while (selectionKeys.hasNext()) { SelectionKeyselectionKey= selectionKeys.next();
if (selectionKey.isReadable()) { selectionKey.cancel(); // avoid repeating selecting the same channel SocketChannelsocketChannel= (SocketChannel) selectionKey.channel();
publicabstractclassSelectorimplementsCloseable { /* more */ /** * Opens a selector. * * <p> The new selector is created by invoking the {@link * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method * of the system-wide default {@link * java.nio.channels.spi.SelectorProvider} object. </p> * * @return A new selector * * @throws IOException * If an I/O error occurs */ publicstatic Selector open()throws IOException { return SelectorProvider.provider().openSelector(); }
publicabstractclassSelectorimplementsCloseable { /* more */ /** * Selects a set of keys whose corresponding channels are ready for I/O * operations. * * <p> This method performs a blocking <a href="#selop">selection * operation</a>. It returns only after at least one channel is selected, * this selector's {@link #wakeup wakeup} method is invoked, the current * thread is interrupted, or the given timeout period expires, whichever * comes first. * * <p> This method does not offer real-time guarantees: It schedules the * timeout as if by invoking the {@link Object#wait(long)} method. </p> * * @param timeout If positive, block for up to {@code timeout} * milliseconds, more or less, while waiting for a * channel to become ready; if zero, block indefinitely; * must not be negative * * @return The number of keys, possibly zero, * whose ready-operation sets were updated * * @throws IOException * If an I/O error occurs * * @throws ClosedSelectorException * If this selector is closed * * @throws IllegalArgumentException * If the value of the timeout argument is negative */ publicabstractintselect(long timeout)throws IOException; /* more */ }
abstractclassSelectorImpl extendsAbstractSelector { /* more */ /** * Selects the keys for channels that are ready for I/O operations. * * @param action the action to perform, can be null * @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to * wait indefinitely */ protectedabstractintdoSelect(Consumer<SelectionKey> action, long timeout) throws IOException;
/** * Returns the system-wide default selector provider for this invocation of * the Java virtual machine. * * <p> The first invocation of this method locates the default provider * object as follows: </p> * * <ol> * * <li><p> If the system property * {@code java.nio.channels.spi.SelectorProvider} is defined then it is * taken to be the fully-qualified name of a concrete provider class. * The class is loaded and instantiated; if this process fails then an * unspecified error is thrown. </p></li> * * <li><p> If a provider class has been installed in a jar file that is * visible to the system class loader, and that jar file contains a * provider-configuration file named * {@code java.nio.channels.spi.SelectorProvider} in the resource * directory {@code META-INF/services}, then the first class name * specified in that file is taken. The class is loaded and * instantiated; if this process fails then an unspecified error is * thrown. </p></li> * * <li><p> Finally, if no provider has been specified by any of the above * means then the system-default provider class is instantiated and the * result is returned. </p></li> * * </ol> * * <p> Subsequent invocations of this method return the provider that was * returned by the first invocation. </p> * * @return The system-wide default selector provider */ publicstatic SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( newPrivilegedAction<>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
The AccessController class is used for access control operations and
decisions. More specifically, the AccessController class is used for
three purposes:
to decide whether an access to a critical system resource is to be
allowed or denied, based on the security policy currently in
effect,
to mark code as being "privileged", thus affecting subsequent access
determinations, and
to obtain a "snapshot" of the current calling context so
access-control decisions from a different context can be made with
respect to the saved context.
The list of SelectableChannels serviced by this Selector. Every mod
MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
array, where the corresponding entry is occupied by the
wakeupSocket
PollArrayWrapper
pollWrapper
The global native poll array holds file decriptors and event
masks
List<SelectThread>
threads
A list of helper threads for select.
Pipe
wakeupPipe
Pipe used as a wakeup object.
FdMap
fdMap
Maps file descriptors to their indices in pollArray
SubSelector
subSelector
SubSelector for the main thread
Object
interruptLock
Lock for interrupt triggering and clearing
Object
updateLock
pending new registrations/updates, queued by implRegister and
setEventOps
/** * A multi-threaded implementation of Selector for Windows. * * @author Konstantin Kladko * @author Mark Reinhold */
classWindowsSelectorImplextendsSelectorImpl { /* more */ @Override protectedintdoSelect(Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this); this.timeout = timeout; // set selector timeout processUpdateQueue(); processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); intupdated= updateSelectedKeys(action); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; } /* more */ }
// Represents a helper thread used for select. privatefinalclassSelectThreadextendsThread { privatefinalint index; // index of this thread final SubSelector subSelector; privatelonglastRun=0; // last run number privatevolatileboolean zombie; // Creates a new thread privateSelectThread(int i) { super(null, null, "SelectorHelper", 0, false); this.index = i; this.subSelector = newSubSelector(i); //make sure we wait for next round of poll this.lastRun = startLock.runsCounter; } voidmakeZombie() { zombie = true; } booleanisZombie() { return zombie; } publicvoidrun() { while (true) { // poll loop // wait for the start of poll. If this thread has become // redundant, then exit. if (startLock.waitForStart(this)) { subSelector.freeFDSetBuffer(); return; } // call poll() try { subSelector.poll(index); } catch (IOException e) { // Save this exception and let other threads finish. finishLock.setException(e); } // notify main thread, that this thread has finished, and // wakeup others, if this thread is the first to finish. finishLock.threadFinished(); } } }
privatefinalclassSubSelector { privatefinalint pollArrayIndex; // starting index in pollArray to poll // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. privatefinalint[] readFds = newint [MAX_SELECTABLE_FDS + 1]; privatefinalint[] writeFds = newint [MAX_SELECTABLE_FDS + 1]; privatefinalint[] exceptFds = newint [MAX_SELECTABLE_FDS + 1]; // Buffer for readfds, writefds and exceptfds structs that are passed // to native select(). privatefinallongfdsBuffer= unsafe.allocateMemory(SIZEOF_FD_SET * 3);
privateSubSelector() { this.pollArrayIndex = 0; // main thread }
The epoll_wait() system call waits for events on the epoll(7)
instance referred to by the file descriptor epfd. The memory area
pointed to by events will contain the events that will be available for
the caller. Up to maxevents are returned by epoll_wait(). The maxevents
argument must be greater than zero.
The timeout argument specifies the number of milliseconds that
epoll_wait() will block.