From 35c0a4f11de579406049062965f02907b5d652a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=A6=E5=A2=83=E8=BF=B7=E7=A6=BB?= Date: Wed, 30 Oct 2019 22:05:52 +0800 Subject: [PATCH] =?UTF-8?q?Akka=E8=B7=AF=E7=94=B1=20=E7=BB=A7=E7=BB=AD?= =?UTF-8?q?=EF=BC=8C=E6=97=B6=E9=97=B4=EF=BC=9A2019-10-30=2022:05:51?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...56\344\270\216\344\275\277\347\224\250.md" | 288 +++++++++++++++++- 1 file changed, 287 insertions(+), 1 deletion(-) diff --git "a/docs/_posts/2019-10-27-Akka-Actor\347\232\204\350\267\257\347\224\261\351\205\215\347\275\256\344\270\216\344\275\277\347\224\250.md" "b/docs/_posts/2019-10-27-Akka-Actor\347\232\204\350\267\257\347\224\261\351\205\215\347\275\256\344\270\216\344\275\277\347\224\250.md" index af94baf1..823229b1 100644 --- "a/docs/_posts/2019-10-27-Akka-Actor\347\232\204\350\267\257\347\224\261\351\205\215\347\275\256\344\270\216\344\275\277\347\224\250.md" +++ "b/docs/_posts/2019-10-27-Akka-Actor\347\232\204\350\267\257\347\224\261\351\205\215\347\275\256\344\270\216\344\275\277\347\224\250.md" @@ -225,7 +225,7 @@ akka.actor.deployment { 在本节中,我们将描述如何创建不同类型的路由器Actor。 -本节中的路由器Actor是从名为parent的顶级Actor中创建的。注意,配置中的部署路径以/parent/开头,后面跟着路由器Actor的名称。 +本节中的路由器Actor是从名为parent的顶级Actor中创建的。注意,配置中的部署路径以/parent/开头,后面跟着路由器Actor的名称。下面都会演示各类路由器如何在代码中以编码的方式配置或如何在配置文件中配置(包括组和池)。 ```scala system.actorOf(Props[Parent], "parent") @@ -459,6 +459,292 @@ val router12: ActorRef = #### BroadcastPool 和 BroadcastGroup +广播路由器将接收到的消息转发给所有路由者。 + +在配置中定义的BroadCastPool: + +``` +akka.actor.deployment { + /parent/router13 { + router = broadcast-pool + nr-of-instances = 5 + } +} +``` + +```scala +val router13: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router13") +``` + +代码中定义的BroadCastPool: + +```scala +val router14: ActorRef = + context.actorOf(BroadcastPool(5).props(Props[Worker]), "router14") +``` + +在配置中定义的BroadCastGroup: + +``` +akka.actor.deployment { + /parent/router15 { + router = broadcast-group + routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] + } +} +``` + +```scala +val router15: ActorRef = + context.actorOf(FromConfig.props(), "router15") +``` + +代码中定义的BroadCastGroup: + +```scala +val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3") +val router16: ActorRef = + context.actorOf(BroadcastGroup(paths).props(), "router16") +``` + + 广播路由器总是将每一条消息广播给他们的路由者。如果不想广播所有消息,则可以使用非广播路由器并根据需要使用广播消息。 + +#### ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup + +ScatterGatherFirstCompletedRouter将把信息发送给所有的路由者。然后等待它得到的第一个回复。此结果将被发送回原发件人。其他答复被丢弃。 + +它期望在配置的持续时间内至少有一个回复,否则它将在akka.actor.Status.Failure中使用akka.pattern.AskTimeoutException进行回复。 + +在配置中定义的ScatterGatherFirstCompletedPool: + +``` +akka.actor.deployment { + /parent/router17 { + router = scatter-gather-pool + nr-of-instances = 5 + within = 10 seconds + } +} +``` + +```scala +val router17: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router17") +``` + +代码中定义的ScatterGatherFirstCompletedPool: + +```scala +val router18: ActorRef = + context.actorOf(ScatterGatherFirstCompletedPool(5, within = 10.seconds).props(Props[Worker]), "router18") +``` + +在配置中定义的ScatterGatherFirstCompletedGroup: + +``` +akka.actor.deployment { + /parent/router19 { + router = scatter-gather-group + routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] + within = 10 seconds + } +} +``` + +```scala +val router19: ActorRef = + context.actorOf(FromConfig.props(), "router19") +``` + +代码中定义的ScatterGatherFirstCompletedGroup: + +```scala +val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3") +val router20: ActorRef = + context.actorOf(ScatterGatherFirstCompletedGroup(paths, within = 10.seconds).props(), "router20") +``` + +#### TailChoppingPool 和 TailChoppingGroup + +TailChoppingRouter首先将消息发送给一个随机选择的路由者,然后在一个小延迟后发送到第二个路由者(从其余的路由随机选择)等等。它等待第一次回复,然后返回给原始发件人。其他答复被丢弃。 + +该路由器的目标是通过对多个路由器执行冗余查询来减少延迟,前提是其它Actor之一的响应速度可能仍然比最初的更快。 + +PeterBailis的一篇博客文章很好地描述了这种优化:[做多余的工作来加速分布式查询](http://www.bailis.org/blog/doing-redundant-work-to-speed-up-distributed-queries/)。 + +在配置中定义的TailChoppingPool: + +``` +akka.actor.deployment { + /parent/router21 { + router = tail-chopping-pool + nr-of-instances = 5 + within = 10 seconds + tail-chopping-router.interval = 20 milliseconds + } +} +``` + +```scala +val router21: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router21") +``` + +代码中定义的TailChoppingPool: + +```scala +val router22: ActorRef = + context.actorOf(TailChoppingPool(5, within = 10.seconds, interval = 20.millis).props(Props[Worker]), "router22") +``` + +在配置中定义的TailChoppingGroup: + +```scala +akka.actor.deployment { + /parent/router23 { + router = tail-chopping-group + routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] + within = 10 seconds + tail-chopping-router.interval = 20 milliseconds + } +} +``` + +```scala +val router23: ActorRef = + context.actorOf(FromConfig.props(), "router23") +``` + +代码中定义的TailChoppingGroup: + +```scala +val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3") +val router24: ActorRef = + context.actorOf(TailChoppingGroup(paths, within = 10.seconds, interval = 20.millis).props(), "router24") +``` + +#### ConsistentHashingPool 和 ConsistentHashingGroup + +ConsistentHashingPool使用一致性Hash来根据发送的消息选择路由。[本文](http://www.tom-e-white.com/2007/11/consistent-hashing.html)很好地了解了如何实现一致散列。 + +有三种方法可以为一致性Hash key定义要使用的数据。 + +* 您可以定义路由器的hashMapping,以将传入的消息映射到其一致性Hash key。这使得决定对发送方透明。 +* 这些消息可以实现akka.routing.ConsistentHashingRouter.ConsistentHashable。key是消息的一部分,与消息定义一起定义它很方便。 +* 可以将消息包装在akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope中,以定义用一致性Hash key的数据。发送者知道要使用的密钥。 + +这些定义一致性Hash key的方法可以同时用于一个路由器。首先尝试hashMapping。 + +Code example: + +```scala +import akka.actor.Actor +import akka.routing.ConsistentHashingRouter.ConsistentHashable + +class Cache extends Actor { + var cache = Map.empty[String, String] + + def receive = { + case Entry(key, value) => cache += (key -> value) + case Get(key) => sender() ! cache.get(key) + case Evict(key) => cache -= key + } +} + +final case class Evict(key: String) + +final case class Get(key: String) extends ConsistentHashable { + override def consistentHashKey: Any = key +} + +final case class Entry(key: String, value: String) +``` + +```scala +import akka.actor.Props +import akka.routing.ConsistentHashingPool +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope + +def hashMapping: ConsistentHashMapping = { + case Evict(key) => key +} + +val cache: ActorRef = + context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping).props(Props[Cache]), name = "cache") + +cache ! ConsistentHashableEnvelope(message = Entry("hello", "HELLO"), hashKey = "hello") +cache ! ConsistentHashableEnvelope(message = Entry("hi", "HI"), hashKey = "hi") + +cache ! Get("hello") +expectMsg(Some("HELLO")) + +cache ! Get("hi") +expectMsg(Some("HI")) + +cache ! Evict("hi") +cache ! Get("hi") +expectMsg(None) +``` + +在上面的示例中,您可以看到GET消息实现了ConsistentHasable本身,而Entry消息被包装在ConsistentHashableEnKabe中。Evict消息由hashMapping部分函数处理。 + +ConsistentHashingPool在配置中定义: + +``` +akka.actor.deployment { + /parent/router25 { + router = consistent-hashing-pool + nr-of-instances = 5 + virtual-nodes-factor = 10 + } +} +``` + +```scala +val router25: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router25") +``` + +在代码中定义的ConsistentHashingPool: + +```scala +val router26: ActorRef = + context.actorOf(ConsistentHashingPool(5).props(Props[Worker]), "router26") +``` + +ConsistentHashingGroup在配置中定义: + +``` +akka.actor.deployment { + /parent/router27 { + router = consistent-hashing-group + routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] + virtual-nodes-factor = 10 + } +} +``` + +```scala +val router27: ActorRef = + context.actorOf(FromConfig.props(), "router27") +``` + +在代码中定义的ConsistentHashingGroup: + +```scala +val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3") +val router28: ActorRef = + context.actorOf(ConsistentHashingGroup(paths).props(), "router28") +``` + +virtual-nodes-factor是在一致性hash节点环中使用的每个路由者的虚拟节点数,以使分布更加均匀。 + +### 特别处理的消息 + + + 未完。 -- GitLab