ThreadPoolExecutor - 从JDK11源码理解Java线程池原理

在开发HearyHTTPd的过程中,为了有效利用多线程处理并发请求,我使用了Java的线程池机制。我查阅了JDK11中的线程池实现源码,本文对其原理进行进一步的梳理。

ThreadPoolExecutor - 从JDK11源码理解Java线程池原理

1 表层——Executors

JDK对外提供Executors类的三个静态方法供调用,可以快速生成线程池:

1.1 newSingleThreadExecutor

1
public static ExecutorService newSingleThreadExecutor()

退化为只包含一个线程的“线程池”。

1.2 newFixedThreadPool

1
public static ExecutorService newFixedThreadPool (int nThreads)

包含固定数量线程的线程池。

1.3 newCachedThreadPool

1
public static ExecutorService newCachedThreadPool()

按需创建线程的线程池。

2 深一层——ThreadPoolExecutor

实质上,以上三个对外的静态方法,本质上都实例化了同一个类型,即:ThreadPoolExecutor,该类继承自抽象类java.util.concurrent.AbstractExecutorService,该抽象类实现了ExecutorService接口,该接口又继承自Executor接口。其中,ExecutorService就是一般外部调用线程池实例的抽象接口。

ThreadPoolExecutor提供构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters, the default thread factory and the default rejected
* execution handler.
*
* <p>It may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

可以设置线程池的一系列参数:

  • 核心线程池尺寸:线程池至少保有多少线程;
  • 最大线程池尺寸:线程池最多能创建多少线程;
  • 保活时间:线程数量如果超过核心线程数了,最多允许空闲多久,超过即终止线程;
  • 时间单位:保活时间的时间单位,可以是纳秒、微秒、毫秒、秒、分钟、小时、天;
  • 工作队列:提交给线程池的任务在执行前,会先放到工作队列中。

2.1 newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor具体设置参数为:

  • 核心线程数和最大线程数都是1,这保证了线程池中有且只有一个线程。
  • 因为只有1个线程,所以无所谓超时终止,因此保活时间为0。
  • 提交给线程池的线程会放到一个LinkedBlockingQueue的实例中。
    • 这是一个默认无界的阻塞队列(可选有界以控制内存消耗)。

外面套了一层FinalizableDelegatedExecutorService实际上是该Executors类定义的一个内部静态类:

1
2
3
4
5
6
7
8
9
10
private static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
@SuppressWarnings("deprecation")
protected void finalize() {
super.shutdown();
}
}
  • 只是实现了finalize方法,负责关闭线程池。

该类进一步继承自另一个内部静态类DelegatedExecutorService,这是一个包装类,用于控制对外的提供的方法:

1
2
3
4
5
6
7
8
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
private static class DelegatedExecutorService
implements ExecutorService {
// ...
}

2.2 newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool具体设置参数为:

  • 核心线程数和最大线程数都是输入参数nThreads,这保证了线程池中有且只有nThreads个线程。
  • 因为只有nThreads个线程,所以无所谓超时终止,因此保活时间为0。
  • 提交给线程池的线程会放到一个LinkedBlockingQueue的实例中。

除了可以设置多个线程,其他参数与上一个单线程的线程池非常相似。

2.3 newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newCachedThreadPool具体设置参数为:

  • 核心线程数为0,意味着如果线程池闲置后,不会保留任何线程。
  • 最大线程数为最大整数,意味着如果需要,线程数量可以变得非常大。
  • 缓存线程池的保活时间是60秒,线程不会立即被销毁,空闲60内如果有新任务,可以直接复用空闲线程。
  • 提交给线程池的线程会放到一个SynchronousQueue的实例中。

3 再深一层——从AbstractExecutorService到ThreadPoolExecutor

3.1 任务提交伊始——submit

外部使用线程池时,调用的是submit方法。该方法在接口ExecutorService中定义,在抽象类AbstractExecutorService中实现。

具体地,在抽象类AbstractExecutorService中,submit的实现为:

1
2
3
4
5
6
7
8
9
10
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

其中,FutureTask负责统一把输入的无论是Runnable还是Callable都统一转换为FutureTask实例,并以RunnableFuture接口的抽象形式返回。

随后,调用execute(ftask)方法来执行新提交的任务。该方法在抽象类的子类——ThreadPoolExecutor中具体实现。

3.2 开始执行任务——execute

任务提交后需要执行起来,submit中执行的方法execute(ftask)在抽象类的子类——ThreadPoolExecutor.execute中具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

可以看到,当工作线程较少,还不到核心线程数时,该方法会添加一个新线程,并把输入的Runnable command交给新线程执行。

如果已经达到核心线程数,该方法进行了一系列谨慎的检查工作,并且把输入的Runnable command加入到了workQueue中,具体地:

检查线程池控制字判断线程池是否在工作,如果是,就把任务加入工作队列workQueue.offer(command)

  • 如果任务能够顺利加入工作队列(true),那么会有工作线程去处理它。典型的工作队列LinkedBlockingQueue就属于这一类,这种情况下,线程数量不会超过corePoolSize的核心线程数。
    • 当然,因为避免用重型锁,这里采用了CAS锁的形式,来避免加入任务后,线程池关闭、零线程的情况。
      • 如果第二次检查发现线程池不运行了,就移除刚刚加入的任务,并rejectreject方法会进一步调用RejectedExecutionHandler实例的handler.rejectedExecution(command, this);方法以便处理这种任务被线程池拒绝的情况);
      • 如果线程池还在运行,但是没有工作线程,就新建一个线程来处理工作队列中新加入的任务。
  • 如果任务不能顺利加入工作队列(false),那么就需要启动新的工作线程。典型的工作队列SynchronousQueue就属于这一类,如果没有线程阻塞在读取上,就无法插入新的任务,即会返回false。这么一来,就会启动新的线程,毕竟有阻塞在读取上的线程,才能加入新的任务。这意味着线程数量完全有可能超过corePoolSize规定的核心线程数。

3.3 工作队列——BlockingQueue

工作队列是一个阻塞队列,用于解决生产者-消费者问题。也就是说,execute扮演的是一个生产者的角色,它负责把检查过的任务加入到工作队列中,供线程池中的工作线程取出并执行。

ThreadPoolExecutor中,工作队列workQueue是一个BlockingQueue<Runnable>

1
2
3
4
5
6
7
8
9
10
11
12
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;

3.4 添加工作线程——addWorker

在上述execute方法中,要执行Runnable任务,需要线程池中有工作线程,是通过调用addWorker实现的。

具体地,线程池工作线程的创建和添加操作在ThreadPoolExecutor.addWorker方法中具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();

if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

其中,会根据输入参数boolean core来约束工作线程数的上限。如果core==true,则线程数不超过corePoolSize;否则线程数上限为maxPoolSize

3.5 线程池工作线程——runWorker

线程池中保有的工作线程作为消费者的一方,要从工作队列中取出任务并执行。

具体地,线程池工作线程的主循环在ThreadPoolExecutor.runWorker方法中具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

开头task初始值为firstTask,是线程池的成员变量,用于引用初始任务,通常为空。因此,线程池的主线程作为workQueue的消费者,通常情况下都是通过getTask()方法来取出任务。

取出任务后,谨慎地进行线程池的状态检查,并在运行任务的前后,分别调用beforeExecuteafterExecute方法。这两个方法在ThreadPoolExecutor中实现内容为空,也就是说不做任何事情。这两个方法是预留的,可以被继承实现,以增加额外的检查和功能(如:记录日志)。

运行任务显得很简单,线程池的工作线程执行Runnable任务实例的task,run()方法即可。

3.6 从工作队列中取任务——getTask

ThreadPoolExecutor.getTask()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();

// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

可以看到,根据timed变量,决定是限时等待型读取或是阻塞型读取。

  • 如果timed==true,则在主循环中限时等待提取工作队列中的任务,即workQueue.poll方法:
    • 如果工作队列中有任务,则立即返回;
    • 如果没有,则等待指定时间,如果仍然没有,则返回null
    • (Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.)
  • 如果timed==false,则在主循环中阻塞式提取工作队列中任务,即workQueue.take方法:
    • 如果工作队列中有任务,则立即返回;
    • 如果没有,则线程阻塞,直到生产者加入任务后,有任务实例再返回。

4 J.U.C阻塞队列

上述“3.2 开始执行任务——execute”中解释了LinkedBlockingQueue与SynchronousQueue为什么会分别用于不同的ThreadPoolExecutor。尤其是同步队列,CachedThreadPool依靠其插入失败就可以检测到没有数量匹配的读线程,由此增加线程池的线程数。

4.1 无容量的同步队列——SynchronousQueue

这个容器就比较特别了,虽然名字是队列,但实际上没有任何容量。

A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.

使用该容器时,必须先取再插。也就是说,对于一个同步队列,如果没有任何线程在读取它,别的线程就无法对其插入新数据。通常,需要先启动一个线程读取同步队列,此时同步队列尚无数据,则该读线程会处于阻塞等待地状态。随后,启动一个线程向同步队列中插入数据,此时,阻塞等待数据的读线程会唤醒并读取插入数据。

因为其插入时必须要有读线程的特性,该容器被应用于检测读线程少于插入任务数量的情况,引导线程池增加新线程。

4.2 链表阻塞队列——LinkedBlockingQueue

不难理解,这是一个基于链表实现的阻塞队列。

An optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

既然是链表实现,那一般理解是可以无界的,当然也可以指定大小限定为有界。

链表阻塞队列采用FIFO模式,队列头是最早插入的,队列尾是最新插入的,取出时依据FIFO顺序。

基于链表阻塞队列在并发应用中吞吐量通常比基于数组的阻塞队列更大,因为基于链表的阻塞队列不至于同步锁定整个数组容器。基于链表的阻塞队列实际上在读写时,锁定入队和出队的位置就可以了。

4.3 数组阻塞队列——ArrayBlockingQueue

不难理解,这是一个基于数组实现的阻塞队列。

A bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
This is a classic “bounded buffer”, in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

既然是基于数组实现的,那容器肯定是有界的,创建时就确定的,无法动态变化。这样一来,如果数组中存满了,再插入新的数据就需要阻塞至有元素被取走,同样地,如果数组中没有元素,读取操作需要阻塞至有元素被插入。

另外,数组阻塞队列还存在一个公平策略,如果严格要求保障FIFO的出入队列顺序,需要启用公平策略,这样可以避免饥饿问题(因为否则的话,可能有些元素长时间都不会被轮到取出来),但是也会减少吞吐量。

支持我的写作!(Support my writing!)

欢迎关注我的其它发布渠道