This would be relevant mostly for users implementing custom `TypeSerializer`s for their state.
这部分主要与实现`TypeSerializer`接口来自定义序列化的用户有关。
The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will be fully removed in the future in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see [Migrating from deprecated serializer snapshot APIs before Flink 1.7] (//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html#migration-from-deprecated-serializer-snapshot-apis-before-Flink-1.7).
原来的 `TypeSerializerConfigSnapshot` 抽象接口被弃用了, 并且将在将来完全删除,取而代之的是新的 `TypeSerializerSnapshot`. 详情请参考 [Migrating from deprecated serializer snapshot APIs before Flink 1.7](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html#migration-from-deprecated-serializer-snapshot-apis-before-Flink-1.7)。
## Migrating from Flink 1.2 to Flink 1.3
## 从 Flink 1.2 迁移到 Flink 1.3
There are a few APIs that have been changed since Flink 1.2\. Most of the changes are documented in their specific documentations. The following is a consolidated list of API changes and links to details for migration when upgrading to Flink 1.3.
This would be relevant mostly for users implementing custom `TypeSerializer`s for their state.
这主要适用于自定义 `TypeSerializer`接口的用户
Since Flink 1.3, two additional methods have been added that are related to serializer compatibility across savepoint restores. Please see [Handling serializer upgrades and compatibility](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility) for further details on how to implement these methods.
In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced. Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to the lifecycle methods and runtime context.
The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API. Please visit the [CEP Migration docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
### Logger dependencies removed from Flink core artifacts
### Flink core 中移除了Logger的依赖
In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are now clean of specific logger dependencies.
Flink1.3以后,用户可以选用自己期望的日志框架了,Flink移除了日志记录框架的依赖。
Example and quickstart archetypes already have loggers specified and should not be affected. For other custom projects, make sure to add logger dependencies. For example, in Maven’s `pom.xml`, you can add:
@@ -52,21 +52,21 @@ Example and quickstart archetypes already have loggers specified and should not
## Migrating from Flink 1.1 to Flink 1.2
## 从 Flink 1.1 到 Flink 1.2的迁移
As mentioned in the [State documentation](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html), Flink has two types of state: **keyed** and **non-keyed** state (also called **operator** state). Both types are available to both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)).
1.allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
1.引入Flink1.2中引入的新函数,比如自适应(rescaling)
2.make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its Flink 1.1 predecessor.
2.确保新Flink 1.2作业能够从Flink 1.1的保存点恢复执行
After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 simply by taking a [savepoint](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html) with your Flink 1.1 job and giving it to your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its Flink 1.1 predecessor left off.
As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink` functions. The first is an example of a function with **keyed** state, while the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
@@ -137,29 +137,29 @@ public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form `(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted containing the word itself and the number of occurrences.
The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`) and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. This is a common way to avoid many expensive calls to a database or an external storage system. To do the buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is periodically checkpointed.
To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions. After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you are guaranteed that the new version of your job will start from where its predecessor left off.
**Keyed State:**Something to note before delving into the details of the migration process is that if your function has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support for the new features and full backwards compatibility. Changes could be made just for better code organization, but this is just a matter of style.
With the above said, the rest of this section focuses on the **non-keyed state**.
综上所述,本章我们重点阐述 **non-keyed state**的迁移
#### Rescaling and new state abstractions
#### 自适应和新状态抽象
The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old `Checkpointed` one.
In both cases, the non-keyed state is expected to be a `List` of _serializable_ objects, independent from each other, thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, while `(test2, 2)` will go to task 1.
More details on the principles behind rescaling of both keyed state and non-keyed state can be found in the [State documentation](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/index.html).
Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and `restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink` is included below:
@@ -226,11 +226,11 @@ public class BufferingSinkListCheckpointed implements
As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards compatibility reasons and more details will be explained at the end of this section.
As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. An implementation of the `CheckpointedFunction` interface for `BufferingSink` is presented below.
@@ -302,15 +302,15 @@ public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize the non-keyed state “container”. This is a container of type `ListState` where the non-keyed state objects are going to be stored upon checkpointing:
After initializing the container, we use the `isRestored()` method of the context to check if we are recovering after a failure. If this is `true`, _i.e._ we are recovering, the restore logic is applied.
As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case for Flink 1.1\. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example, the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods would look like this:
So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2. The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my already running job from Flink 1.1 stopped?”.
The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing. Flink will take care of restoring the state from Flink 1.1\. For the non-keyed state, your new function has to implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1\. As shown in the modified code of the `BufferingSink`, the `restoreState()` method is identical to its predecessor.
In Flink 1.1, and only when operating on _processing time_ with no specified evictor or trigger, the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of these operators are referred to as _aligned_ window operators as they assume their input elements arrive in order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic `WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`.
Note Although deprecated, you can still use the aligned window operators in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the `SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to resume execution from a Flink 1.1 savepoint while using these operators.
This page provides instructions on how to run Flink in a _fully distributed fashion_ on a _static_ (but possibly heterogeneous) cluster.
这里给出如何在集群上以完全分布式方式运行Flink程序的说明 .
## Requirements
## 需求
### Software Requirements
### 软件需求
Flink runs on all _UNIX-like environments_, e.g. **Linux**, **Mac OS X**, and **Cygwin** (for Windows) and expects the cluster to consist of **one master node** and **one or more worker nodes**. Before you start to setup the system, make sure you have the following software installed **on each node**:
Go to the [downloads page](http://flink.apache.org/downloads.html) and get the ready-to-run package. Make sure to pick the Flink package **matching your Hadoop version**. If you don’t plan to use Hadoop, pick any version.
After downloading the latest release, copy the archive to your master node and extract it:
Set the `jobmanager.rpc.address` key to point to your master node. You should also define the maximum amount of main memory the JVM is allowed to allocate on each node by setting the `jobmanager.heap.mb` and `taskmanager.heap.mb` keys.
These values are given in MB. If some worker nodes have more main memory which you want to allocate to the Flink system you can overwrite the default value by setting the environment variable `FLINK_TM_HEAP` on those specific nodes.
Finally, you must provide a list of all nodes in your cluster which shall be used as worker nodes. Therefore, similar to the HDFS configuration, edit the file _conf/slaves_ and enter the IP/host name of each worker node. Each worker node will later run a TaskManager.
The following example illustrates the setup with three nodes (with IP addresses from _10.0.0.1_ to _10.0.0.3_ and hostnames _master_, _worker1_, _worker2_) and shows the contents of the configuration files (which need to be accessible at the same path on all machines):
The Flink directory must be available on every worker under the same path. You can use a shared NFS directory, or copy the entire Flink directory to every worker node.
Please see the [configuration page](../config.html) for details and additional configuration options.
The following script starts a JobManager on the local node and connects via SSH to all worker nodes listed in the _slaves_ file to start the TaskManager on each node. Now your Flink system is up and running. The JobManager running on the local node will now accept jobs at the configured RPC port.
这些是对Flink非常重要的配置值。
Assuming that you are on the master node and inside the Flink directory:
Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
启动一个有 4 Task Managers (每个4 GB堆内存)的YARN session命令:
...
...
@@ -21,11 +21,11 @@ cd flink-1.7.1/
Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
`-s`参数可以指定每个TaskManager的处理槽数。建议将插槽数设置为每台计算机的处理器数。
Once the session has been started, you can submit jobs to the cluster using the `./bin/flink` tool.
YARN启动后,可以通过 `./bin/flink` 工具来提交Flink作业.
### Run a Flink job on YARN
### 在YARN上运行Flink作业
...
...
@@ -42,26 +42,26 @@ cd flink-1.7.1/
## Flink YARN Session
Apache [Hadoop YARN](http://hadoop.apache.org/)is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.
Follow these instructions to learn how to launch a Flink Session within your YARN cluster.
以下说明了解如何在YARN群集中启动Flink会话。
A session will start all required Flink services (JobManager and TaskManagers) so that you can submit programs to the cluster. Note that you can run multiple programs per session.
@@ -118,31 +118,29 @@ Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` en
The system will use the configuration in `conf/flink-conf.yaml`. Please follow our [configuration guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html) if you want to change something.
Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.
If you don’t want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`.
The example invocation starts 11 containers (even though only 10 containers were requested), since there is one additional container for the ApplicationMaster and Job Manager.
Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.
在YARN群集中部署Flink后,YARN会显示JobManager的详细信息。
Stop the YARN session by stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client.
按下CTRL + C 或者输入 ‘stop’可以停止YARN会话。
Flink on YARN will only start all requested containers if enough resources are available on the cluster. Most YARN schedulers account for the requested memory of the containers, some account also for the number of vcores. By default, the number of vcores is equal to the processing slots (`-s`) argument. The [`yarn.containers.vcores`](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#yarn-containers-vcores) allows overwriting the number of vcores with a custom value. In order for this parameter to work you should enable CPU scheduling in your cluster.
If you do not want to keep the Flink YARN client running all the time, it’s also possible to start a _detached_ YARN session. The parameter for that is called `-d` or `--detached`.
如果不想让Flink程序一直运行,可以启动一个 _分离_ YARN 会话。参数称为 `-d` or `--detached`。
In that case, the Flink YARN client will only submit Flink to the cluster and then close itself. Note that in this case its not possible to stop the YARN session using Flink.
@@ -220,9 +218,9 @@ Action "run" compiles and runs a program.
Use the _run_ action to submit a job to YARN. The client is able to determine the address of the JobManager. In the rare event of a problem, you can also pass the JobManager address using the `-m` argument. The JobManager address is visible in the YARN console.
If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
如果一分钟后TaskManagers没有出现,则需要使用日志文件确认问题。
## Run a single Flink job on YARN
## 在 YARN上运行单作业
The documentation above describes how to start a Flink cluster within a Hadoop YARN environment. It is also possible to launch Flink within YARN only for executing a single job.
Please note that the client then expects the `-yn` value to be set (number of TaskManagers).
需要注意客户端可以通过 `-yn` 参数来指定 TaskManagers的数量
**_Example:_**
**_示例:_**
...
...
@@ -266,41 +264,39 @@ Please note that the client then expects the `-yn` value to be set (number of Ta
The command line options of the YARN session are also available with the `./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long argument options).
Note: You can use a different configuration directory per job by setting the environment variable `FLINK_CONF_DIR`. To use this copy the `conf` directory from the Flink distribution and modify, for example, the logging settings on a per-job basis.
Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to “fire and forget” a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!
By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter.
*`ORDER`: (default) Adds the jar to the system class path based on the lexicographic order.
*`FIRST`: Adds the jar to the beginning of the system class path.
*`LAST`: Adds the jar to the end of the system class path.
当设置为 `DISABLED` 时,Flink会使用用户类路径中的jar包
## Recovery behavior of Flink on YARN
可以通过以下之一来控制类路径中的user-jar位置:
Flink’s YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.
*`ORDER`: (默认) 根据字典顺序将jar添加到系统类路径。
*`FIRST`: 将jar添加到系统类路径的开头。
*`LAST`: 将jar添加到系统类路径的末尾。
*`yarn.reallocate-failed`: This parameter controls whether Flink should reallocate failed TaskManager containers. Default: true
*`yarn.maximum-failed-containers`: The maximum number of failed containers the ApplicationMaster accepts until it fails the YARN session. Default: The number of initially requested TaskManagers (`-n`).
*`yarn.application-attempts`: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.
There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the [YARN log aggregation](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/). To enable it, users have to set the `yarn.log-aggregation-enable` property to `true` in the `yarn-site.xml` file. Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
Note that it takes a few seconds after the session has finished until the logs show up.
需要注意的是,会话结束后需要几秒钟才会显示日志。
### YARN Client console & Web interfaces
### YARN Client 终端 和 Web 界面
The Flink YARN client also prints error messages in the terminal if errors occur during runtime (for example if a TaskManager stops working after some time).
In addition to that, there is the YARN Resource Manager web interface (by default on port 8088). The port of the Resource Manager web interface is determined by the `yarn.resourcemanager.webapp.address` configuration value.
It allows to access log files for running YARN applications and shows diagnostics for failed apps.
界面可以看到日志文件以运行YARN应用程序,显示失败应用程序和一些错误信息。
## Build YARN client for a specific Hadoop version
## 构建特定版本的YARN客户端
Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](//ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html) for more details.
Some YARN clusters use firewalls for controlling the network traffic between the cluster and the rest of the network. In those setups, Flink jobs can only be submitted to a YARN session from within the cluster’s network (behind the firewall). If this is not feasible for production use, Flink allows to configure a port range for all relevant services. With these ranges configured, users can also submit jobs to Flink crossing the firewall.
Currently, two services are needed to submit a job:
目前两个与提交作业相关的服务:
*The JobManager (ApplicationMaster in YARN)
*The BlobServer running within the JobManager.
* JobManager (ApplicationMaster in YARN)
*在JobManager中运行的BlobServer。
When submitting a job to Flink, the BlobServer will distribute the jars with the user code to all worker nodes (TaskManagers). The JobManager receives the job itself and triggers the execution.
The two configuration parameters for specifying the ports are the following:
用于指定端口的两个配置参数如下:
*`yarn.application-master.port`
*`blob.server.port`
These two configuration options accept single ports (for example: “50010”), ranges (“50000-50025”), or a combination of both (“50010,50011,50020-50025,50050-50075”).
This section briefly describes how Flink and YARN interact.
本节简要介绍Flink和YARN如何交互。
![](../img/FlinkOnYarn.svg)
The YARN client needs to access the Hadoop configuration to connect to the YARN resource manager and to HDFS. It determines the Hadoop configuration using the following strategy:
*Test if `YARN_CONF_DIR`, `HADOOP_CONF_DIR` or `HADOOP_CONF_PATH` are set (in that order). If one of these variables are set, they are used to read the configuration.
*If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the `HADOOP_HOME` environment variable. If it is set, the client tries to access `$HADOOP_HOME/etc/hadoop` (Hadoop 2) and `$HADOOP_HOME/conf` (Hadoop 1).
When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).
The next step of the client is to request (step 2) a YARN container to start the _ApplicationMaster_ (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the _ApplicationMaster_ (AM) is started.
The _JobManager_ and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the _AM_ container is also serving Flink’s web interface. All ports the YARN code is allocating are _ephemeral ports_. This allows users to execute multiple Flink YARN sessions in parallel.
_JobManager_ 和 AM 在同一容器内,运行一旦它们成功启动,AM就知道JobManager(它自己的主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager)。该文件也自动上传到HDFS中, _AM_ 还提供了Flink的WEB界面。YARN代码分配的所有端口都是 _临时端口_,这样用户可以并行执行多个Flink YARN会话。
After that, the AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.
The Mesos implementation consists of two components: The Application Master and the Worker. The workers are simple TaskManagers which are parameterized by the environment set up by the application master. The most sophisticated component of the Mesos implementation is the application master. The application master currently hosts the following components:
The scheduler is responsible for registering the framework with Mesos, requesting resources, and launching worker nodes. The scheduler continuously needs to report back to Mesos to ensure the framework is in a healthy state. To verify the health of the cluster, the scheduler monitors the spawned workers and marks them as failed and restarts them if necessary.
Flink’s Mesos scheduler itself is currently not highly available. However, it persists all necessary information about its state (e.g. configuration, list of workers) in Zookeeper. In the presence of a failure, it relies on an external system to bring up a new scheduler. The scheduler will then register with Mesos again and go through the reconciliation phase. In the reconciliation phase, the scheduler receives a list of running workers nodes. It matches these against the recovered information from Zookeeper and makes sure to bring back the cluster in the state before the failure.
The artifact server is responsible for providing resources to the worker nodes. The resources can be anything from the Flink binaries to shared secrets or configuration files. For instance, in non-containerized environments, the artifact server will provide the Flink binaries. What files will be served depends on the configuration overlay used.
The Dispatcher and the web interface provide a central point for monitoring, job submission, and other client interaction with the cluster (see[FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)).
The startup script provide a way to configure and start the application master. All further configuration is then inherited by the workers nodes. This is achieved using configuration overlays. Configuration overlays provide a way to infer configuration from environment variables and config files which are shipped to the worker nodes.
This section refers to [DC/OS](https://dcos.io) which is a Mesos distribution with a sophisticated application management layer. It comes pre-installed with Marathon, a service to supervise applications and maintain their state in case of failures.
Once you have a DC/OS cluster, you may install Flink through the DC/OS Universe. In the search prompt, just search for Flink. Alternatively, you can use the DC/OS CLI:
安装DC / OS群集后,可以通过DC / OS Universe安装Flink。在搜索提示中,只需搜索Flink。或者,可以使用DC / OS CLI:
```
dcos package install flink
```
Further information can be found in the [DC/OS examples documentation](https://github.com/dcos/examples/tree/master/1.8/flink).
After installation you have to configure the set of master and agent nodes by creating the files `MESOS_HOME/etc/mesos/masters` and `MESOS_HOME/etc/mesos/slaves`. These files contain in each row a single hostname on which the respective component will be started (assuming SSH access to these nodes).
In order to configure the Mesos agents, you have to create `MESOS_HOME/etc/mesos/mesos-agent-env.sh` or use the template found in the same directory. You have to configure
In order to run Java applications with Mesos you have to export `MESOS_NATIVE_JAVA_LIBRARY=MESOS_HOME/lib/libmesos.so` on Linux. Under Mac OS X you have to export `MESOS_NATIVE_JAVA_LIBRARY=MESOS_HOME/lib/libmesos.dylib`.
#### Deploying Mesos
In order to start your mesos cluster, use the deployment script `MESOS_HOME/sbin/mesos-start-cluster.sh`. In order to stop your mesos cluster, use the deployment script `MESOS_HOME/sbin/mesos-stop-cluster.sh`. More information about the deployment scripts can be found [here](http://mesos.apache.org/documentation/latest/deploy-scripts/).
Optionally, you may also [install Marathon](https://mesosphere.github.io/marathon/docs/) which enables you to run Flink in [high availability (HA) mode](#high-availability).
### Pre-installing Flink vs Docker/Mesos containers
### 预安装Flink 与 Docker/Mesos 容器
You may install Flink on all of your Mesos Master and Agent nodes. You can also pull the binaries from the Flink web site during deployment and apply your custom configuration before launching the application master. A more convenient and easier to maintain approach is to use Docker containers to manage the Flink binaries and configuration.
In the `/bin` directory of the Flink distribution, you find two startup scripts which manage the Flink processes in a Mesos cluster:
Flink的 `/bin` 目录下可以找到两个脚本用来管理Mesos集群中的Flink进程
1.`mesos-appmaster.sh`This starts the Mesos application master which will register the Mesos scheduler. It is also responsible for starting up the worker nodes.
2.`mesos-taskmanager.sh`The entry point for the Mesos worker processes. You don’t need to explicitly execute this script. It is automatically launched by the Mesos worker node to bring up a new TaskManager.
In order to run the `mesos-appmaster.sh` script you have to define `mesos.master` in the `flink-conf.yaml` or pass it via `-Dmesos.master=...` to the Java process.
When executing `mesos-appmaster.sh`, it will create a job manager on the machine where you executed the script. In contrast to that, the task managers will be run as Mesos tasks in the Mesos cluster.
It is possible to completely parameterize a Mesos application through Java properties passed to the Mesos application master. This also allows to specify general Flink configuration parameters. For example:
**Note:**If Flink is in [legacy mode](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#legacy), you should additionally define the number of task managers that are started by Mesos via [`mesos.initial-tasks`](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#mesos-initial-tasks).
You will need to run a service like Marathon or Apache Aurora which takes care of restarting the Flink master process in case of node or process failures. In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html).
Marathon needs to be set up to launch the `bin/mesos-appmaster.sh` script. In particular, it should also adjust any configuration parameters for the Flink cluster.
For a list of Mesos specific configuration, refer to the [Mesos section](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#mesos) of the configuration documentation.
[Docker](https://www.docker.com)is a popular container runtime. There are Docker images for Apache Flink available on Docker Hub which can be used to deploy a session cluster. The Flink repository also contains tooling to create container images to deploy a job cluster.
Images for each supported combination of Hadoop and Scala are available, and tag aliases are provided for convenience.
镜像支持Hadoop和scala的环境组合,为了方便通tag别名来区分。
Beginning with Flink 1.5, image tags that omit a Hadoop version (e.g. `-hadoop28`) correspond to Hadoop-free releases of Flink that do not include a bundled Hadoop distribution.
**Note:** The Docker images are provided as a community project by individuals on a best-effort basis. They are not official releases by the Apache Flink PMC.
A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.
Flink作业集群是运行单个作业的专用集群。这项是镜像内容的一部分,不需要额外的操作。
### Docker images
### Docker 镜像
The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started. Therefore, one needs to build a dedicated container image for every job. The `flink-container` module contains a `build.sh` script which can be used to create such an image. Please see the [instructions](https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md) for more details.
Example config files for a [session cluster](https://github.com/docker-flink/examples/blob/master/docker-compose.yml) and a [job cluster](https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-compose.yml) are available on GitHub.
When the cluster is running, you can visit the web UI at [http://localhost:8081](http://localhost:8081). You can also use the web UI to submit a job to a session cluster.
Please follow [Kubernetes’ setup guide](https://kubernetes.io/docs/setup/) in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using[MiniKube](https://kubernetes.io/docs/setup/minikube/).
**Note:** If using MiniKube please make sure to execute `minikube ssh 'sudo ip link set docker0 promisc on'` before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.
**注意:** 如果使用MiniKube,请确保在部署前执行 `minikube ssh 'sudo ip link set docker0 promisc on'` 命令,窦泽,Flink组件无法通过Kubernetes服务来引用自己
## Flink session cluster on Kubernetes
## Kubernetes上的Flink集群
A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
You can then access the Flink UI via `kubectl proxy`:
可以通过 `kubectl proxy` 来访问Flink UI:
1.Run `kubectl proxy` in a terminal
2.Navigate to [http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy](http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy) in your browser
A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.
Flink作业集群是运行单个作业的专用集群。是镜像的一部分,所以不需要额外的工作
### Creating the job-specific image
### 创建专用作业的镜像
The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started. Therefore, one needs to build a dedicated container image for every job. Please follow these [instructions](https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md) to build the Docker image.
In order to deploy the a job cluster on Kubernetes please follow these[instructions](https://github.com/apache/flink/blob/release-1.7/flink-container/kubernetes/README.md#deploy-flink-job-cluster).
The Deployment definitions use the pre-built image `flink:latest` which can be found [on Docker Hub](https://hub.docker.com/r/_/flink/). The image is built from this [Github repository](https://github.com/docker-flink/docker-flink).
Amazon Web Services offers cloud computing services on which you can run Flink.
Amazon Web Services 提供可以运行Flink的云计算服务。
## EMR: Elastic MapReduce
[Amazon Elastic MapReduce](https://aws.amazon.com/elasticmapreduce/)(Amazon EMR) is a web service that makes it easy to quickly setup a Hadoop cluster. This is the **recommended way** to run Flink on AWS as it takes care of setting up everything.
Flink is a supported application on Amazon EMR. [Amazon’s documentation](http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html) describes configuring Flink, creating and monitoring a cluster, and working with jobs.
Amazon EMR services are regularly updated to new releases but a version of Flink which is not available can be manually installed in a stock EMR cluster.
Amazon EMR 会定期更新到最新版本, 也可以在EMR集群中安装不同版本的Flink
**Create EMR Cluster**
**创建 EMR 集群**
The EMR documentation contains [examples showing how to start an EMR cluster](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-gs-launch-sample-cluster.html). You can follow that guide and install any EMR release. You don’t need to install the _All Applications_ part of the EMR release, but can stick to _Core Hadoop_.
Note Access to S3 buckets requires [configuration of IAM roles](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-iam-roles.html) when creating an EMR cluster.
After creating your cluster, you can [connect to the master node](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-connect-master-node.html) and install Flink:
1.Go the [Downloads Page](http://flink.apache.org/downloads.html) and **download a binary version of Flink matching the Hadoop version** of your EMR cluster, e.g. Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
2.Extract the Flink distribution and you are ready to deploy [Flink jobs via YARN](yarn_setup.html) after **setting the Hadoop config directory**:
1.转到 [下载页](http://flink.apache.org/downloads.html) 并 **下载与EMR集群中Hadoop版本匹配的二进制版本** , 例如, Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
2.解压Flink,**设置Hadoop配置目录**后,可以运行Flink作业,文档请参阅 [Flink jobs via YARN](yarn_setup.html):
[Amazon Simple Storage Service](http://aws.amazon.com/s3/)(Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html) or even as a YARN object storage.
Note that these examples are _not_ exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI.
For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as YARN’s resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem implementation. Both ways are described below.
### Shaded Hadoop/Presto S3 file systems (recommended)
### Shaded Hadoop/Presto S3 文件系统 (推荐)
**Note:**You don’t have to configure this manually if you are running[Flink on EMR](#emr-elastic-mapreduce).
**Note:**如果在EMR上运行Flink,则无需手动配置,文档可以参阅 [Flink on EMR](#emr-elastic-mapreduce).
To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the `opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g.
Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can use this to use both at the same time.
After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets.
设置S3 文件系统实现之后,需要保证Flink程序可以访问S3存储。
##### Identity and Access Management (IAM) (Recommended)
##### 身份和访问管理 (IAM) (推荐)
The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are[IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
If you set this up correctly, you can manage access to S3 within AWS and don’t need to distribute any access keys to Flink.
如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys
##### Access Keys (Discouraged)
##### Access Keys (不推荐)
Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
### Hadoop-provided S3 file systems - manual setup
### Hadoop提供的S3文件系统-手动设置
**Note:**You don’t have to configure this manually if you are running[Flink on EMR](#emr-elastic-mapreduce).
**Note:**在EMR上运行Flink不需要配置,请参阅[Flink on EMR](#emr-elastic-mapreduce).
This setup is a bit more complex and we recommend using our shaded Hadoop/Presto file systems instead (see above) unless required otherwise, e.g. for using S3 as YARN’s resource storage dir via the `fs.defaultFS` configuration property in Hadoop’s `core-site.xml`.
1.`S3AFileSystem` (**recommended** for Hadoop 2.7 and later): file system for reading and writing regular files using Amazon’s SDK internally. No maximum file size and works with IAM roles.
2.`NativeS3FileSystem` (for Hadoop 2.6 and earlier): file system for reading and writing regular files. Maximum object size is 5GB and does not work with IAM roles.
This is the recommended S3 FileSystem implementation to use. It uses Amazon’s SDK internally and works with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)).
You need to point Flink to a valid Hadoop configuration, which contains the following properties in`core-site.xml`:
需要修改Flink中Hadoop的配置,配置文件`core-site.xml`:
...
...
@@ -154,13 +152,13 @@ You need to point Flink to a valid Hadoop configuration, which contains the foll
This registers `S3AFileSystem` as the default FileSystem for URIs with the `s3a://` scheme.
这里注册 `S3AFileSystem` 作为 `s3a://` URI开头的文件实现.
##### `NativeS3FileSystem`
This file system is limited to files up to 5GB in size and it does not work with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)), meaning that you have to manually configure your AWS credentials in the Hadoop config file.
You need to point Flink to a valid Hadoop configuration, which contains the following property in `core-site.xml`:
需要修改Flink中Hadoop的配置,配置文件`core-site.xml`:
...
...
@@ -173,14 +171,14 @@ You need to point Flink to a valid Hadoop configuration, which contains the foll
This registers `NativeS3FileSystem` as the default FileSystem for URIs with the `s3://` scheme.
这里注册 `NativeS3FileSystem` 作为 `s3://` URI开头文件的实现.
#### Hadoop Configuration
#### Hadoop 配置
You can specify the [Hadoop configuration](../config.html#hdfs) in various ways pointing Flink to the path of the Hadoop configuration directory, for example
This registers `/path/to/etc/hadoop` as Hadoop’s configuration directory with Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the specified directory.
##### Identity and Access Management (IAM) (Recommended)
如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys
When using `S3AFileSystem`, the recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html).
##### Access Keys (不推荐)
If you set this up correctly, you can manage access to S3 within AWS and don’t need to distribute any access keys to Flink.
##### Access Keys with `S3AFileSystem` (Discouraged)
##### 通过Access Keys 访问 `S3AFileSystem` (不推荐)
Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
@@ -228,11 +230,11 @@ For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and `fs.s3a.s
##### Access Keys with `NativeS3FileSystem` (Discouraged)
##### 通过Access Keys访问`NativeS3FileSystem` (不推荐)
Access to S3 can be granted via your **access and secret key pair**. But this is discouraged and you should use `S3AFileSystem`[with the required IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
@@ -250,21 +252,21 @@ For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and `
#### Provide S3 FileSystem Dependency
#### 提供 S3 FileSystem 依赖
**Note:**You don’t have to configure this manually if you are running[Flink on EMR](#emr-elastic-mapreduce).
**Note:**在EMR上运行Flink则无需配置,请参阅[Flink on EMR](#emr-elastic-mapreduce).
Hadoop’s S3 FileSystem clients are packaged in the `hadoop-aws` artifact (Hadoop version 2.6 and later). This JAR and all its dependencies need to be added to Flink’s classpath, i.e. the class path of both Job and TaskManagers. Depending on which FileSystem implementation and which Flink and Hadoop version you use, you need to provide different dependencies (see below).
There are multiple ways of adding JARs to Flink’s class path, the easiest being simply to drop the JARs in Flink’s `lib` folder. You need to copy the `hadoop-aws` JAR with all its dependencies. You can also export the directory containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on all machines.
有多种方法可以将JAR添加到Flink的类路径中,最简单的方法就是将JAR放在Flink的 `lib` 目录下。 拷贝 `hadoop-aws` JAR 和他的所有依赖复制到lib下,还可以把一个目录指定为 `HADOOP_CLASSPATH` 环境变量。
##### Flink for Hadoop 2.7
Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.7/share/hadoop/tools/lib`:
*`aws-java-sdk-s3-1.11.183.jar`and its dependencies:
*`aws-java-sdk-s3-1.11.183.jar`及其依赖:
*`aws-java-sdk-core-1.11.183.jar`
*`aws-java-sdk-kms-1.11.183.jar`
*`jackson-annotations-2.6.7.jar`
...
...
@@ -277,11 +279,11 @@ Depending on which file system you use, please add the following dependencies. Y
*`hadoop-aws-2.7.3.jar`
*`guava-11.0.2.jar`
Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink.
注意 `hadoop-common` 是Flink的一部分, 但 Guava不是。
##### Flink for Hadoop 2.6
Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.6/share/hadoop/tools/lib`:
@@ -296,19 +298,19 @@ Depending on which file system you use, please add the following dependencies. Y
*`hadoop-aws-2.6.4.jar`
*`guava-11.0.2.jar`
Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink.
注意 `hadoop-common` 是Flink的一部分, 但 Guava不是。
##### Flink for Hadoop 2.4 and earlier
##### Flink for Hadoop 2.4及其以下版本
These Hadoop versions only have support for `NativeS3FileSystem`. This comes pre-packaged with Flink for Hadoop 2 as part of `hadoop-common`. You don’t need to add anything to the classpath.
The following sections lists common issues when working with Flink on AWS.
下面列出了在AWS上使用Flink时的部分常见问题。
### Missing S3 FileSystem Configuration
If your job submission fails with an Exception message noting that `No file system found with scheme s3` this means that no FileSystem has been configured for S3\. Please check out the configuration sections for our [shaded Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic Hadoop](#set-s3-filesystem) file systems for details on how to configure this properly.
如果作业提交失败,并显示 `No file system found with scheme s3` 这说明S3文件系统的配置不正确,需要检查文件配置,参照 [Shaded Hadoop/Presto S3 文件系统 ](#shaded-hadooppresto-s3-file-systems-recommended) 或者 [Hadoop基本配置](#set-s3-filesystem) 保证配置正确性
...
...
@@ -330,7 +332,7 @@ Caused by: java.io.IOException: No file system found with scheme s3,
### AWS Access Key ID and Secret Access Key Not Specified
If you see your job failing with an Exception noting that the `AWS Access Key ID and Secret Access Key must be specified as the username or password`, your access credentials have not been set up properly. Please refer to the access credential section for our [shaded Hadoop/Presto](#configure-access-credentials) or [generic Hadoop](#configure-access-credentials-1) file systems for details on how to configure this.
如果作业失败并显示异常 `AWS Access Key ID and Secret Access Key must be specified as the username or password`, 未正确设置您的访问凭据。有关如何配置请参阅 [shaded Hadoop/Presto](#configure-access-credentials) or [Hadoop基本配置](#configure-access-credentials-1) 。
...
...
@@ -362,7 +364,7 @@ Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Acce
### ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found
If you see this Exception, the S3 FileSystem is not part of the class path of Flink. Please refer to [S3 FileSystem dependency section](#provide-s3-filesystem-dependency) for details on how to configure this properly.
@@ -391,7 +393,7 @@ Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native
### IOException: `400: Bad Request`
If you have configured everything properly, but get a `Bad Request` Exception **and** your S3 bucket is located in region `eu-central-1`, you might be running an S3 client, which does not support [Amazon’s signature version 4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html).
@@ -413,9 +415,9 @@ com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service
This should not apply to our shaded Hadoop/Presto S3 file systems but can occur for Hadoop-provided S3 file systems. In particular, all Hadoop versions up to 2.7.2 running `NativeS3FileSystem` (which depend on `JetS3t 0.9.0` instead of a version [>= 0.9.4](http://www.jets3t.org/RELEASE_NOTES.html)) are affected but users also reported this happening with the `S3AFileSystem`.
Except for changing the bucket region, you may also be able to solve this by [requesting signature version 4 for request authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version), e.g. by adding this to Flink’s JVM options in `flink-conf.yaml` (see [configuration](../config.html#common-options)):
除了更改可用区之外,可以参阅亚马逊的 [requesting signature version 4 for request authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version)进行修改, 例如. 在 `flink-conf.yaml` 中添加JVM参数(参阅 [配置](../config.html#common-options)):
### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator
This Exception is usually caused by skipping the local buffer directory configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the [S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to configure the `S3AFileSystem` properly.