基于NIO的请求-响应双向Socket通信网络编程

CyC2018的NIO Socket单向通信样例改写为双向通信,记录下其中的机制、易出问题的要点。

基于NIO的请求-响应双向Socket通信网络编程

1 概述

NIO是Java的New IO的缩写,基于块来提高读写数据的性能,而非基于流。

通过Selector对SocketChannel进行非阻塞式处理,可以提高线程效率。

因为本来Server端线程accept Client端发来的Socket连接之后,需要阻塞线程,等待数据发过来变为可读后才能继续处理,而有了select机制后,就不需要阻塞线程了,线程可以去看看别的IO情况如何,无需阻塞线程进行等待。

2 代码

2.1 服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package sample;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/*
[Server] started listening
[Server] selected acceptable socket
[Server] selected readable socket
[Server] starts reading
Client: 'Hello!'
[Server] finished reading
[Server] selected writable socket
[Server] starts writing
[Server] finished writing
[Server] closed writable socket
*/

public class NIOServer {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

ServerSocket serverSocket = ssChannel.socket();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
serverSocket.bind(address);

System.out.println("[Server] started listening");

while (true) {
// selector blocks until at least one channel is selected
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();

while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();

if (selectionKey.isAcceptable()) {
System.out.println("[Server] selected acceptable socket");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();

// create a SocketChannel for accepted request socket
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false); // configure as non-blocking

// register the socketChannel
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
System.out.println("[Server] selected readable socket");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

System.out.println("[Server] starts reading");
System.out.println(readDataFromSocketChannel(socketChannel));
System.out.println("[Server] finished reading");

socketChannel.register(selector, SelectionKey.OP_WRITE);
} else if (selectionKey.isWritable()) {
System.out.println("[Server] selected writable socket");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

System.out.println("[Server] starts writing");
writeDataToSocketChannel(socketChannel);
System.out.println("[Server] finished writing");

socketChannel.close();
System.out.println("[Server] closed writable socket");
}

keyIterator.remove();
}
}
}

private static String readDataFromSocketChannel(SocketChannel socketChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder stringBuilder = new StringBuilder();

while (true) {
buffer.clear();
int n = socketChannel.read(buffer);
if (n == -1) {
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0; i < limit; i++) {
dst[i] = (char) buffer.get(i);
}
stringBuilder.append(dst);
buffer.clear();
}
return stringBuilder.toString();
}

private static void writeDataToSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
String s = "Server: 'Have a nice day!'";
buffer.put(s.getBytes(StandardCharsets.UTF_8)); // write to buffer
buffer.flip(); // flip buffer's mode from 'write' to 'read'
sChannel.write(buffer); // read from buffer, write to channel
buffer.clear();
}
}

2.2 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package sample;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

/*
[Client] starts writing
[Client] finished writing
[Client] starts reading
Server: 'Have a nice day!'
[Client] fininshed reading
[Client] closed socket
*/

public class NIOClient {

public static void main(String[] args) throws IOException {
// open socket
Socket socket = new Socket("127.0.0.1", 8888);

// write request to server
OutputStream out = socket.getOutputStream();
String s = "Client: 'Hello!'";
System.out.println("[Client] starts writing");
out.write(s.getBytes());
socket.shutdownOutput();
System.out.println("[Client] finished writing");

// read response from server
InputStream in = socket.getInputStream();
byte[] readBuffer = new byte[1024];
System.out.println("[Client] starts reading");
StringBuilder stringBuilder = new StringBuilder();
while (true) {
int ret = in.read(readBuffer);
if (ret == -1) {
break;
}
stringBuilder.append(new String(readBuffer, StandardCharsets.UTF_8));
}
System.out.println(stringBuilder.toString());
System.out.println("[Client] fininshed reading");

// close socket
socket.close();
System.out.println("[Client] closed socket");
}
}

2.3 实验

首先运行服务器端程序开始监听,然后运行客户端程序建立连接。

服务器端应看到如下输出:

1
2
3
4
5
6
7
8
9
10
[Server] started listening
[Server] selected acceptable socket
[Server] selected readable socket
[Server] starts reading
Client: 'Hello!'
[Server] finished reading
[Server] selected writable socket
[Server] starts writing
[Server] finished writing
[Server] closed writable socket
  • Server读取到了Client发来的请求Client: Hello!,并作出响应。

客户端应看到如下输出:

1
2
3
4
5
6
[Client] starts writing
[Client] finished writing
[Client] starts reading
Server: 'Have a nice day!'
[Client] fininshed reading
[Client] closed socket
  • Client发送请求给Server端,随后读取到Server端发来的响应Client: 'Hello!'

3 注意点

相较于CyC2018的单向(仅请求)通信样例,我想要实现的是双向请求-响应双向通信。这意味着Client在发送完数据后,不可以关闭socket,因为还需要接收响应数据,应该在双向通信完成后再最终关闭socket。

当我将Client代码中的out.close()改为out.flush()后,发现服务器端陷入死循环。调试后才发现,服务器端除了第一次正确读取全部数据之后,后续读取read函数返回值均为0,正确情况下应该是-1表示End of Stream。

经查,发现其实需要调用socket.shutdownOutput();来关闭TCP的输出流,告知对方输出已结束,这样对方read才会正确返回-1,即EOS。

SocketshutdownInput文档说明为:

shutdownInput public void shutdownInput() throws IOException Places the input stream for this socket at "end of stream". Any data sent to the input stream side of the socket is acknowledged and then silently discarded. If you read from a socket input stream after invoking this method on the socket, the stream's available method will return 0, and its read methods will return -1 (end of stream).

Throws: IOException - if an I/O error occurs when shutting down this socket. Since: 1.3 See Also: shutdownOutput(),close(), setSoLinger(boolean, int), isInputShutdown()

SocketshutdownOutput文档说明为:

shutdownOutput public void shutdownOutput() throws IOException Disables the output stream for this socket. For a TCP socket, any previously written data will be sent followed by TCP's normal connection termination sequence. If you write to a socket output stream after invoking shutdownOutput() on the socket, the stream will throw an IOException. Throws: IOException - if an I/O error occurs when shutting down this socket. Since: 1.3 See Also: shutdownInput(), close(), setSoLinger(boolean, int), isOutputShutdown()

4 参考资料

CyC2018 套接字-nio-实例

Socket 通信中由 read 返回值造成的的死锁问题