提交 e7f49190 编写于 作者: 噼里啪啦嘣

37 完成

上级 9532d9c0
......@@ -3,6 +3,7 @@
# Elasticsearch Connector
This connector provides sinks that can request document actions to an [Elasticsearch](https://elastic.co/) Index. To use this connector, add one of the following dependencies to your project, depending on the version of the Elasticsearch installation:
该连接器提供了可向Elasticsearch Index 请求文档操作的接收器。要使用此连接器,请根据您的Elasticsearch安装版本将以下依赖项之一添加到您的项目中:
| Maven Dependency | Supported since | Elasticsearch version |
| --- | --- | --- |
......@@ -12,17 +13,20 @@ This connector provides sinks that can request document actions to an [Elasticse
| flink-connector-elasticsearch6_2.11 | 1.6.0 | 6 and later versions |
Note that the streaming connectors are currently not part of the binary distribution. See [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for information about how to package the program with the libraries for cluster execution.
## Installing Elasticsearch
Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when creating an `ElasticsearchSink` for requesting document actions against your cluster.
## Elasticsearch Sink
The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an Elasticsearch cluster.
The example below shows how to configure and create a sink:
......@@ -331,21 +335,27 @@ val esSinkBuilder = new ElasticsearchSink.Builer[String](
For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s is used to configure the `ElasticsearchSink`. This config map will be directly forwarded when creating the internally used `TransportClient`. The configuration keys are documented in the Elasticsearch documentation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). Especially important is the `cluster.name` parameter that must correspond to the name of your cluster.
对于仍然使用现在不推荐使用的Elasticsearch版本TransportClient与Elasticsearch集群通信(即等于或低于5.x的版本),请注意的Mapof String用来配置ElasticsearchSink。创建内部使用的时,将直接转发此配置映射TransportClient。配置密钥记录在此处的Elasticsearch文档中。特别重要的是cluster.name必须与集群名称相对应的参数。
For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication. By default, the connector uses the default configurations for the REST client. To have custom configuration for the REST client, users can provide a `RestClientFactory` implementation when setting up the `ElasticsearchClient.Builder` that builds the sink.
对于Elasticsearch 6.x及更高版本,内部RestHighLevelClient使用进行集群通信。默认情况下,连接器使用REST客户端的默认配置。要为REST客户端进行自定义配置,用户可以RestClientFactory在设置ElasticsearchClient.Builder构建接收器的时提供实现。
Also note that the example only demonstrates performing a single index request for each incoming element. Generally, the `ElasticsearchSinkFunction` can be used to perform multiple requests of different types (ex., `DeleteRequest`, `UpdateRequest`, etc.).
Internally, each parallel instance of the Flink Elasticsearch Sink uses a `BulkProcessor` to send action requests to the cluster. This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor` executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress.
在内部,Flink Elasticsearch Sink的每个并行实例都使用a BulkProcessor向集群发送操作请求。这将缓冲元素,然后将它们批量发送到集群。在BulkProcessor执行批量传输请求一次一个,即会出现在正在进行的缓冲动作没有两个并发刷新。
### Elasticsearch Sinks and Fault Tolerance
### Elasticsearch Sinks and Fault Tolerance 接收器和容错
With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the `BulkProcessor` at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink.
启用Flink的检查点后,Flink Elasticsearch Sink可以确保将动作请求至少一次传递给Elasticsearch集群。通过BulkProcessor在检查点时等待所有待处理的操作请求来完成此操作。这有效地确保在继续处理发送到接收器的更多记录之前,Elasticsearch成功确认了触发检查点之前的所有请求。
More details on checkpoints and fault tolerance are in the [fault tolerance docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html).
To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
要使用容错的Elasticsearch Sink,需要在执行环境中启用拓扑检查点:
......@@ -365,13 +375,15 @@ env.enableCheckpointing(5000) // checkpoint every 5000 msecs
**NOTE**: Users can disable flushing if they wish to do so, by calling **disableFlushOnCheckpoint()** on the created **ElasticsearchSink**. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled.
### Communication using Embedded Node (only for Elasticsearch 1.x)
### Communication using Embedded Node (only for Elasticsearch 1.x) 使用嵌入式节点进行通信
For Elasticsearch versions 1.x, communication using an embedded node is also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) for information about the differences between communicating with Elasticsearch with an embedded node and a `TransportClient`.
对于Elasticsearch 1.x版,还支持使用嵌入式节点的通信。请参阅此处,以获取有关通过嵌入式节点与Elasticsearch进行通信和a之间的区别的信息TransportClient。
Below is an example of how to create an `ElasticsearchSink` use an embedded node instead of a `TransportClient`:
......@@ -427,13 +439,15 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes.
### Handling Failing Elasticsearch Requests
### Handling Failing Elasticsearch Requests 处理失败的Elasticsearch请求
Elasticsearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an `ActionRequestFailureHandler` and providing it to the constructor.
由于各种原因,Elasticsearch操作请求可能会失败,包括临时饱和的节点队列容量或要建立索引的文档格式不正确。Flink Elasticsearch Sink允许用户通过简单地实现ActionRequestFailureHandler并将其提供给构造函数来指定如何处理请求失败。
Below is an example:
......@@ -497,35 +511,48 @@ input.addSink(new ElasticsearchSink(
The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` is not provided to the constructor, the sink will fail for any kind of error.
上面的示例将使接收器重新添加由于队列容量饱和而失败的请求,并丢弃格式错误的文档的请求,而不会使接收器失败。对于所有其他失败,接收器将失败。如果ActionRequestFailureHandler未将a 提供给构造函数,则接收器将因任何类型的错误而失败。
Note that `onFailure` is called for failures that still occur only after the `BulkProcessor` internally finishes all backoff retry attempts. By default, the `BulkProcessor` retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal `BulkProcessor` and how to configure it, please see the following section.
By default, if a failure handler is not provided, the sink uses a `NoOpFailureHandler` that simply fails for all kinds of exceptions. The connector also provides a `RetryRejectedExecutionFailureHandler` implementation that always re-add requests that have failed due to queue capacity saturation.
**IMPORTANT**: Re-adding requests back to the internal **BulkProcessor** on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using **RetryRejectedExecutionFailureHandler**, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish.
**Failure handling for Elasticsearch 1.x**: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type could not be retrieved through the older version Java client APIs (thus, the types will be general **Exception**s and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
**Elasticsearch 1.x的故障处理**:对于Elasticsearch 1.x,匹配故障的类型是不可行的,因为无法通过较旧版本的Java客户端API检索确切的类型(因此,这些类型将是通用Exception。并且仅在失败消息中有所不同)。在这种情况下,建议匹配提供的REST状态代码。
### Configuring the Internal Bulk Processor
### Configuring the Internal Bulk Processor 配置内部批量处理器
The internal `BulkProcessor` can be further configured for its behaviour on how buffered action requests are flushed, by setting the following values in the provided `Map<String, String>`:
BulkProcessor通过在提供的内容中设置以下值,可以进一步配置内部组件的行为,以了解如何刷新缓冲的动作请求Map<String, String>
* **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing.
* **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing.
* **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions.
* **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing. 刷新前要缓冲的最大操作数。
* **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing. 刷新前要缓冲的最大数据大小(以兆字节为单位)。
* **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions. 刷新间隔,无论缓冲操作的数量或大小如何。
For versions 2.x and above, configuring how temporary request errors are retried is also supported:
* **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
* 如果刷新的一个或多个操作由于临时原因而失败,是否对刷新执行延迟退避重试EsRejectedExecutionException。
* **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
* **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay.
* 延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
* **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
* 尝试尝试的退避重试次数。
More information about Elasticsearch can be found [here](https://elastic.co).
## Packaging the Elasticsearch Connector into an Uber-Jar
For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for further information).
Alternatively, you can put the connector’s jar file into Flink’s `lib/` folder to make it available system-wide, i.e. for all job being run.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册