提交 8ca0c4e2 编写于 作者: 梦境迷离's avatar 梦境迷离

akka 路由,时间:2019-11-02 11:04:31

上级 84a43aba
......@@ -10,7 +10,7 @@ description: 介绍Actor的调度器(dispatcher)及其使用
* 目录
{:toc}
Akka的消息调度器(MessageDispatcher)是使Akka Actors“滴答”的东西,可以说它是机器的引擎。所有MessageDispatcher实现也都是一个ExecutionContext(参考“Scala的Future解读”),这意味着它们可以用于执行任意代码,例如 Future 实例。以下调度器与调度程序等同,均指Dispatcher。
Akka的消息调度器(MessageDispatcher)是使Akka Actors“滴答”的东西,可以说它是机器的引擎。所有MessageDispatcher实现也都是一个ExecutionContext(参考“Scala的Future解读”),这意味着它们可以用于执行任意代码,例如 Future 实例。以下调度器与调度程序等同,均指Dispatcher,可以理解为Spring的Servlet分派器
### 默认的调度器
......
......@@ -815,9 +815,245 @@ router ! Kill
### 动态调整大小的池
大多数池可以与固定数目的路由者一起使用,也可以通过调整大小的策略来动态地调整路由者的数量。
未完
有两种resizers(支持调整池大小)类型:默认Resizer和OptimalSizeExploringResizer
#### Default Resizer
默认的resizer根据压力调整池大小,以池中繁忙的路由者的百分比来衡量。当压力高于某一阈值时,它会增大池的大小;如果压力低于某一阈值,则会后退。这两个阈值都是可配置的,类似HashMap平衡因子。
配置中定义了默认resizer的池大小:
```
akka.actor.deployment {
/parent/router29 {
router = round-robin-pool
resizer {
lower-bound = 2
upper-bound = 15
messages-per-resize = 100
}
}
}
```
```scala
val router29: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router29")
```
还有几个配置选项可用,并在[配置](https://doc.akka.io/docs/akka/current/general/configuration.html)的akka.actor.deployment.default.resizer部分中进行了描述。
在代码中定义的resizer的池大小:
```scala
val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
val router30: ActorRef =
context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]), "router30")
```
还值得指出的是,如果在配置文件中定义路由器,则将使用此值而不是以编程方式发送的任何参数。即外部配置能覆盖编码的配置。
#### Optimal Size Exploring Resizer
OptimalSizeExploringResizer将池调整到提供最多消息吞吐量的最佳大小。我们暂称这个为调整器。
当您期望性能函数的池大小是凸函数时,此调整器最有效。例如,当您有CPU绑定的任务时,最佳大小将与CPU内核数绑定。当您的任务受IO限制时,最佳大小将限制为与该IO服务的并发连接的最佳数目--例如4节点弹性搜索集群可以最佳速度处理4-8个并发请求。
它通过跟踪每种池大小的消息吞吐量并定期执行以下三个调整大小操作(一次调整一次)来实现此目的:
* 如果在一段时间内没有看到所有路由者都被充分利用,请减小尺寸。
* 探索附近的随机池大小,以尝试收集吞吐量指标。
* 以更好的吞吐量指标(比其他任何附近的指标都更好)优化到附近的泳池大小
当池被充分利用时(即所有路由者都处于繁忙状态),它会在探索和优化之间随机选择。如果一段时间未完全使用该池,它会将该池的大小减小到最后看到的最大利用率乘以可配置的比率。
通过不断地探索和优化,调整器最终将达到最佳尺寸并保持在附近。当最佳尺寸更改时,它将开始朝着新尺寸前进。
它保留了性能日志,使其具有状态,并且具有比默认Resizer大的内存占用量。内存使用量为O(n),其中n是您允许的大小数,即upperBound-lowerBound
配置中定义的具有OptimalSizeExplorerResizer的池:
```
akka.actor.deployment {
/parent/router31 {
router = round-robin-pool
optimal-size-exploring-resizer {
enabled = on
action-interval = 5s
downsize-after-underutilized-for = 72h
}
}
}
```
```scala
val router31: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router31")
```
[配置](https://doc.akka.io/docs/akka/current/general/configuration.html)的akka​​.actor.deployment.default.optimal-size-exploring-resizer部分中提供了更多可用的配置选项并进行了描述。
调整大小是通过向Actor池发送消息来触发的,但是没有同步完成;相反,将消息发送到“Head”RouterActor以执行大小更改。因此,您不能依靠调整大小来在所有其他工作人员繁忙时立即创建新的工作人员,因为刚刚发送的消息将被排队到繁忙的Actor的邮箱中。若要解决此问题,请将池配置为使用平衡调度器,有关更多信息,请参见本博客的“Actor的调度器”。
### 如何在Akka内设计路由
从表面上看,路由器看起来像普通的Actor,但实际上它们的实现方式不同。路由器被设计成能够非常高效地接收信息并迅速传递给路由者。
普通的Actor可以用来路由消息,但是Actor的单线程处理可能成为瓶颈。路由器可以通过优化允许并发路由的message-processing管道来实现更高的吞吐量。这是通过将路由器的路由逻辑直接嵌入到其ActorRef中而不是在路由器Actor中实现的。发送到路由器的ActorRef的消息可以立即路由到路由者,完全绕过单线程路由器Actor。
这样做的代价是,路由代码的内部结构比路由器是用正常的Actor实现的要复杂得多。幸运的是,对于路由API的使用者来说,所有这些复杂性都是不可见的。但是,在实现自己的路由器时,应该注意到这一点。
### 自定义路由
如果找不到Akka提供的足以满足您需求的路由器,则可以创建自己的路由器。为了推出自己的路由器,您必须满足本节中说明的某些条件。
在创建自己的路由器之前,应考虑具有类似路由器行为的正常Actor是否可以像成熟的路由器一样完成这项工作。如上所述,与普通Actor相比,路由器的主要好处是其更高的性能。但是它们的编写比普通Actor要复杂一些。因此,如果在您的应用程序中可接受较低的最大吞吐量,则您可能希望坚持使用传统Actor。但是,本部分假定您希望获得最佳性能,因此将演示如何创建自己的路由器。
在本例中创建的路由器正在将每条消息复制到几个目的地。
从路由逻辑开始:
```scala
import scala.collection.immutable
import java.util.concurrent.ThreadLocalRandom
import akka.routing.RoundRobinRoutingLogic
import akka.routing.RoutingLogic
import akka.routing.Routee
import akka.routing.SeveralRoutees
class RedundancyRoutingLogic(nbrCopies: Int) extends RoutingLogic {
val roundRobin = RoundRobinRoutingLogic()
def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = {
val targets = (1 to nbrCopies).map(_ => roundRobin.select(message, routees))
SeveralRoutees(targets)
}
}
```
将为每个消息调用select,在此示例中,通过循环使用,通过重用现有的RoundRobinRoutingLogic并将结果包装在SeveralRoutees实例中,通过循环选择一些目的地。SeveralRoutees会将消息发送到所有提供的路由
路由逻辑的实现必须是线程安全的,因为它可能在Actor之外使用。
```scala
final case class TestRoutee(n: Int) extends Routee {
override def send(message: Any, sender: ActorRef): Unit = ()
}
val logic = new RedundancyRoutingLogic(nbrCopies = 3)
val routees = for (n <- 1 to 7) yield TestRoutee(n)
val r1 = logic.select("msg", routees)
r1.asInstanceOf[SeveralRoutees].routees should be(Vector(TestRoutee(1), TestRoutee(2), TestRoutee(3)))
val r2 = logic.select("msg", routees)
r2.asInstanceOf[SeveralRoutees].routees should be(Vector(TestRoutee(4), TestRoutee(5), TestRoutee(6)))
val r3 = logic.select("msg", routees)
r3.asInstanceOf[SeveralRoutees].routees should be(Vector(TestRoutee(7), TestRoutee(1), TestRoutee(2)))
```
您可以在此处停止,并按照[简单路由器](https://doc.akka.io/docs/akka/current/routing.html#simple-router)中所述将RedundancyRoutingLogic与akka.routing.Router一起使用。
让我们继续,并使其成为一个独立的,可配置的,路由器Actor。
创建一个扩展Pool、Group或CustomRouterConfig的类。该类是路由逻辑的工厂,包含路由器的配置。在这里,我们把它变成一个Group。
```scala
import akka.dispatch.Dispatchers
import akka.routing.Group
import akka.routing.Router
import akka.japi.Util.immutableSeq
import com.typesafe.config.Config
final case class RedundancyGroup(routeePaths: immutable.Iterable[String], nbrCopies: Int) extends Group {
def this(config: Config) =
this(routeePaths = immutableSeq(config.getStringList("routees.paths")), nbrCopies = config.getInt("nbr-copies"))
override def paths(system: ActorSystem): immutable.Iterable[String] = routeePaths
override def createRouter(system: ActorSystem): Router =
new Router(new RedundancyRoutingLogic(nbrCopies))
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
```
这完全可以当Akka提供的路由器Actor来使用。
```scala
for (n <- 1 to 10) system.actorOf(Props[Storage], "s" + n)
val paths = for (n <- 1 to 10) yield ("/user/s" + n)
val redundancy1: ActorRef =
system.actorOf(RedundancyGroup(paths, nbrCopies = 3).props(), name = "redundancy1")
redundancy1 ! "important"
```
注意,我们在RedundancyGroup中添加了一个构造函数,它接受Config参数。这使得在配置中定义它成为可能。
```
akka.actor.deployment {
/redundancy2 {
router = "jdocs.routing.RedundancyGroup"
routees.paths = ["/user/s1", "/user/s2", "/user/s3"]
nbr-copies = 5
}
}
```
注意路由器属性中的完全限定类名。路由器类必须扩展akka.routing.RouterConfig(Pool、Group或CustomRouterConfig),并具有一个com.typesafe.config.Config参数的构造函数。配置的部署部分将传递给构造函数。
```scala
val redundancy2: ActorRef = system.actorOf(FromConfig.props(), name = "redundancy2")
redundancy2 ! "very important"
```
### 配置调度器
[调度器](https://doc.akka.io/docs/akka/current/dispatchers.html)所述,池中已创建孩子的调度器将从Props中获取。
为了轻松定义池路由的调度器,您可以在配置的部署部分内联定义调度器。
```
akka.actor.deployment {
/poolWithDispatcher {
router = random-pool
nr-of-instances = 5
pool-dispatcher {
fork-join-executor.parallelism-min = 5
fork-join-executor.parallelism-max = 5
}
}
}
```
这是唯一需要为池启用专用调度程序的操作。
注意:如果使用一组Actor并路由到其路径,则它们仍将使用在其Props中为其配置的同一调度程序,因此在创建Actor调度程序后将无法对其进行更改。
“Head”路由器不能总是在同一个Dispatcher上运行,因为它不处理相同类型的消息,因此这个特殊的Actor不使用配置在Props中的调度程序,而是从RouterConfig获取路由器Dispatcher,默认为Actor系统的默认调度程序。所有标准路由器都允许在其构造函数或工厂方法中设置此属性,自定义路由器必须以适当的方式实现该方法。
```scala
val router: ActorRef = system.actorOf(
// “head” router actor will run on "router-dispatcher" dispatcher
// Worker routees will run on "pool-dispatcher" dispatcher
RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
name = "poolWithDispatcher")
```
```scala
val router: ActorRef = system.actorOf(
// “head” router actor will run on "router-dispatcher" dispatcher
// Worker routees will run on "pool-dispatcher" dispatcher
RandomPool(5, routerDispatcher = "router-dispatcher").props(Props[Worker]),
name = "poolWithDispatcher")
```
注意:不允许将routerDispatcher配置为akka.dispatch.BalancingDispatcherConfigurator,因为其他任何Actor都不能处理用于特殊路由器Actor的消息。
* 使用搜狗翻译、百度翻译、谷歌翻译,仅供参考
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册