### 系统计算路由MessageQueue ```java SendResult send = producer.send(message, 60 * 1000); ``` #### 系统计算路由MessageQueue的其他路由算法 ```java public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 默认策略(路由到当前的broker主节点列表取模后的broker中) return tpInfo.selectOneMessageQueue(lastBrokerName); } ``` ### 自定义路由MessageQueue ```java SendResult send = producer.send(message, new MessageQueueSelector() { /** * * @param mqs 通过name server返回的broker主节点列表 * @param msg 当前消息 * @param arg * @return */ @Override public MessageQueue select(List
![](https://s1.ax1x.com/2020/03/15/81Rgc8.png) #### 负责均衡策略 ![](https://s1.ax1x.com/2020/03/15/81WDrF.png) 1. AllocateMessageQueueAveragely ### 负载均衡的时机 ```java // RebalanceService @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); // 开始进行分配 this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } ``` 具体实现 ```java /** consumerGroup : 消费组名称 currentCID:当前消费者实例Id(随机数) mqAll: 该topic对应的queue的信息列表 cidAll: 消费组中所有的消费者列表 */ @Override public List