# 三、NIO 对网络的支持
在本章中,我们将重点介绍 Java**新 IO**(**NIO**)包的`Buffer`和`Channels`类。NIO 是早期 Java IO API 和部分网络 API 的替代品。虽然 NIO 是一个广泛而复杂的主题,但我们感兴趣的是它如何为网络应用程序提供支持。
我们将探讨几个主题,包括以下内容:
* 缓冲区、通道和选择器之间的性质和关系
* 使用 NIO 技术构建客户端/服务器
* 处理多个客户端的过程
* 支持异步套接字通道
* 基本缓冲区操作
NIO 包为构建高效的网络应用程序提供了广泛的支持。
# Java NIO
Java NIO 使用三个核心类:
* `Buffer`:此保存读取或写入通道的信息
* `Channel`:这是一种类似流的技术,支持对数据源/接收器的异步读/写操作
* `Selector`:这是一个机制,在一个线程中处理多个通道
从概念上讲,缓冲区和通道共同处理数据。如下图所示,数据可以在缓冲区和通道之间的任意方向移动:
![Java NIO](img/B04915_03_01.jpg)
通道连接到一些外部数据源,而缓冲区用于内部处理数据。有几种类型的通道和缓冲区。下表列出了其中的一些。
频道表如下所示:
|
频道类别
|
意图
|
| --- | --- |
| `FileChannel` | 这连接到一个文件 |
| `DatagramChannel` | 这支持数据报套接字 |
| `SocketChannel` | 这支持流式套接字 |
| `ServerSocketChannel` | 这将侦听套接字请求 |
| `NetworkChannel` | 此支持网络套接字 |
| `AsynchronousSocketChannel` | 支持异步流套接字 |
缓冲器表如下:
|
缓冲类
|
支持的数据类型
|
| --- | --- |
| `ByteBuffer` | `byte` |
| `CharBuffer` | `char` |
| `DoubleBuffer` | `double` |
| `FloatBuffer` | `float` |
| `IntBuffer` | `int` |
| `LongBuffer` | `long` |
| `ShortBuffer` | `short` |
当应用程序使用许多低流量连接时,`Selector`类非常有用,这些连接可以使用单个线程处理。这比为每个连接创建线程更有效。这也是一种用于使应用程序更具可伸缩性的技术,我们将在[第 7 章](7.html "Chapter 7. Network Scalability")、*网络可伸缩性*中讨论。
在本章中,我们将创建客户端/服务器应用程序来说明通道和缓冲区之间的交互。这包括一个简单的时间服务器、一个用于演示可变长度消息的聊天服务器、一个用于演示处理多个客户端的一种技术的部件服务器,以及一个异步服务器。我们还将研究专门的缓冲区技术,包括批量传输和视图。
我们将从缓冲区的概述以及它们如何与通道一起工作开始讨论。
# 缓冲器介绍
缓冲区暂时保存数据,因为它正被移入和移出通道。创建缓冲区时,它是以固定大小或容量创建的。缓冲区的部分或全部内存可与几个`Buffer`类字段一起使用,用于管理缓冲区中的数据。
`Buffer`类是抽象的。但是,它拥有用于操作缓冲区的基本方法,包括:
* `capacity`:返回缓冲区中的元素数
* `limit`:返回不应该访问的缓冲区的第一个索引
* `position`:返回下一个要读写的元素的索引
元素取决于缓冲区类型。
`mark`和`reset`方法也控制缓冲器内的位置。`mark`方法将缓冲器的标记设置到其位置。`reset`方法将标记位置恢复到先前标记的位置。以下代码显示了各种缓冲项之间的关系:
```java
0 <= mark <= position <= limit <= capacity
```
缓冲区可以是**直接**或**非直接**。如果可能,直接缓冲区将尝试使用本机 IO 方法。直接缓冲区的创建往往更昂贵,但对于驻留在内存中更长时间的较大缓冲区,其执行效率更高。`allocateDirect`方法用于创建直接缓冲区,并接受指定缓冲区大小的整数。`allocate`方法也接受整数大小参数,但创建非直接缓冲区。
对于大多数操作而言,非直接缓冲区的效率不如直接缓冲区。但是,非直接缓冲区使用的内存将由 JVM 垃圾收集器回收,而直接内存缓冲区可能不在 JVM 的控制范围内。这使得非直接缓冲区的内存管理更加可预测。
有几种方法用于在通道和缓冲区之间传输数据。这些可分为以下两类:
* 绝对的或相对的
* 批量传输
* 使用基本数据类型
* 支持观点
* 压缩、复制和切片字节缓冲区
许多`Buffer`类的方法支持调用链接。put 类型方法将数据传输到缓冲区,而 get 类型方法从缓冲区检索信息。我们将在示例中广泛使用 get 和 put 方法。这些方法一次只传输一个字节。
这些 get 和 put 方法是相对于缓冲区中位置的当前位置的。还有一些绝对方法使用缓冲区中的索引来隔离特定的缓冲区元素。
批量数据传输连续的数据块。这些 get 和 put 方法使用字节数组作为其参数之一来保存数据。这些将在*批量数据传输*一节中讨论。
当`Buffer`类中的所有数据类型相同时,可以创建**视图**,允许使用特定数据类型(如`Float`方便地访问数据。我们将在*中使用视图*部分演示此缓冲区。
支持压缩、复制和切片类型的操作。压缩操作将移动缓冲区的内容,以消除已处理的数据。复制将创建缓冲区的副本,而切片将基于原始缓冲区的全部或部分创建新缓冲区。对其中一个缓冲区的更改将反映在另一个缓冲区中。但是,每个缓冲区的位置、限制和标记值是独立的。
让我们看看操作中的缓冲区,从创建缓冲区开始。
# 使用带有时间服务器的通道
在[第 1 章](1.html "Chapter 1. Getting Started with Network Programming")*网络编程入门*中介绍的时间服务器和客户端将在此处实现,以演示缓冲区和通道的使用。这些应用程序很简单,但它们说明了如何将缓冲区和通道一起使用。我们将首先创建一个服务器,然后创建一个使用该服务器的客户端。
## 创建时间服务器
下面的代码是`ServerSocketChannelTimeServer`类的初始声明,它将成为我们的时间服务器。`ServerSocketChannel`类的`open`方法创建一个`ServerSocketChannel`实例。`socket`方法检索通道的`ServerSocket`实例。然后,`bind`方法将此服务器套接字与端口`5000`关联。虽然`ServerSocketChannel`类有一个`close`方法,但使用 try with resources 块更容易:
```java
public class ServerSocketChannelTimeServer {
public static void main(String[] args) {
System.out.println("Time Server started");
try {
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.socket().bind(
new InetSocketAddress(5000));
...
}
} catch (IOException ex) {
// Handle exceptions
}
}
}
```
服务器将进入一个无限循环,`accept`方法将阻塞该循环,直到收到来自客户端的请求。发生这种情况时,返回一个`SocketChannel`实例:
```java
while (true) {
System.out.println("Waiting for request ...");
SocketChannel socketChannel =
serverSocketChannel.accept();
```
假设此实例不为 null,将创建一个包含当前日期和时间的字符串:
```java
if (socketChannel != null) {
String dateAndTimeMessage = "Date: "
+ new Date(System.currentTimeMillis());
...
}
```
创建了一个大小为 64 字节的`ByteBuffer`实例。这对于大多数消息来说已经足够了。`put`方法将数据移动到缓冲区中。这是一个批量数据传输操作。如果缓冲区不够大,则引发`BufferOverflowException`异常:
```java
ByteBuffer buf = ByteBuffer.allocate(64);
buf.put(dateAndTimeMessage.getBytes());
```
我们需要调用`flip`方法,以便将其用于通道的写入操作。这具有设置的效果;限制设置为当前位置,位置设置为零。while 循环用于写出每个字节,当`hasRemaining`方法确定没有更多的字节可写时终止。最后一个操作是显示发送给客户端的消息:
```java
buf.flip();
while (buf.hasRemaining()) {
socketChannel.write(buf);
}
System.out.println("Sent: " + dateAndTimeMessage);
```
当服务器启动时,它将产生与以下输出类似的输出:
**服务器启动时间**
**等待请求。。。**
我们现在已经准备好创建我们的客户。
## 创建时间客户端
客户端是在`SocketChannelTimeClient`类中实现的,定义如下。为了简化示例,假定客户端与服务器位于同一台机器上。使用 IP 地址`127.0.0.1`创建`SocketAddress`实例,并与端口`5000`关联。`SocketChannel`类的`open`方法返回一个`SocketChannel`实例,该实例将用于处理 try with resources 块中服务器的响应:
```java
public class SocketChannelTimeClient {
public static void main(String[] args) {
SocketAddress address = new InetSocketAddress(
"127.0.0.1", 5000);
try (SocketChannel socketChannel =
SocketChannel.open(address)) {
...
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
```
在 try 块的主体中,创建了一个大小为 64 的`ByteBuffer`实例。使用小于实际消息的大小将使此示例复杂化。在*处理可变长度消息*部分,我们将重新检查缓冲区大小。消息从通道读取,并使用`read`方法放入`ByteBuffer`实例。然后翻转此缓冲区以准备处理。读取并显示每个字节:
```java
ByteBuffer byteBuffer = ByteBuffer.allocate(64);
int bytesRead = socketChannel.read(byteBuffer);
while (bytesRead > 0) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
System.out.println();
bytesRead = socketChannel.read(byteBuffer);
}
```
启动客户端时,其输出将类似于以下内容:
**日期:2015 年 8 月 18 日星期二 21:36:25 CDT**
服务器的输出现在将类似于以下输出:
**服务器启动时间**
**等待请求。。。**
**发送日期:2015 年 8 月 18 日星期二 21:36:25 CDT**
**等待请求。。。**
我们现在准备检查通道和缓冲区交互的细节。
# 聊天服务器/客户端应用程序
本节旨在更深入地演示缓冲器和通道如何协同工作。我们将使用来回传递消息的客户端和服务器应用程序。具体来说,我们将创建一个简单版本的聊天服务器。
我们将执行以下操作:
* 创建一个来回发送消息的服务器和客户端
* 演示如何处理可变长度消息
首先,我们将使用`sendFixedLengthMessage`和`receiveFixedLengthMessage`方法演示如何使用固定大小的消息。然后我们将使用`sendMessage`和`receiveMessage`方法来处理可变长度的消息。固定长度的消息更容易处理,但如果消息长度超过缓冲区的大小,则无法处理。与前面的示例相比,可变长度消息需要更仔细的处理。这些方法被放在一个名为`HelperMethods`的类中,以便在多个应用程序中使用。
## 聊天服务器
让我们从服务器开始。服务器在`ChatServer`类中定义,如下所述。创建一个`ServerSocketChannel`实例并绑定到端口`5000`。它将在 while 循环的主体中使用。`running`变量控制服务器的生存期。根据需要捕获异常。与前面的服务器一样,服务器将通过`accept`方法阻塞,直到客户端连接到服务器:
```java
public class ChatServer {
public ChatServer() {
System.out.println("Chat Server started");
try {
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.socket().bind(
new InetSocketAddress(5000));
boolean running = true;
while (running) {
System.out.println("Waiting for request ...");
SocketChannel socketChannel
= serverSocketChannel.accept();
...
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
new ChatServer();
}
}
```
在此聊天/服务器应用程序中,通信受到限制。一旦建立连接,服务器将提示用户向客户端发送消息。客户端将等待收到此消息,然后提示其用户进行答复。回复将发送回服务器。此序列仅限于简化交互,以关注通道/缓冲区交互。
当建立连接时,服务器将显示一条消息,表明这一点,然后进入一个循环,如下所示。系统将提示用户输入消息。调用了`sendFixedLengthMessage`方法。如果用户输入`quit`,则向服务器发送终止消息,服务器终止。否则,消息被发送到服务器,然后服务器在`receiveFixedLengthMessage`方法阻塞,等待客户端响应:
```java
System.out.println("Connected to Client");
String message;
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("> ");
message = scanner.nextLine();
if (message.equalsIgnoreCase("quit")) {
HelperMethods.sendFixedLengthMessage(
socketChannel, "Server terminating");
running = false;
break;
} else {
HelperMethods.sendFixedLengthMessage(
socketChannel, message);
System.out.println(
"Waiting for message from client ...");
System.out.println("Message: " + HelperMethods
.receiveFixedLengthMessage(socketChannel));
}
}
```
服务器启动时,其输出将如下所示:
**聊天服务器启动**
**等待请求。。。**
创建了服务器之后,让我们检查一下客户端应用程序。
## 聊天客户端
客户端应用程序使用`ChatClient`类,如下所述。其结构类似于以前的客户端应用程序。本地主机(`127.0.0.1`与端口`5000`一起使用。一旦建立连接,程序将进入无限循环并等待服务器向其发送消息:
```java
public class ChatClient {
public ChatClient() {
SocketAddress address =
new InetSocketAddress("127.0.0.1", 5000);
try (SocketChannel socketChannel =
SocketChannel.open(address)) {
System.out.println("Connected to Chat Server");
String message;
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println(
"Waiting for message from the server ...");
...
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
new ChatClient();
}
}
```
在循环中,程序以`receiveFixedLengthMessage`方法阻塞,直到服务器向其发送消息。然后将显示该消息,并提示用户发送回服务器的消息。如果消息为**退出**,则使用`sendFixedLengthMessage`方法向服务器发送终止消息,应用程序终止。否则,消息将发送到服务器,程序将等待另一条消息:
```java
System.out.println("Waiting for message from the server ...");
System.out.println("Message: "
+ HelperMethods.receiveFixedLengthMessage(
socketChannel));
System.out.print("> ");
message = scanner.nextLine();
if (message.equalsIgnoreCase("quit")) {
HelperMethods.sendFixedLengthMessage(
socketChannel, "Client terminating");
break;
}
HelperMethods.sendFixedLengthMessage(socketChannel, message);
```
通过创建客户端和 Apple T0 服务器,我们来看看它们是如何相互作用的。
## 服务器/客户端交互
服务器启动后,启动客户端应用程序。客户端的输出将如下所示:
**连接聊天服务器**
**正在等待来自服务器的消息。。。**
服务器输出将反映此连接:
**聊天服务器启动**
**等待请求。。。**
**已连接到客户端**
**>**
输入消息`Hello`。然后您将获得以下输出:
**>你好**
**发送:你好**
**正在等待客户端的消息。。。**
客户端现在将显示为:
**留言:您好**
**>**
输入回复`Hi!`客户端输出如下图所示:
**>你好!**
**森特:你好!**
**正在等待来自服务器的消息。。。**
服务器将显示为:
**留言:嗨!**
**>**
我们可以继续这个过程,直到任何一方输入`quit`命令。但是,输入超过 64 字节缓冲区限制的消息将导致引发`BufferOverflowException`异常。将`sendFixedLengthMessage`方法替换为`sendMessage`方法,将`receiveFixedLengthMessage`方法替换为`receiveMessage`方法将避免此问题。
让我们看看这些发送和接收方法是如何工作的。
## 帮助方法课程
接下来定义了`HelperMethods`类。它拥有以前使用的发送和接收方法。这些方法被声明为静态的,以便于访问:
```java
public class HelperMethods {
...
}
```
下面将显示固定长度的消息方法。它们的执行方式基本上与*使用带有时间服务器的通道*部分中使用的方法相同:
```java
public static void sendFixedLengthMessage(
SocketChannel socketChannel, String message) {
try {
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.put(message.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
System.out.println("Sent: " + message);
} catch (IOException ex) {
ex.printStackTrace();
}
}
public static String receiveFixedLengthMessage
(SocketChannel socketChannel) {
String message = "";
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(64);
socketChannel.read(byteBuffer);
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
message += (char) byteBuffer.get();
}
} catch (IOException ex) {
ex.printStackTrace();
}
return message;
}
```
### 处理可变长度消息
本节讨论了处理可变长度消息的技术。可变长度消息的问题是我们不知道它们的长度。我们不能假设缓冲区未完全填满时,消息已到达末尾。虽然大多数消息可能都是这样,但如果消息长度与消息缓冲区的大小相同,那么我们可能会错过消息的结尾。
确定何时到达消息末尾的另一种方法是发送消息前缀的消息长度,或在消息末尾附加特殊的终止字符。我们选择后者。
### 注
此示例适用于 ASCII 字符。如果改用 Unicode 字符,则会生成一个`BufferOverflowException`异常。`CharBuffer`类用于字符数据,并提供与`ByteBuffer`类类似的功能。`CharBuffer`等级详见[http://docs.oracle.com/javase/8/docs/api/java/nio/CharBuffer.html](http://docs.oracle.com/javase/8/docs/api/java/nio/CharBuffer.html) 。
`0x00`的值用于标记消息的结束。我们之所以选择这个值,是因为它不容易被用户意外输入,因为它不可打印,并且恰好对应于字符串在语言(如 C)中通常以内部终止的方式。
在下面的`sendMessage`方法中,`put`方法在发送消息之前将该终止字节添加到消息的末尾。缓冲区大小是消息的长度加上一。否则,代码与用于发送固定长度消息的代码类似:
```java
public static void sendMessage(
SocketChannel socketChannel, String message) {
try {
ByteBuffer buffer =
ByteBuffer.allocate(message.length() + 1);
buffer.put(message.getBytes());
buffer.put((byte) 0x00);
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
System.out.println("Sent: " + message);
} catch (IOException ex) {
ex.printStackTrace();
}
}
```
在`receiveMessage`方法中,检查接收到的每个字节是否为终止字节。如果是,则返回消息。提取部分消息后,`clear`方法应用于`byteBuffer`变量。这种方法是必需的;否则,读取方法将返回`0`。该方法将缓冲器的位置设置回`0`和容量限制:
```java
public static String receiveMessage(SocketChannel socketChannel) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
String message = "";
while (socketChannel.read(byteBuffer) > 0) {
char byteRead = 0x00;
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byteRead = (char) byteBuffer.get();
if (byteRead == 0x00) {
break;
}
message += byteRead;
}
if (byteRead == 0x00) {
break;
}
byteBuffer.clear();
}
return message;
} catch (IOException ex) {
ex.printStackTrace();
}
return "";
}
```
我们现在准备演示该应用程序。
## 正在运行聊天服务器/客户端应用程序
首先启动服务器。输出将如下所示:
**聊天服务器启动**
**等待请求。。。**
接下来,启动客户端,这将产生以下输出:
**连接聊天服务器**
**正在等待来自服务器的消息。。。**
这些用户在服务器和客户端之间的交换受到当前实现的限制。当两个应用程序都已启动时,客户端将等待来自服务器的消息。服务器窗口反映了这一点,如下所示:
**聊天服务器启动**
**等待请求。。。**
**已连接到客户端**
**>**
输入消息后,它将被发送到客户端。输入信息**你好**。客户端窗口现在将显示消息,如下所示:
**连接聊天服务器**
**正在等待来自服务器的消息。。。**
**留言:您好**
**>**
在服务器端,将显示以下输出:
**发送:你好**
**正在等待客户端的消息。。。**
我们现在可以从客户端向服务器发送消息。消息可以以这种方式交换,直到`quit`消息从任一应用程序发送。
# 处理多个客户
处理使用线程可以实现多个客户端。在本节中,我们将开发一个简单的部件服务器和客户端应用程序。服务器将使用一个单独的线程来处理每个客户端。此技术易于实现,但并不总是适用于要求更高的应用程序。我们将在[第 7 章](7.html "Chapter 7. Network Scalability")、*网络可扩展性*中介绍多任务的替代技术。
零件服务器在`PartsServer`类中实现,客户端在`PartsClient`类中实现。将为每个客户端创建一个`ClientHandler`类的新实例。此处理程序将接受零件价格的请求。客户端将向处理程序发送部件的名称。经办人将使用`PartsServer`的`getPrice`方法查询零件的价格。然后它会将价格返回给客户。
## 零件服务器
零件服务器使用`HashMap`变量保存零件信息。部件名称用作键,值存储为`Float`对象。`PartsServer`类在此声明:
```java
public class PartsServer {
private static final HashMap parts =
new HashMap<>();
public PartsServer() {
System.out.println("Part Server Started");
...
}
public static void main(String[] args) {
new PartsServer();
}
}
```
服务器启动后,调用`initializeParts`方法:
```java
initializeParts();
```
该方法如下:
```java
private void initializeParts() {
parts.put("Hammer", 12.55f);
parts.put("Nail", 1.35f);
parts.put("Pliers", 4.65f);
parts.put("Saw", 8.45f);
}
```
处理程序将使用`getPrice`方法检索零件的价格,如下所示:
```java
public static Float getPrice(String partName) {
return parts.get(partName);
}
```
调用`initializeParts`方法后,使用 try 块打开与客户端的连接,如图所示:
```java
try {
ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
serverSocketChannel.socket().bind(
new InetSocketAddress(5000));
...
} catch (IOException ex) {
ex.printStackTrace();
}
```
接下来,一个无限循环将为每个客户端创建一个新的处理程序。虽然 Java 中有几种创建线程的方法,但接下来使用的方法是创建`ClientHandler`类的新实例,将客户端的套接字传递给该类的构造函数。此方法不限制应用程序创建的线程数,这使其容易受到拒绝服务攻击。在[第 7 章](7.html "Chapter 7. Network Scalability")*网络可扩展性*中,我们将研究几种可选的线程方法。
`ClientHandler`实例用作`Thread`类的参数。该类将创建一个新线程,该线程将执行`ClientHandler`类的`run`方法。但是,不应该直接调用`run`方法,而是调用`start`方法。此方法将创建线程所需的程序堆栈:
```java
while (true) {
System.out.println("Waiting for client ...");
SocketChannel socketChannel
= serverSocketChannel.accept();
new Thread(
new ClientHandler(socketChannel)).start();
}
```
服务器启动时,将显示以下输出:
**部件服务器启动**
**正在等待客户。。。**
让我们检查一下处理程序是如何工作的。
## 零件客户处理人
`ClientHandler`类在下面的代码中定义为。`socketChannel`实例变量用于连接客户端。在`run`方法中,将显示一条指示处理程序启动的消息。这不是必需的,但它将帮助我们了解服务器、客户端和处理程序是如何交互的。
进入一个无限循环,其中使用*HelperMethods 类*中开发的`receiveMessage`方法获取零件名称。一条`quit`消息将终止处理程序。否则调用`getPrice`方法,使用`sendMessage`方法返回给客户端:
```java
public class ClientHandler implements Runnable{
private final SocketChannel socketChannel;
public ClientHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
System.out.println("ClientHandler Started for "
+ this.socketChannel);
String partName;
while (true) {
partName =
HelperMethods.receiveMessage(socketChannel);
if (partName.equalsIgnoreCase("quit")) {
break;
} else {
Float price = PartsServer.getPrice(partName);
HelperMethods.sendMessage(socketChannel, "" +
price);
}
}
System.out.println("ClientHandler Terminated for "
+ this.socketChannel);
}
}
```
我们将在向客户演示时观察`run`方法的输出。
## 零件客户
`PartsClient`类在下一个代码序列中定义。已建立与服务器的连接。将显示消息,指示客户端何时启动,服务器何时连接。在 while 循环中使用`Scanner`类从用户获取输入:
```java
public class PartsClient {
public PartsClient() {
System.out.println("PartsClient Started");
SocketAddress address =
new InetSocketAddress("127.0.0.1", 5000);
try (SocketChannel socketChannel =
SocketChannel.open(address)) {
System.out.println("Connected to Parts Server");
Scanner scanner = new Scanner(System.in);
while (true) {
...
}
System.out.println("PartsClient Terminated");
} catch (IOException ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
new PartsClient();
}
}
```
循环体将提示用户输入零件名称。如果名称为 quit,则客户端将终止。否则,`sendMessage`方法会将名称发送给处理程序进行处理。客户端将在`receiveMessage`方法调用时阻塞,直到服务器响应。然后将显示此部件的价格:
```java
System.out.print("Enter part name: ");
String partName = scanner.nextLine();
if (partName.equalsIgnoreCase("quit")) {
HelperMethods.sendMessage(socketChannel, "quit");
break;
} else {
HelperMethods.sendMessage(socketChannel, partName);
System.out.println("The price is "
+ HelperMethods.receiveMessage(socketChannel));
}
```
现在,让我们看看它们是如何协同工作的。
## 运行部件客户端/服务器
首先启动服务器。服务器启动时将产生以下输出:
**部件服务器启动**
**正在等待客户。。。**
现在,启动客户端应用程序。您将获得以下输出:
**PartsClient 启动**
**连接到部件服务器**
**输入零件名称:**
输入零件名称,如`Hammer`。现在将显示客户端输出,如下所示。**发送:锤**输出是`sendMessage`方法的伪影,如果需要,可以通过修改`sendMessage`方法移除:
**PartsClient 启动**
**连接到部件服务器**
**输入零件名称:锤子**
**已发送:锤子**
**价格为 12.55**
**输入零件名称:**
在服务器端,您将获得与以下类似的输出。每当启动新客户端时,就会看到一条显示处理程序信息的消息:
**部件服务器启动**
**正在等待客户。。。**
**已为 java.nio.channels.SocketChannel 启动 ClientHandler[连接的本地=/127.0.0.1:5000 远程=/127.0.0.1:51132]**
**正在等待客户。。。**
**发送时间:12.55**
在客户端,我们可以继续检查价格,直到输入`quit`命令。此命令将终止客户端。一种可能的请求顺序如下:
**PartsClient 启动**
**连接到部件服务器**
**输入零件名称:锤子**
**已发送:锤子**
**价格为 12.55**
**输入零件名称:钳子**
**已发送:钳子**
**价格为 4.65**
**输入零件名称:saw**
**发送:saw**
**价格为空**
**输入零件名称:Saw**
**发送:Saw**
**价格为 8.45**
**输入零件名称:退出**
**发送:退出**
**当事人终止**
服务器将继续运行,因为可能有其他客户端正在查找价格信息。当客户端处理程序终止时,服务器将显示与以下类似的输出:
**为 java.nio.channels.SocketChannel 终止的 ClientHandler[连接的本地=/127.0.0.1:5000 远程=/127.0.0.1:51132]**
启动两个或多个客户端,观察它们如何与服务器交互。我们将在[第 7 章](7.html "Chapter 7. Network Scalability")、*网络可扩展性*中研究更复杂的扩展应用程序的方法。
# 异步套接字通道
异步通信包括发出一个请求,然后在不必等待请求完成的情况下继续执行其他一些操作。这称为非阻塞。
有三个类用于支持异步通道操作:
* `AsynchronousSocketChannel`:这是一个到套接字的简单异步通道
* `AsynchronousServerSocketChannel`:这是到服务器套接字的异步通道
* `AsynchronousDatagramChannel`:这是面向数据报的套接字的通道
`AsynchronousSocketChannel`类的读写方法是异步的。`AsynchronousServerSocketChannel`类拥有一个`accept`方法,该方法返回一个`AsynchronousSocketChannel`实例。此方法也是异步的。我们将在[第 6 章](6.html "Chapter 6. UDP and Multicasting")、*UDP 和多播*中讨论`AsynchronousDatagramChannel`类。
处理异步 I/O 操作有两种方式:
* 使用`java.util.concurrent`包中的`Future`接口
* 使用`CompletionHandler`接口
`Future`接口表示挂起的结果。这通过允许应用程序继续执行而不是阻塞来支持异步操作。使用此对象,可以使用以下方法之一:
* `isDone`方法
* `get`方法,阻塞直到完成
`get`方法重载,有一个版本支持超时。`CompletionHandler`实例在操作完成后被调用。这本质上是一个回调。我们将不在这里说明这种方法。
我们将分别开发一个名为`AsynchronousServerSocketChannelServer`和`AsynchronousSocketChannelClient`的异步服务器和客户端。客户端/服务器应用程序受到限制,只允许将消息从客户端发送到服务器。这将使我们能够关注应用程序的异步方面。
## 创建异步服务器套接字通道服务器
`AsynchronousServerSocketChannelServer`类在下一个代码序列中定义。此时会显示一条消息,指示服务器已启动,并输入一个 try with resources 块,其中创建了`AsynchronousServerSocketChannel`类的一个实例,并发生了实际工作:
```java
public class AsynchronousServerSocketChannelServer {
public AsynchronousServerSocketChannelServer() {
System.out.println("Asynchronous Server Started");
try (AsynchronousServerSocketChannel serverChannel
= AsynchronousServerSocketChannel.open()) {
...
} catch (IOException | InterruptedException
| ExecutionException ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
new AsynchronousServerSocketChannelServer();
}
}
```
`bind`方法用于将代表`AsynchronousServerSocketChannel`实例的`serverChannel`变量与本地主机和端口`5000`关联:
```java
InetSocketAddress hostAddress
= new InetSocketAddress("localhost", 5000);
serverChannel.bind(hostAddress);
```
然后,服务器等待客户端连接。`Future`实例被`acceptResult`变量引用:
```java
System.out.println("Waiting for client to connect... ");
Future acceptResult = serverChannel.accept();
```
另一个 try 块用于处理客户端请求。它创建一个`AsynchronousSocketChannel`类的实例,该实例连接到客户端。`get`方法将阻塞,直到创建通道:
```java
try (AsynchronousSocketChannel clientChannel
= (AsynchronousSocketChannel) acceptResult.get()) {
...
}
```
try 块的主体将分配一个缓冲区,然后从通道读取以填充缓冲区。填充缓冲区后,对缓冲区应用`flip`方法,并处理和显示消息:
```java
System.out.println("Messages from client: ");
while ((clientChannel != null) && (clientChannel.isOpen())) {
ByteBuffer buffer = ByteBuffer.allocate(32);
Future result = clientChannel.read(buffer);
// Wait until buffer is ready using
// one of three techniques to be discussed
buffer.flip();
String message = new String(buffer.array()).trim();
System.out.println(message);
if (message.equals("quit")) {
break;
}
}
```
有三种方法可以确定缓冲区是否准备就绪。第一种技术使用`isDone`方法轮询`Future`对象(由结果变量表示),直到缓冲区准备就绪,如下所示:
```java
while (!result.isDone()) {
// do nothing
}
```
第二种技术使用`get`方法,在缓冲区准备就绪之前进行阻塞:
```java
result.get();
```
第三种技术也使用`get`方法,但使用超时来确定等待的时间。在本例中,它在超时前等待 10 秒:
```java
result.get(10, TimeUnit.SECONDS);
```
使用此版本的`get`方法时,需要将 catch 块添加到封闭的 try 块中,以处理`TimeoutException`异常。
当服务器启动时,我们得到以下输出:
**异步服务器启动**
**正在等待客户端连接。。。**
现在,让我们检查一下客户端。
## 创建异步套接字通道客户端
客户端是在下一个代码段中使用`AsynchronousSocketChannelClient`类实现的。将显示一条消息,指示客户端已启动,随后是创建`AsynchronousSocketChannel`实例的 try 块:
```java
public class AsynchronousSocketChannelClient {
public static void main(String[] args) {
System.out.println("Asynchronous Client Started");
try (AsynchronousSocketChannel client =
AsynchronousSocketChannel.open()) {
...
} catch (IOException | InterruptedException
| ExecutionException ex) {
// Handle exception
}
}
}
```
创建一个`InetSocketAddress`实例,指定服务器使用的地址和端口号。然后创建一个表示连接的`Future`对象。`get`方法将阻塞,直到连接完成:
```java
InetSocketAddress hostAddress =
new InetSocketAddress("localhost", 5000);
Future future = client.connect(hostAddress);
future.get();
```
建立连接后,将显示一条消息。输入无限循环,提示用户输入消息。`wrap`方法将用消息填充缓冲区。`write`方法将开始向`AsynchronousSocketChannel`实例写入消息,并返回一个`Future`对象。`isDone`方法用于等待写入完成。如果消息为**退出**,则客户端应用程序将终止:
```java
System.out.println("Client is started: " + client.isOpen());
System.out.println("Sending messages to server: ");
Scanner scanner = new Scanner(System.in);
String message;
while (true) {
System.out.print("> ");
message = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
Future result = client.write(buffer);
while (!result.isDone()) {
// Wait
}
if (message.equalsIgnoreCase("quit")) {
break;
}
}
```
让我们来看看异步客户端/服务器在运行中的情况。
在服务器运行时,启动客户端应用程序。这将产生以下输出:
**异步客户端启动**
**客户端已启动:真**
**向服务器发送消息:**
**>**
服务器的输出现在显示如下:
**异步服务器启动**
**正在等待客户端连接。。。**
**来自客户端的消息:**
使用客户端,我们可以输入以下消息:
**>你好**
**>此消息来自异步客户端,发送到服务器**
**>退出**
这些将一次发送一个到服务器。从服务器上,我们将得到以下响应:
**你好**
**此消息来自 asynchr**
**onous 客户端,发送到**
**服务器**
**退出**
请注意,较长的消息已拆分为多行。这是使用服务器缓冲区大小仅为 32 字节的结果。更大的缓冲区可以避免这个问题。然而,除非我们知道将要发送的最大消息的大小,否则我们需要开发一种处理长消息的方法。这是留给读者的练习。
# 其他缓冲操作
最后,我们将研究一些其他有用的缓冲区操作。其中包括使用视图的缓冲区和阵列之间的批量数据传输以及只读缓冲区。
## 批量数据传输
批量传输是在缓冲区和数组之间传输数据的一种方式。有几种 get 和 put 类型方法支持批量数据传输。它们通常有两种版本。第一个版本使用单个参数,即传输数组。第二个版本也使用数组,但它有两个附加参数:数组中的起始索引和要传输的元素数。
为了演示这些技术,我们将使用一个`IntBuffer`缓冲区。我们将使用以下`displayBuffer`方法帮助我们了解数据传输的工作原理:
```java
public void displayBuffer(IntBuffer buffer) {
for (int i = 0; i < buffer.position(); i++) {
System.out.print(buffer.get(i) + " ");
}
System.out.println();
}
```
我们将从声明一个数组并将其内容传输到缓冲区开始。数组在以下语句中声明和初始化:
```java
int[] arr = {12, 51, 79, 54};
```
分配了一个比数组大的缓冲区,如下所示。数组大小和缓冲区中可用数据之间的差异很重要。如果处理不当,将引发异常:
```java
IntBuffer buffer = IntBuffer.allocate(6);
```
接下来,我们将使用 bulk`put`方法将数组的内容传输到缓冲区:
```java
buffer.put(arr);
```
然后使用以下语句显示缓冲区:
```java
System.out.println(buffer);
displayBuffer(buffer);
```
输出如下。整个阵列已传输,位置设置为下一个可用索引:
**java.nio.HeapIntBuffer[pos=4 lim=6 cap=6]**
**12517954**
由于缓冲区中还有空间,我们可以向其中传输更多数据。但是,我们必须小心不要尝试传输太多,否则将抛出异常。第一步是确定缓冲区中还有多少空间。如下图所示,`remaining`方法实现了这一点。bulk`put`语句然后将数组的前两个元素传输到缓冲区的最后两个位置,如下所示:
```java
int length = buffer.remaining();
buffer.put(arr, 0, length);
```
如果我们再次显示缓冲区及其内容,我们将得到以下输出:
**java.nio.HeapIntBuffer[pos=6 lim=6 cap=6]**
**125179541251**
`get`方法被重载以支持批量数据传输。我们可以修改`displayBuffer`方法来说明它是如何工作的,如下所示。创建一个与缓冲区内容大小相同的整数数组。`rewind`方法将缓冲器的位置移回零。然后,批量`get`方法执行传输,然后对每个循环执行一次,以实际显示其内容:
```java
public void displayBuffer(IntBuffer buffer) {
int arr[] = new int[buffer.position()];
buffer.rewind();
buffer.get(arr);
for(int element : arr) {
System.out.print(element + " ");
}
}
```
## 使用视图
一个视图镜像另一个缓冲区中的数据。对其中一个缓冲区的修改将影响另一个缓冲区。但是,位置和限制是独立的。视图可以通过多种方法创建,包括`duplicate`方法。在下面的示例中,使用 bulk`getBytes`方法对字符串创建一个缓冲区视图。然后创建视图:
```java
String contents = "Book";
ByteBuffer buffer = ByteBuffer.allocate(32);
buffer.put(contents.getBytes());
ByteBuffer duplicateBuffer = buffer.duplicate();
```
为了证明修改一个缓冲区会影响另一个缓冲区,将副本的第一个字符更改为字母“L”。然后显示每个缓冲区的第一个字节以确认更改:
```java
duplicateBuffer.put(0,(byte)0x4c); // 'L'
System.out.println("buffer: " + buffer.get(0));
System.out.println("duplicateBuffer: " +
duplicateBuffer.get(0));
```
输出将显示两个缓冲区中的字母都已更改。`slice`方法也将创建一个视图,但它只使用原始缓冲区的一部分。
## 使用只读缓冲区
默认情况下,缓冲区为读写。但是,它可以是只读或读写。要创建只读缓冲区,请使用缓冲区类的`asReadOnlyBuffer`方法。在下一个序列中,将创建只读缓冲区:
```java
ByteBuffer buffer = ByteBuffer.allocate(32);
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
```
`isReadOnly`方法将确定缓冲区是否为只读,如下所示:
```java
System.out.println("Read-only: " +
readOnlyBuffer.isReadOnly());
```
只读缓冲区是与原始缓冲区不同的视图。对缓冲区的任何修改都会反映在另一个缓冲区中。
# 控制插座选项
可以配置套接字类的底层套接字实现。可用选项取决于套接字类型。通常,用于支持选项的实际机制是特定于操作系统的。此外,有时该选项只是对底层实现的提示。
下面显示的每个套接字类的可用选项都来自 Java API 文档:
|
班
|
选项名
|
描述
|
| --- | --- | --- |
| `SocketChannel` | `SO_SNDBUF` | 这是套接字发送缓冲区的大小 |
| `SO_RCVBUF` | 这是套接字接收缓冲区的大小 |
| `SO_KEEPALIVE` | 这使连接保持活动状态 |
| `SO_REUSEADDR` | 这将重新使用地址 |
| `SO_LINGER` | 如果存在数据,此将持续关闭(仅在阻塞模式下配置时) |
| `TCP_NODELAY` | 此将禁用 Nagle 算法 |
| `ServerSocketChannel` | `SO_RCVBUF` | 这是套接字接收缓冲区的大小 |
| `SO_REUSEADDR` | 此重新使用地址 |
| `AsynchronousSocketChannel` | `SO_SNDBUF` | 这是套接字发送缓冲区的大小 |
| `SO_RCVBUF` | 这是套接字接收缓冲区的大小 |
| `SO_KEEPALIVE` | 此使连接保持活动状态 |
| `SO_REUSEADDR` | 此重新使用地址 |
| `TCP_NODELAY` | 这将禁用 Nagle 算法 |
使用`setOption`方法配置插座选项。以下代码使用零件服务器部分中使用的服务器套接字通道说明了此方法:
```java
serverSocketChannel.setOption(SO_RCVBUF, 64);
```
第一个参数是`SocketOption`接口的一个实例。此接口定义选项的名称和类型方法。`StandardSocketOptions`类定义了一系列选项,这些选项实现了这个接口。例如,`SO_RCVBUF`实例定义如下:
```java
public static final SocketOption SO_RCVBUF;
```
可能还有其他特定于实施的选项可用。
# 总结
在本章中,我们研究了 NIO 通道和缓冲区类的使用。通道连接到外部源并将数据传输到缓冲区和从缓冲区传输数据。我们演示了通道套接字,它通过网络连接到另一个套接字。
缓冲区是数据的临时存储库。使用缓冲区允许按顺序或随机访问数据。有许多缓冲区操作,这使它成为许多应用程序的良好选择。
我们检查了几种类型的通道插座,包括`SocketChannel`、`ServerSocketChannel`和`AsynchronousSocketChannel`类。`ServerSocketChannel`类支持服务器,并使用`accept`方法阻塞,直到客户端请求连接。该方法将返回一个`SocketChannel`实例,该实例将连接到客户端的`SocketChannel`。`AsynchronousSocketChannel`和`AsynchronousSocketChannel`类支持异步通信,实现两个应用程序之间的无阻塞通信。也支持`DatagramChannel`,我们将在[第 6 章](6.html "Chapter 6. UDP and Multicasting")、*UDP 和多播*中研究。
我们解释了缓冲区和通道类如何协同工作,并说明了它们在几个客户端/服务器应用程序中的使用。我们还研究了一种使用线程处理多个客户端的简单方法。
我们演示了如何在数组和缓冲区之间执行批量数据传输。还检查了视图和只读缓冲区的使用。最后,我们介绍了如何配置底层 OS 套接字支持。
在下一章中,我们将使用许多此类和技术来支持其他客户端/服务器应用程序。