ThreadPoolExecutor - 从JDK11源码理解Java线程池原理
在开发HearyHTTPd的过程中,为了有效利用多线程处理并发请求,我使用了Java的线程池机制。我查阅了JDK11中的线程池实现源码,本文对其原理进行进一步的梳理。
ThreadPoolExecutor - 从JDK11源码理解Java线程池原理
1 表层——Executors
JDK对外提供Executors
类的三个静态方法供调用,可以快速生成线程池:
1.1 newSingleThreadExecutor
1 | public static ExecutorService newSingleThreadExecutor() |
退化为只包含一个线程的“线程池”。
1.2 newFixedThreadPool
1 | public static ExecutorService newFixedThreadPool (int nThreads) |
包含固定数量线程的线程池。
1.3 newCachedThreadPool
1 | public static ExecutorService newCachedThreadPool() |
按需创建线程的线程池。
2 深一层——ThreadPoolExecutor
实质上,以上三个对外的静态方法,本质上都实例化了同一个类型,即:ThreadPoolExecutor
,该类继承自抽象类java.util.concurrent.AbstractExecutorService
,该抽象类实现了ExecutorService
接口,该接口又继承自Executor
接口。其中,ExecutorService
就是一般外部调用线程池实例的抽象接口。
ThreadPoolExecutor
提供构造函数:
1 | /** |
可以设置线程池的一系列参数:
- 核心线程池尺寸:线程池至少保有多少线程;
- 最大线程池尺寸:线程池最多能创建多少线程;
- 保活时间:线程数量如果超过核心线程数了,最多允许空闲多久,超过即终止线程;
- 时间单位:保活时间的时间单位,可以是纳秒、微秒、毫秒、秒、分钟、小时、天;
- 工作队列:提交给线程池的任务在执行前,会先放到工作队列中。
2.1 newSingleThreadExecutor
1 | /** |
newSingleThreadExecutor
具体设置参数为:
- 核心线程数和最大线程数都是1,这保证了线程池中有且只有一个线程。
- 因为只有1个线程,所以无所谓超时终止,因此保活时间为0。
- 提交给线程池的线程会放到一个
LinkedBlockingQueue
的实例中。- 这是一个默认无界的阻塞队列(可选有界以控制内存消耗)。
外面套了一层FinalizableDelegatedExecutorService
实际上是该Executors
类定义的一个内部静态类:
1 | private static class FinalizableDelegatedExecutorService |
- 只是实现了
finalize
方法,负责关闭线程池。
该类进一步继承自另一个内部静态类DelegatedExecutorService
,这是一个包装类,用于控制对外的提供的方法:
1 | /** |
2.2 newFixedThreadPool
1 | /** |
newFixedThreadPool
具体设置参数为:
- 核心线程数和最大线程数都是输入参数
nThreads
,这保证了线程池中有且只有nThreads
个线程。 - 因为只有
nThreads
个线程,所以无所谓超时终止,因此保活时间为0。 - 提交给线程池的线程会放到一个
LinkedBlockingQueue
的实例中。
除了可以设置多个线程,其他参数与上一个单线程的线程池非常相似。
2.3 newCachedThreadPool
1 | /** |
newCachedThreadPool
具体设置参数为:
- 核心线程数为0,意味着如果线程池闲置后,不会保留任何线程。
- 最大线程数为最大整数,意味着如果需要,线程数量可以变得非常大。
- 缓存线程池的保活时间是60秒,线程不会立即被销毁,空闲60内如果有新任务,可以直接复用空闲线程。
- 提交给线程池的线程会放到一个
SynchronousQueue
的实例中。
3 再深一层——从AbstractExecutorService到ThreadPoolExecutor
3.1 任务提交伊始——submit
外部使用线程池时,调用的是submit
方法。该方法在接口ExecutorService
中定义,在抽象类AbstractExecutorService
中实现。
具体地,在抽象类AbstractExecutorService
中,submit
的实现为:
1 | /** |
其中,FutureTask
负责统一把输入的无论是Runnable
还是Callable
都统一转换为FutureTask
实例,并以RunnableFuture
接口的抽象形式返回。
随后,调用execute(ftask)
方法来执行新提交的任务。该方法在抽象类的子类——ThreadPoolExecutor
中具体实现。
3.2 开始执行任务——execute
任务提交后需要执行起来,submit
中执行的方法execute(ftask)
在抽象类的子类——ThreadPoolExecutor.execute
中具体实现:
1 | /** |
可以看到,当工作线程较少,还不到核心线程数时,该方法会添加一个新线程,并把输入的Runnable command
交给新线程执行。
如果已经达到核心线程数,该方法进行了一系列谨慎的检查工作,并且把输入的Runnable command
加入到了workQueue
中,具体地:
检查线程池控制字判断线程池是否在工作,如果是,就把任务加入工作队列workQueue.offer(command)
:
- 如果任务能够顺利加入工作队列(true),那么会有工作线程去处理它。典型的工作队列
LinkedBlockingQueue
就属于这一类,这种情况下,线程数量不会超过corePoolSize
的核心线程数。- 当然,因为避免用重型锁,这里采用了CAS锁的形式,来避免加入任务后,线程池关闭、零线程的情况。
- 如果第二次检查发现线程池不运行了,就移除刚刚加入的任务,并
reject
(reject
方法会进一步调用RejectedExecutionHandler
实例的handler.rejectedExecution(command, this);
方法以便处理这种任务被线程池拒绝的情况); - 如果线程池还在运行,但是没有工作线程,就新建一个线程来处理工作队列中新加入的任务。
- 如果第二次检查发现线程池不运行了,就移除刚刚加入的任务,并
- 当然,因为避免用重型锁,这里采用了CAS锁的形式,来避免加入任务后,线程池关闭、零线程的情况。
- 如果任务不能顺利加入工作队列(false),那么就需要启动新的工作线程。典型的工作队列
SynchronousQueue
就属于这一类,如果没有线程阻塞在读取上,就无法插入新的任务,即会返回false。这么一来,就会启动新的线程,毕竟有阻塞在读取上的线程,才能加入新的任务。这意味着线程数量完全有可能超过corePoolSize
规定的核心线程数。
3.3 工作队列——BlockingQueue
工作队列是一个阻塞队列,用于解决生产者-消费者问题。也就是说,execute
扮演的是一个生产者的角色,它负责把检查过的任务加入到工作队列中,供线程池中的工作线程取出并执行。
在ThreadPoolExecutor
中,工作队列workQueue
是一个BlockingQueue<Runnable>
:
1 | /** |
3.4 添加工作线程——addWorker
在上述execute
方法中,要执行Runnable
任务,需要线程池中有工作线程,是通过调用addWorker
实现的。
具体地,线程池工作线程的创建和添加操作在ThreadPoolExecutor.addWorker
方法中具体实现:
1 | /** |
其中,会根据输入参数boolean core
来约束工作线程数的上限。如果core==true
,则线程数不超过corePoolSize
;否则线程数上限为maxPoolSize
。
3.5 线程池工作线程——runWorker
线程池中保有的工作线程作为消费者的一方,要从工作队列中取出任务并执行。
具体地,线程池工作线程的主循环在ThreadPoolExecutor.runWorker
方法中具体实现:
1 | /** |
开头task
初始值为firstTask
,是线程池的成员变量,用于引用初始任务,通常为空。因此,线程池的主线程作为workQueue
的消费者,通常情况下都是通过getTask()
方法来取出任务。
取出任务后,谨慎地进行线程池的状态检查,并在运行任务的前后,分别调用beforeExecute
和afterExecute
方法。这两个方法在ThreadPoolExecutor
中实现内容为空,也就是说不做任何事情。这两个方法是预留的,可以被继承实现,以增加额外的检查和功能(如:记录日志)。
运行任务显得很简单,线程池的工作线程执行Runnable
任务实例的task,run()
方法即可。
3.6 从工作队列中取任务——getTask
ThreadPoolExecutor.getTask()
的实现:
1 | /** |
可以看到,根据timed
变量,决定是限时等待型读取或是阻塞型读取。
- 如果
timed==true
,则在主循环中限时等待提取工作队列中的任务,即workQueue.poll
方法:- 如果工作队列中有任务,则立即返回;
- 如果没有,则等待指定时间,如果仍然没有,则返回
null
。 - (Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.)
- 如果
timed==false
,则在主循环中阻塞式提取工作队列中任务,即workQueue.take
方法:- 如果工作队列中有任务,则立即返回;
- 如果没有,则线程阻塞,直到生产者加入任务后,有任务实例再返回。
4 J.U.C阻塞队列
上述“3.2
开始执行任务——execute”中解释了LinkedBlockingQueue与SynchronousQueue为什么会分别用于不同的ThreadPoolExecutor。尤其是同步队列,CachedThreadPool
依靠其插入失败就可以检测到没有数量匹配的读线程,由此增加线程池的线程数。
4.1 无容量的同步队列——SynchronousQueue
这个容器就比较特别了,虽然名字是队列,但实际上没有任何容量。
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
使用该容器时,必须先取再插。也就是说,对于一个同步队列,如果没有任何线程在读取它,别的线程就无法对其插入新数据。通常,需要先启动一个线程读取同步队列,此时同步队列尚无数据,则该读线程会处于阻塞等待地状态。随后,启动一个线程向同步队列中插入数据,此时,阻塞等待数据的读线程会唤醒并读取插入数据。
因为其插入时必须要有读线程的特性,该容器被应用于检测读线程少于插入任务数量的情况,引导线程池增加新线程。
4.2 链表阻塞队列——LinkedBlockingQueue
不难理解,这是一个基于链表实现的阻塞队列。
An optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
既然是链表实现,那一般理解是可以无界的,当然也可以指定大小限定为有界。
链表阻塞队列采用FIFO模式,队列头是最早插入的,队列尾是最新插入的,取出时依据FIFO顺序。
基于链表阻塞队列在并发应用中吞吐量通常比基于数组的阻塞队列更大,因为基于链表的阻塞队列不至于同步锁定整个数组容器。基于链表的阻塞队列实际上在读写时,锁定入队和出队的位置就可以了。
4.3 数组阻塞队列——ArrayBlockingQueue
不难理解,这是一个基于数组实现的阻塞队列。
A bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.
This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.
既然是基于数组实现的,那容器肯定是有界的,创建时就确定的,无法动态变化。这样一来,如果数组中存满了,再插入新的数据就需要阻塞至有元素被取走,同样地,如果数组中没有元素,读取操作需要阻塞至有元素被插入。
另外,数组阻塞队列还存在一个公平策略,如果严格要求保障FIFO的出入队列顺序,需要启用公平策略,这样可以避免饥饿问题(因为否则的话,可能有些元素长时间都不会被轮到取出来),但是也会减少吞吐量。