提交 a3de78b3 编写于 作者: 梦境迷离's avatar 梦境迷离

akka-http 服务端核心API,时间:2019-11-19 23:01:13

上级 9cee324c
......@@ -40,7 +40,7 @@ description: 主要介绍Lightbend平台的Akka与Java内存模型的关系
* actor随后的处理规则:actor对一条消息进行处理,发生在同一actor处理下一条消息之前。
注意:用外行的术语来说,这意味着当actor处理下一条消息时,对actor内部字段的更改是可见的。因此,actor中的字段不一定是易失性的或等效的。
> 用外行的术语来说,这意味着当actor处理下一条消息时,对actor内部字段的更改是可见的。因此,actor中的字段不一定是易失性的或等效的。
这两条规则只适用于同一个actor实例,如果使用不同的actor,则无效。
......
......@@ -36,7 +36,7 @@ Akka实施了一种称为“父母监督”的特定形式。actor只能由其
这种限制使得actor监督等级的形成隐含并鼓励合理的设计决策。应该指出的是,这也保证了行为者不可能成为孤儿,也不会被外部的监督者所依附,否则他们可能会不知不觉地被抓到。
此外,这将为actor应用程序(子树)提供一个自然而干净的关闭过程。
注意:监控相关的父-子通信发生在特殊的系统消息中,这些邮件有自己的邮箱,与用户消息分开。
> 监控相关的父-子通信发生在特殊的系统消息中,这些邮件有自己的邮箱,与用户消息分开。
这意味着,相对于普通消息,与监督相关的事件并不具有决定性的有序性。通常,用户不能影响正常消息和失败通知的顺序。
![监督图](../public/image/actor-guardian.png)
......@@ -86,7 +86,7 @@ AllForOneStrategy适用于以下情况:儿童群体之间有非常紧密的依
通常,使用“全对一”策略时,停止一个子程序(即不响应失败)不会自动终止其他子级的;这可以通过观察它们的生命周期来完成:如果终止的消息没有由主管处理,
它将抛出DeathPactException,该异常(取决于其主管)将重新启动它,默认的preRestart操作将终止所有的子级。当然,这也可以显式地处理。
注意,从“一对一”管理者创建一次性actor意味着临时actor升级的失败将影响所有永久actor。如果不需要,请安装一个中间管理器;这可以通过为工作进程声明一个大小为1的路由器来完成。
> 从“一对一”管理者创建一次性actor意味着临时actor升级的失败将影响所有永久actor。如果不需要,请安装一个中间管理器;这可以通过为工作进程声明一个大小为1的路由器来完成。
### 使用生命周期监控
......
......@@ -51,7 +51,7 @@ my-dispatcher {
}
```
注意:parallelism-max没有设置ForkJoinPool分配的线程总数的上限。它是一个专门设置会话池持续运行的热线程数,以减少处理新传入任务的延迟。您可以在JDK的[ForkJoinPool](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html)文档中阅读更多有关并行性的内容。
> parallelism-max没有设置ForkJoinPool分配的线程总数的上限。它是一个专门设置会话池持续运行的热线程数,以减少处理新传入任务的延迟。您可以在JDK的[ForkJoinPool](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html)文档中阅读更多有关并行性的内容。
另一个使用“thread-pool-executor”的示例:
......@@ -93,7 +93,7 @@ val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
```
注意:您在WithDispatcher中指定的Dispatcher和部署配置中的Dispatcher属性实际上是配置的路径。所以在这个例子中它是一个属于顶级层次结构的调度器,你也可以把它作为一个属于子级别层次的调度器,你需要使用点来表示层级关系,比如:“foo.bar.my dispatcher”。actor的层次结构说明请参考“监督与监控”。
> 您在WithDispatcher中指定的Dispatcher和部署配置中的Dispatcher属性实际上是配置的路径。所以在这个例子中它是一个属于顶级层次结构的调度器,你也可以把它作为一个属于子级别层次的调度器,你需要使用点来表示层级关系,比如:“foo.bar.my dispatcher”。actor的层次结构说明请参考“监督与监控”。
### 调度器类型
......@@ -202,7 +202,7 @@ val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor3")
```
注意:根据上面的my-thread-pool-dispatcher示例,thread-pool-executor配置是不适用的。这是因为当使用PinnedDispatcher时,每个actor都会有自己的线程池,而这个线程池只有一个线程。
> 根据上面的my-thread-pool-dispatcher示例,thread-pool-executor配置是不适用的。这是因为当使用PinnedDispatcher时,每个actor都会有自己的线程池,而这个线程池只有一个线程。
不能保证随着时间的推移使用相同的线程,因为PinnedDispatcher有超时设置,以便在空闲actor的情况下减少资源使用。要始终使用相同的线程,您需要向PinnedDispatcher的配置中添加thread-pool-executor.allow-core-timeout=off。
......@@ -251,7 +251,7 @@ implicit val executionContext: ExecutionContext = context.dispatcher
如果所有可用线程都被阻塞,那么同一调度程序上的所有actor都将急需线程,并且无法处理传入的消息。
注意:如果可能的话,也应该避免阻塞API。尝试查找或构建反应式API,以便将阻塞最小化,或转移到专用调度程序。通常,当与现有库或系统集成时,不可能避免阻塞API。下面的解决方案解释如何正确处理阻塞操作。请注意,同样的提示适用于管理Akka中任何地方的阻塞操作,包括streams、http和其他构建在其之上的反应式库。
> 如果可能的话,也应该避免阻塞API。尝试查找或构建反应式API,以便将阻塞最小化,或转移到专用调度程序。通常,当与现有库或系统集成时,不可能避免阻塞API。下面的解决方案解释如何正确处理阻塞操作。请注意,同样的提示适用于管理Akka中任何地方的阻塞操作,包括streams、http和其他构建在其之上的反应式库。
```scala
class PrintActor extends Actor {
......
......@@ -50,7 +50,7 @@ akka.actor.mailbox.requirements {
现在,每次创建MyBoundedActor类型的Actor时,它都会尝试获得一个有界的邮箱。如果Actor在部署中配置了不同的邮箱(直接或通过具有指定邮箱类型的dispatcher),则将覆盖此映射配置。
注意:为Actor创建的邮箱中的队列类型将根据该特质中的所需类型进行检查,如果队列没有实现所需的类型,则Actor的创建将失败。
> 为Actor创建的邮箱中的队列类型将根据该特质中的所需类型进行检查,如果队列没有实现所需的类型,则Actor的创建将失败。
#### 调度器需要的消息队列类型
......@@ -139,7 +139,7 @@ Akka附带了许多邮箱实现,如下所示:
当达到容量并配置的mailbox-push-timeout-time为非0,其他有界限的邮箱实现将阻止发送者。
注意:以下邮箱只应与mailbox-push-timeout-time为0的一起使用,因为当mailbox-push-timeout-time非0时,下面所有的信箱都是阻塞的,反应式不推荐使用阻塞,万不得已也应当隔离出阻塞操作到独立的调度线程。
> 以下邮箱只应与mailbox-push-timeout-time为0的一起使用,因为当mailbox-push-timeout-time非0时,下面所有的信箱都是阻塞的,反应式不推荐使用阻塞,万不得已也应当隔离出阻塞操作到独立的调度线程。
* BoundedMailbox
* 实现:基于java.util.concurrent.LinkedBlockingQueue
......@@ -342,7 +342,7 @@ boundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.
然后将MailboxType的FQCN指定为dispatcher配置或邮箱配置中“mailbox-type”的值。
注意:确保包含一个构造函数,该构造函数需要akka.actor.ActorSystem.Settings和com.ypesafe.config.Config参数,因为此构造函数是以反射方式调用来构造你的邮箱类型的。作为第二个参数传入的配置是配置中使用此邮箱类型描述dispatcher或邮箱设置的部分;将对使用该配置类型的每个dispatcher或邮箱设置实例化邮箱类型一次。
> 确保包含一个构造函数,该构造函数需要akka.actor.ActorSystem.Settings和com.ypesafe.config.Config参数,因为此构造函数是以反射方式调用来构造你的邮箱类型的。作为第二个参数传入的配置是配置中使用此邮箱类型描述dispatcher或邮箱设置的部分;将对使用该配置类型的每个dispatcher或邮箱设置实例化邮箱类型一次。
您还可以使用邮箱作为调度器的要求(requirement ),如下所示:
......
......@@ -147,7 +147,7 @@ sender().!("reply")(context.parent) // alternative syntax (beware of the parens!
路由器Actor的监督策略可以配置为池的supervisorStrategy属性。如果没有提供配置,路由器默认为“总是升级(always escalate)”策略。这意味着错误会传递给路由器的主管进行处理。路由器的主管将决定如何处理任何错误。
注意路由器的主管将把这个错误当作路由器本身的错误来处理。因此,停止或重新启动的指令将导致路由器本身停止或重新启动。路由器反过来会导致其子节点停止并重新启动。
> 路由器的主管将把这个错误当作路由器本身的错误来处理。因此,停止或重新启动的指令将导致路由器本身停止或重新启动。路由器反过来会导致其子节点停止并重新启动。
应该指出的是,路由器的重新启动行为已经被覆盖,这样,重新启动时,同时仍然重新创建孩子,仍然将保持相同数目的Actor在池中。
......@@ -760,7 +760,7 @@ router ! Broadcast("Watch out for Davy Jones' locker")
在本例中,路由器接收广播消息,提取其有效载荷(“Watch out for Davy Jones' locker”),然后将有效载荷发送给路由器的所有路由者。由每个路由者Actor来处理接收到的有效载荷消息。
注意:当您对路由器使用BalancingPool时,不要使用广播消息。BalancingPool上的路由者们共享相同的邮箱实例,因此一些路由者可能会多次获得广播消息,而其他的路由者则得不到广播消息。
> 当您对路由器使用BalancingPool时,不要使用广播消息。BalancingPool上的路由者们共享相同的邮箱实例,因此一些路由者可能会多次获得广播消息,而其他的路由者则得不到广播消息。
#### 毒丸消息
......@@ -993,7 +993,7 @@ val redundancy1: ActorRef =
redundancy1 ! "important"
```
注意,我们在RedundancyGroup中添加了一个构造函数,它接受Config参数。这使得在配置中定义它成为可能。
> 我们在RedundancyGroup中添加了一个构造函数,它接受Config参数。这使得在配置中定义它成为可能。
```
akka.actor.deployment {
......@@ -1033,7 +1033,7 @@ akka.actor.deployment {
这是唯一需要为池启用专用调度程序的操作。
注意:如果使用一组Actor并路由到其路径,则它们仍将使用在其Props中为其配置的同一调度程序,因此在创建Actor调度程序后将无法对其进行更改。
> 如果使用一组Actor并路由到其路径,则它们仍将使用在其Props中为其配置的同一调度程序,因此在创建Actor调度程序后将无法对其进行更改。
“Head”路由器不能总是在同一个Dispatcher上运行,因为它不处理相同类型的消息,因此这个特殊的Actor不使用配置在Props中的调度程序,而是从RouterConfig获取路由器Dispatcher,默认为Actor系统的默认调度程序。所有标准路由器都允许在其构造函数或工厂方法中设置此属性,自定义路由器必须以适当的方式实现该方法。
......@@ -1053,7 +1053,7 @@ val router: ActorRef = system.actorOf(
name = "poolWithDispatcher")
```
注意:不允许将routerDispatcher配置为akka.dispatch.BalancingDispatcherConfigurator,因为其他任何Actor都不能处理用于特殊路由器Actor的消息。
> 不允许将routerDispatcher配置为akka.dispatch.BalancingDispatcherConfigurator,因为其他任何Actor都不能处理用于特殊路由器Actor的消息。
* 使用搜狗翻译、百度翻译、谷歌翻译,仅供参考
......
---
title: HTTP核心服务端API
categories:
- Akka
tags: [Akka-HTTP中文文档]
description: Akka-HTTP的核心服务器API主要实现了HTTP/1.1的基本功能,即akka-http-core本身没有实现高级的功能,诸如:请求路由,文件服务,压缩等
---
* 目录
{:toc}
> 版本 Akka-HTTP 10.1.10
这种设计使核心服务器API保持小巧轻便,易于理解和维护。HTTP/1.1基本功能如下:
* 连接管理
* 解析和渲染消息和标头
* 超时管理(用于请求和连接)
* 响应顺序(用于透明管道支持)
> 建议阅读[Implications of the streaming nature of Request/Response Entities](https://doc.akka.io/docs/akka-http/current/implications-of-streaming-http-entity.html)部分,因为它解释了底层的全堆栈流概念,当来自非“流优先”HTTP服务器的后台时,这可能是意外的。
## Streams 和 HTTP
Akka HTTP服务器是在[Streams](https://doc.akka.io/docs/akka/2.6.0/stream/index.html)之上实现的,在实现中以及在其API的所有级别上都大量使用它。
在连接级别,Akka HTTP提供与[Working with streaming IO](https://doc.akka.io/docs/akka/2.6.0/stream/stream-io.html)基本上相同的接口:套接字绑定表示为传入连接的流。该应用程序从该流源中提取连接,并为每个连接提供一个[Flow[HttpRequest, HttpResponse, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Flow.html),以将请求“转换”为响应。
除了将绑定在服务器端的套接字视为[Source[IncomingConnection, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Source.html),并将每个连接视为[Source[HttpRequest, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Source.html)[Sink[HttpResponse, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Sink.html),流抽象还存在于单个HTTP消息中:HTTP请求和响应的实体通常建模为[Source[ByteString, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Source.html)。另请参阅[HTTP模型](https://doc.akka.io/docs/akka-http/current/common/http-model.html),以获取有关如何在Akka HTTP中表示HTTP消息的更多信息。
## 开始和停止
在最基本的级别上,通过调用[akka.http.scaladsl.Http](https://doc.akka.io/api/akka-http/10.1.10/akka/http/scaladsl/Http$.html)扩展的bind方法来绑定Akka HTTP服务器:
```scala
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection => //源中每个链接
println("Accepted new connection from " + connection.remoteAddress)
//开始处理链接
}).run()
```
Http().bind方法的参数指定要绑定到的接口和端口,并在处理传入的HTTP连接时表现感兴趣。此外,该方法还允许定义套接字选项以及根据您的需要配置服务器的大量设置。
bind方法的结果是[Source[Http.IncomingConnection]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Source.html),它必须被应用程序耗尽以接受传入的连接。在将此源实现为处理管道的一部分之前,不会执行实际的绑定。万一绑定失败(例如因为端口已经忙),则实现的流将立即终止,并带有相应的异常。当传入连接源的订阅者取消其订阅时,绑定将被释放(即底层套接字未绑定)。或者,可以使用Http.ServerBinding实例的unbind()方法,该方法是在连接源实现过程中创建的。 Http.ServerBinding还提供了一种方法来获取绑定套接字的实际本地地址,例如,该方法在绑定到端口零(从而让操作系统选择可用端口)时非常有用。
## 请求/响应周期
接受新连接后,它将以Http.IncomingConnection的形式发布,它由远程地址和方法组成,以提供[Flow[HttpRequest, HttpResponse, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Flow.html)用来处理通过此连接传入的请求。
通过使用处理程序调用handleWithXXX方法之一来处理请求,该处理程序可以是
* 一个用于handleWith的[Flow[HttpRequest, HttpResponse, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Flow.html)
* 一个用于handleWithSyncHandler的函数 HttpRequest => HttpResponse
* 一个用于handleWithAsyncHandler的函数 HttpRequest => Future[HttpResponse]
这是一个完整的示例:
```scala
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val serverSource = Http().bind(interface = "localhost", port = 8080)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(entity = HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
sys.error("BOOM!")
case r: HttpRequest =>
r.discardEntityBytes() //耗尽传入的HTTP实体流这很重要!
HttpResponse(404, entity = "Unknown resource!")
}
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
//这相当于下面调用
// connection handleWith { Flow[HttpRequest] map requestHandler }
}).run()
```
在此示例中,通过使用handleWithSyncHandler(或等效地,Akka Stream的map运算符)使用函数HttpRequest => HttpResponse转换请求流来处理请求。根据使用情况,可以使用Akka Stream的组合器来提供请求处理程序的许多其他方式。如果应用程序提供[FLow](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Flow.html),则应用程序还有责任为每个请求生成一个准确的响应,并且响应的顺序与关联请求的顺序相匹配(如果启用了HTTP管道传输(其中多个传入请求的处理可能重叠),则这是相关的)。当依靠handleWithSyncHandler或handleWithAsyncHandler或map或mapAsync流运算符时,将自动满足此要求。
有关创建请求处理程序的更方便的高级DSL,请参见[“路由DSL概述”](https://doc.akka.io/docs/akka-http/current/routing-dsl/overview.html)
### 流式请求/响应实体
通过[HttpEntity](https://doc.akka.io/api/akka-http/10.1.10/akka/http/scaladsl/model/HttpEntity.html)的子类支持HTTP消息实体的流传输。在接收请求以及在许多情况下构造响应时,该应用程序必须能够处理流式实体。有关替代方法的描述,请参见[HttpEntity](https://doc.akka.io/docs/akka-http/current/common/http-model.html#httpentity)
如果您依赖Akka HTTP提供的[编组](https://doc.akka.io/docs/akka-http/current/common/marshalling.html)和/或[解组](https://doc.akka.io/docs/akka-http/current/common/unmarshalling.html)功能,则自定义类型与流实体之间的转换非常方便。这里是直接翻译的marshalling和unmarshalling。实际类似是Json的序列化这种可逆操作,同时需要依赖akka-http-spray-json。
### 断开连接
当处理[Flow](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Flow.html)取消其上游订阅或对等方关闭连接时(p2p对等网络),HTTP连接将关闭。通常更方便的替代方法是显式向HttpResponse添加Connection: close响应头。然后,此响应将是连接上的最后一个响应,并且在发送完连接后,服务器将主动关闭该连接。
如果请求实体已被取消(例如,通过将其附加到Sink.cancelled()或仅被部分使用(例如,通过使用take组合器)),连接也将被关闭,为防止此行为,应通过将其附加到Sink.ignore()来显式地耗尽它。
## 配置服务器端 HTTPS
有关在服务器端配置和使用HTTPS的详细文档,请参阅[服务器端HTTPS支持](https://doc.akka.io/docs/akka-http/current/server-side/server-https-support.html)
## 独立的 HTTP 层用法
由于其基于响应流的性质,Akka HTTP层与底层TCP接口是完全可分离的。虽然在大多数应用程序中,这个“特性”并不重要,但在某些情况下,能够针对不是来自网络而是来自其他来源的数据“运行”HTTP层(可能还有更高的层)是非常有用的。潜在的可能有用的场景包括测试、调试或低级事件源(例如通过重播网络流量)。重播网络流的原文是replaying network traffic,这里可能有误。
在服务器端,独立HTTP层形成一个[BidiFlow](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/BidiFlow.html),其定义如下:
```scala
/**
* 服务器端HTTP层的类型(作为独立的BidiFlow)
* 可以放在TCP层上以形成HTTP服务器。
*
* {{{
* +------+
* HttpResponse ~>| |~> SslTlsOutbound
* | bidi |
* HttpRequest <~| |<~ SslTlsInbound
* +------+
* }}}
*/
type ServerLayer = BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed]
```
通过调用Http().serverLayer方法的两个重载之一来创建Http.ServerLayer的实例,该方法还允许进行不同程度的配置。
## 控制服务器并行性
请求处理可以在两个轴上并行化,方法是并行处理多个连接,并依赖HTTP管道在一个连接上发送多个请求,而无需先等待响应。在这两种情况下,客户端都控制正在进行的请求数。为了防止太多的请求导致过载,Akka HTTP可以限制它并行处理的请求数。
要限制同时打开的连接数,请使用akka.http.server.max-connections设置。此设置适用于所有Http.bindAndHandle* 方法。如果使用Http.bind,则传入连接由[Source[IncomingConnection, ...]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Source.html)表示。使用Akka Stream的组合器应用背压以控制传入连接的流量,例如通过使用throttle或mapAsync。[背压 参考](https://www.zhihu.com/question/49618581/answer/117107570)
通常不鼓励使用HTTP管道传输([大多数浏览器都禁用了HTTP管道传输](https://en.wikipedia.org/w/index.php?title=HTTP_pipelining&oldid=700966692#Implementation_in_web_browsers)),但是Akka HTTP完全支持HTTP管道传输。该限制适用于两个级别。首先,存在akka.http.server.pipelining-limit配置设置,该设置可防止将超过给定数量的未完成请求提供给用户提供的处理程序流。另一方面,处理程序流本身可以应用任何类型的限制。如果使用Http.bindAndHandleAsync入口点,则可以指定parallelism参数(默认为1,表示禁用流水线)来控制每个连接的并发请求数。如果使用Http.bindAndHandle或Http.bind,则用户提供的处理程序流将通过应用背压完全控制它同时接受多少个请求。在这种情况下,您可以使用Akka Stream的mapAsync组合器和给定的并行度来限制并发处理请求的数量。实际上,管道限制配置和手动控制流的请求方式中,约束程度越高的一个将决定如何处理一个连接上的并行请求。这里很绕口,大致理解是一个是基于通道全局的配置,一个是在服务器处理连接时手动设置的并行度参数。
## 在低级别 API 中处理 HTTP 服务器故障
在多种情况下,初始化或运行Akka HTTP服务器时可能会发生故障。默认情况下,Akka将记录所有这些故障,但是有时除了记录故障之外,还可能希望对故障做出反应,例如,通过关闭actor系统或明确通知某些外部监视端点。
创建和实例化HTTP Server时,有很多事情可能会失败(类似地,同样适用于普通流式Tcp()服务器)。可能发生在堆栈不同层上的故障类型,从无法启动服务器开始,直到无法解组HttpRequest为止,故障的示例包括(从最外层到最内层):
* 无法绑定到指定的地址/端口,
* 接受新的IncomingConnections时失败,例如,当操作系统的文件描述符或内存不足时,
* 处理连接时失败,例如,如果传入的[HttpRequest](https://doc.akka.io/api/akka-http/10.1.10/akka/http/scaladsl/model/HttpRequest.html)格式错误。
本节介绍如何处理每种故障情况,以及在哪些情况下可能发生这些故障。
### 绑定失败
第一种失败类型是服务器无法绑定到给定端口时。例如,当该端口已被另一个应用程序占用时,或者该端口具有特权(即只能由root用户使用)时。在这种情况下,“binding future”将立即失效,我们可以通过侦听Future的完成情况对此做出反应:
```scala
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.ActorMaterializer
import scala.concurrent.Future
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future foreach in the end
implicit val executionContext = system.dispatcher
// let's say the OS won't allow us to bind to 80.
val (host, port) = ("localhost", 80)
val serverSource = Http().bind(host, port)
val bindingFuture: Future[ServerBinding] = serverSource
.to(handleConnections) // Sink[Http.IncomingConnection, _]
.run()
bindingFuture.failed.foreach { ex =>
log.error(ex, "Failed to bind to {}:{}!", host, port)
}
```
服务器成功绑定到端口后,[Source[IncomingConnection, _]](https://doc.akka.io/api/akka/2.5.23/akka/stream/scaladsl/Source.html)将开始运行并发出新的传入连接。从技术上讲,此源也可以发出故障信号,但是,仅在非常严重的情况下(例如文件描述符或系统可用的内存用完)才能发生故障,这样它就无法接受新的传入连接。在Akka Streams中处理故障是非常直接的,因为故障是通过流发出信号的,从发生故障的阶段开始,一直到下游的最后阶段。
### 连接源故障
在下面的示例中,我们添加了一个自定义[GraphStage](https://doc.akka.io/api/akka/2.5.23/akka/stream/stage/GraphStage.html),以对流的失败做出反应。我们会向failureMonitor actor发出信号,说明流停止的原因,然后让Actor处理其余的事件 - 也许它将决定重新启动服务器或关闭ActorSystem,但这不再是我们关注的问题。
```scala
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val (host, port) = ("localhost", 8080)
val serverSource = Http().bind(host, port)
val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)
val reactToTopLevelFailures = Flow[IncomingConnection]
.watchTermination()((_, termination) => termination.failed.foreach {
cause => failureMonitor ! cause
})
serverSource
.via(reactToTopLevelFailures)
.to(Sink.foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
}).run()
}
class MyExampleMonitoringActor extends Actor {
override def receive: Actor.Receive = {
//绑定失败打印(这个错误好演示)
//receive: akka.stream.impl.io.ConnectionSourceStage$$anon$1$$anon$2: Bind failed because of java.net.BindException: Address already in use
case e: Throwable => println("receive: " + e)
}
```
### 连接错误
可能发生的第三种类型的故障是在正确建立连接后,然后突然终止,例如,由于客户端中止了底层的TCP连接。
要处理此故障,我们可以使用与上一片段相同的模式,但是将其应用于连接的Flow:
```scala
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val (host, port) = ("localhost", 8080)
val serverSource = Http().bind(host, port)
val reactToConnectionFailure = Flow[HttpRequest]
.recover[HttpRequest] {
case ex =>
// handle the failure somehow
throw ex
}
val httpEcho = Flow[HttpRequest]
.via(reactToConnectionFailure)
.map { request =>
// simple streaming (!) "echo" response:
HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, request.entity.dataBytes))
}
serverSource
.runForeach { con =>
con.handleWith(httpEcho)
}
```
请注意,这是当TCP连接正确关闭时,如果客户端刚刚离开,例如因为网络故障,它将不会被视为此类流故障。它将通过空闲[超时检测](https://doc.akka.io/docs/akka-http/current/common/timeouts.html#timeouts))。
这些失败可以描述为与基础设施或多或少相关,它们是绑定或连接失败。在大多数情况下,您不需要深入研究这些内容,因为Akka无论如何都会记录此类错误,对于此类问题,这是合理的默认设置。
为了进一步了解如何在实际路由层(即应用程序代码出现的地方)中处理异常,请参阅“[异常处理](https://doc.akka.io/docs/akka-http/current/routing-dsl/exception-handling.html)”,它明确地侧重于说明如何处理路由中引发的异常并将其转换为具有适当错误码的[HttpResponse](https://doc.akka.io/api/akka-http/10.1.10/akka/http/scaladsl/model/HttpResponse.html)和人类可读的故障描述。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册