提交 35c0a4f1 编写于 作者: 梦境迷离's avatar 梦境迷离

Akka路由 继续,时间:2019-10-30 22:05:51

上级 9c532a49
......@@ -225,7 +225,7 @@ akka.actor.deployment {
在本节中,我们将描述如何创建不同类型的路由器Actor。
本节中的路由器Actor是从名为parent的顶级Actor中创建的。注意,配置中的部署路径以/parent/开头,后面跟着路由器Actor的名称。
本节中的路由器Actor是从名为parent的顶级Actor中创建的。注意,配置中的部署路径以/parent/开头,后面跟着路由器Actor的名称。下面都会演示各类路由器如何在代码中以编码的方式配置或如何在配置文件中配置(包括组和池)。
```scala
system.actorOf(Props[Parent], "parent")
......@@ -459,6 +459,292 @@ val router12: ActorRef =
#### BroadcastPool 和 BroadcastGroup
广播路由器将接收到的消息转发给所有路由者。
在配置中定义的BroadCastPool:
```
akka.actor.deployment {
/parent/router13 {
router = broadcast-pool
nr-of-instances = 5
}
}
```
```scala
val router13: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router13")
```
代码中定义的BroadCastPool:
```scala
val router14: ActorRef =
context.actorOf(BroadcastPool(5).props(Props[Worker]), "router14")
```
在配置中定义的BroadCastGroup:
```
akka.actor.deployment {
/parent/router15 {
router = broadcast-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
}
}
```
```scala
val router15: ActorRef =
context.actorOf(FromConfig.props(), "router15")
```
代码中定义的BroadCastGroup:
```scala
val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
val router16: ActorRef =
context.actorOf(BroadcastGroup(paths).props(), "router16")
```
广播路由器总是将每一条消息广播给他们的路由者。如果不想广播所有消息,则可以使用非广播路由器并根据需要使用广播消息。
#### ScatterGatherFirstCompletedPool 和 ScatterGatherFirstCompletedGroup
ScatterGatherFirstCompletedRouter将把信息发送给所有的路由者。然后等待它得到的第一个回复。此结果将被发送回原发件人。其他答复被丢弃。
它期望在配置的持续时间内至少有一个回复,否则它将在akka.actor.Status.Failure中使用akka.pattern.AskTimeoutException进行回复。
在配置中定义的ScatterGatherFirstCompletedPool:
```
akka.actor.deployment {
/parent/router17 {
router = scatter-gather-pool
nr-of-instances = 5
within = 10 seconds
}
}
```
```scala
val router17: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router17")
```
代码中定义的ScatterGatherFirstCompletedPool:
```scala
val router18: ActorRef =
context.actorOf(ScatterGatherFirstCompletedPool(5, within = 10.seconds).props(Props[Worker]), "router18")
```
在配置中定义的ScatterGatherFirstCompletedGroup:
```
akka.actor.deployment {
/parent/router19 {
router = scatter-gather-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
within = 10 seconds
}
}
```
```scala
val router19: ActorRef =
context.actorOf(FromConfig.props(), "router19")
```
代码中定义的ScatterGatherFirstCompletedGroup:
```scala
val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
val router20: ActorRef =
context.actorOf(ScatterGatherFirstCompletedGroup(paths, within = 10.seconds).props(), "router20")
```
#### TailChoppingPool 和 TailChoppingGroup
TailChoppingRouter首先将消息发送给一个随机选择的路由者,然后在一个小延迟后发送到第二个路由者(从其余的路由随机选择)等等。它等待第一次回复,然后返回给原始发件人。其他答复被丢弃。
该路由器的目标是通过对多个路由器执行冗余查询来减少延迟,前提是其它Actor之一的响应速度可能仍然比最初的更快。
PeterBailis的一篇博客文章很好地描述了这种优化:[做多余的工作来加速分布式查询](http://www.bailis.org/blog/doing-redundant-work-to-speed-up-distributed-queries/)
在配置中定义的TailChoppingPool:
```
akka.actor.deployment {
/parent/router21 {
router = tail-chopping-pool
nr-of-instances = 5
within = 10 seconds
tail-chopping-router.interval = 20 milliseconds
}
}
```
```scala
val router21: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router21")
```
代码中定义的TailChoppingPool:
```scala
val router22: ActorRef =
context.actorOf(TailChoppingPool(5, within = 10.seconds, interval = 20.millis).props(Props[Worker]), "router22")
```
在配置中定义的TailChoppingGroup:
```scala
akka.actor.deployment {
/parent/router23 {
router = tail-chopping-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
within = 10 seconds
tail-chopping-router.interval = 20 milliseconds
}
}
```
```scala
val router23: ActorRef =
context.actorOf(FromConfig.props(), "router23")
```
代码中定义的TailChoppingGroup:
```scala
val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
val router24: ActorRef =
context.actorOf(TailChoppingGroup(paths, within = 10.seconds, interval = 20.millis).props(), "router24")
```
#### ConsistentHashingPool 和 ConsistentHashingGroup
ConsistentHashingPool使用一致性Hash来根据发送的消息选择路由。[本文](http://www.tom-e-white.com/2007/11/consistent-hashing.html)很好地了解了如何实现一致散列。
有三种方法可以为一致性Hash key定义要使用的数据。
* 您可以定义路由器的hashMapping,以将传入的消息映射到其一致性Hash key。这使得决定对发送方透明。
* 这些消息可以实现akka.routing.ConsistentHashingRouter.ConsistentHashable。key是消息的一部分,与消息定义一起定义它很方便。
* 可以将消息包装在akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope中,以定义用一致性Hash key的数据。发送者知道要使用的密钥。
这些定义一致性Hash key的方法可以同时用于一个路由器。首先尝试hashMapping。
Code example:
```scala
import akka.actor.Actor
import akka.routing.ConsistentHashingRouter.ConsistentHashable
class Cache extends Actor {
var cache = Map.empty[String, String]
def receive = {
case Entry(key, value) => cache += (key -> value)
case Get(key) => sender() ! cache.get(key)
case Evict(key) => cache -= key
}
}
final case class Evict(key: String)
final case class Get(key: String) extends ConsistentHashable {
override def consistentHashKey: Any = key
}
final case class Entry(key: String, value: String)
```
```scala
import akka.actor.Props
import akka.routing.ConsistentHashingPool
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
def hashMapping: ConsistentHashMapping = {
case Evict(key) => key
}
val cache: ActorRef =
context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping).props(Props[Cache]), name = "cache")
cache ! ConsistentHashableEnvelope(message = Entry("hello", "HELLO"), hashKey = "hello")
cache ! ConsistentHashableEnvelope(message = Entry("hi", "HI"), hashKey = "hi")
cache ! Get("hello")
expectMsg(Some("HELLO"))
cache ! Get("hi")
expectMsg(Some("HI"))
cache ! Evict("hi")
cache ! Get("hi")
expectMsg(None)
```
在上面的示例中,您可以看到GET消息实现了ConsistentHasable本身,而Entry消息被包装在ConsistentHashableEnKabe中。Evict消息由hashMapping部分函数处理。
ConsistentHashingPool在配置中定义:
```
akka.actor.deployment {
/parent/router25 {
router = consistent-hashing-pool
nr-of-instances = 5
virtual-nodes-factor = 10
}
}
```
```scala
val router25: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router25")
```
在代码中定义的ConsistentHashingPool:
```scala
val router26: ActorRef =
context.actorOf(ConsistentHashingPool(5).props(Props[Worker]), "router26")
```
ConsistentHashingGroup在配置中定义:
```
akka.actor.deployment {
/parent/router27 {
router = consistent-hashing-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
virtual-nodes-factor = 10
}
}
```
```scala
val router27: ActorRef =
context.actorOf(FromConfig.props(), "router27")
```
在代码中定义的ConsistentHashingGroup:
```scala
val paths = List("/user/workers/w1", "/user/workers/w2", "/user/workers/w3")
val router28: ActorRef =
context.actorOf(ConsistentHashingGroup(paths).props(), "router28")
```
virtual-nodes-factor是在一致性hash节点环中使用的每个路由者的虚拟节点数,以使分布更加均匀。
### 特别处理的消息
未完。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册