Amazon 服务 (AWS) Amazon Web Services 提供可以运行Flink的云计算服务。 ## EMR: Elastic MapReduce [Amazon Elastic MapReduce](https://aws.amazon.com/elasticmapreduce/) (Amazon EMR) 是一种Web服务,可以轻松快速地设置Hadoop集群。因为它负责设置所有内容,所以在AWS上运行Flink **非常推荐**,使用这种方式 。 ### 标准 EMR 安装 Flink是Amazon EMR上受支持的应用程序。 [亚马逊的文档](http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html) 描述了如何配置Flink、创建和监控集群以及处理作业。 ### 自定义 EMR 安装 Amazon EMR 会定期更新到最新版本, 也可以在EMR集群中安装不同版本的Flink **创建 EMR 集群** EMR文档包含了 [创建 EMR集群](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-gs-launch-sample-cluster.html)。通过该文档可以安装任何版本的EMR,不需要安装EMR中的_All Applications_部分,但是_Core Hadoop_是必须的。 注意 访问S3存储需要配置IAM角色,文档请参阅 [配置 IAM角色](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-iam-roles.html) 。 **在EMR集群中安装Flink** 在创建集群之后,连接到主节点并安装Flink,文档请参阅 [连接到主节点](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-connect-master-node.html) 。 1. 转到 [下载页](http://flink.apache.org/downloads.html) 并 **下载与EMR集群中Hadoop版本匹配的二进制版本** , 例如, Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0. 2. 解压Flink,**设置Hadoop配置目录**后,可以运行Flink作业,文档请参阅 [Flink jobs via YARN](yarn_setup.html) : ``` HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar ``` ## S3: 简单存储 [Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3)为各种实例提供云对象存储,S3可以作为Flink的数据**输入** 或 **输出** ,或者是为 [streaming **state backends**](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html) 、以及YARN提供对象存储。 通过以下格式指定路径来使用常规文件等S3对象: ``` s3:/// ``` endpoint 是单个文件或目录,例如: ``` // 从S3中读取 env.readTextFile("s3:///"); // 写入到S3 stream.writeAsText("s3:///"); // 使用S3作为 FsStatebackend env.setStateBackend(new FsStateBackend("s3:///")); ``` 上文的示例并没有列出全部情况,也可以在其他地方使用S3存储,包括 [高可用安装](../jobmanager_high_availability.html) 或 [RocksDBStateBackend](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html#the-rocksdbstatebackend)等 任何Flink期望的文件地址。 多数情况下我们使用 `flink-s3-fs-hadoop` 和 `flink-s3-fs-presto` 作为S3在Hadoop 上的实现 ,例如,使用S3作为YARN的资源存储目录,可能需要设置特定的Hadoop S3 FileSystem实现。两种方式如下所述。 ### Shaded Hadoop/Presto S3 文件系统 (推荐) **Note:** 如果在EMR上运行Flink,则无需手动配置,文档可以参阅 [Flink on EMR](#emr-elastic-mapreduce). 要使用 `flink-s3-fs-hadoop` 或 `flink-s3-fs-presto`,co在启动Flink之前将相应的JAR文件从 `opt` 目录拷贝到Flink的 `lib` 目录下,例如 ``` cp ./opt/flink-s3-fs-presto-1.7.1.jar ./lib/ ``` `flink-s3-fs-hadoop` 和 `flink-s3-fs-presto` 注册了不同的文件URI `s3://` 实现, `flink-s3-fs-hadoop` 还注册了 `s3a://`, `flink-s3-fs-presto` 注册了 `s3p://`,可以同时使用这些不同的URI。 #### 配置访问凭据 设置S3 文件系统实现之后,需要保证Flink程序可以访问S3存储。 ##### 身份和访问管理 (IAM) (推荐) 比较推荐使用身份和访问管理 (IAM)来设置Access key,可以通过IAM提供的功能来让Flink程序安全的访问S3存储。 详细可以参阅[文档身份和访问管理 (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html)。具体用户角色权限控制可以参考 [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). 如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys ##### Access Keys (不推荐) 还可以通过 **access key**来对S3进行访问。相关内容可以参阅 [ IAM的角色](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). 这种情况下,需要在Flink的 `flink-conf.yaml`文件中同时配置`s3.access-key` 和 `s3.secret-key` ``` s3.access-key: your-access-key s3.secret-key: your-secret-key ``` ### Hadoop提供的S3文件系统-手动设置 **Note:** 在EMR上运行Flink不需要配置,请参阅 [Flink on EMR](#emr-elastic-mapreduce). 因为这个设置比较复杂,所以除非是有其他需求,否则,还是建议才用上面的方式来实现。例如,修改Hadoop的 `core-site.xml`文件中的 `fs.defaultFS`配置来把S3作为存储目录. #### 设置 S3 文件系统 S3相关内容可以参阅 [Hadoop’s S3 FileSystem clients](https://wiki.apache.org/hadoop/AmazonS3): 1. `S3AFileSystem` (**推荐** Hadoop 2.7 及其以上版本使用): 用于在内部使用Amazon SDK读取和写入常规文件的文件系统。没有最大文件大小并且与IAM角色一起使用。 2. `NativeS3FileSystem` (Hadoop 2.6 及一起版本): 用于读写常规文件的文件系统。最大对象大小为5GB,不适用于IAM角色。 ##### `S3AFileSystem` (推荐) 推荐使用的S3 FileSystem实现。它在内部使用Amazon的SDK并与IAM角色一起使用 (参阅 [配置访问凭据](#configure-access-credentials-1)). 需要修改Flink中Hadoop的配置,配置文件 `core-site.xml`: ``` fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3a.buffer.dir /tmp ``` 这里注册 `S3AFileSystem` 作为 `s3a://` URI开头的文件实现. ##### `NativeS3FileSystem` 此文件系统仅限于最大5GB的文件,并且不适用于IAM角色(参阅 [配置访问凭据](#configure-access-credentials-1)),所以需要在配置文件中配置AWS的access key。 需要修改Flink中Hadoop的配置,配置文件`core-site.xml`: ``` fs.s3.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem ``` 这里注册 `NativeS3FileSystem` 作为 `s3://` URI开头文件的实现. #### Hadoop 配置 可以采用如下两种方式指定 [Hadoop 配置](../config.html#hdfs) 把Flink指向Hadoop的配置目录 * 设置环境变量`HADOOP_CONF_DIR`, * 设置 `flink-conf.yaml`文件中的 `fs.hdfs.hadoopconf`: ``` fs.hdfs.hadoopconf: /path/to/etc/hadoop ``` `/path/to/etc/hadoop` 被注册为Hadoop的配置目录. Flink 会在目录下查找 `core-site.xml` 和 `hdfs-site.xml` 文件。 #### 配置访问凭据 **Note:** 如果在EMR上直接运行Flink,则无需额外配置,请参阅 [Flink on EMR](#emr-elastic-mapreduce). Flink中使用 S3 存储时,需要保证Flink可以访问到S3存储。 ##### 身份和访问管理 (IAM) (推荐) 比较推荐使用身份和访问管理 (IAM)来设置Access key,可以通过IAM提供的功能来让Flink程序安全的访问S3存储。 详细可以参阅[文档身份和访问管理 (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html)。具体用户角色权限控制可以参考 [IAM 角色介绍](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). 如果正确设置此选项,则可以在AWS中管理对S3的访问,并且Flink访问S3不需要任何keys ##### Access Keys (不推荐) 还可以通过 **access key**来对S3进行访问。相关内容可以参阅 [ IAM的角色](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). 请注意,这只适用于 `S3AFileSystem` 而不是 `NativeS3FileSystem`. ##### 通过Access Keys 访问 `S3AFileSystem` (不推荐) 可以用过 **Access Keys**来授权对S3存储的访问。 但是这种操作并不推荐,请参阅 [IAM 角色介绍](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). 对于 `S3AFileSystem` 需要在Hadoop的`core-site.xml`文件中同时配置 `fs.s3a.access.key` 和 `fs.s3a.secret.key` : ``` fs.s3a.access.key fs.s3a.secret.key ``` ##### 通过Access Keys访问`NativeS3FileSystem` (不推荐) 可以用过 **Access Keys**来授权对S3存储的访问。 此文件系统已经过时,最好使用 `S3AFileSystem`来代替,详情参阅 [IAM 角色要求](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). 对于`NativeS3FileSystem` 需要在Hadoop的 `core-site.xml`文件中同事配置 `fs.s3.awsAccessKeyId` 和 `fs.s3.awsSecretAccessKey` : ``` fs.s3.awsAccessKeyId fs.s3.awsSecretAccessKey ``` #### 提供 S3 FileSystem 依赖 **Note:** 在EMR上运行Flink则无需配置,请参阅 [Flink on EMR](#emr-elastic-mapreduce). Hadoop的S3 FileSystem客户端打包在 `hadoop-aws` jar中(Hadoop 2.6及其以后)。需要将JAR及其所有依赖项添加到Flink的类路径中,即Job和TaskManagers的类路径。根据使用的FileSystem实现以及使用的Flink和Hadoop版本,需要添加不同的依赖项(请参阅下文)。 有多种方法可以将JAR添加到Flink的类路径中,最简单的方法就是将JAR放在Flink的 `lib` 目录下。 拷贝 `hadoop-aws` JAR 和他的所有依赖复制到lib下,还可以把一个目录指定为 `HADOOP_CLASSPATH` 环境变量。 ##### Flink for Hadoop 2.7 根据您使用的操作系统,请添加以下依赖项。可以在`hadoop-2.7/share/hadoop/tools/lib`找到一部分: * `S3AFileSystem`: * `hadoop-aws-2.7.3.jar` * `aws-java-sdk-s3-1.11.183.jar` 及其依赖: * `aws-java-sdk-core-1.11.183.jar` * `aws-java-sdk-kms-1.11.183.jar` * `jackson-annotations-2.6.7.jar` * `jackson-core-2.6.7.jar` * `jackson-databind-2.6.7.jar` * `joda-time-2.8.1.jar` * `httpcore-4.4.4.jar` * `httpclient-4.5.3.jar` * `NativeS3FileSystem`: * `hadoop-aws-2.7.3.jar` * `guava-11.0.2.jar` 注意 `hadoop-common` 是Flink的一部分, 但 Guava不是。 ##### Flink for Hadoop 2.6 根据您使用的操作系统,请添加以下依赖项。可以在`hadoop-2.6/share/hadoop/tools/lib`找到一部分: * `S3AFileSystem`: * `hadoop-aws-2.6.4.jar` * `aws-java-sdk-1.7.4.jar` and its dependencies: * `jackson-annotations-2.1.1.jar` * `jackson-core-2.1.1.jar` * `jackson-databind-2.1.1.jar` * `joda-time-2.2.jar` * `httpcore-4.2.5.jar` * `httpclient-4.2.5.jar` * `NativeS3FileSystem`: * `hadoop-aws-2.6.4.jar` * `guava-11.0.2.jar` 注意 `hadoop-common` 是Flink的一部分, 但 Guava不是。 ##### Flink for Hadoop 2.4及其以下版本 2.4及其以下版本只支持 `NativeS3FileSystem`相应依赖已经包含在`hadoop-common`中了,所以不需要额外添加依赖。 ## 常见问题 下面列出了在AWS上使用Flink时的部分常见问题。 ### Missing S3 FileSystem Configuration 如果作业提交失败,并显示 `No file system found with scheme s3` 这说明S3文件系统的配置不正确,需要检查文件配置,参照 [Shaded Hadoop/Presto S3 文件系统 ](#shaded-hadooppresto-s3-file-systems-recommended) 或者 [Hadoop基本配置](#set-s3-filesystem) 保证配置正确性 ``` org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...] Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: No file system found with scheme s3, referenced in file URI 's3:///'. [...] Caused by: java.io.IOException: No file system found with scheme s3, referenced in file URI 's3:///'. at o.a.f.core.fs.FileSystem.get(FileSystem.java:296) at o.a.f.core.fs.Path.getFileSystem(Path.java:311) at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) at o.a.f.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156) ``` ### AWS Access Key ID and Secret Access Key Not Specified 如果作业失败并显示异常 `AWS Access Key ID and Secret Access Key must be specified as the username or password`, 未正确设置您的访问凭据。有关如何配置请参阅 [shaded Hadoop/Presto](#configure-access-credentials) or [Hadoop基本配置](#configure-access-credentials-1) 。 ``` org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...] Caused by: java.io.IOException: The given file URI (s3:///) points to the HDFS NameNode at , but the File System could not be initialized with that address: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...] Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...] at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70) at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source) at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330) at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321) ``` ### ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found 看到此异常,表示S3 FileSystem不是Flink的类路径的一部分请参阅 [S3 文件存储依赖](#provide-s3-filesystem-dependency) 。 ``` Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156) ... 25 more Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178) ... 32 more Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152) ... 33 more ``` ### IOException: `400: Bad Request` 如果您已正确配置所有内容,但是请求却显示 `Bad Request` 异常 **and** S3 bucket在 `eu-central-1`可用区, 可能是因为S3客户端不支持,请参阅[Amazon’s signature version 4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html). ``` [...] Caused by: java.io.IOException: s3:/// : 400 : Bad Request [...] Caused by: org.jets3t.service.impl.rest.HttpException [...] ``` 或 ``` com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...] ``` Hadoop/Presto S3 存储不会有问题,但是 Hadoop-provided S3 file systems会有。Hadoop versions 大于 2.7.2 运行于 `NativeS3FileSystem` (依赖 `JetS3t 0.9.0` 版本 [>= 0.9.4](http://www.jets3t.org/RELEASE_NOTES.html)) 都会受影响,部分 `S3AFileSystem`也可能出现该异常。 除了更改可用区之外,可以参阅亚马逊的 [requesting signature version 4 for request authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version)进行修改, 例如. 在 `flink-conf.yaml` 中添加JVM参数(参阅 [配置](../config.html#common-options)): ``` env.java.opts: -Dcom.amazonaws.services.s3.enableV4 ``` ### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator 此异常通常是由跳过本地缓冲区目录配置引起 `S3AFileSystem`的`fs.s3a.buffer.dir`配置。参阅 [S3A文件系统配置](#s3afilesystem-recommended) 来正确配置. ``` [...] Caused by: java.lang.NullPointerException at o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at o.a.h.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87) at o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at o.a.h.fs.FileSystem.create(FileSystem.java:907) at o.a.h.fs.FileSystem.create(FileSystem.java:888) at o.a.h.fs.FileSystem.create(FileSystem.java:785) at o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at ... 25 more ```