线程池的学习和使用

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

线程池的学习和使用

Ronaldo7   2020-03-06 我要评论
## 什么是线程池 线程池的作用是初始化一些线程,当有任务的时候,就从中启动一个来执行相关任务,执行完后,线程资源重新回收到线程池中,达到复用的效果,从而减少资源的开销 ## 创建线程池 在JDK中,`Executors`类已经帮我们封装了创建线程池的方法。 ```java Executors.newFixedThreadPool(); Executors.newCachedThreadPool(); Executors.newScheduledThreadPool(); ``` 但是点进去看的话, ```java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } ``` 它的内部实现还是基于`ThreadPoolExecutor`来实现的。通过阿里代码规范插件扫描会提示我们用`ThreadPoolExecutor`去实现线程池。通过查看`ThreadPoolExecutor`的构造方法 ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... do something ... } ``` 我觉得有以下几方面的原因。 1. 可以灵活设置`keepAliveTime`(当线程池中线程数大于`corePoolSize`的数m, 为这m个线程设置的最长等待时间 ),节约系统资源。 2. `workQueue`:线程等待队列,在`Executors`中默认的是`LinkedBlockingQueue`。可以理解是一种无界的数组,当有不断有线程来的时候,可能会撑爆机器内存。 3. 可以设线程工厂,里面添加自己想要的一些元素,只需要实现JDK的`ThreadFactory`类。 4. 按照自己的业务设置合适的拒绝策略。策略有以下几种 1. AbortPolicy:直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐 2. DiscardPolicy:默默丢弃无法加载的任务。 3. DiscardOldestPolicy:丢弃队列中最老的,然后再次尝试提交新任务。 4. CallerRunsPolicy:在调用者线程中(也就是说谁把 r 这个任务甩来的),运行当前被丢弃的任务。只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。如果线程池已经被关闭,则直接丢弃该任务。 ## 使用线程池 ### 声明`ThreadFactory` ```java public class NacosSyncThreadFactory implements ThreadFactory { private final AtomicInteger threadNum = new AtomicInteger(1); private String threadPrefix = null; private ThreadGroup threadGroup; public NacosSyncThreadFactory(String prefix) { this.threadPrefix = "thread" + "-" + prefix + "-" ; threadGroup = Thread.currentThread().getThreadGroup(); } public NacosSyncThreadFactory() { this("pool"); } @Override public Thread newThread(Runnable r) { String name = threadPrefix + threadNum.incrementAndGet(); Thread thread = new Thread(threadGroup, r, name); return thread; } } ``` ### 创建线程池类 ```java public class MyThreadPool { private ThreadFactory threadFactory; private int threadNum; private BlockingQueue blockingQueue; private RejectedExecutionHandler handler; public MyThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler ) { this.threadFactory = threadFactory; this.threadNum = threadNum; this.blockingQueue = blockingQueue; this.handler = handler; } public MyThreadPool() { this(Executors.defaultThreadFactory(), 10, new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy()); } public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) { if (handler == null) { handler = new ThreadPoolExecutor.AbortPolicy(); } return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler); } } ``` ### 调用线程池 1. 初始化线程池类 ```java MyThreadPool myThreadPool = new MyThreadPool(); threadPoolExecutor = myThreadPool.initThreadPool( new NacosSyncThreadFactory("nacos-sync"), threadNum, new ArrayBlockingQueue(10), new ThreadPoolExecutor.DiscardPolicy() ); } ``` 2. 创建Callable(FutureTask) ```java /** * 分页获取task信息 * @return */ private List getTask(int pageNum) { IPage page = new Page(pageNum, 25); IPage taskIPage = this.taskService.page(page); if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) { return null; } return taskIPage.getRecords(); } // 执行任务 private FutureTask assembleTaskFuture(Task task) { FutureTask futureTask = new FutureTask(() -> { // 执行任务 this.doSyncWork(task); return "success"; }); return futureTask; } ``` 3. 执行任务(FutureTask) ```java public void zkSync() { // 获取数据总数,得到线程数 int count = this.taskService.count(); int pageSize = 25; int num = count / pageSize; int pageTotal = count % pageSize == 0 ? num : num + 1; log.info("========总记录数:{}=====总页数:{}", count, pageTotal); for (int i = 1; i <= pageTotal; i++) { List taskList = this.getTask(i); if (CollectionUtils.isEmpty(taskList)) { break; } List collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList()); taskList.forEach(task -> { FutureTask futureTask = this.assembleTaskFuture(task); threadPoolExecutor.execute(futureTask); }); } threadPoolExecutor.shutdown(); } ```

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们