提交 f47fdc19 编写于 作者: W wizardforcel

2021-05-11 22:00:27

上级 7f2f2407
......@@ -8,7 +8,7 @@
* 非阻塞 API
* 反应式–响应迅速、弹性十足、富有弹性、信息驱动
* 反应流
* RxJava 公司
* RxJava
# 异步处理
......@@ -75,7 +75,7 @@ getAverage(ids.parallelStream()); //prints: 2.34 in 214 ms
这将是程序员们谈论的真正的异步处理。但是,在编写这样的代码之前,让我们先看看位于`java.util.concurrent`包中的`CompletableFuture`类。它完成了所描述的一切,甚至更多。
# 使用 CompletableFuture 对象
# 使用`CompletableFuture`对象
使用`CompletableFuture`对象,我们可以通过从`CompletableFuture`对象得到结果,将请求单独发送到测量系统。这正是我们在解释什么是异步处理时描述的场景。让我们在代码中演示一下:
......@@ -125,7 +125,7 @@ List<CompletableFuture<Double>> list = ids.stream()
术语**非阻塞**`java.nio`包一起使用。**非阻塞输入/输出****NIO**)支持密集的输入/输出操作。它描述了应用程序的实现方式:它不为每个请求指定一个执行线程,而是提供多个轻量级工作线程,这些线程以异步和异步方式进行处理同时
# java.io 文件包装与 java.nio 文件包裹
# `java.io`包与`java.nio`包
向外部存储器(例如硬盘驱动器)写入数据和从外部存储器(例如硬盘驱动器)读取数据的操作要比仅在存储器中进行的操作慢得多。`java.io`包中已经存在的类和接口工作得很好,但偶尔会成为性能瓶颈。创建新的`java.nio`包是为了提供更有效的 I/O 支持。
......@@ -139,7 +139,7 @@ List<CompletableFuture<Double>> list = ids.stream()
# 事件/运行循环
许多非阻塞系统基于**事件**(或**运行**)循环—一个持续执行的线程。它接收事件(请求、消息),然后将它们分派给相应的事件处理程序(worker)。事件处理程序没有什么特别之处。它们只是程序员专用于处理特定事件类型的方法(函数)。
许多非阻塞系统基于**事件**(或**运行**)循环—一个持续执行的线程。它接收事件(请求、消息),然后将它们分派给相应的事件处理程序(工作器)。事件处理程序没有什么特别之处。它们只是程序员专用于处理特定事件类型的方法(函数)。
这种设计被称为**反应器设计模式**。围绕处理并发事件和服务请求而构建,并命名为**反应式编程****反应式系统**,即对事件做出反应并对其进行并发处理。
......@@ -229,9 +229,9 @@ public static interface Flow.Processor<T,R>
在推送模式中,发布者可以在没有来自订户的任何请求的情况下呼叫`onNext()`。如果处理速度低于项目发布速度,订阅者可以使用各种策略来缓解压力。例如,它可以跳过项目或为临时存储创建一个缓冲区,希望项目生产速度会减慢,订户能够赶上
这是 reactivestreams 计划为支持具有非阻塞背压的异步数据流而定义的最小接口集。如您所见,它允许订阅者和发布者相互交谈并协调传入数据的速率,从而为我们在“反应式”部分讨论的背压问题提供了多种解决方案。
这是 ReactiveStreams 计划为支持具有非阻塞背压的异步数据流而定义的最小接口集。如您所见,它允许订阅者和发布者相互交谈并协调传入数据的速率,从而为我们在“反应式”部分讨论的背压问题提供了多种解决方案。
有许多方法可以实现这些接口。目前,在 JDK9 中,只有一个接口的实现:`SubmissionPublisher`类实现了`Flow.Publisher`。原因是这些接口不应该由应用程序开发人员使用。它是一个**服务提供者接口****SPI**),由反应流库的开发人员使用。如果需要的话,可以使用已经实现了我们已经提到的 reactivestreamsAPI 的现有工具箱之一:RxJava、Reactor、Akka Streams、Vert.x 或任何其他您喜欢的库。
有许多方法可以实现这些接口。目前,在 JDK9 中,只有一个接口的实现:`SubmissionPublisher`类实现了`Flow.Publisher`。原因是这些接口不应该由应用程序开发人员使用。它是一个**服务提供者接口****SPI**),由反应流库的开发人员使用。如果需要的话,可以使用已经实现了我们已经提到的 ReactiveStreamsAPI 的现有工具箱之一:RxJava、Reactor、Akka Streams、Vert.x 或任何其他您喜欢的库。
# RxJava 公司
......@@ -324,7 +324,7 @@ RxJava 提供了如此丰富的功能,我们无法在本书中详细地回顾
# 可观察类型
谈到 rxjava2api(请注意,它与 rxJava1 有很大的不同),我们将使用可以在[这个页面](http://reactivex.io/RxJava/2.x/javadoc/index.html)中找到的在线文档。
谈到 RxJava2API(请注意,它与 RxJava1 有很大的不同),我们将使用可以在[这个页面](http://reactivex.io/RxJava/2.x/javadoc/index.html)中找到的在线文档。
观察者订阅接收来自可观察对象的值,该对象可以表现为以下类型之一:
......@@ -384,7 +384,7 @@ System.out.println(list); //prints: []
```
我们使用`List`对象来捕获结果,因为您可能还记得,Lambda 表达式不允许使用非 final 变量。
我们使用`List`对象来捕获结果,因为您可能还记得,Lambda 表达式不允许使用非`final`变量。
如您所见,结果列表为空。这是因为执行管道计算时没有阻塞(异步)。因此,由于 100 毫秒的延迟,控件同时转到打印列表内容的最后一行,该行仍然是空的。我们可以在最后一行前面设置延迟:
......@@ -449,7 +449,7 @@ System.out.println(i3); //prints: 42
![](img/e4317b1f-255b-43df-8fd5-c4cdf0986b28.png)
第一条运行消息来自第 2 行,响应阻塞`blockingGet()`方法的调用。第一条空消息来自第 3 行。第 4 行抛出异常,因为超时设置为 15 毫秒,而实际处理设置为 100 毫秒延迟。第二条 Run 消息来自第 5 行,响应于`blockingGet()`方法调用。这一次,超时被设置为 150 毫秒,也就是超过 100 毫秒,并且该方法能够在超时结束之前返回。
第一条运行消息来自第 2 行,响应阻塞`blockingGet()`方法的调用。第一条空消息来自第 3 行。第 4 行抛出异常,因为超时设置为 15 毫秒,而实际处理设置为 100 毫秒延迟。第二条运行消息来自第 5 行,响应于`blockingGet()`方法调用。这一次,超时被设置为 150 毫秒,也就是超过 100 毫秒,并且该方法能够在超时结束之前返回。
最后两行(7 和 8)演示了有无超时的`blockingAwait()`方法的用法。此方法不返回值,但允许可观察管道运行其过程。有趣的是,即使将超时设置为小于管道完成所需时间的值,它也不会因异常而中断。显然,它是在管道处理完成之后开始等待的,除非它是一个稍后将被修复的缺陷(文档对此并不清楚)。
......@@ -516,7 +516,7 @@ for (int i = 0; i < 1_000_000; i++) {
}
```
`PublishProcessor`类扩展了`Flowable`,并有`onNext(Object o)`方法强制它发出传入的对象。在调用它之前,我们已经使用`Schedulers.io()`线程订阅了 observate。我们将在“多线程(Scheduler)”部分讨论调度器。
`PublishProcessor`类扩展了`Flowable`,并有`onNext(Object o)`方法强制它发出传入的对象。在调用它之前,我们已经使用`Schedulers.io()`线程订阅了`observate`。我们将在“多线程(调度器)”部分讨论调度器。
`subscribe()`方法有几个重载版本。我们决定使用一个接受两个`Consumer`函数的函数:第一个处理传入的值,第二个处理由任何管道操作引发的异常(类似于`Catch`块)。
......@@ -627,16 +627,16 @@ System.out.println(list); //prints: []
在我们的示例中,您已经看到了一些创建可观察对象的方法。在`Observable``Flowable``Single``Maybe``Completable`中还有许多其他工厂方法。但并不是所有下列方法都可以在这些接口中使用(参见注释;*所有*表示所有列出的接口都有它):
* `create()`:通过提供完整实现(all)创建一个`Observable`对象
* `defer()`:每次订阅一个新的`Observer`时创建一个新的`Observable`对象(all
* `create()`:通过提供完整实现(所有)创建一个`Observable`对象
* `defer()`:每次订阅一个新的`Observer`时创建一个新的`Observable`对象(所有
* `empty()`:创建一个空的`Observable`对象,该对象在订阅后立即完成(除`Single`外的所有对象)
* `never()`:创建一个`Observable`对象,它不发射任何东西,也不做任何事情;甚至不完成(all
* `error()`:创建一个`Observable`对象,该对象在订阅时立即发出异常(all
* `fromXXX()`:创建一个`Observable`对象,其中`XXX`可以为`Callable``Future`全部)、`Iterable``Array``Publisher``Observable``Flowable``Action``Runnable``Maybe``Completable`);这意味着它基于提供的函数或对象创建一个`Observable`对象
* `never()`:创建一个`Observable`对象,它不发射任何东西,也不做任何事情;甚至不完成(所有
* `error()`:创建一个`Observable`对象,该对象在订阅时立即发出异常(所有
* `fromXXX()`:创建一个`Observable`对象,其中`XXX`可以为`Callable``Future`所有)、`Iterable``Array``Publisher``Observable``Flowable``Action``Runnable``Maybe``Completable`);这意味着它基于提供的函数或对象创建一个`Observable`对象
* `generate()`:创建一个冷`Observable`对象,该对象基于提供的函数或对象生成值(仅限`Observable``Flowable`
* `range(), rangeLong(), interval(), intervalRange()`:创建一个`Observable`对象,该对象发出连续的`int``long`值,这些值受指定范围的限制或不受指定时间间隔的限制(仅限`Observable``Flowable`
* `just()`:根据提供的对象或一组对象(除`Completable`外的所有对象)创建一个`Observable`对象
* `timer()`:创建一个`Observable`对象,该对象在指定的时间之后发出`0L`信号(all),然后完成`Observable``Flowable`的操作
* `timer()`:创建一个`Observable`对象,该对象在指定的时间之后发出`0L`信号(所有),然后完成`Observable``Flowable`的操作
还有许多其他有用的方法,如`repeat()``startWith()`等。我们只是没有足够的空间来列出它们。[参考在线文档](http://reactivex.io/RxJava/2.x/javadoc/index.html)
......@@ -702,7 +702,7 @@ pauseMs(100);
这些运算符转换可观察对象发出的值:
* `buffer()`:根据提供的参数或使用提供的函数将发出的值收集到 bundle 中,并定期一次发出一个 bundle
* `buffer()`:根据提供的参数或使用提供的函数将发出的值收集到包裹中,并定期一次发出一个包裹
* `flatMap()`:基于当前可观察对象生成可观察对象,并将其插入到当前流中,这是最流行的操作符之一
* `groupBy()`:将当前`Observable`分为若干组可观察对象(`GroupedObservables`个对象)
* `map()`:使用提供的函数转换发出的值
......@@ -812,17 +812,17 @@ Observable.zip(obs1, obs2, obs1, (x,y,z) -> "("+x+y+z+")")
.subscribe(System.out::print); //prints: (oto)(nwn)(eoe)
```
# 从 XXX 转换
# 从`XXX`转换
这些运算符非常简单。以下是`Observable`类的 from XXX 操作符列表:
这些运算符非常简单。以下是`Observable`类的`XXX`转换操作符列表:
* `fromArray(T... items)`:从 varargs 创建`Observable`
* `fromArray(T... items)`:从可变参数创建`Observable`
* `fromCallable(Callable<T> supplier)`:从`Callable`函数创建`Observable`
* `fromFuture(Future<T> future)`:从`Future`对象创建`Observable`
* `fromFuture(Future<T> future, long timeout, TimeUnit unit)`:从`Future`对象创建`Observable`,超时参数应用于`future`
* `fromFuture(Future<T> future, long timeout, TimeUnit unit, Scheduler scheduler)`:从`Future`对象创建`Observable`,超时参数应用于`future`和调度程序(建议使用`Schedulers.io()`,请参阅“多线程(调度程序)”部分)
* `fromFuture(Future<T> future, Scheduler scheduler)`:从指定调度程序上的`Future`对象创建一个`Observable``Schedulers.io()`推荐,请参阅“多线程(调度程序)”部分)
* `fromIterable(Iterable<T> source)`:从 iterable 对象创建`Observable`(例如`List`
* `fromIterable(Iterable<T> source)`:从可迭代对象创建`Observable`(例如`List`
* `fromPublisher(Publisher<T> publisher)`:从`Publisher`对象创建`Observable`
# 异常处理
......@@ -831,7 +831,7 @@ Observable.zip(obs1, obs2, obs1, (x,y,z) -> "("+x+y+z+")")
但是,如果您需要在管道中间处理异常,以便值流可以由引发异常的操作符之后的其他操作符恢复和处理,那么以下操作符(及其多个重载版本)可以帮助您:
* `onErrorXXX()`:捕捉到异常时恢复提供的序列;XXX 表示运算符的操作:`onErrorResumeNext()``onErrorReturn()``onErrorReturnItem()`
* `onErrorXXX()`:捕捉到异常时恢复提供的序列;`XXX`表示运算符的操作:`onErrorResumeNext()``onErrorReturn()``onErrorReturnItem()`
* `retry()`:创建一个`Observable`,重复源发出的排放;如果调用`onError()`,则重新订阅源`Observable`
演示代码如下所示:
......@@ -859,20 +859,20 @@ Observable.error(new RuntimeException("MyException"))
这些操作符在管道中任何位置发生的特定事件上都被调用。它们的工作方式类似于“处理”部分中描述的操作符。
这些操作符的格式是`doXXX()`,其中 XXX 是事件的名称:`onComplete``onNext``onError`等。并不是所有的课程都有,有些课程在`Observable``Flowable``Single``Maybe``Completable`上略有不同。但是,我们没有空间列出所有这些类的所有变体,我们的概述将局限于`Observable`类的生命周期事件处理操作符的几个示例:
这些操作符的格式是`doXXX()`,其中`XXX`是事件的名称:`onComplete``onNext``onError`等。并不是所有的课程都有,有些课程在`Observable``Flowable``Single``Maybe``Completable`上略有不同。但是,我们没有空间列出所有这些类的所有变体,我们的概述将局限于`Observable`类的生命周期事件处理操作符的几个示例:
* `doOnSubscribe(Consumer<Disposable> onSubscribe)`:当观察者订阅时执行
* `doOnNext(Consumer<T> onNext)`:当源可观测调用`onNext`时,应用提供的`Consumer`功能
* `doAfterNext(Consumer<T> onAfterNext)`:将提供的`Consumer`功能推送到下游后应用于当前值
* `doOnEach(Consumer<Notification<T>> onNotification)`:对每个发出的值执行`Consumer`函数
* `doOnEach(Observer<T> observer)`:为每个发出的值及其发出的终端事件通知一个`Observer`
* `doOnComplete(Action onComplete)`:在源 observable 生成`onComplete`事件后,执行提供的`Action`函数
* `doOnComplete(Action onComplete)`:在源可观察对象生成`onComplete`事件后,执行提供的`Action`函数
* `doOnDispose(Action onDispose)`:管道被下游处理后执行提供的`Action`功能
* `doOnError(Consumer<Throwable> onError)`:发送`onError`事件时执行
* `doOnLifecycle(Consumer<Disposable> onSubscribe, Action onDispose)`:对相应的事件调用相应的`onSubscribe``onDispose`函数
* `doOnTerminate(Action onTerminate)`:当源可观测对象生成`onComplete`事件或引发异常(`onError`事件)时,执行提供的`Action`函数
* `doAfterTerminate(Action onFinally)`:在源可观测对象生成`onComplete`事件或引发异常(`onError`事件)后,执行提供的`Action`函数
* `doFinally(Action onFinally)`:在源 observable 生成`onComplete`事件或引发异常(`onError`事件)或下游处理管道后,执行提供的`Action`函数
* `doFinally(Action onFinally)`:在源可观测对象生成`onComplete`事件或引发异常(`onError`事件)或下游处理管道后,执行提供的`Action`函数
下面是演示代码:
......@@ -902,7 +902,7 @@ pauseMs(25);
* `dematerialize()`:反转`materialize()`运算符的结果
* `observeOn()`:指定`Observer`应遵守`Observable``Scheduler`(螺纹)(见“多线程(调度程序)”部分)
* `serialize()`:强制序列化发出的值和通知
* `subscribe()`:订阅一个 observable 的排放和通知;各种重载版本接受用于各种事件的回调,包括`onComplete``onError`;只有在调用`subscribe()`之后,值才开始流经管道
* `subscribe()`:订阅一个可观测对象的排放和通知;各种重载版本接受用于各种事件的回调,包括`onComplete``onError`;只有在调用`subscribe()`之后,值才开始流经管道
* `subscribeOn()`:使用指定的`Scheduler`异步订阅`Observer``Observable`(参见“多线程(调度程序)”部分)
* `timeInterval(), timestamp()`:将发出值的`Observable<T>`转换为`Observable<Timed<T>>`,然后相应地发出两次发射之间经过的时间量或时间戳
* `timeout()`:重复源`Observable`的发射;如果在指定的时间段后没有发射,则生成错误
......@@ -1003,7 +1003,7 @@ obs.onBackpressureLatest().subscribe();
# 多线程(调度程序)
默认情况下,RxJava 是单线程的。这意味着源 observable 及其所有操作符都会通知调用了`subscribe()`操作符的同一线程上的观察者。
默认情况下,RxJava 是单线程的。这意味着源可观测对象及其所有操作符都会通知调用了`subscribe()`操作符的同一线程上的观察者。
这里有两个操作符,`observeOn()``subscribeOn()`,允许将单个操作的执行移动到不同的线程。这些方法以一个`Scheduler`对象作为参数,该对象调度要在不同线程上执行的各个操作。
......
......@@ -67,7 +67,7 @@
为了演示与传统通信方法相比的替代方法,我们将使用 Vert.x,它是一个事件驱动的非阻塞轻量级多语言工具包。它允许您用 Java、JavaScript、Groovy、Ruby、Scala、Kotlin 和 Ceylon 编写组件。它支持一个异步编程模型和一个分布式事件总线,可以访问浏览器内的 JavaScript,从而允许创建实时 Web 应用程序。但是,由于本书的重点,我们将只使用 Java。
Vert.xAPI 有两个源代码树:第一个源代码树以`io.vertx.core`开头,第二个源代码树以`io.vertx.rxjava.core`开头。第二个源树是`io.vertx.core`类的反应版本。事实上,无功源树是基于非无功源的,所以这两个源树并不是不兼容的。相反,除了非反应式实现还提供了反应式版本。因为我们的讨论集中在反应式编程上,所以我们将主要使用`io.vertx.rxjava`源代码树的类和接口,也称为 **RX 化的 Vert.xapi**
Vert.xAPI 有两个源代码树:第一个源代码树以`io.vertx.core`开头,第二个源代码树以`io.vertx.rxjava.core`开头。第二个源树是`io.vertx.core`类的反应版本。事实上,无功源树是基于非无功源的,所以这两个源树并不是不兼容的。相反,除了非反应式实现还提供了反应式版本。因为我们的讨论集中在反应式编程上,所以我们将主要使用`io.vertx.rxjava`源代码树的类和接口,也称为 **RX 化的 Vert.x API**
首先,我们将向`pom.xml`文件添加以下依赖项,如下所示:
......@@ -105,13 +105,13 @@ Single<Message<String>> reply = vertx.eventBus().rxSend(address, msg);
`vertx`对象(它是`AbstractVerticle`的受保护属性,可用于每个垂直方向)允许访问事件总线和`rxSend()`调用方法。`Single<Message<String>>`返回值表示响应消息可以返回的回复;您可以订阅它,或者以任何其他方式处理它。
verticle 还可以注册为特定地址的消息接收器(使用者):
Verticle 还可以注册为特定地址的消息接收器(使用者):
```java
vertx.eventBus().consumer(address);
```
如果多个 verticle 注册为同一地址的消费者,那么`rxSend()`方法使用循环算法只将消息传递给这些消费者中的一个。
如果多个 Verticle 注册为同一地址的消费者,那么`rxSend()`方法使用循环算法只将消息传递给这些消费者中的一个。
或者,`publish()`方法可用于向使用相同地址注册的所有消费者传递消息:
......@@ -125,13 +125,13 @@ EventBus eb = vertx.eventBus().publish(address, msg);
# 微服务的反应系统
为了演示如果使用 Vert.x 实现,微服务的反应式系统会是什么样子,我们将创建一个 HTTP 服务器,它可以接受系统中基于 REST 的请求,向另一个 verticle 发送基于`EventBus`的消息,接收回复,并将响应发送回原始请求。
为了演示如果使用 Vert.x 实现,微服务的反应式系统会是什么样子,我们将创建一个 HTTP 服务器,它可以接受系统中基于 REST 的请求,向另一个 Verticle 发送基于`EventBus`的消息,接收回复,并将响应发送回原始请求。
为了演示这一切是如何工作的,我们还将编写一个程序,向系统生成 HTTP 请求,并允许您从外部测试系统。
# HTTP 服务器
让我们假设进入反应式系统演示的入口点是一个 HTTP 调用。这意味着我们需要创建一个充当 HTTP 服务器的 verticle。Vert.x 使这变得非常简单;下面垂直线中的三行就可以做到这一点:
让我们假设进入反应式系统演示的入口点是一个 HTTP 调用。这意味着我们需要创建一个充当 HTTP 服务器的 Verticle。Vert.x 使这变得非常简单;下面垂直线中的三行就可以做到这一点:
```java
HttpServer server = vertx.createHttpServer();
......@@ -265,7 +265,7 @@ private void processPostSomePathPublish(RoutingContext ctx){
从路径名可以猜到我们将使用`/some/path/send`路由发送`EventBus`消息,使用`/some/path/publish`路由发布`EventBus`消息,但是在实现相应的路由处理程序之前,我们先创建一个接收`EventBus`消息的垂直体。
# EventBus 消息接收器
# `EventBus`消息接收器
消息接收器的实现非常简单:
......@@ -331,7 +331,7 @@ RxHelper.deployVerticle(vertx, new MessageRcvVert("1", address));
如您所见,到达并执行了`MessageRcvVert`的最后一行,而创建的管道和我们传递给它的操作符的函数正在等待消息的发送。所以,我们现在就开始吧。
# EventBus 消息发送者
# `EventBus`消息发送者
正如我们所承诺的,我们现在将以更现实的方式重新实现`HttpServerVert`垂直面的处理程序。`GET`方法处理程序现在看起来像以下代码块:
......@@ -370,7 +370,7 @@ RxHelper.deployVerticle(vertx, new HttpServerVert(8082));
![](img/9e30c01f-6771-437d-8b70-5918d2e79525.png)
请注意,每个 verticle 都在自己的线程上运行。现在我们可以使用`curl`命令提交 HTTP`GET`请求,结果如下:
请注意,每个 Verticle 都在自己的线程上运行。现在我们可以使用`curl`命令提交 HTTP`GET`请求,结果如下:
![](img/bdaa1e47-f359-4eeb-85d6-de95fd9c9961.png)
......@@ -404,7 +404,7 @@ private void processPostSomePathSend(RoutingContext ctx){
}
```
对于 HTTP`POST`请求,我们希望发送 JSON 格式的有效负载,其值与我们作为 HTTP`GET`请求的参数发送的值相同。该方法的其余部分与`processGetSomePath()`实现非常相似。让我们再次部署`HttpServerVert``MessageRcvVert`verticles,然后用有效负载发出 HTTP`POST`请求,结果如下:
对于 HTTP`POST`请求,我们希望发送 JSON 格式的有效负载,其值与我们作为 HTTP`GET`请求的参数发送的值相同。该方法的其余部分与`processGetSomePath()`实现非常相似。让我们再次部署`HttpServerVert``MessageRcvVert` Verticles,然后用有效负载发出 HTTP`POST`请求,结果如下:
![](img/aade93a7-98e3-44a8-981a-564a72fbe360.png)
......@@ -448,7 +448,7 @@ private void processPostSomePathPublish(RoutingContext ctx){
# 反应系统演示
现在,让我们使用上一节中创建的 verticles 来组装和部署一个小型反应式系统:
现在,让我们使用上一节中创建的 Verticles 来组装和部署一个小型反应式系统:
```java
package com.packt.learnjava.ch16_microservices;
......@@ -466,7 +466,7 @@ public class ReactiveSystemDemo {
}
```
如您所见,我们将部署两个使用相同的`One`地址接收消息的 verticle 和一个使用`Two`地址的 verticle。如果我们运行上述程序,屏幕将显示以下消息:
如您所见,我们将部署两个使用相同的`One`地址接收消息的 Verticle 和一个使用`Two`地址的 Verticle。如果我们运行上述程序,屏幕将显示以下消息:
![](img/2f67915c-1df0-4f5a-be9f-12842ec550d6.png)
......@@ -490,9 +490,9 @@ public class ReactiveSystemDemo {
![](img/1ad7b15a-8f79-4890-b116-5108d2aaef11.png)
如您所见,`publish()`方法将消息发送到注册到指定地址的所有 verticle。注意,带有`ID="3"`(注册为`Two`地址)的 verticle 从未收到消息。
如您所见,`publish()`方法将消息发送到注册到指定地址的所有 Verticle。注意,带有`ID="3"`(注册为`Two`地址)的 Verticle 从未收到消息。
在我们结束这个被动系统演示之前,值得一提的是,Vert.x 允许您轻松地对 verticle 进行集群。您可以在 [Vert.x 文档](https://vertx.io/docs/vertx-core/java)中阅读此功能。
在我们结束这个被动系统演示之前,值得一提的是,Vert.x 允许您轻松地对 Verticle 进行集群。您可以在 [Vert.x 文档](https://vertx.io/docs/vertx-core/java)中阅读此功能。
# 摘要
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册