大家好,我是小郭,今天主要来和大家聊一聊RocketMQ中的线程池是如何创建的,如何设置线程池数量,同时也可以从中去学习到一些线程池的实践和需要注意的一些细节。
在RocketMQ中存在了大量的对线程池的使用,从消息的生产到投递Broker中,到最后的消息消费每一个环节中都大量使用到线程池的地方,下面我们拿出几个不同类型的线程池来看一看。
在 NameServer的路由注册和剔除中,多次使用到了定时线程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread"));
// 定时任务 每10s扫描一次Broker,移除失活Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定时任务,每隔30s向集群中所有NameServer发送心跳包 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
FixedThreadPool常用于创建一个固定大小的线程池,
它的特点就是核心线程数量与最大线程数量一致,采用无界的阻塞队列 LinkedBlockingQueue,并且没有设置队列的大小默认是Integer.MAX_VALUE,适用于负载较重的场景
private ExecutorService remotingExecutor; this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 用来设置接收到消息后的处理方法 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
核心线程数与最大线程数设置均为 Runtime.getRuntime().availableProcessors() ,可用的计算资源
阻塞队列设置为一个初始化50000长度的阻塞队列
keepAliveTime设置60s,超过则时间空闲的线程将被终止
private final ExecutorService defaultAsyncSenderExecutor; private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000); this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } });
我们重点来看一下消费端的线程池是如何创建,它可以说是整个RocketMQ中最关键的一个线程池
为了提高消费速度,我们通常有两种方式来提高消费并行度
在消息监听的时候,利用线程池进行不断的拉取消息
提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
创建内部线程池,核心参数核心线程数和最大线程数,主要是根据配置来进行设置
设置线程池名称以 ConsumeMessageThread_ 开头的,利于排查问题
阻塞队列是一个无界的阻塞队列LinkedBlockingQueue
private final BlockingQueue<Runnable> consumeRequestQueue; this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(consumeThreadPrefix));
通过RocketMQ的源码,我们看到 consumeExecutor 线程池的创建也是非常简单的
根据线程池的原理我们知道,只有阻塞队列为满的情况下,不会创建临时线程
所以线程池内部持有的队列为一个无界队列,导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量
在正常的业务场景中,启动应用之后,我们就不会再修改消费者线程数,但有可能突发业务高峰导致消息堆积,这时候我们就需要调整单个 Consumer 的消费并行线程数。
JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略
@Override public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } }
这两种方式都存在一定的痛点
针对上面的痛点问题,我们可以考虑封装线程池动态参数调整,首先肯定原来代码是毫无侵入性的,
同时通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。