network-connect.md 33.9 KB
Newer Older
沉默王二's avatar
沉默王二 已提交
1
---
沉默王二's avatar
沉默王二 已提交
2 3
title: Java NIO 网络编程实践聊天室:从入门到精通
shortTitle: NIO 实现简易版聊天室
沉默王二's avatar
沉默王二 已提交
4 5 6 7
category:
  - Java核心
tag:
  - Java NIO
沉默王二's avatar
沉默王二 已提交
8
description: Java NIO 网络编程实践涉及 SocketChannel、ServerSocketChannel、阻塞与非阻塞模式、Scatter 和 Gather 数据传输、异步套接字通道(AsynchronousSocketChannel 和 AsynchronousServerSocketChannel),以及简单聊天室实现。NIO 提供了高效、灵活且可扩展的 I/O 处理方式,适用于大型应用程序和高并发场景。
沉默王二's avatar
nio  
沉默王二 已提交
9
author: 沉默王二
沉默王二's avatar
沉默王二 已提交
10 11 12
head:
  - - meta
    - name: keywords
沉默王二's avatar
沉默王二 已提交
13
      content: java,nio,网络编程,SocketChannel,ServerSocketChannel,AsynchronousSocketChannel,AsynchronousServerSocketChannel,聊天室
沉默王二's avatar
沉默王二 已提交
14 15
---

沉默王二's avatar
沉默王二 已提交
16
# 12.5 NIO 实现简易版聊天室
沉默王二's avatar
nio  
沉默王二 已提交
17

沉默王二's avatar
nio  
沉默王二 已提交
18
在此之前,我们曾利用 Java 的套接字 Socket 和 ServerSocket 完成[网络编程](https://tobebetterjavaer.com/socket/socket.html),但 Socket 和 ServerSocket 是基于 Java IO 的,在网络编程方面,性能会比较差。[原因我们在之前也讲过](https://tobebetterjavaer.com/nio/nio-better-io.html)
沉默王二's avatar
沉默王二 已提交
19

沉默王二's avatar
nio  
沉默王二 已提交
20
那 Java NIO 的 SocketChannel 和 ServerSocketChannel 性能怎么样呢?
沉默王二's avatar
nio  
沉默王二 已提交
21

沉默王二's avatar
nio  
沉默王二 已提交
22
### SocketChannel 和 ServerSocketChannel
沉默王二's avatar
nio  
沉默王二 已提交
23

沉默王二's avatar
nio  
沉默王二 已提交
24
在学习 NIO 的[第一讲里](https://tobebetterjavaer.com/nio/nio-better-io.html),我们已经介绍过 SocketChannel 和 ServerSocketChannel了,这里再简单补充下。
沉默王二's avatar
nio  
沉默王二 已提交
25

沉默王二's avatar
nio  
沉默王二 已提交
26
ServerSocketChannel 用于创建服务器端套接字,而 SocketChannel 用于创建客户端套接字。它们都支持阻塞和非阻塞模式,通过设置其 blocking 属性来切换。阻塞模式下,读/写操作会一直阻塞直到完成,而非阻塞模式下,读/写操作会立即返回。
沉默王二's avatar
nio  
沉默王二 已提交
27

沉默王二's avatar
nio  
沉默王二 已提交
28
阻塞模式:
沉默王二's avatar
nio  
沉默王二 已提交
29

沉默王二's avatar
nio  
沉默王二 已提交
30 31
- 优点:编程简单,适合低并发场景。
- 缺点:性能较差,不适合高并发场景。
沉默王二's avatar
nio  
沉默王二 已提交
32

沉默王二's avatar
nio  
沉默王二 已提交
33
非阻塞模式:
沉默王二's avatar
nio  
沉默王二 已提交
34

沉默王二's avatar
nio  
沉默王二 已提交
35 36
- 优点:性能更好,适合高并发场景。
- 缺点:编程相对复杂。
沉默王二's avatar
nio  
沉默王二 已提交
37

沉默王二's avatar
nio  
沉默王二 已提交
38
我们来看一个简单的示例(阻塞模式下):
沉默王二's avatar
nio  
沉默王二 已提交
39

沉默王二's avatar
nio  
沉默王二 已提交
40
先来看 Server 端的:
沉默王二's avatar
nio  
沉默王二 已提交
41 42

```java
沉默王二's avatar
nio  
沉默王二 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
public class BlockingServer {
    public static void main(String[] args) throws IOException {
        // 创建服务器套接字
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        // 设置为阻塞模式(默认为阻塞模式)
        serverSocketChannel.configureBlocking(true);

        while (true) {
            // 接收客户端连接
            SocketChannel socketChannel = serverSocketChannel.accept();
            // 分配缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            // 读取数据
            int bytesRead = socketChannel.read(buffer);
            while (bytesRead != -1) {
                buffer.flip();
                System.out.println(StandardCharsets.UTF_8.decode(buffer));
                buffer.clear();
                bytesRead = socketChannel.read(buffer);
            }
            // 关闭套接字
            socketChannel.close();
        }
    }
沉默王二's avatar
nio  
沉默王二 已提交
70 71
}
```
沉默王二's avatar
沉默王二 已提交
72

沉默王二's avatar
nio  
沉默王二 已提交
73
简单解释一下这段代码,也比较好理解。
沉默王二's avatar
沉默王二 已提交
74

沉默王二's avatar
nio  
沉默王二 已提交
75
首先创建服务器端套接字ServerSocketChannel,然后绑定 8080 端口,接着使用 while 循环监听客户端套接字。如果接收到客户端连接 SocketChannel,就从通道里读取数据到缓冲区 ByteBuffer,一直读到通道里没有数据,关闭当前通道。
沉默王二's avatar
沉默王二 已提交
76

沉默王二's avatar
nio  
沉默王二 已提交
77
其中 `serverSocketChannel.configureBlocking(true)` 用来设置通道为阻塞模式(可以缺省)。
沉默王二's avatar
沉默王二 已提交
78

沉默王二's avatar
nio  
沉默王二 已提交
79
再来看客户端的:
沉默王二's avatar
沉默王二 已提交
80 81

```java
沉默王二's avatar
nio  
沉默王二 已提交
82
public class BlockingClient {
沉默王二's avatar
沉默王二 已提交
83
    public static void main(String[] args) throws IOException {
沉默王二's avatar
nio  
沉默王二 已提交
84 85 86 87 88
        // 创建客户端套接字
        SocketChannel socketChannel = SocketChannel.open();
        // 连接服务器
        socketChannel.connect(new InetSocketAddress("localhost", 8080));
        // 分配缓冲区
沉默王二's avatar
沉默王二 已提交
89 90
        ByteBuffer buffer = ByteBuffer.allocate(1024);

沉默王二's avatar
nio  
沉默王二 已提交
91 92 93 94 95 96
        // 向服务器发送数据
        buffer.put("沉默王二,这是来自客户端的消息。".getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        socketChannel.write(buffer);
        // 清空缓冲区
        buffer.clear();
沉默王二's avatar
沉默王二 已提交
97

沉默王二's avatar
nio  
沉默王二 已提交
98
        // 关闭套接字
沉默王二's avatar
沉默王二 已提交
99 100 101 102 103
        socketChannel.close();
    }
}
```

沉默王二's avatar
nio  
沉默王二 已提交
104
客户端代码就更简单了,建立通道 SocketChannel,连接服务器,然后在缓冲区里放一段数据,之后写入到通道中,关闭套接字。
沉默王二's avatar
沉默王二 已提交
105

沉默王二's avatar
nio  
沉默王二 已提交
106
先运行 BlockingServer,再运行 BlockingClient,可以在 Server 端的控制台收到以下信息。
沉默王二's avatar
沉默王二 已提交
107

沉默王二's avatar
nio  
沉默王二 已提交
108
![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407124624.png)
沉默王二's avatar
沉默王二 已提交
109

沉默王二's avatar
nio  
沉默王二 已提交
110
好,我们再来看非阻塞模式下的示例。
沉默王二's avatar
沉默王二 已提交
111

沉默王二's avatar
nio  
沉默王二 已提交
112
先来看 Server 端:
沉默王二's avatar
沉默王二 已提交
113

沉默王二's avatar
nio  
沉默王二 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127
```java
public class NonBlockingServer {
    public static void main(String[] args) throws IOException {
        // 创建服务器套接字
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);

        // 创建选择器
        Selector selector = Selector.open();
        // 注册服务器套接字到选择器
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
沉默王二's avatar
沉默王二 已提交
128

沉默王二's avatar
nio  
沉默王二 已提交
129 130 131 132
        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectedKeys.iterator();
沉默王二's avatar
沉默王二 已提交
133

沉默王二's avatar
nio  
沉默王二 已提交
134 135 136
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
沉默王二's avatar
沉默王二 已提交
137

沉默王二's avatar
nio  
沉默王二 已提交
138 139 140 141 142 143
                if (key.isAcceptable()) {
                    // 接收客户端连接
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
沉默王二's avatar
沉默王二 已提交
144

沉默王二's avatar
nio  
沉默王二 已提交
145 146 147 148 149
                if (key.isReadable()) {
                    // 读取数据
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = socketChannel.read(buffer);
沉默王二's avatar
沉默王二 已提交
150

沉默王二's avatar
nio  
沉默王二 已提交
151 152 153 154 155 156 157 158 159 160 161
                    if (bytesRead != -1) {
                        buffer.flip();
                        System.out.print(StandardCharsets.UTF_8.decode(buffer));
                        buffer.clear();
                    } else {
                        // 客户端已断开连接,取消选择键并关闭通道
                        key.cancel();
                        socketChannel.close();
                    }
                }
            }
沉默王二's avatar
沉默王二 已提交
162 163 164 165 166
        }
    }
}
```

沉默王二's avatar
nio  
沉默王二 已提交
167
与之前阻塞模式相同的,我们就不再赘述了,只说不同的。
沉默王二's avatar
沉默王二 已提交
168

沉默王二's avatar
nio  
沉默王二 已提交
169
①、首先,创建一个 ServerSocketChannel,并将其设置为非阻塞模式。
沉默王二's avatar
沉默王二 已提交
170 171

```java
沉默王二's avatar
nio  
沉默王二 已提交
172 173 174
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
```
沉默王二's avatar
沉默王二 已提交
175

沉默王二's avatar
nio  
沉默王二 已提交
176
②、创建一个 Selector 实例,用于处理多个通道的事件。
沉默王二's avatar
沉默王二 已提交
177

沉默王二's avatar
nio  
沉默王二 已提交
178 179 180
```java
Selector selector = Selector.open();
```
沉默王二's avatar
沉默王二 已提交
181

沉默王二's avatar
nio  
沉默王二 已提交
182
③、将 ServerSocketChannel 注册到 Selector 上,并设置感兴趣的事件为 OP_ACCEPT。这意味着当有新的客户端连接请求时,Selector 会通知我们。
沉默王二's avatar
沉默王二 已提交
183

沉默王二's avatar
nio  
沉默王二 已提交
184 185 186
```java
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
```
沉默王二's avatar
沉默王二 已提交
187

沉默王二's avatar
nio  
沉默王二 已提交
188
看一下 OP_ACCEPT 的注释:
沉默王二's avatar
沉默王二 已提交
189

沉默王二's avatar
nio  
沉默王二 已提交
190
![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407130621.png)
沉默王二's avatar
沉默王二 已提交
191 192


沉默王二's avatar
nio  
沉默王二 已提交
193
④、循环处理 Selector 中的事件。首先调用 `selector.select()` 方法来等待感兴趣的事件发生。这个方法会阻塞,直到至少有一个感兴趣的事件发生。
沉默王二's avatar
沉默王二 已提交
194

沉默王二's avatar
nio  
沉默王二 已提交
195 196 197 198 199
```java
while (true) {
    int readyChannels = selector.select();
    if (readyChannels == 0) {
        continue;
沉默王二's avatar
沉默王二 已提交
200
    }
沉默王二's avatar
nio  
沉默王二 已提交
201
    // ...
沉默王二's avatar
沉默王二 已提交
202 203 204
}
```

沉默王二's avatar
nio  
沉默王二 已提交
205
⑤、当 `selector.select()` 返回时,我们可以通过 `selector.selectedKeys()` 获取所有已就绪的事件,并对其进行迭代处理。在处理事件时,根据 SelectionKey 的类型来执行相应的操作。
沉默王二's avatar
沉默王二 已提交
206 207

```java
沉默王二's avatar
nio  
沉默王二 已提交
208 209 210 211 212 213 214 215 216
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    // 处理事件
    // ...
    keyIterator.remove();
}
```
沉默王二's avatar
沉默王二 已提交
217

沉默王二's avatar
nio  
沉默王二 已提交
218
⑥、当 SelectionKey 的类型为 OP_ACCEPT 时,说明有新的客户端连接请求。此时,我们需要接受新的连接,并将新创建的 SocketChannel 设置为非阻塞模式。然后,将该 SocketChannel 注册到 Selector 上,并设置感兴趣的事件为 OP_READ。
沉默王二's avatar
沉默王二 已提交
219

沉默王二's avatar
nio  
沉默王二 已提交
220 221 222 223 224 225
```java
if (key.isAcceptable()) {
    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    SocketChannel client = server.accept();
    client.configureBlocking(false);
    client.register(selector, SelectionKey.OP_READ);
沉默王二's avatar
沉默王二 已提交
226 227 228
}
```

沉默王二's avatar
nio  
沉默王二 已提交
229
⑦、当 SelectionKey 的类型为 OP_READ 时,说明有客户端发送了数据。我们需要从 SocketChannel 中读取数据,并进行相应的处理。
沉默王二's avatar
沉默王二 已提交
230

沉默王二's avatar
nio  
沉默王二 已提交
231 232 233 234 235 236 237 238 239
```java
if (key.isReadable()) {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int bytesRead = client.read(buffer);
    // 对读取到的数据进行处理
    // ...
}
```
沉默王二's avatar
沉默王二 已提交
240

沉默王二's avatar
nio  
沉默王二 已提交
241
⑧、(如果可以的话)当 SelectionKey 的类型为 OP_WRITE 时,说明可以向客户端发送数据。我们可以将要发送的数据写入 SocketChannel。
沉默王二's avatar
沉默王二 已提交
242

沉默王二's avatar
nio  
沉默王二 已提交
243 244 245 246 247 248 249
```java
if (key.isWritable()) {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.wrap("你好,客户端".getBytes());
   client.write(buffer);
}
```
沉默王二's avatar
沉默王二 已提交
250

沉默王二's avatar
nio  
沉默王二 已提交
251
不过,本例中并没有这一步。如果需要的话,可以按照这样的方式向客户端写入数据。
沉默王二's avatar
沉默王二 已提交
252

沉默王二's avatar
nio  
沉默王二 已提交
253
⑨、在服务器停止运行时,需要关闭 Selector 和 ServerSocketChannel,释放资源。
沉默王二's avatar
沉默王二 已提交
254

沉默王二's avatar
nio  
沉默王二 已提交
255 256 257 258
```java
key.cancel();
socketChannel.close();
```
沉默王二's avatar
沉默王二 已提交
259

沉默王二's avatar
nio  
沉默王二 已提交
260
好,接下来,我们来看客户端的。
沉默王二's avatar
沉默王二 已提交
261 262

```java
沉默王二's avatar
nio  
沉默王二 已提交
263
public class NonBlockingClient {
沉默王二's avatar
沉默王二 已提交
264
    public static void main(String[] args) throws IOException {
沉默王二's avatar
nio  
沉默王二 已提交
265 266 267
        // 创建客户端套接字
        SocketChannel socketChannel = SocketChannel.open();
        // 设置为非阻塞模式
沉默王二's avatar
沉默王二 已提交
268
        socketChannel.configureBlocking(false);
沉默王二's avatar
nio  
沉默王二 已提交
269 270
        // 连接服务器
        socketChannel.connect(new InetSocketAddress("localhost", 8080));
沉默王二's avatar
沉默王二 已提交
271

沉默王二's avatar
nio  
沉默王二 已提交
272 273
        while (!socketChannel.finishConnect()) {
            // 等待连接完成
沉默王二's avatar
沉默王二 已提交
274 275
        }

沉默王二's avatar
nio  
沉默王二 已提交
276 277
        // 分配缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
沉默王二's avatar
沉默王二 已提交
278

沉默王二's avatar
nio  
沉默王二 已提交
279 280 281 282 283 284 285
        // 向服务器发送数据
        String message = "你好,沉默王二,这是来自客户端的消息。";
        buffer.put(message.getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        socketChannel.write(buffer);
        // 清空缓冲区
        buffer.clear();
沉默王二's avatar
沉默王二 已提交
286

沉默王二's avatar
nio  
沉默王二 已提交
287 288 289 290 291
        // 关闭套接字
        socketChannel.close();
    }
}
```
沉默王二's avatar
沉默王二 已提交
292

沉默王二's avatar
nio  
沉默王二 已提交
293
客户端代码依然比较简单,我们直接略过,不再解释。然后运行 Server,再运行 Client。可以运行多次,结果如下:
沉默王二's avatar
沉默王二 已提交
294

沉默王二's avatar
nio  
沉默王二 已提交
295
![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407131553.png)
沉默王二's avatar
沉默王二 已提交
296

沉默王二's avatar
nio  
沉默王二 已提交
297
### Scatter 和 Gather
沉默王二's avatar
沉默王二 已提交
298

沉默王二's avatar
nio  
沉默王二 已提交
299
Scatter 和 Gather 是 Java NIO 中两种高效的 I/O 操作,用于将数据分散到多个缓冲区或从多个缓冲区中收集数据。
沉默王二's avatar
沉默王二 已提交
300

沉默王二's avatar
nio  
沉默王二 已提交
301
Scatter(分散):它将从 Channel 读取的数据分散(写入)到多个缓冲区。这种操作可以在读取数据时将其分散到不同的缓冲区,有助于处理结构化数据。例如,我们可以将消息头、消息体和消息尾分别写入不同的缓冲区。
沉默王二's avatar
沉默王二 已提交
302

沉默王二's avatar
nio  
沉默王二 已提交
303
Gather(聚集):与 Scatter 相反,它将多个缓冲区中的数据聚集(读取)并写入到一个 Channel。这种操作允许我们在发送数据时从多个缓冲区中聚集数据。例如,我们可以将消息头、消息体和消息尾从不同的缓冲区中聚集到一起并写入到同一个 Channel。
沉默王二's avatar
沉默王二 已提交
304

沉默王二's avatar
nio  
沉默王二 已提交
305
来写一个完整的 demo,先看 Server。
沉默王二's avatar
沉默王二 已提交
306

沉默王二's avatar
nio  
沉默王二 已提交
307 308 309 310
```java
// 创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
沉默王二's avatar
沉默王二 已提交
311

沉默王二's avatar
nio  
沉默王二 已提交
312 313
// 接受连接
SocketChannel socketChannel = serverSocketChannel.accept();
沉默王二's avatar
沉默王二 已提交
314

沉默王二's avatar
nio  
沉默王二 已提交
315 316 317
// Scatter:分散读取数据到多个缓冲区
ByteBuffer headerBuffer = ByteBuffer.allocate(128);
ByteBuffer bodyBuffer = ByteBuffer.allocate(1024);
沉默王二's avatar
沉默王二 已提交
318

沉默王二's avatar
nio  
沉默王二 已提交
319
ByteBuffer[] buffers = {headerBuffer, bodyBuffer};
沉默王二's avatar
沉默王二 已提交
320

沉默王二's avatar
nio  
沉默王二 已提交
321
long bytesRead = socketChannel.read(buffers);
沉默王二's avatar
沉默王二 已提交
322

沉默王二's avatar
nio  
沉默王二 已提交
323 324 325 326 327
// 输出缓冲区数据
headerBuffer.flip();
while (headerBuffer.hasRemaining()) {
    System.out.print((char) headerBuffer.get());
}
沉默王二's avatar
沉默王二 已提交
328

沉默王二's avatar
nio  
沉默王二 已提交
329
System.out.println();
沉默王二's avatar
沉默王二 已提交
330

沉默王二's avatar
nio  
沉默王二 已提交
331 332 333 334
bodyBuffer.flip();
while (bodyBuffer.hasRemaining()) {
    System.out.print((char) bodyBuffer.get());
}
沉默王二's avatar
沉默王二 已提交
335

沉默王二's avatar
nio  
沉默王二 已提交
336 337 338
// Gather:聚集数据从多个缓冲区写入到Channel
ByteBuffer headerResponse = ByteBuffer.wrap("Header Response".getBytes());
ByteBuffer bodyResponse = ByteBuffer.wrap("Body Response".getBytes());
沉默王二's avatar
沉默王二 已提交
339

沉默王二's avatar
nio  
沉默王二 已提交
340
ByteBuffer[] responseBuffers = {headerResponse, bodyResponse};
沉默王二's avatar
沉默王二 已提交
341

沉默王二's avatar
nio  
沉默王二 已提交
342
long bytesWritten = socketChannel.write(responseBuffers);
沉默王二's avatar
沉默王二 已提交
343

沉默王二's avatar
nio  
沉默王二 已提交
344 345 346 347
// 关闭连接
socketChannel.close();
serverSocketChannel.close();
```
沉默王二's avatar
沉默王二 已提交
348

沉默王二's avatar
nio  
沉默王二 已提交
349
再来看 Client:
沉默王二's avatar
沉默王二 已提交
350

沉默王二's avatar
nio  
沉默王二 已提交
351 352 353 354
```java
// 创建一个SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 9000));
沉默王二's avatar
沉默王二 已提交
355

沉默王二's avatar
nio  
沉默王二 已提交
356 357 358
// 发送数据到服务器
String header = "Header Content";
String body = "Body Content";
沉默王二's avatar
沉默王二 已提交
359

沉默王二's avatar
nio  
沉默王二 已提交
360 361
ByteBuffer headerBuffer = ByteBuffer.wrap(header.getBytes());
ByteBuffer bodyBuffer = ByteBuffer.wrap(body.getBytes());
沉默王二's avatar
沉默王二 已提交
362

沉默王二's avatar
nio  
沉默王二 已提交
363 364
ByteBuffer[] buffers = {headerBuffer, bodyBuffer};
socketChannel.write(buffers);
沉默王二's avatar
沉默王二 已提交
365

沉默王二's avatar
nio  
沉默王二 已提交
366 367 368
// 从服务器接收数据
ByteBuffer headerResponseBuffer = ByteBuffer.allocate(128);
ByteBuffer bodyResponseBuffer = ByteBuffer.allocate(1024);
沉默王二's avatar
沉默王二 已提交
369

沉默王二's avatar
nio  
沉默王二 已提交
370
ByteBuffer[] responseBuffers = {headerResponseBuffer, bodyResponseBuffer};
沉默王二's avatar
沉默王二 已提交
371

沉默王二's avatar
nio  
沉默王二 已提交
372
long bytesRead = socketChannel.read(responseBuffers);
沉默王二's avatar
沉默王二 已提交
373

沉默王二's avatar
nio  
沉默王二 已提交
374 375 376 377 378
// 输出接收到的数据
headerResponseBuffer.flip();
while (headerResponseBuffer.hasRemaining()) {
    System.out.print((char) headerResponseBuffer.get());
}
沉默王二's avatar
沉默王二 已提交
379

沉默王二's avatar
nio  
沉默王二 已提交
380 381 382 383
bodyResponseBuffer.flip();
while (bodyResponseBuffer.hasRemaining()) {
    System.out.print((char) bodyResponseBuffer.get());
}
沉默王二's avatar
沉默王二 已提交
384

沉默王二's avatar
nio  
沉默王二 已提交
385 386 387
// 关闭连接
socketChannel.close();
```
沉默王二's avatar
沉默王二 已提交
388

沉默王二's avatar
nio  
沉默王二 已提交
389
在这个示例中,我们使用了 Scattering 从 SocketChannel 分散读取数据到多个缓冲区,并使用 Gathering 将数据从多个缓冲区聚集写入到 SocketChannel。通过这种方式,我们可以方便地处理多个缓冲区中的数据。
沉默王二's avatar
沉默王二 已提交
390

沉默王二's avatar
沉默王二 已提交
391
### 异步套接字通道 AsynchronousSocketChannel 和 AsynchronousServerSocketChannel
沉默王二's avatar
沉默王二 已提交
392

沉默王二's avatar
沉默王二 已提交
393
AsynchronousSocketChannel 和 AsynchronousServerSocketChannel 是 Java 7 引入的异步 I/O 类,分别用于处理异步客户端 Socket 和服务器端 ServerSocket。异步 I/O 允许在 I/O 操作进行时执行其他任务,并在操作完成时接收通知,提高了并发处理能力。
沉默王二's avatar
沉默王二 已提交
394

沉默王二's avatar
沉默王二 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
来看一个简单的示例,先看服务器端。

```java
public class AsynchronousServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
        server.bind(new InetSocketAddress("localhost", 5000));

        System.out.println("服务器端启动");

        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Void attachment) {
                // 接收下一个连接请求
                server.accept(null, this);

                ByteBuffer buffer = ByteBuffer.allocate(1024);
                Future<Integer> readResult = client.read(buffer);

                try {
                    readResult.get();
                    buffer.flip();
                    String message = new String(buffer.array(), 0, buffer.remaining());
                    System.out.println("接收到的消息: " + message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });

        // 为了让服务器继续运行,我们需要阻止 main 线程退出
        Thread.currentThread().join();
    }
}
```

代码结构和之前讲到的[异步文件通道 AsynchronousFileChannel](https://tobebetterjavaer.com/nio/buffer-channel.html) 比较相似,异步服务单套接字通道 AsynchronousServerSocketChannel 接收客户端连接,每当收到一个新的连接时,会调用 `completed()` 方法,然后读取客户端发送的数据并将其打印到控制台。

来简单分析一下吧。

①、创建了一个 AsynchronousServerSocketChannel 实例并将其打开。这个通道将用于监听客户端连接。

```java
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
```

②、调用 `accept()` 方法来接收客户端连接。这个方法需要一个 CompletionHandler 实例,当客户端连接成功时,`completed()` 方法会被调用。

```java
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { ... });
```

③、实现 CompletionHandler,I/O 操作成功时,会调用 `completed()` 方法;当 I/O 操作失败时,会调用 `failed()` 方法。

```java
new CompletionHandler<AsynchronousSocketChannel, Void>() {
    @Override
    public void completed(AsynchronousSocketChannel client, Void attachment) { ... }

    @Override
    public void failed(Throwable exc, Void attachment) { ... }
}
```

在 completed 方法中,我们首先调用 `server.accept()` 来接收下一个连接请求。然后,我们创建一个缓冲区 ByteBuffer 并使用 `client.read()` 从客户端读取数据。在这个示例中,我们使用了一个 [Future](https://tobebetterjavaer.com/thread/callable-future-futuretask.html) 对象来等待读取操作完成。当读取完成时,我们将缓冲区的内容打印到控制台。

④、为了让服务器继续运行并接收客户端连接,我们需要阻止 main 线程退出。

```java
Thread.currentThread().join();
```

再来看客户端的:

```java
public class AsynchronousClient {

    public static void main(String[] args) {
        try {
            AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            Future<Void> connectResult = client.connect(new InetSocketAddress("localhost", 5000));
            connectResult.get(); // 等待连接完成

            String message = "沉默王二,在吗?";
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
            Future<Integer> writeResult = client.write(buffer);
            writeResult.get(); // 等待发送完成

            System.out.println("消息发送完毕");

            client.close();
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
```

就是简单的连接和写入数据,就不多做解释了。这里先运行一下 Server 端,然后再运行一下客户端,看一下结果。

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407161351.png)
沉默王二's avatar
沉默王二 已提交
502

沉默王二's avatar
nio  
沉默王二 已提交
503
### 简单的聊天室
沉默王二's avatar
沉默王二 已提交
504

沉默王二's avatar
沉默王二 已提交
505 506 507
我们来通过 SocketChannel 和 ServerSocketChannel 实现一个 0.1 版的聊天室,先说一下需求,比较简单,服务端启动监听客户端请求,当客户端向服务器端发送信息后,服务器端接收到后把客户端消息回显给客户端,比较呆瓜,但可以先来看一下。

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407164326.png)
沉默王二's avatar
沉默王二 已提交
508

沉默王二's avatar
沉默王二 已提交
509
我们来看服务器端代码:
沉默王二's avatar
沉默王二 已提交
510

沉默王二's avatar
沉默王二 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
```java
public class ChatServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private static final int PORT = 8080;

    public ChatServer() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("聊天室服务端启动了 " + PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        try {
            while (true) {
                if (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        handleKey(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void handleKey(SelectionKey key) throws IOException {
        if (key.isAcceptable()) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("客户端连接上了: " + socketChannel.getRemoteAddress());
        } else if (key.isReadable()) {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int read = socketChannel.read(buffer);
            if (read > 0) {
                buffer.flip();
                String msg = new String(buffer.array(), 0, read);
                System.out.println("客户端说: " + msg);
                socketChannel.write(ByteBuffer.wrap(("服务端回复: " + msg).getBytes()));
            }
        }
    }

    public static void main(String[] args) {
        new ChatServer().start();
    }
}
```

解释一下代码逻辑:

1、创建一个 ServerSocketChannel,并将其绑定到指定端口。

2、将 ServerSocketChannel 设置为非阻塞模式。

3、创建一个 Selector,并将 ServerSocketChannel 注册到它上面,监听 OP_ACCEPT 事件(等待客户端连接)。

4、无限循环,等待感兴趣的事件发生。

5、使用 `Selector.select()` 方法,等待已注册的通道中有事件发生。

6、获取到发生事件的通道的 SelectionKey。

7、判断 SelectionKey 的事件类型:

- a. 如果是 OP_ACCEPT 事件,说明有新的客户端连接进来。接受新的连接,并将新连接的 SocketChannel 注册到 Selector 上,监听 OP_READ 事件。
- b. 如果是 OP_READ 事件,说明客户端发送了消息。读取客户端发送的消息,并将其返回给客户端。
处理完毕后,清除已处理的 SelectionKey。

再来看一下客户端的代码:

```java
public class ChatClient {
    private Selector selector;
    private SocketChannel socketChannel;
    private static final String HOST = "localhost";
    private static final int PORT = 8080;

    public ChatClient() {
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("连接到聊天室了");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        new Thread(() -> {
            try {
                while (true) {
                    if (selector.select() > 0) {
                        for (SelectionKey key : selector.selectedKeys()) {
                            selector.selectedKeys().remove(key);
                            if (key.isReadable()) {
                                readMessage();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in
        ))) {
            String input;
            while ((input = reader.readLine()) != null) {
                sendMessage(input);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private void sendMessage(String message) throws IOException {
        if (message != null && !message.trim().isEmpty()) {
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
            socketChannel.write(buffer);
        }
    }

    private void readMessage() throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = socketChannel.read(buffer);
        if (read > 0) {
            buffer.flip();
            String msg = new String(buffer.array(), 0, read);
            System.out.println(msg);
        }
    }

    public static void main(String[] args) {
        new ChatClient().start();
    }
}
```

解释一下代码逻辑:

1、创建一个 SocketChannel,并连接到指定的服务器地址和端口。

2、将 SocketChannel 设置为非阻塞模式。

3、创建一个 Selector,并将 SocketChannel 注册到它上面,监听 OP_READ 事件(等待接收服务器的消息)。

4、启动一个新线程用于读取用户在控制台输入的消息,并发送给服务器。

5、无限循环,等待感兴趣的事件发生。

6、使用` Selector.select()` 方法,等待已注册的通道中有事件发生。

7、获取到发生事件的通道的 SelectionKey。

8、判断 SelectionKey 的事件类型:

- a. 如果是 OP_READ 事件,说明服务器发送了消息。读取服务器发送的消息,并在控制台显示。
处理完毕后,清除已处理的 SelectionKey。

来看运行后的效果。

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407164913.png)

好,接下来,我们来升级一下需求,也就是 0.2 版聊天室,要求服务器端也能从控制台敲入信息主动发送给客户端。

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407165110.png)

来看服务器端代码:

```java
public class Chat2Server {

    public static void main(String[] args) throws IOException {
        // 创建一个 ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8080));

        // 创建一个 Selector
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("聊天室服务端启动了");

        // 客户端连接
        AtomicReference<SocketChannel> clientRef = new AtomicReference<>();

        // 从控制台读取输入并发送给客户端
        Thread sendMessageThread = new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {
                while (true) {
                    System.out.println("输入服务器端消息: ");
                    String message = reader.readLine();
                    SocketChannel client = clientRef.get();
                    if (client != null && client.isConnected()) {
                        ByteBuffer buffer = ByteBuffer.wrap((message + "\n").getBytes());
                        client.write(buffer);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        sendMessageThread.start();

        while (true) {
            int readyChannels = selector.select();

            if (readyChannels == 0) {
                continue;
            }

            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                if (key.isAcceptable()) {
                    // 接受客户端连接
                    SocketChannel client = serverSocketChannel.accept();
                    System.out.println("客户端已连接");
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_READ);
                    clientRef.set(client);
                } else if (key.isReadable()) {
                    // 读取客户端消息
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = channel.read(buffer);

                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        String message = new String(bytes).trim();
                        System.out.println("客户端消息: " + message);
                    }
                }
                keyIterator.remove();
            }
        }
    }
}
```

再来看客户端代码:

```java
public class Chat2Client {

    public static void main(String[] args) throws IOException {
        // 创建一个 SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("localhost", 8080));

        // 创建一个 Selector
        Selector selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        // 从控制台读取输入并发送给服务器端
        Thread sendMessageThread = new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(System.in))) {
                while (true) {
                    System.out.println("输入客户端消息: ");
                    String message = reader.readLine();
                    if (socketChannel.isConnected()) {
                        ByteBuffer buffer = ByteBuffer.wrap((message + "\n").getBytes());
                        socketChannel.write(buffer);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        sendMessageThread.start();

        while (true) {
            int readyChannels = selector.select();

            if (readyChannels == 0) {
                continue;
            }

            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();

                if (key.isConnectable()) {
                    // 连接到服务器
                    socketChannel.finishConnect();
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("已连接到服务器");
                } else if (key.isReadable()) {
                    // 读取服务器端消息
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = socketChannel.read(buffer);

                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        String message = new String(bytes).trim();
                        System.out.println("服务器端消息: " + message);
                    }
                }
                keyIterator.remove();
            }
        }
    }
}
```

运行 Server,再运行 Client,交互信息如下:

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407180853.png)

我们使用了 Selector 和非阻塞 I/O,这使得服务器可以同时处理多个连接。所以我们在 Intellij IDEA 中可以再配置一个客户端,见下图(填上这四项内容)。

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407181717.png)

然后启动,就可以完成一个 Server 和多个 Client 交互了。

![](https://cdn.tobebetterjavaer.com/stutymore/network-connect-20230407181906.png)

OK,关于聊天室,我们就先讲到这里。

### 小结

前面我们了解到,Java NIO 在文件 IO 上的性能其实和传统 IO 差不多,甚至在处理大文件的时候还有些甘拜下风,但 NIO 的主要作用体现在网络 IO 上,像 [Netty](https://tobebetterjavaer.com/netty/rumen.html) 框架底层其实就是 NIO,我们来做一下简单的总结吧。

SocketChannel(用于 TCP 连接)和 ServerSocketChannel(用于监听和接受新的 TCP 连接)可以用来替代传统的 Socket 和 ServerSocket 类,提供非阻塞模式。

NIO 支持阻塞和非阻塞模式。非阻塞模式允许程序在等待 I/O 时执行其他任务,从而提高并发性能。非阻塞模式的实现依赖于 Selector,它可以监控多个通道上的 I/O 事件。

NIO 支持将数据分散到多个 Buffer(Scatter)或从多个 Buffer 收集数据(Gather),提供了更高效的数据传输方式。

Java NIO.2 引入了 AsynchronousSocketChannel 和 AsynchronousServerSocketChannel,这些类提供了基于回调的异步 I/O 操作。异步套接字通道可以在完成 I/O 操作时自动触发回调函数,从而实现高效的异步处理。

最后,我们使用 NIO 实现了简单的聊天室功能。通过 ServerSocketChannel 和 SocketChannel 创建服务端和客户端,实现互相发送和接收消息。在处理多个客户端时,可以使用 Selector 来管理多个客户端连接,提高并发性能。

总之,Java NIO 网络编程实践提供了更高效、灵活且可扩展的 I/O 处理方式,对于大型应用程序和高并发场景具有显著优势。
沉默王二's avatar
沉默王二 已提交
870 871 872

---------

沉默王二's avatar
7600+  
沉默王二 已提交
873
GitHub 上标星 7600+ 的开源知识库《二哥的 Java 进阶之路》第一版 PDF 终于来了!包括Java基础语法、数组&字符串、OOP、集合框架、Java IO、异常处理、Java 新特性、网络编程、NIO、并发编程、JVM等等,共计 32 万余字,可以说是通俗易懂、风趣幽默……详情戳:[太赞了,GitHub 上标星 7600+ 的 Java 教程](https://tobebetterjavaer.com/overview/)
沉默王二's avatar
沉默王二 已提交
874

沉默王二's avatar
沉默王二 已提交
875 876

微信搜 **沉默王二** 或扫描下方二维码关注二哥的原创公众号沉默王二,回复 **222** 即可免费领取。
沉默王二's avatar
沉默王二 已提交
877

沉默王二's avatar
沉默王二 已提交
878
![](https://cdn.tobebetterjavaer.com/tobebetterjavaer/images/gongzhonghao.png)