# YARN 配置 ## 快速开始 ### 启动YARN 启动一个有 4 Task Managers (每个4 GB堆内存)的YARN session命令: ``` # get the hadoop2 package from the Flink download page at # http://flink.apache.org/downloads.html curl -O tar xvzf flink-1.7.1-bin-hadoop2.tgz cd flink-1.7.1/ ./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m ``` `-s`参数可以指定每个TaskManager的处理槽数。建议将插槽数设置为每台计算机的处理器数。 YARN启动后,可以通过 `./bin/flink` 工具来提交Flink作业. ### 在YARN上运行Flink作业 ``` # get the hadoop2 package from the Flink download page at # http://flink.apache.org/downloads.html curl -O tar xvzf flink-1.7.1-bin-hadoop2.tgz cd flink-1.7.1/ ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar ``` ## Flink YARN Session Apache [Hadoop YARN](http://hadoop.apache.org/) 是一个集群资源管理框架。它允许在群集上运行各种分布式应用程序。Flink在YARN上运行。如果已经有YARN,用户不必安装其他任何东西。 **系统配置需求** * 版本不低于Apache Hadoop 2.2 * HDFS(Hadoop分布式文件系统)(或Hadoop支持的其他分布式文件系统) 如果在使用Flink YARN时遇到问题,请查阅 [FAQ section](http://flink.apache.org/faq.html#yarn-deployment). ### 启动 Flink 会话 以下说明了解如何在YARN群集中启动Flink会话。 会话将启动Flink程序运行所必须的服务 (JobManager 和 TaskManagers) 这样就可以把程序提交到集群中.需要注意的是,可以在会话中运行多个程序 #### Download Flink 从 [下载页](http://flink.apache.org/downloads.html)下载Hadoop版本>2的安装包. 它包含所需的文件。 通过下面命令解压: ``` tar xvzf flink-1.7.1-bin-hadoop2.tgz cd flink-1.7.1/ ``` #### 启动一个会话 通过以下命令启动一个session ``` ./bin/yarn-session.sh ``` 命令将显示以下概述: ``` Usage: Required -n,--container Number of YARN container to allocate (=Number of Task Managers) Optional -D Dynamic properties -d,--detached Start detached -jm,--jobManagerMemory Memory for JobManager Container with optional unit (default: MB) -nm,--name Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue Specify YARN queue. -s,--slots Number of slots per TaskManager -tm,--taskManagerMemory Memory per TaskManager Container with optional unit (default: MB) -z,--zookeeperNamespace Namespace to create the Zookeeper sub-paths for HA mode ``` 需要注意的是,客户端需要 `YARN_CONF_DIR` 或 `HADOOP_CONF_DIR` 环境变量来保证可以读取 YARN 和 HDFS 配置. **举例:** 以下命令可以分配10个TaskManager,每个TaskManager有8 GB内存和32个处理插槽 ``` ./bin/yarn-session.sh -n 10 -tm 8192 -s 32 ``` 系统会使用 `conf/flink-conf.yaml`文件中的配置. 需要更改配置可以参考 [configuration guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html) 。 YARN上的Flink将覆盖以下配置参数 `jobmanager.rpc.address` (因为JobManager可能会分配到不同的机器上运行), `taskmanager.tmp.dirs` (我们使用YARN给出的tmp目录) 以及 `parallelism.default` 如果已经指定插槽数 I如果不希望通过更改配置文件来设置配置参数,则可以选择通过`-D`标志传递动态属性。例如通过如下方式传递参数: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`. 例子中启动了11个容器(即使只请求了10个容器)因为ApplicationMaster和Job Manager还有一个额外的容器。 在YARN群集中部署Flink后,YARN会显示JobManager的详细信息。 按下CTRL + C 或者输入 ‘stop’可以停止YARN会话。 如果群集上资源从租,Flink会按照请求来启动容器。大多数YARN调度账户考虑所请求容器的内存量,还有些账户会思考vcores的数量。 默认情况下,vcores的数量等于处理slots (`-s`) 参数。 调整 [`yarn.containers.vcores`](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#yarn-containers-vcores) 可以自定义vcores的数值。 为了使此参数起作用,需要在群集中启用CPU调度。 #### 分离YARN 会话 如果不想让Flink程序一直运行,可以启动一个 _分离_ YARN 会话。参数称为 `-d` or `--detached`。 在这种情况下,Flink YARN客户端将仅向群集提交Flink,然后自行关闭。需要注意的是,在这种情况下,无法使用Flink停止YARN会话,需要使用 YARN 工具 (`yarn application -kill <appId>`) 来停止YARN会话。 #### 附加到已有Session 使用以下命令启动会话 ``` ./bin/yarn-session.sh ``` 命令将会显示一下内容 ``` Usage: Required -id,--applicationId YARN application Id ``` 正如前面所述, 必须设置`YARN_CONF_DIR` 和 `HADOOP_CONF_DIR` 环境变量来确保可以读取YARN和HDFS配置。 **举例:** 输入以下命令以附加到正在运行的Flink YARN会话 `application_1463870264508_0029`: ``` ./bin/yarn-session.sh -id application_1463870264508_0029 ``` 附加到正在运行的会话使用YARN ResourceManager来指定JobManagerRPC端口。 按下CTRL + C 或者输入 ‘stop’可以停止YARN会话。 ### 向Flink提交作业 使用以下命令将Flink程序提交到YARN群集: ``` ./bin/flink ``` 参数可以参阅 [command-line client](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/cli.html)文档。 该命令将会按照如下显示: ``` [...] Action "run" compiles and runs a program. Syntax: run [OPTIONS] "run" action arguments: -c,--class Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -m,--jobmanager Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -p,--parallelism The parallelism with which to run the program. Optional flag to override the default value specified in the configuration ``` 使用 _run_ 命令把作业提交给YARN。客户端能够确定JobManager的地址。 少数情况下可以通过 `-m` 参数来指定JobManager的地址。JobManager可以在YARN的控制台上可见 **示例** ``` wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ... ./bin/flink run ./examples/batch/WordCount.jar \ hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt ``` 如果出现以下错误,请检查所有TaskManagers都已启动: ``` Exception in thread "main" org.apache.flink.compiler.CompilerException: Available instances could not be determined from job manager: Connection timed out. ``` 可以在JobManager Web界面中检查TaskManagers的数量。接口的地址打印在YARN会话控制台中。 如果一分钟后TaskManagers没有出现,则需要使用日志文件确认问题。 ## 在 YARN上运行单作业 上面的文档描述了如何在YARN环境中启动Flink集群。也可以通过YARN中启动Flink来执行单个作业。 需要注意客户端可以通过 `-yn` 参数来指定 TaskManagers的数量 **_示例:_** ``` ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar ``` YARN会话的命令行选项也可用于 `./bin/flink`.以 `y` 或者 `yarn` 为前缀 注意: 可以为每个作业使用不同的配置目录`FLINK_CONF_DIR`,需要这样配置时,请拷贝 `conf`目录,然后分发到不同节点上,并修改对应的配置 注意:当对一个分离的YARN会话(`-yd`)使用`-m yarn-cluster`参数时,应用程序将不会从ExecutionEnvironment.execute()调用获得任何累加器结果或异常! ### jars 和 Classpath 默认情况下,Flink将在运行单个作业时将用户jar包含到系统类路径中。可以使用 `yarn.per-job-cluster.include-user-jar` 参数控制. 当设置为 `DISABLED` 时,Flink会使用用户类路径中的jar包 可以通过以下之一来控制类路径中的user-jar位置: * `ORDER`: (默认) 根据字典顺序将jar添加到系统类路径。 * `FIRST`: 将jar添加到系统类路径的开头。 * `LAST`: 将jar添加到系统类路径的末尾。 ## Flink 在 YARN上的故障恢复 运行在YARN上的Flink可以通过配置参数来控制发生故障时的行为,相关配置可以在 `conf/flink-conf.yaml` 文件配置或使用 `-D` 参数指定。 * `yarn.reallocate-failed`: Flink是否应重新分配失败的TaskManager容器。默认值:true。 * `yarn.maximum-failed-containers`: ApplicationMaster在YARN会话失败之前接受的最大失败容器数,默认值:最初请求的TaskManagers (`-n`)的数量。 * `yarn.application-attempts`: ApplicationMaster (+ TaskManager ) 尝试次数,如果此值设置为1(默认值),则当Application master失败时,整个YARN会话将失败。较高的值指定YARN重新启动ApplicationMaster的次数。 ## 调试失败的YARN会话 Flink YARN会话部署失败的原因有很多。Hadoop配置错误(HDFS权限,YARN配置),版本不兼容(在Cloudera Hadoop上运行Flink与vanilla Hadoop依赖关系)或其他错误。 ### 日志文件 在部署期间失败,调试时比较有用的一个手段就是YARN日志聚合,参阅 [YARN 日志聚合](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/). 在` yarn-site.xml` 中配置 `yarn.log-aggregation-enable` 为true可以启用配置。 启用后,用户可以使用以下命令检索(失败的)YARN会话的所有日志文件。 ``` yarn logs -applicationId ``` 需要注意的是,会话结束后需要几秒钟才会显示日志。 ### YARN Client 终端 和 Web 界面 如果在运行期间发生错误,Flink YARN客户端还会在终端中打印错误消息(例如,如果TaskManager在一段时间后停止工作)。 除此之外,还有YARN Resource Manager Web界面(默认情况下在端口8088上)可以看到详情。端口由`yarn.resourcemanager.webapp.address`配置指定。 界面可以看到日志文件以运行YARN应用程序,显示失败应用程序和一些错误信息。 ## 构建特定版本的YARN客户端 使用Hortonworks,Cloudera或MapR等公司的Hadoop发行版的用户可能必须针对其特定版本的Hadoop(HDFS)和YARN构建Flink。相关内容可以查阅 [构建指南](//ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html) 。 ## 在防火墙后面的YARN上运行Flink 某些YARN群集使用防火墙来控制群集与网络其余部分之间的网络流量。在这些设置中,Flink作业只能从群集网络内(防火墙后面)提交到YARN会话。这种情况下需要放开Flink应用的相关端口。 目前两个与提交作业相关的服务: * JobManager (ApplicationMaster in YARN) * 在JobManager中运行的BlobServer。 向Flink提交作业时,BlobServer会将带有用户代码的jar分发给所有工作节点(TaskManagers)。JobManager自己接收作业并触发执行。 用于指定端口的两个配置参数如下: * `yarn.application-master.port` * `blob.server.port` 这两个配置选项可以配置,单个端口(例如:“50010”),范围(“50000-50025”)或两者的组合(“50010,50011,50020-50025,50050-50075”)。 (Hadoop使用类似的机制,比如配置`yarn.app.mapreduce.am.job.client.port-range`.) ## Flink和YARN的内部交互机制 本节简要介绍Flink和YARN如何交互。 ![](img/FlinkOnYarn.svg) ARN客户端需要访问Hadoop配置用来连接到YARN资源管理器和HDFS。使用以下策略确定Hadoop配置: * 按顺序测试 `YARN_CONF_DIR`, `HADOOP_CONF_DIR` 或者 `HADOOP_CONF_PATH` 是否存在,如果其中一个变量存在,则使用配置中的路径来读取配置。 * 如果上面的都不存在 (YARN设置正确不应该这样),则客户端正在使用 `HADOOP_HOME` 环境变量。 如果设置了则尝试访问 `$HADOOP_HOME/etc/hadoop` (Hadoop 2) 和 `$HADOOP_HOME/conf` (Hadoop 1)路径来读取配置。 启动新的Flink YARN会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,将包含Flink和配置的jar上传到HDFS(步骤1)。 客户端的下一步是请求(步骤2)YARN容器以启动 _ApplicationMaster_ (步骤 3)。由于客户端将配置和jar文件注册为容器的资源,因此在该特定机器上运行的YARN的NodeManager将负责准备容器(例如,下载文件)。完成后,将启动 _ApplicationMaster_ (AM) 。 _JobManager_ 和 AM 在同一容器内,运行一旦它们成功启动,AM就知道JobManager(它自己的主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager)。该文件也自动上传到HDFS中, _AM_ 还提供了Flink的WEB界面。YARN代码分配的所有端口都是 _临时端口_,这样用户可以并行执行多个Flink YARN会话。 之后,AM开始为Flink的TaskManagers分配容器,这将从HDFS下载jar文件和修改后的配置。完成这些步骤后,即可建立Flink并准备接受作业。