RocketMQ-2.RocketMQ的负载均衡

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

RocketMQ-2.RocketMQ的负载均衡

Ronaldo7   2020-03-15 我要评论
[toc] # RocketMQ的负载均衡 ### producer对`MessageQueue`的负载均衡 ![](https://s1.ax1x.com/2020/03/14/8lOWV0.jpg) 通过调试代码可以知道,所谓的`MessageQueue`就是broker上的队列信息,每个topic在创建的时候可以指定相应的queue的数量。**也就是说,一个topic的消息存储在多个主broker中** ## producer负载均衡 producer端的负载均衡主要是在选择对应的broker。在producer发送消息的时候会对消息进行路由,看到底是路由到哪个broker。下面主要说下以下两种发送消息的方法:`系统计算路由MessageQueue`,`自定义路由MessageQueue`。
### 系统计算路由MessageQueue ```java SendResult send = producer.send(message, 60 * 1000); ``` #### 系统计算路由MessageQueue的其他路由算法 ```java public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 默认策略(路由到当前的broker主节点列表取模后的broker中) return tpInfo.selectOneMessageQueue(lastBrokerName); } ``` ### 自定义路由MessageQueue ```java SendResult send = producer.send(message, new MessageQueueSelector() { /** * * @param mqs 通过name server返回的broker主节点列表 * @param msg 当前消息 * @param arg * @return */ @Override public MessageQueue select(List mqs, Message msg, Object arg) { int size = mqs.size(); long timeMillis = System.currentTimeMillis(); return mqs.get((int)timeMillis % size); } }, 60 * 1000); ``` ## Consumer的负载均衡 ### 消费端设置负责均衡策略 在`consumer.statrt()`中,consumer会对所订阅的topic上的messagequeue做负载均衡`DefaultConsumerPushImpl.start()`下的` this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); `, 默认返回的是`AllocateMessageQueueAveragely`
![](https://s1.ax1x.com/2020/03/15/81Rgc8.png) #### 负责均衡策略 ![](https://s1.ax1x.com/2020/03/15/81WDrF.png) 1. AllocateMessageQueueAveragely ### 负载均衡的时机 ```java // RebalanceService @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); // 开始进行分配 this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } ``` 具体实现 ```java /** consumerGroup : 消费组名称 currentCID:当前消费者实例Id(随机数) mqAll: 该topic对应的queue的信息列表 cidAll: 消费组中所有的消费者列表 */ @Override public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { if (currentCID == null || currentCID.length() < 1) { throw new IllegalArgumentException("currentCID is empty"); } if (mqAll == null || mqAll.isEmpty()) { throw new IllegalArgumentException("mqAll is null or mqAll empty"); } if (cidAll == null || cidAll.isEmpty()) { throw new IllegalArgumentException("cidAll is null or cidAll empty"); } List result = new ArrayList(); if (!cidAll.contains(currentCID)) { log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", consumerGroup, currentCID, cidAll); return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; } ```

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们