Selector - 从JDK11源码理解Java I/O复用原理

阅读JDK11源码实现的过程中,发现同为java.nio.channels.Selector,是Windows和Linux平台的Selector.open()所构造的Selector的底层实现完全不一样。

Selector - 从JDK11源码理解Java I/O复用原理

Selector是Java NIO中核心的多路复用选择器。线程可以将SocketChannel与选择键注册到Selector上,而Selector会选出I/O状态符合选择键条件的SocketChannel实例。

应用层

线程将SocketChannel实例与选择键注册到Selector上:

1
2
3
4
5
6
7
try {
socketChannel.register(this.selector, SelectionKey.OP_READ); // socketChannel is always Writable
// socketChannel.register(this.selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
this.selector.wakeup();
} catch (ClosedChannelException e) {
e.printStackTrace();
}

Selector可以取出I/O状态符合选择键的SocketChannel集合,遍历处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
try {
this.selector.select();

Set<SelectionKey> selectionKeySet = this.selector.selectedKeys();
Iterator<SelectionKey> selectionKeys = selectionKeySet.iterator();

while (selectionKeys.hasNext()) {
SelectionKey selectionKey = selectionKeys.next();

if (selectionKey.isReadable()) {
selectionKey.cancel(); // avoid repeating selecting the same channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

HttpWorker httpWorker = new HttpWorker(webRoot, socketChannel);
this.executorService.submit(httpWorker);
}

selectionKeys.remove();
}

} catch (IOException e) {
e.printStackTrace();
}

抽象层

Selector

Selector.open()

外部通过Selector.open()方法就可以

1
2
3
import java.nio.channels.Selector;

Selector selector = Selector.open();

Selector抽象类中实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class Selector implements Closeable {

/* more */

/**
* Opens a selector.
*
* <p> The new selector is created by invoking the {@link
* java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
* of the system-wide default {@link
* java.nio.channels.spi.SelectorProvider} object. </p>
*
* @return A new selector
*
* @throws IOException
* If an I/O error occurs
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

/* more */

}

实际上只是一层抽象,具体调用了SelectorProvider来提供和打开Selector实例。

Selector.select()

多路复用的核心功能,选出可进行I/O的通道们的键集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public abstract class Selector implements Closeable {

/* more */

/**
* Selects a set of keys whose corresponding channels are ready for I/O
* operations.
*
* <p> This method performs a blocking <a href="#selop">selection
* operation</a>. It returns only after at least one channel is selected,
* this selector's {@link #wakeup wakeup} method is invoked, the current
* thread is interrupted, or the given timeout period expires, whichever
* comes first.
*
* <p> This method does not offer real-time guarantees: It schedules the
* timeout as if by invoking the {@link Object#wait(long)} method. </p>
*
* @param timeout If positive, block for up to {@code timeout}
* milliseconds, more or less, while waiting for a
* channel to become ready; if zero, block indefinitely;
* must not be negative
*
* @return The number of keys, possibly zero,
* whose ready-operation sets were updated
*
* @throws IOException
* If an I/O error occurs
*
* @throws ClosedSelectorException
* If this selector is closed
*
* @throws IllegalArgumentException
* If the value of the timeout argument is negative
*/
public abstract int select(long timeout) throws IOException;

/* more */

}

该方法在Selector抽象类定义,但具体实现位于作为其子类的SelectorImpl实现类中。

SelectorImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Base Selector implementation class.
*/

abstract class SelectorImpl
extends AbstractSelector
{

/* more */

/**
* Selects the keys for channels that are ready for I/O operations.
*
* @param action the action to perform, can be null
* @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to
* wait indefinitely
*/
protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException;

private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}

@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

@Override
public final int select() throws IOException {
return lockAndDoSelect(null, -1);
}

/* more */

}

SelectorImpl的select方法调用了lockAndDoSelect方法。传入的参数表示不执行任何操作,且默认持续等待。

lockAndDoSelect方法中,用synchronized关键字保护当前Selector对象,实现并发同步。内部也通过isSelect标记来防止并发select操作,实际执行的方法是doSelect方法,该方法在SelectorImpl类中被定义,但没有实现。具体实现取决于其子类,即实现层的实现。

SelectorProvider

SelectorPrivider是一个抽象类。

SelectorProvider.provider()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public abstract class SelectorProvider {

/* more */

/**
* Returns the system-wide default selector provider for this invocation of
* the Java virtual machine.
*
* <p> The first invocation of this method locates the default provider
* object as follows: </p>
*
* <ol>
*
* <li><p> If the system property
* {@code java.nio.channels.spi.SelectorProvider} is defined then it is
* taken to be the fully-qualified name of a concrete provider class.
* The class is loaded and instantiated; if this process fails then an
* unspecified error is thrown. </p></li>
*
* <li><p> If a provider class has been installed in a jar file that is
* visible to the system class loader, and that jar file contains a
* provider-configuration file named
* {@code java.nio.channels.spi.SelectorProvider} in the resource
* directory {@code META-INF/services}, then the first class name
* specified in that file is taken. The class is loaded and
* instantiated; if this process fails then an unspecified error is
* thrown. </p></li>
*
* <li><p> Finally, if no provider has been specified by any of the above
* means then the system-default provider class is instantiated and the
* result is returned. </p></li>
*
* </ol>
*
* <p> Subsequent invocations of this method return the provider that was
* returned by the first invocation. </p>
*
* @return The system-wide default selector provider
*/
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

/* more */

}

这个provider()方法是一个synchronized同步锁保护的单例模式,返回SelectorProvider类型的实例。

具体地,当没有实例时,需要创建实例。

创建实例通过AccessController来执行特权行为。

根据官方文档,AccessController被用于控制操作的权限和决策。

The AccessController class is used for access control operations and decisions. More specifically, the AccessController class is used for three purposes:

  • to decide whether an access to a critical system resource is to be allowed or denied, based on the security policy currently in effect,
  • to mark code as being "privileged", thus affecting subsequent access determinations, and
  • to obtain a "snapshot" of the current calling context so access-control decisions from a different context can be made with respect to the saved context.

这个AccessController起到三种作用:

  1. 检查权限:决定对关键系统资源的访问是否应该批准;
  2. 授予权限:把代码标记为特权代码,以便执行后续操作;
  3. 保存快照:保存当前调用上下文,以便做来自其它上下文的访问控制决策的时候能够考虑到已保存的上下文。

具体地,在此处,AccessController.doPrivileged(...)方法起到的是第二个作用,授予权限,执行特权代码:

Performs the specified PrivilegedAction with privileges enabled.

The action is performed with all of the permissions possessed by the caller's protection domain.

该方法的输入参数是一个实现了PrivilegedAction接口的匿名类,该匿名类实现了接口的run()方法。该方法依靠外层提供的特权权限,来实例化一个SelectorProvider。实例化的过程分三种优先级:

  1. loadProviderFromProperty()
  2. loadProviderAsService()
  3. provider = sun.nio.ch.DefaultSelectorProvider.create()
第一优先级 loadProviderFromProperty

第一优先级通过系统检查java.nio.channels.spi.SelectorProvider是否存在,如果存在则加载,反之,则返回false

Service-provider classes for the java.nio.channels package.

java.nio.channels.spi包提供了一批ServiceProvider的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class SelectorProvider {

/* more */

private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
@SuppressWarnings("deprecation")
Object tmp = Class.forName(cn, true,
ClassLoader.getSystemClassLoader()).newInstance();
provider = (SelectorProvider)tmp;
return true;
} catch (ClassNotFoundException x) {
throw new ServiceConfigurationError(null, x);
} catch (IllegalAccessException x) {
throw new ServiceConfigurationError(null, x);
} catch (InstantiationException x) {
throw new ServiceConfigurationError(null, x);
} catch (SecurityException x) {
throw new ServiceConfigurationError(null, x);
}
}

/* more */

}

该方法首先读取检查系统属性中,键java.nio.channels.spi.SelectorProvider是否有设置值。如果没有,则返回false,如果有,用这个值加载SelectorProvider类。

该方法通过Class.forName方法,指定通过系统类加载器在运行时动态加载系统属性中设置的SelectorProvider类(如指定)。

第二优先级 loadProviderAsService

如果第一优先级所需的java.nio.channels.spi.SelectorProvider不存在,则需要启动第二优先级的加载工作。

如果META-INF/services中,存放了java.nio.channels.spi.SelectorProvider的jar文件,则通过系统类加载器加载该服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class SelectorProvider {

/* more */

private static boolean loadProviderAsService() {

ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}

/* more */

}

该方法通过ServiceLoader来加载服务,选取可见的第一个SelectorProvider实例。

最终优先级 sun.nio.ch.DefaultSelectorProvider

如果上述SelectorProvider都不存在,就会加载sun.nio.ch.DefaultSelectorProvider作为最终选择。

实际运行中,如果没有实现和配置前两种,默认会启用该最终优先级。

DefaultSelectorProvider的对外提供统一的接口,内部仅仅是完成对实现类的实例化,而具体实例化什么类,取决于JDK的操作系统版本。

DefaultSelectorProvider

DefaultSelector.create()

具体地,该sun.nio.ch.DefaultSelectorProvider对外提供一致接口,其create方法实际上仅仅是一层封装,只是实现了一个new实例化操作,但不同操作系统平台的JDK的内部实现不同:

  • 在Windows JDK11中,其实例化的是sun.nio.ch.WindowsSelectorProvider类。
  • 在Linux JDK11中,其实例化的是sun.nio.ch.EPollSelectorProvider类。

Windows JDK11:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates this platform's default SelectorProvider
*/

public class DefaultSelectorProvider {

/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }

/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}

}

Linux JDK11:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates this platform's default SelectorProvider
*/

public class DefaultSelectorProvider {

/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }

/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new EPollSelectorProvider();
}

}

WindowsSelectorProvider与EPollSelectorProvider

这一层都继承自SelectorProviderImpl抽象类,实际上也没有实现什么特别的功能逻辑,只是调用对应的SelectorImpl实现类。

这一层实现了从SelectorProvider到SelectorImpl的交互。

具体到每种SelectorImpl是如何实现的,在下一节实现层具体分析。

WindowsSelectorProvider.openSelector()

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* SelectorProvider for sun.nio.ch.WindowsSelectorImpl.
*
* @author Konstantin Kladko
* @since 1.4
*/

public class WindowsSelectorProvider extends SelectorProviderImpl {

public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}

EPollSelectorProvider.openSelector()

1
2
3
4
5
6
7
8
9
10
11
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}

继承关系小结

Selector系列

  • abstract class Selector
    • abstract class AbstractSelector extends Selector
      • abstract class SelectorImpl exntends AbstractSelector
        • class WindowsSelectorImpl extends SelectorImpl
        • class EPollSelectorImpl extends SelectorImpl

SelectorProvider系列

  • abstract class SelectorProvider
    • abstract class SelectorProviderImpl extends SelectorProvider
      • class WindowsSelectorProvider extends SelectorProviderImpl
      • class EPollSelectorProvider extends SelectorProviderImpl
  • class DefaultSelectorProvider

实现层

Windows JDK11的实现

在Windows JDK11中,其实例化的是sun.nio.ch.WindowsSelectorProvider类返回给上层使用。

WindowsSelectorProvider

简单回顾一下,WindowsSelectorProvider实现了从对外的SelectorProvider到具体的WindowsSelectorImpl实现类的转接。

该类继承自SelectorProviderImpl抽象类,是对其的具体实现,供外部抽象层调用,实现的只是转接调用,调用WindowsSelectorImpl这一个实现类。

1
2
3
4
5
6
public class WindowsSelectorProvider extends SelectorProviderImpl {

public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}

WindowsSelectorImpl

概括地来讲,WindowsSelectorImpl的底层实现是通过JNI接口调用地本地poll方法,但是不是简单调用,而是进行了多线程的改进。

为什么要采用多线程呢?因为poll方法本身可以处理的文件描述符(file descriptor)数量是有限的,一般和select方法类似,不超过1024个。实际的应用场景中,需要并发处理的文件描述符是完全有可能超过这个上限的。Windows JDK11中的实现则采用多线程对poll进行改进,一个线程能处理的文件描述符数量是有限的,那么如果文件描述符数量很多,用多个线程分摊处理不就好了么。

主要数据结构
类型 变量 说明
SelectionKeyImpl[] channelArray The list of SelectableChannels serviced by this Selector. Every mod MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll array, where the corresponding entry is occupied by the wakeupSocket
PollArrayWrapper pollWrapper The global native poll array holds file decriptors and event masks
List<SelectThread> threads A list of helper threads for select.
Pipe wakeupPipe Pipe used as a wakeup object.
FdMap fdMap Maps file descriptors to their indices in pollArray
SubSelector subSelector SubSelector for the main thread
Object interruptLock Lock for interrupt triggering and clearing
Object updateLock pending new registrations/updates, queued by implRegister and setEventOps
Deque<SelectionKeyImpl> newKeys
Deque<SelectionKeyImpl> updateKeys
WindowsSelectorImpl.doSelect()

Windows平台JDK11是如何select出对应状态的SocketChannel的呢?

抽象层的Selector.select()调用由SelectorImpl.select()实现,而该实现主要是调用了SelectorImpl.lockAndDoSelect(),其中调用SelectorImpl.doSelect(),该方法在Windows平台的JDK11中由WindowsSelectorImpl.doSelect()具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* A multi-threaded implementation of Selector for Windows.
*
* @author Konstantin Kladko
* @author Mark Reinhold
*/

class WindowsSelectorImpl extends SelectorImpl {

/* more */

@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue();
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys(action);
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}

/* more */

}

调用WindowsSelectorImpl.doSelect()方法,执行的流程主要为:

  1. 首先进行了一些状态更新,处理新的注册和修改、被取消的键集。
  2. 随后计算所需线程数量,准备多线程poll操作所需的辅助线程(helper threads)。
    1. 如果主线程就足够处理当前这么多的描述符了,那就不需要再启动辅助线程了;
    2. 如果主线程没法独自处理大量的描述符,那就需要创建并启动辅助线程来帮忙。
  3. 主线程本身当然是要承担poll的工作的,即subSelector.poll(),这是主线程自己调用自己的subSelector在执行poll操作。
  4. 如果有辅助线程帮忙,即threads.size()>0的情况,那么就需要通过finishLock.waitForHelperThreads()的同步操作来等待辅助线程们完成他们的工作。
  5. 至此,poll的处理就完成了,此后进行一些收尾的检查,状态的更新,即可返回本次doSelect操作更新过的键的数量。

WindowsSelectorImpl.SelectThread

WindowsSelectorImpl.SelectThread.run()

辅助线程是WindowsSelectorImpl.SelectThread类的实例,线程类最核心的内容就是其实现的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread
private SelectThread(int i) {
super(null, null, "SelectorHelper", 0, false);
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
void makeZombie() {
zombie = true;
}
boolean isZombie() {
return zombie;
}
public void run() {
while (true) { // poll loop
// wait for the start of poll. If this thread has become
// redundant, then exit.
if (startLock.waitForStart(this)) {
subSelector.freeFDSetBuffer();
return;
}
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// notify main thread, that this thread has finished, and
// wakeup others, if this thread is the first to finish.
finishLock.threadFinished();
}
}
}

不难发现,辅助线程的线程类的实现中,其执行的核心操作其实就是调用了subSelector.poll(index),以此对本线程负责的文件描述符进行poll操作。

那这个subSelector又是怎么做的呢?

WindowsSelectorImpl.SubSelector

前面介绍了主线程和辅助线程,两者都有一个subSelector实例,他们在执行poll操作的时候都是调用的subSelector.poll()

WindowsSelectorImpl.SubSelector.poll()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final class SubSelector {
private final int pollArrayIndex; // starting index in pollArray to poll
// These arrays will hold result of native select().
// The first element of each array is the number of selected sockets.
// Other elements are file descriptors of selected sockets.
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
// Buffer for readfds, writefds and exceptfds structs that are passed
// to native select().
private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 3);

private SubSelector() {
this.pollArrayIndex = 0; // main thread
}

private SubSelector(int threadIndex) { // helper threads
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
}

private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout, fdsBuffer);
}

private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout, fdsBuffer);
}

private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);

/* more */

}

查看源码可知,SubSelector的poll()poll(index)方法实际上都是对poll0()方法的一层适配封装,实际上调用的就是poll0()

WindowsSelectorImpl.SubSelector.poll0()

从上面的源码可以看到,poll0方法并不是在Java中实现的,而是通过JNI调用的本地实现。

Linux JDK11的实现

在Linux JDK11中,其实例化的是sun.nio.ch.EPollSelectorProvider类返回给上层使用。

EPollSelectorProvider

类似的,Linux JDK11是通过EPollSelectorProvider提供外部访问接口的。

1
2
3
4
5
6
7
8
9
10
11
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}

openSelector方法主要是通过EPollSelectorImpl实现类来实例化一个EPollSelector并返回。

EPollSelectorImpl

EPollSelectorImpl.doSelector

Linux平台JDK11是如何select出对应状态的SocketChannel的呢?

实质上是调用的EPoll.wait方法来返回已经就绪的文件描述符数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Linux epoll based Selector implementation
*/

class EPollSelectorImpl extends SelectorImpl {

/* more */

@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);

// epoll_wait timeout is int
int to = (int) Math.min(timeout, Integer.MAX_VALUE);
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);

int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
begin(blocking);

do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);

} finally {
end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}

/* more */
}

具体地,在EPollSelectorImpl.doSelect方法中,和WindowsSelectorImpl中的实现类似:

  1. 首先都有必要检查和更新状态,处理修改队列和取消注册队列;
  2. 通过EPoll.wait方法来获取处于就绪状态的I/O文件描述符数量;
  3. 最后更新状态,返回本次doSelect更新过的键的数量。

EPoll

EPoll作为Linux内核提供的多路复用器,JDK11选择通过JNI接口来调用其功能。

JDK11中EPoll类是一个简易的包装类,epoll的实现不由JDK负责。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/**
* Provides access to the Linux epoll facility.
*/

class EPoll {
private EPoll() { }

private static final Unsafe unsafe = Unsafe.getUnsafe();

/**
* typedef union epoll_data {
* void *ptr;
* int fd;
* __uint32_t u32;
* __uint64_t u64;
* } epoll_data_t;
*
* struct epoll_event {
* __uint32_t events;
* epoll_data_t data;
* }
*/
private static final int SIZEOF_EPOLLEVENT = eventSize();
private static final int OFFSETOF_EVENTS = eventsOffset();
private static final int OFFSETOF_FD = dataOffset();

// opcodes
static final int EPOLL_CTL_ADD = 1;
static final int EPOLL_CTL_DEL = 2;
static final int EPOLL_CTL_MOD = 3;

// events
static final int EPOLLIN = 0x1;
static final int EPOLLOUT = 0x4;

// flags
static final int EPOLLONESHOT = (1 << 30);

/**
* Allocates a poll array to handle up to {@code count} events.
*/
static long allocatePollArray(int count) {
return unsafe.allocateMemory(count * SIZEOF_EPOLLEVENT);
}

/**
* Free a poll array
*/
static void freePollArray(long address) {
unsafe.freeMemory(address);
}

/**
* Returns event[i];
*/
static long getEvent(long address, int i) {
return address + (SIZEOF_EPOLLEVENT*i);
}

/**
* Returns event->data.fd
*/
static int getDescriptor(long eventAddress) {
return unsafe.getInt(eventAddress + OFFSETOF_FD);
}

/**
* Returns event->events
*/
static int getEvents(long eventAddress) {
return unsafe.getInt(eventAddress + OFFSETOF_EVENTS);
}

// -- Native methods --

private static native int eventSize();

private static native int eventsOffset();

private static native int dataOffset();

static native int create() throws IOException;

static native int ctl(int epfd, int opcode, int fd, int events);

static native int wait(int epfd, long pollAddress, int numfds, int timeout)
throws IOException;

static {
IOUtil.load();
}
}
EPoll.wait
1
2
static native int wait(int epfd, long pollAddress, int numfds, int timeout)
throws IOException;

该方法调用的应该是Linux中的epoll_wait系统调用。根据man epoll_wait查阅的Linux手册,具体说明:

The epoll_wait() system call waits for events on the epoll(7) instance referred to by the file descriptor epfd. The memory area pointed to by events will contain the events that will be available for the caller. Up to maxevents are returned by epoll_wait(). The maxevents argument must be greater than zero.

The timeout argument specifies the number of milliseconds that epoll_wait() will block.

也就是说,JDK调用的EPoll.wait方法会在timeout时间内阻塞等待epoll的文件描述符epfd所引用的事件发生,发生后,其返回的结果是代表事件数量的整数。

总结

从表层的Selector查到底层的WindowsSelectorImplEPollImpl,经过一层层抽丝剥茧,可以看到JDK在设计上清晰地体现着将抽象与实现分离的“依赖倒置原则”——顶层调用不应该依赖于底层实现,底层实现也不应该针对于顶层调用,双方都应该依赖于抽象。

考虑到Linux内核已经提供了好用的epoll多路复用,足以处理大规模的并发连接,JDK11通过JNI接口对epoll相关的系统调用进行本地调用即可,其实现也显得相对简单。Windows并未提供Epoll这样的多路复用模型,为解决poll存在的并发连接数量有限的问题,JDK11通过分而治之的分治思想,拉辅助线程来分担任务,通过实现动态多线程poll巧妙地实现了处理大量并发连接的能力。

最终,无论是Windows还是Linux,要想研究多路复用机制的更深层的实现原理,还是需要研究操作系统层级的实现原理。