37.md 28.5 KB
Newer Older
W
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558


# Elasticsearch Connector

This connector provides sinks that can request document actions to an [Elasticsearch](https://elastic.co/) Index. To use this connector, add one of the following dependencies to your project, depending on the version of the Elasticsearch installation:
该连接器提供了可向Elasticsearch Index 请求文档操作的接收器。要使用此连接器,请根据您的Elasticsearch安装版本将以下依赖项之一添加到您的项目中:

| Maven Dependency | Supported since | Elasticsearch version |
| --- | --- | --- |
| flink-connector-elasticsearch_2.11 | 1.0.0 | 1.x |
| flink-connector-elasticsearch2_2.11 | 1.0.0 | 2.x |
| flink-connector-elasticsearch5_2.11 | 1.3.0 | 5.x |
| flink-connector-elasticsearch6_2.11 | 1.6.0 | 6 and later versions |

Note that the streaming connectors are currently not part of the binary distribution. See [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for information about how to package the program with the libraries for cluster execution.
请注意,流连接器当前不是二进制分发的一部分。有关如何将程序与库一起打包以执行群集的信息,请参见此处。

## Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html). Make sure to set and remember a cluster name. This must be set when creating an `ElasticsearchSink` for requesting document actions against your cluster.
可以在此处找到有关设置Elasticsearch集群的说明。确保设置并记住集群名称。在ElasticsearchSink针对集群创建用于请求文档操作的时必须设置此选项。

## Elasticsearch Sink

The `ElasticsearchSink` uses a `TransportClient` (before 6.x) or `RestHighLevelClient` (starting with 6.x) to communicate with an Elasticsearch cluster.
的ElasticsearchSink使用TransportClient(前6.x的)或RestHighLevelClient(与6.x的开始)与Elasticsearch集群通信。

The example below shows how to configure and create a sink:
下面的示例显示如何配置和创建接收器:


```
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
```





```
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
```





```
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;

import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

DataStream<String> input = ...;

List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<String>() {
        public IndexRequest createIndexRequest(String element) {
            Map<String, String> json = new HashMap<>();
            json.put("data", element);

            return Requests.indexRequest()
                    .index("my-index")
                    .type("my-type")
                    .source(json);
        }

        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            indexer.add(createIndexRequest(element));
        }
    }
);

// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
    restClientBuilder.setDefaultHeaders(...)
    restClientBuilder.setMaxRetryTimeoutMillis(...)
    restClientBuilder.setPathPrefix(...)
    restClientBuilder.setHttpClientConfigCallback(...)
  }
);

// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
```





```
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer

import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.common.transport.TransportAddress

import java.net.InetAddress
import java.util.ArrayList
import java.util.HashMap
import java.util.List
import java.util.Map

val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json)
  }
}))
```





```
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink

import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

import java.net.InetAddress
import java.net.InetSocketAddress
import java.util.ArrayList
import java.util.HashMap
import java.util.List
import java.util.Map

val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json)
  }
}))
```





```
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

import java.util.ArrayList
import java.util.List

val input: DataStream[String] = ...

val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))

val esSinkBuilder = new ElasticsearchSink.Builer[String](
  httpHosts,
  new ElasticsearchSinkFunction[String] {
    def createIndexRequest(element: String): IndexRequest = {
      val json = new java.util.HashMap[String, String]
      json.put("data", element)

      return Requests.indexRequest()
              .index("my-index")
              .type("my-type")
              .source(json)
    }
  }
)

// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1)

// provide a RestClientFactory for custom configuration on the internally created REST client esSinkBuilder.setRestClientFactory(
  restClientBuilder -> {
    restClientBuilder.setDefaultHeaders(...)
    restClientBuilder.setMaxRetryTimeoutMillis(...)
    restClientBuilder.setPathPrefix(...)
    restClientBuilder.setHttpClientConfigCallback(...)
  }
)

// finally, build and add the sink to the job's pipeline input.addSink(esSinkBuilder.build)
```



For Elasticsearch versions that still uses the now deprecated `TransportClient` to communicate with the Elasticsearch cluster (i.e., versions equal or below 5.x), note how a `Map` of `String`s is used to configure the `ElasticsearchSink`. This config map will be directly forwarded when creating the internally used `TransportClient`. The configuration keys are documented in the Elasticsearch documentation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html). Especially important is the `cluster.name` parameter that must correspond to the name of your cluster.
对于仍然使用现在不推荐使用的Elasticsearch版本TransportClient与Elasticsearch集群通信(即等于或低于5.x的版本),请注意的Mapof String用来配置ElasticsearchSink。创建内部使用的时,将直接转发此配置映射TransportClient。配置密钥记录在此处的Elasticsearch文档中。特别重要的是cluster.name必须与集群名称相对应的参数。

For Elasticsearch 6.x and above, internally, the `RestHighLevelClient` is used for cluster communication. By default, the connector uses the default configurations for the REST client. To have custom configuration for the REST client, users can provide a `RestClientFactory` implementation when setting up the `ElasticsearchClient.Builder` that builds the sink.
对于Elasticsearch 6.x及更高版本,内部RestHighLevelClient使用进行集群通信。默认情况下,连接器使用REST客户端的默认配置。要为REST客户端进行自定义配置,用户可以RestClientFactory在设置ElasticsearchClient.Builder构建接收器的时提供实现。

Also note that the example only demonstrates performing a single index request for each incoming element. Generally, the `ElasticsearchSinkFunction` can be used to perform multiple requests of different types (ex., `DeleteRequest`, `UpdateRequest`, etc.).
还要注意,该示例仅演示了对每个传入元素执行单个索引请求。一般地,ElasticsearchSinkFunction可用于执行不同类型的多个请求(例如,DeleteRequest,UpdateRequest等等)。

Internally, each parallel instance of the Flink Elasticsearch Sink uses a `BulkProcessor` to send action requests to the cluster. This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor` executes bulk requests one at a time, i.e. there will be no two concurrent flushes of the buffered actions in progress.
在内部,Flink Elasticsearch Sink的每个并行实例都使用a BulkProcessor向集群发送操作请求。这将缓冲元素,然后将它们批量发送到集群。在BulkProcessor执行批量传输请求一次一个,即会出现在正在进行的缓冲动作没有两个并发刷新。

### Elasticsearch Sinks and Fault Tolerance 接收器和容错

With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees at-least-once delivery of action requests to Elasticsearch clusters. It does so by waiting for all pending action requests in the `BulkProcessor` at the time of checkpoints. This effectively assures that all requests before the checkpoint was triggered have been successfully acknowledged by Elasticsearch, before proceeding to process more records sent to the sink.
启用Flink的检查点后,Flink Elasticsearch Sink可以确保将动作请求至少一次传递给Elasticsearch集群。通过BulkProcessor在检查点时等待所有待处理的操作请求来完成此操作。这有效地确保在继续处理发送到接收器的更多记录之前,Elasticsearch成功确认了触发检查点之前的所有请求。

More details on checkpoints and fault tolerance are in the [fault tolerance docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html).
有关检查点和容错的更多详细信息,请参见容错文档。

To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
要使用容错的Elasticsearch Sink,需要在执行环境中启用拓扑检查点:


```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
```





```
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
```



**NOTE**: Users can disable flushing if they wish to do so, by calling **disableFlushOnCheckpoint()** on the created **ElasticsearchSink**. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled.
**注意**:用户可以通过在创建的ElasticsearchSink上调用disableFlushOnCheckpoint()来禁用刷新。请注意,这实质上意味着即使启用了拓扑检查点,接收器也将不再提供任何强大的交付保证。

### Communication using Embedded Node (only for Elasticsearch 1.x) 使用嵌入式节点进行通信

For Elasticsearch versions 1.x, communication using an embedded node is also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html) for information about the differences between communicating with Elasticsearch with an embedded node and a `TransportClient`.
对于Elasticsearch 1.x版,还支持使用嵌入式节点的通信。请参阅此处,以获取有关通过嵌入式节点与Elasticsearch进行通信和a之间的区别的信息TransportClient。

Below is an example of how to create an `ElasticsearchSink` use an embedded node instead of a `TransportClient`:
下面是一个如何创建ElasticsearchSink使用嵌入式节点而不是的示例TransportClient:


```
DataStream<String> input = ...;

Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
    public IndexRequest createIndexRequest(String element) {
        Map<String, String> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("my-index")
                .type("my-type")
                .source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}));
```





```
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")

input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
  def createIndexRequest(element: String): IndexRequest = {
    val json = new java.util.HashMap[String, String]
    json.put("data", element)

    return Requests.indexRequest()
            .index("my-index")
            .type("my-type")
            .source(json)
  }
}))
```



The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes.
不同之处在于,现在我们不需要提供Elasticsearch节点的地址列表。

### Handling Failing Elasticsearch Requests 处理失败的Elasticsearch请求

Elasticsearch action requests may fail due to a variety of reasons, including temporarily saturated node queue capacity or malformed documents to be indexed. The Flink Elasticsearch Sink allows the user to specify how request failures are handled, by simply implementing an `ActionRequestFailureHandler` and providing it to the constructor.
由于各种原因,Elasticsearch操作请求可能会失败,包括临时饱和的节点队列容量或要建立索引的文档格式不正确。Flink Elasticsearch Sink允许用户通过简单地实现ActionRequestFailureHandler并将其提供给构造函数来指定如何处理请求失败。

Below is an example:
下面是一个示例:


```
DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure;
            }
        }
}));
```





```
val input: DataStream[String] = ...

input.addSink(new ElasticsearchSink(
    config, transportAddresses,
    new ElasticsearchSinkFunction[String] {...},
    new ActionRequestFailureHandler {
        @throws(classOf[Throwable])
        override def onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) {

            if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action)
            } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also choose to throw custom exceptions
                throw failure
            }
        }
}))
```



The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` is not provided to the constructor, the sink will fail for any kind of error.
上面的示例将使接收器重新添加由于队列容量饱和而失败的请求,并丢弃格式错误的文档的请求,而不会使接收器失败。对于所有其他失败,接收器将失败。如果ActionRequestFailureHandler未将a 提供给构造函数,则接收器将因任何类型的错误而失败。

Note that `onFailure` is called for failures that still occur only after the `BulkProcessor` internally finishes all backoff retry attempts. By default, the `BulkProcessor` retries to a maximum of 8 attempts with an exponential backoff. For more information on the behaviour of the internal `BulkProcessor` and how to configure it, please see the following section.
请注意,onFailure仅在BulkProcessor内部完成所有退避重试尝试之后仍然发生的失败才被称为。默认情况下,BulkProcessor最大重试次数为8,且有指数补偿。有关内部功能BulkProcessor以及如何配置的更多信息,请参见以下部分。

By default, if a failure handler is not provided, the sink uses a `NoOpFailureHandler` that simply fails for all kinds of exceptions. The connector also provides a `RetryRejectedExecutionFailureHandler` implementation that always re-add requests that have failed due to queue capacity saturation.
默认情况下,如果未提供故障处理程序,则接收器将对NoOpFailureHandler所有异常使用只会失败的故障处理程序。连接器还提供了一种RetryRejectedExecutionFailureHandler实现,该实现始终重新添加由于队列容量饱和而失败的请求。

**IMPORTANT**: Re-adding requests back to the internal **BulkProcessor** on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using **RetryRejectedExecutionFailureHandler**, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish.
**重要说明**:发生故障时,将请求重新添加到内部BulkProcessor会导致更长的检查点,因为接收器在检查点时还需要等待刷新重新添加的请求。例如,当使用RetryRejectedExecutionFailureHandler时,检查点将需要等待,直到Elasticsearch节点队列具有足够的容量来处理所有挂起的请求。这也意味着,如果重新添加的请求永远不会成功,则检查点将永远不会完成。

**Failure handling for Elasticsearch 1.x**: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type could not be retrieved through the older version Java client APIs (thus, the types will be general **Exception**s and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
**Elasticsearch 1.x的故障处理**:对于Elasticsearch 1.x,匹配故障的类型是不可行的,因为无法通过较旧版本的Java客户端API检索确切的类型(因此,这些类型将是通用Exception。并且仅在失败消息中有所不同)。在这种情况下,建议匹配提供的REST状态代码。

### Configuring the Internal Bulk Processor 配置内部批量处理器

The internal `BulkProcessor` can be further configured for its behaviour on how buffered action requests are flushed, by setting the following values in the provided `Map&lt;String, String&gt;`:
BulkProcessor通过在提供的内容中设置以下值,可以进一步配置内部组件的行为,以了解如何刷新缓冲的动作请求Map&lt;String, String&gt;

*   **bulk.flush.max.actions**: Maximum amount of actions to buffer before flushing. 刷新前要缓冲的最大操作数。
*   **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to buffer before flushing.  刷新前要缓冲的最大数据大小(以兆字节为单位)。
*   **bulk.flush.interval.ms**: Interval at which to flush regardless of the amount or size of buffered actions. 刷新间隔,无论缓冲操作的数量或大小如何。

For versions 2.x and above, configuring how temporary request errors are retried is also supported:
对于2.x和更高版本,还支持配置重试临时请求错误的方式:

*   **bulk.flush.backoff.enable**: Whether or not to perform retries with backoff delay for a flush if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
*   如果刷新的一个或多个操作由于临时原因而失败,是否对刷新执行延迟退避重试EsRejectedExecutionException。
*   **bulk.flush.backoff.type**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
*   退避延迟的类型,可以是CONSTANT或EXPONENTIAL
*   **bulk.flush.backoff.delay**: The amount of delay for backoff. For constant backoff, this is simply the delay between each retry. For exponential backoff, this is the initial base delay.
*   延迟的延迟量。对于恒定的退避,这只是每次重试之间的延迟。对于指数补偿,这是初始基准延迟。
*   **bulk.flush.backoff.retries**: The amount of backoff retries to attempt.
*   尝试尝试的退避重试次数。

More information about Elasticsearch can be found [here](https://elastic.co).
可以在此处找到有关Elasticsearch的更多信息。

## Packaging the Elasticsearch Connector into an Uber-Jar

For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html) for further information).
为了执行Flink程序,建议构建一个包含所有依赖项的所谓的uber-jar(可执行jar)(有关更多信息,请参见此处)。

Alternatively, you can put the connector’s jar file into Flink’s `lib/` folder to make it available system-wide, i.e. for all job being run.
或者,您可以将连接器的jar文件放入Flink的lib/文件夹中,以使其在整个系统范围内可用,即,对于正在运行的所有作业。