ThreadPoolExecutor - 从JDK11源码理解Java线程池原理

在开发HearyHTTPd的过程中,为了有效利用多线程处理并发请求,我使用了Java的线程池机制。我查阅了JDK11中的线程池实现源码,本文对其原理进行进一步的梳理。

ThreadPoolExecutor - 从JDK11源码理解Java线程池原理

1 表层——Executors

JDK对外提供Executors类的三个静态方法供调用,可以快速生成线程池:

1.1 newSingleThreadExecutor

1
public static ExecutorService newSingleThreadExecutor()

退化为只包含一个线程的“线程池”。

1.2 newFixedThreadPool

1
public static ExecutorService newFixedThreadPool (int nThreads)

包含固定数量线程的线程池。

1.3 newCachedThreadPool

1
public static ExecutorService newCachedThreadPool()

按需创建线程的线程池。

2 深一层——ThreadPoolExecutor

实质上,以上三个对外的静态方法,本质上都实例化了同一个类型,即:ThreadPoolExecutor,该类继承自抽象类java.util.concurrent.AbstractExecutorService,该抽象类实现了ExecutorService接口,该接口又继承自Executor接口。其中,ExecutorService就是一般外部调用线程池实例的抽象接口。

ThreadPoolExecutor提供构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters, the default thread factory and the default rejected
* execution handler.
*
* <p>It may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

可以设置线程池的一系列参数:

  • 核心线程池尺寸:线程池至少保有多少线程;
  • 最大线程池尺寸:线程池最多能创建多少线程;
  • 保活时间:线程数量如果超过核心线程数了,最多允许空闲多久,超过即终止线程;
  • 时间单位:保活时间的时间单位,可以是纳秒、微秒、毫秒、秒、分钟、小时、天;
  • 工作队列:提交给线程池的任务在执行前,会先放到工作队列中。

2.1 newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor具体设置参数为:

  • 核心线程数和最大线程数都是1,这保证了线程池中有且只有一个线程。
  • 因为只有1个线程,所以无所谓超时终止,因此保活时间为0。
  • 提交给线程池的线程会放到一个LinkedBlockingQueue的实例中。
    • 这是一个默认无界的阻塞队列(可选有界以控制内存消耗)。

外面套了一层FinalizableDelegatedExecutorService实际上是该Executors类定义的一个内部静态类:

1
2
3
4
5
6
7
8
9
10
private static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
@SuppressWarnings("deprecation")
protected void finalize() {
super.shutdown();
}
}
  • 只是实现了finalize方法,负责关闭线程池。

该类进一步继承自另一个内部静态类DelegatedExecutorService,这是一个包装类,用于控制对外的提供的方法:

1
2
3
4
5
6
7
8
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
private static class DelegatedExecutorService
implements ExecutorService {
// ...
}

2.2 newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool具体设置参数为:

  • 核心线程数和最大线程数都是输入参数nThreads,这保证了线程池中有且只有nThreads个线程。
  • 因为只有nThreads个线程,所以无所谓超时终止,因此保活时间为0。
  • 提交给线程池的线程会放到一个LinkedBlockingQueue的实例中。

除了可以设置多个线程,其他参数与上一个单线程的线程池非常相似。

2.3 newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newCachedThreadPool具体设置参数为:

  • 核心线程数为0,意味着如果线程池闲置后,不会保留任何线程。
  • 最大线程数为最大整数,意味着如果需要,线程数量可以变得非常大。
  • 缓存线程池的保活时间是60秒,线程不会立即被销毁,空闲60内如果有新任务,可以直接复用空闲线程。
  • 提交给线程池的线程会放到一个SynchronousQueue的实例中。

3 再深一层——从AbstractExecutorService到ThreadPoolExecutor

3.1 任务提交伊始——submit

外部使用线程池时,调用的是submit方法。该方法在接口ExecutorService中定义,在抽象类AbstractExecutorService中实现。

具体地,在抽象类AbstractExecutorService中,submit的实现为:

1
2
3
4
5
6
7
8
9
10
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

其中,FutureTask负责统一把输入的无论是Runnable还是Callable都统一转换为FutureTask实例,并以RunnableFuture接口的抽象形式返回。

随后,调用execute(ftask)方法来执行新提交的任务。该方法在抽象类的子类——ThreadPoolExecutor中具体实现。

3.2 开始执行任务——execute

任务提交后需要执行起来,submit中执行的方法execute(ftask)在抽象类的子类——ThreadPoolExecutor.execute中具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

可以看到,当工作线程较少,还不到核心线程数时,该方法会添加一个新线程,并把输入的Runnable command交给新线程执行。

如果已经达到核心线程数,该方法进行了一系列谨慎的检查工作,并且把输入的Runnable command加入到了workQueue中,具体地:

检查线程池控制字判断线程池是否在工作,如果是,就把任务加入工作队列workQueue.offer(command)

  • 如果任务能够顺利加入工作队列(true),那么会有工作线程去处理它。典型的工作队列LinkedBlockingQueue就属于这一类,这种情况下,线程数量不会超过corePoolSize的核心线程数。
    • 当然,因为避免用重型锁,这里采用了CAS锁的形式,来避免加入任务后,线程池关闭、零线程的情况。
      • 如果第二次检查发现线程池不运行了,就移除刚刚加入的任务,并rejectreject方法会进一步调用RejectedExecutionHandler实例的handler.rejectedExecution(command, this);方法以便处理这种任务被线程池拒绝的情况);
      • 如果线程池还在运行,但是没有工作线程,就新建一个线程来处理工作队列中新加入的任务。
  • 如果任务不能顺利加入工作队列(false),那么就需要启动新的工作线程。典型的工作队列SynchronousQueue就属于这一类,如果没有线程阻塞在读取上,就无法插入新的任务,即会返回false。这么一来,就会启动新的线程,毕竟有阻塞在读取上的线程,才能加入新的任务。这意味着线程数量完全有可能超过corePoolSize规定的核心线程数。

3.3 工作队列——BlockingQueue

工作队列是一个阻塞队列,用于解决生产者-消费者问题。也就是说,execute扮演的是一个生产者的角色,它负责把检查过的任务加入到工作队列中,供线程池中的工作线程取出并执行。

ThreadPoolExecutor中,工作队列workQueue是一个BlockingQueue<Runnable>

1
2
3
4
5
6
7
8
9
10
11
12
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;

3.4 添加工作线程——addWorker

在上述execute方法中,要执行Runnable任务,需要线程池中有工作线程,是通过调用addWorker实现的。

具体地,线程池工作线程的创建和添加操作在ThreadPoolExecutor.addWorker方法中具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

其中,会根据输入参数boolean core来约束工作线程数的上限。如果core==true,则线程数不超过corePoolSize;否则线程数上限为maxPoolSize

3.5 线程池工作线程——runWorker

线程池中保有的工作线程作为消费者的一方,要从工作队列中取出任务并执行。

具体地,线程池工作线程的主循环在ThreadPoolExecutor.runWorker方法中具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

开头task初始值为firstTask,是线程池的成员变量,用于引用初始任务,通常为空。因此,线程池的主线程作为workQueue的消费者,通常情况下都是通过getTask()方法来取出任务。

取出任务后,谨慎地进行线程池的状态检查,并在运行任务的前后,分别调用beforeExecuteafterExecute方法。这两个方法在ThreadPoolExecutor中实现内容为空,也就是说不做任何事情。这两个方法是预留的,可以被继承实现,以增加额外的检查和功能(如:记录日志)。

运行任务显得很简单,线程池的工作线程执行Runnable任务实例的task,run()方法即可。

3.6 从工作队列中取任务——getTask

ThreadPoolExecutor.getTask()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

可以看到,根据timed变量,决定是限时等待型读取或是阻塞型读取。

  • 如果timed==true,则在主循环中限时等待提取工作队列中的任务,即workQueue.poll方法:
    • 如果工作队列中有任务,则立即返回;
    • 如果没有,则等待指定时间,如果仍然没有,则返回null
    • (Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.)
  • 如果timed==false,则在主循环中阻塞式提取工作队列中任务,即workQueue.take方法:
    • 如果工作队列中有任务,则立即返回;
    • 如果没有,则线程阻塞,直到生产者加入任务后,有任务实例再返回。

4 J.U.C阻塞队列

4.1 无容量的同步队列——SynchronousQueue

这个容器就比较特别了,虽然名字是队列,但实际上没有任何容量。

A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.

支持我的写作!(Support my writing!)

欢迎关注我的其它发布渠道