Loading... # java线程池初探 java提供了Executor接口,它是所有线程池的父接口,其中定义了execute()方法,也就是任务的执行方法。ExecutorService接口继承了Executor接口,对其进行了一些扩展,主要添加了线程池生命周期的管理。我们通常使用的线程池有两大类,其都为ExecutorService接口的实现,**ThreadPoolExecutor**与**ForkJoinPool**。 ![线程池继承关系.jpg][1] ## ThreadPoolExecutor 比较常用的线程池,结构为一个工作线程容器与一个执行队列。工作线程去执行队列中取任务执行。 ![threadpool.jpg][2] 其构造方法为: ``` ThreadPoolExecutor tpe = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); ``` 该构造方法有7个参数,分别是: 1. corePoolSize: 核心线程数 2. maximumPoolSize:最大线程数 3. keepAliveTime: 存活时间 4. unit: 时间单位 5. workQueue: 执行队列 6. threadFactory: 线程工厂 7. handler: 拒绝策略 线程池处理第一个任务时,会创建一个处理线程,当有新的任务到来时,判断有没有空闲线程,如果没有空闲线程,但是核心线程数没达到,就直接启动一个核心线程去处理,如果核心线程也满了,就放入执行队列中等待。当既没有空闲核心线程,且执行队列也满了时,判断是否达到最大线程数,如果没达到,就创建一个普通线程去处理该任务。当没有空闲线程且达到了最大线程数,并且队列也满了时,进来新任务会触发拒绝策略。存活时间为当线程空闲超过存活时间时,线程会被回收,通常情况下核心线程是不会被回收的,被回收的只有普通线程。线程工厂负责创建线程,一般我们需要自定义一个工厂的实现,加上标记,方便异常追踪,jdk也提供了一个默认实现**Executors.defaultThreadFactory()**。java也提供了四种默认的拒绝策略。 - AbortPolicy: 直接抛出异常,并丢弃任务 - DiscardPolicy: 丢弃任务,不抛出异常 - DiscardOldestPolicy: 丢去排队时间最久的任务 - CallerRunsPolicy: 调用者处理任务,就是哪个线程调用的线程池execute()方法,哪个线程去处理 正式工作中,我们一般自己实现RejectedExecutionHandler接口,当触发拒绝策略时,记录日志,并将被拒绝的任务存入消息队列或reids中,有时间再重新处理,或者提升硬件算力,减少被拒绝的任务数量。 java提供了几种默认的线程池实现 ### SingleThreadExecutor ``` ExecutorService service = Executors.newSingleThreadExecutor(); ``` 构造方法 ``` public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } ``` 这是一个单线程线程池,他最多只能有一个线程处理任务,可以保证任务的执行顺序。 弊端:执行队列是一个无界队列,当并发过高时,造成OOM。 ### CachedThreadPool ``` ExecutorService service = Executors.newCachedThreadPool(); ``` 构造方法 ``` public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } ``` 可以看出,最大线程数是int的最大值2^31-1,执行队列是一个SynchronousQueue,容量是0,也就是每次来一个新的任务当没有空闲线程时,都会启一个新的,该线程池可以保证每个任务都有线程执行。 弊端:线程最大数量为int最大值,并发过多时,大量线程切换消耗资源,导致CPU和内存占满。 ### FixedThreadPool ``` ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); ``` 构造方法 ``` public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } ``` 该线程池直接启动n个核心线程处理任务,避免线程过多造成资源损耗。 弊端:同SingleThreadExecutor。 ### ScheduledThreadPool ``` ScheduledExecutorService service = Executors.newScheduledThreadPool(4); ``` 构造方法 ``` public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } ``` 它的执行队列是一个DelayedWorkQueue,可以根据时间来执行任务,是一个任务调度线程池。提供了很多好用的api,比如 ``` public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); ``` 每过多长时间,执行一次任务。 弊端:复杂的调度任务,通常会用Quartz甚至自己开发框架,简单的用Timer完全可以搞定,聊胜于无。 **总结**:java提供的线程池大多有弊端,根据阿里巴巴的规范,禁止用java默认的线程池实现,要自己new一个ThreadPoolExecutor实例,并且自定义线程工厂。 ### 执行 调用线程池提供了两种任务执行方法: #### execute(Runnable runnable) 该方法传入一个Runnable直接交给线程池去执行,没有返回值。 #### submit(Callable callable) Callable,对Runnable进行了扩展,实现它需要重写call方法 ``` V call() throws Exception; ``` 该方法对应Runnable的run方法,区别是这个方法会有一个返回值。 submit方法传入一个Callable会异步非阻塞执行,并且返回一个Future。 Future,未来,也就是将来要给我返回来的结果,线程会在调用`future.get()`的位置阻塞,直到Callable的结果返回。 对于Future还有一个很好用的实现FutureTask ``` public class FutureTask<V> implements RunnableFuture<V> ``` 可以看到,它实现了一个RunnableFuture接口,什么是RunnableFuture ``` public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); } ``` 可以看到RunnableFuture实现了Runnable和Future两个接口,也就是说FutureTask既是一个任务,又是一个Future。 ``` FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1000; }); //new Callable () { Integer call();} new Thread(task).start(); System.out.println(task.get()); //阻塞 ``` Future管理:CompletableFuture CompletableFuture可以将多个任务并行执行,并且等待某几个的结果返回。 比如我要获取淘宝,天猫,京东的价格信息,直到所有的信息返回执行后续步骤 ``` CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(()->priceOfTB()); CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(()->priceOfTM()); CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(()->priceOfJD()); ``` 三个任务并行执行 ``` CompletableFuture.allOf(futureTM, futureTB, futureJD).join(); ``` allOf传入n个CompletableFuture,阻塞,直到这些任务全部执行完成。 ``` CompletableFuture.anyOf(futureTM, futureTB, futureJD).join(); ``` anyOf传入n个CompletableFuture,阻塞,直到其中任意一个任务执行完成。 CompletableFuture还提供了对返回结果的链式处理。 ``` CompletableFuture.supplyAsync(()->priceOfTM()) .thenApply(String::valueOf) .thenApply(str-> "price " + str) .thenAccept(System.out::println); ``` CompletableFuture是基于ForkJoinPool实现的。 ## ForkJoinPool 该线程池可以将一个任务拆分成多个小任务,每个小任务还可以继续差分,直到满足我们的执行条件为止,这个过程就是fork,当任务执行完成的时候,结果会向它的上一级节点汇总,直到汇总到根节点,合并成一个结果,这个过程就是join。 ![forkjoin.jpg][3] 它的结构是一个工作线程容器,然后每个线程都有一个自己的执行队列。 ![forkjoinpool.jpg][4] ForkJoinPool的execute()方法支持两种类型的参数,一种是**RecursiveAction**,该类型没有返回值,另外一种是**RecursiveTask<V>**,会有一个返回值。 示例 计算1000000个数字之和,每个线程最多计算50000个。 ``` public class Test { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); //初始化数组,插入随机数 static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } } public static void main(String[] args) throws IOException { //无返回值 ForkJoinPool fjp = new ForkJoinPool(); AddAction task = new AddAction(0, nums.length); fjp.execute(task); //有返回值 ForkJoinPool fjp1 = new ForkJoinPool(); AddTask task1 = new AddTask(0, nums.length); fjp1.execute(task1); long result = task1.join(); System.out.println(result); } static class AddAction extends RecursiveAction { int start, end; AddAction (int s, int e) { start = s; end = e; } //该方法会递归调用,无限拆分,直到小于等于50000 @Override protected void compute() { //如果小于50000个,直接累加返回结果 if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); //否则,分成两个子任务执行 } else { int middle = start + (end-start)/2; AddAction subTask1 = new AddAction (start, middle); AddAction subTask2 = new AddAction (middle, end); subTask1.fork(); subTask2.fork(); } } } static class AddTask extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; int start, end; AddTask (int s, int e) { start = s; end = e; } //该方法会递归调用,无限拆分,直到小于等于50000 @Override protected Long compute() { //如果小于50000个,直接累加返回结果 if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } //否则,分成两个子任务执行 int middle = start + (end-start)/2; AddTask subTask1 = new AddTask (start, middle); AddTask subTask2 = new AddTask (middle, end); subTask1.fork(); subTask2.fork(); //返回两个任务的累加结果 return subTask1.join() + subTask2.join(); } } } ``` ForkJoinPool也提供了默认实现,比如**WorkStealingPool** 该线程池,每个线程都有自己的任务队列,当某个线程队列中的任务执行完成时,会去其他线程的队列中取,这样就避免了有的线程压力过大,而有的线程又很空闲的问题。 ![fjwprker.jpg][5] java的parallelStream(并行流)底层也是通过ForkJoinPool实现的,可以把一个任务切分成多个子任务。 ``` nums.parallelStream().forEach(); ``` 它的执行速度比普通的forEach要快很多。 [1]: https://www.princelei.club/usr/uploads/2020/03/3885033506.jpg [2]: https://www.princelei.club/usr/uploads/2020/03/1438939417.jpg [3]: https://www.princelei.club/usr/uploads/2020/03/246713941.jpg [4]: https://www.princelei.club/usr/uploads/2020/03/1149313145.jpg [5]: https://www.princelei.club/usr/uploads/2020/03/812488425.jpg Last modification:June 11th, 2020 at 06:07 pm © 允许规范转载