提交 bda2e037 编写于 作者: W wizardforcel

init

上级
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.DS_Store
# gitbook
_book
# node.js
node_modules
# windows
Thumbs.db
# word
~$*.docx
~$*.doc
---
permalink: /404.html
---
<script>window.location.href = '/';</script>
flink.apachecn.org
\ No newline at end of file
# 贡献指南
> 请您勇敢地去翻译和改进翻译。虽然我们追求卓越,但我们并不要求您做到十全十美,因此请不要担心因为翻译上犯错——在大部分情况下,我们的服务器已经记录所有的翻译,因此您不必担心会因为您的失误遭到无法挽回的破坏。(改编自维基百科)
负责人:
## 章节列表
+ [Apache Flink Documentation](docs/1.md)
+ [Dataflow Programming Model](docs/2.md)
+ [Distributed Runtime Environment](docs/3.md)
+ [DataStream API Tutorial](docs/4.md)
+ [Local Setup Tutorial](docs/5.md)
+ [Running Flink on Windows](docs/6.md)
+ [Examples](docs/7.md)
+ [Batch Examples](docs/8.md)
+ [Project Template for Java](docs/9.md)
+ [Project Template for Scala](docs/10.md)
+ [Configuring Dependencies, Connectors, Libraries](docs/11.md)
+ [Basic API Concepts](docs/12.md)
+ [Scala API Extensions](docs/13.md)
+ [Java Lambda Expressions](docs/14.md)
+ [Flink DataStream API Programming Guide](docs/15.md)
+ [Event Time](docs/16.md)
+ [Generating Timestamps / Watermarks](docs/17.md)
+ [Pre-defined Timestamp Extractors / Watermark Emitters](docs/18.md)
+ [State & Fault Tolerance](docs/19.md)
+ [Working with State](docs/20.md)
+ [The Broadcast State Pattern](docs/21.md)
+ [Checkpointing](docs/22.md)
+ [Queryable State Beta](docs/23.md)
+ [State Backends](docs/24.md)
+ [State Schema Evolution](docs/25.md)
+ [Custom Serialization for Managed State](docs/26.md)
+ [Operators](docs/27.md)
+ [Windows](docs/28.md)
+ [Joining](docs/29.md)
+ [Process Function (Low-level Operations)](docs/30.md)
+ [Asynchronous I/O for External Data Access](docs/31.md)
+ [Streaming Connectors](docs/32.md)
+ [Fault Tolerance Guarantees of Data Sources and Sinks](docs/33.md)
+ [Apache Kafka Connector](docs/34.md)
+ [Apache Cassandra Connector](docs/35.md)
+ [Amazon AWS Kinesis Streams Connector](docs/36.md)
+ [Elasticsearch Connector](docs/37.md)
+ [HDFS Connector](docs/38.md)
+ [Streaming File Sink](docs/39.md)
+ [RabbitMQ Connector](docs/40.md)
+ [Apache NiFi Connector](docs/41.md)
+ [Twitter Connector](docs/42.md)
+ [Side Outputs](docs/43.md)
+ [Python Programming Guide (Streaming) Beta](docs/44.md)
+ [Testing](docs/45.md)
+ [Experimental Features](docs/46.md)
+ [Flink DataSet API Programming Guide](docs/47.md)
+ [DataSet Transformations](docs/48.md)
+ [Fault Tolerance](docs/49.md)
+ [Iterations](docs/50.md)
+ [Zipping Elements in a DataSet](docs/51.md)
+ [Connectors](docs/52.md)
+ [Python Programming Guide Beta](docs/53.md)
+ [Hadoop Compatibility Beta](docs/54.md)
+ [Local Execution](docs/55.md)
+ [Cluster Execution](docs/56.md)
+ [Table API & SQL](docs/57.md)
+ [Concepts & Common API](docs/58.md)
+ [Streaming Concepts](docs/59.md)
+ [Dynamic Tables](docs/60.md)
+ [Time Attributes](docs/61.md)
+ [Joins in Continuous Queries](docs/62.md)
+ [Temporal Tables](docs/63.md)
+ [Detecting Patterns in Tables Beta](docs/64.md)
+ [Query Configuration](docs/65.md)
+ [Connect to External Systems](docs/66.md)
+ [Table API](docs/67.md)
+ [SQL](docs/68.md)
+ [Built-In Functions](docs/69.md)
+ [User-defined Sources & Sinks](docs/70.md)
+ [User-defined Functions](docs/71.md)
+ [SQL Client Beta](docs/72.md)
+ [Data Types & Serialization](docs/73.md)
+ [Register a custom serializer for your Flink program](docs/74.md)
+ [Execution Configuration](docs/75.md)
+ [Program Packaging and Distributed Execution](docs/76.md)
+ [Parallel Execution](docs/77.md)
+ [Execution Plans](docs/78.md)
+ [Restart Strategies](docs/79.md)
+ [FlinkCEP - Complex event processing for Flink](docs/80.md)
+ [Storm Compatibility Beta](docs/81.md)
+ [Gelly: Flink Graph API](docs/82.md)
+ [Graph API](docs/83.md)
+ [Iterative Graph Processing](docs/84.md)
+ [Library Methods](docs/85.md)
+ [Graph Algorithms](docs/86.md)
+ [Graph Generators](docs/87.md)
+ [Bipartite Graph](docs/88.md)
+ [FlinkML - Machine Learning for Flink](docs/89.md)
+ [Quickstart Guide](docs/90.md)
+ [Alternating Least Squares](docs/91.md)
+ [How to Contribute](docs/92.md)
+ [Cross Validation](docs/93.md)
+ [Distance Metrics](docs/94.md)
+ [k-Nearest Neighbors Join](docs/95.md)
+ [MinMax Scaler](docs/96.md)
+ [Multiple Linear Regression](docs/97.md)
+ [Looking under the hood of pipelines](docs/98.md)
+ [Polynomial Features](docs/99.md)
+ [Stochastic Outlier Selection](docs/100.md)
+ [Standard Scaler](docs/101.md)
+ [SVM using CoCoA](docs/102.md)
+ [Best Practices](docs/103.md)
+ [API Migration Guides](docs/104.md)
+ [Standalone Cluster](docs/105.md)
+ [YARN Setup](docs/106.md)
+ [Mesos Setup](docs/107.md)
+ [Docker Setup](docs/108.md)
+ [Kubernetes Setup](docs/109.md)
+ [Amazon Web Services (AWS)](docs/110.md)
+ [Google Compute Engine Setup](docs/111.md)
+ [MapR Setup](docs/112.md)
+ [Hadoop Integration](docs/113.md)
+ [JobManager High Availability (HA)](docs/114.md)
+ [Checkpoints](docs/115.md)
+ [Savepoints](docs/116.md)
+ [State Backends](docs/117.md)
+ [Tuning Checkpoints and Large State](docs/118.md)
+ [Configuration](docs/119.md)
+ [Production Readiness Checklist](docs/120.md)
+ [Command-Line Interface](docs/121.md)
+ [Scala REPL](docs/122.md)
+ [Kerberos Authentication Setup and Configuration](docs/123.md)
+ [SSL Setup](docs/124.md)
+ [File Systems](docs/125.md)
+ [Upgrading Applications and Flink Versions](docs/126.md)
+ [Metrics](docs/127.md)
+ [How to use logging](docs/128.md)
+ [History Server](docs/129.md)
+ [Monitoring Checkpointing](docs/130.md)
+ [Monitoring Back Pressure](docs/131.md)
+ [Monitoring REST API](docs/132.md)
+ [Debugging Windows & Event Time](docs/133.md)
+ [Debugging Classloading](docs/134.md)
+ [Application Profiling](docs/135.md)
+ [Importing Flink into an IDE](docs/136.md)
+ [Building Flink from Source](docs/137.md)
+ [Component Stack](docs/138.md)
+ [Data Streaming Fault Tolerance](docs/139.md)
+ [Jobs and Scheduling](docs/140.md)
+ [Task Lifecycle](docs/141.md)
+ [File Systems](docs/142.md)
## 流程
### 一、认领
首先查看[整体进度](https://github.com/apachecn/flink-doc-zh/issues/1),确认没有人认领了你想认领的章节。
然后回复 ISSUE,注明“章节 + QQ 号”(一定要留 QQ)。
### 二、翻译
可以合理利用翻译引擎(例如[谷歌](https://translate.google.cn/)),但一定要把它变得可读!
如果遇到格式问题,请随手把它改正。
### 三、提交
+ `fork` Github 项目
+ 将译文放在`docs`文件夹下
+ `push`
+ `pull request`
请见 [Github 入门指南](https://github.com/apachecn/kaggle/blob/dev/docs/GitHub)
\ No newline at end of file
# Flink 1.7 中文文档
![](docs/img/navbar-brand-logo.png)
> 原文:[Apache Flink 1.7 Documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.7/)
>
> 协议:[CC BY-NC-SA 4.0](http://creativecommons.org/licenses/by-nc-sa/4.0/)
>
> 欢迎任何人参与和完善:一个人可以走的很快,但是一群人却可以走的更远。
* [在线阅读](https://apachecn.github.io/flink-doc-zh)
* [贡献指南](CONTRIBUTING.md)
* [ApacheCN 大数据交流群 152622464](http://shang.qq.com/wpa/qunwpa?idkey=30e5f1123a79867570f665aa3a483ca404b1c3f77737bc01ec520ed5f078ddef)
* [ApacheCN 学习资源](http://www.apachecn.org/)
## 联系方式
### 负责人
### 其他
* 认领翻译和项目进度-地址: <https://github.com/apachecn/flink-doc-zh/issues/1>
* 在我们的 [apachecn/flink-doc-zh](https://github.com/apachecn/flink-doc-zh) github 上提 issue.
* 发邮件到 Email: `apachecn@163.com`.
* 在我们的 [组织学习交流群](http://www.apachecn.org/organization/348.html) 中联系群主/管理员即可.
## 赞助我们
![](https://www.apachecn.org/img/about/donate.jpg)
+ [Flink 1.7 中文文档](README.md)
+ [Apache Flink Documentation](docs/1.md)
+ [Dataflow Programming Model](docs/2.md)
+ [Distributed Runtime Environment](docs/3.md)
+ [DataStream API Tutorial](docs/4.md)
+ [Local Setup Tutorial](docs/5.md)
+ [Running Flink on Windows](docs/6.md)
+ [Examples](docs/7.md)
+ [Batch Examples](docs/8.md)
+ [Project Template for Java](docs/9.md)
+ [Project Template for Scala](docs/10.md)
+ [Configuring Dependencies, Connectors, Libraries](docs/11.md)
+ [Basic API Concepts](docs/12.md)
+ [Scala API Extensions](docs/13.md)
+ [Java Lambda Expressions](docs/14.md)
+ [Flink DataStream API Programming Guide](docs/15.md)
+ [Event Time](docs/16.md)
+ [Generating Timestamps / Watermarks](docs/17.md)
+ [Pre-defined Timestamp Extractors / Watermark Emitters](docs/18.md)
+ [State & Fault Tolerance](docs/19.md)
+ [Working with State](docs/20.md)
+ [The Broadcast State Pattern](docs/21.md)
+ [Checkpointing](docs/22.md)
+ [Queryable State Beta](docs/23.md)
+ [State Backends](docs/24.md)
+ [State Schema Evolution](docs/25.md)
+ [Custom Serialization for Managed State](docs/26.md)
+ [Operators](docs/27.md)
+ [Windows](docs/28.md)
+ [Joining](docs/29.md)
+ [Process Function (Low-level Operations)](docs/30.md)
+ [Asynchronous I/O for External Data Access](docs/31.md)
+ [Streaming Connectors](docs/32.md)
+ [Fault Tolerance Guarantees of Data Sources and Sinks](docs/33.md)
+ [Apache Kafka Connector](docs/34.md)
+ [Apache Cassandra Connector](docs/35.md)
+ [Amazon AWS Kinesis Streams Connector](docs/36.md)
+ [Elasticsearch Connector](docs/37.md)
+ [HDFS Connector](docs/38.md)
+ [Streaming File Sink](docs/39.md)
+ [RabbitMQ Connector](docs/40.md)
+ [Apache NiFi Connector](docs/41.md)
+ [Twitter Connector](docs/42.md)
+ [Side Outputs](docs/43.md)
+ [Python Programming Guide (Streaming) Beta](docs/44.md)
+ [Testing](docs/45.md)
+ [Experimental Features](docs/46.md)
+ [Flink DataSet API Programming Guide](docs/47.md)
+ [DataSet Transformations](docs/48.md)
+ [Fault Tolerance](docs/49.md)
+ [Iterations](docs/50.md)
+ [Zipping Elements in a DataSet](docs/51.md)
+ [Connectors](docs/52.md)
+ [Python Programming Guide Beta](docs/53.md)
+ [Hadoop Compatibility Beta](docs/54.md)
+ [Local Execution](docs/55.md)
+ [Cluster Execution](docs/56.md)
+ [Table API & SQL](docs/57.md)
+ [Concepts & Common API](docs/58.md)
+ [Streaming Concepts](docs/59.md)
+ [Dynamic Tables](docs/60.md)
+ [Time Attributes](docs/61.md)
+ [Joins in Continuous Queries](docs/62.md)
+ [Temporal Tables](docs/63.md)
+ [Detecting Patterns in Tables Beta](docs/64.md)
+ [Query Configuration](docs/65.md)
+ [Connect to External Systems](docs/66.md)
+ [Table API](docs/67.md)
+ [SQL](docs/68.md)
+ [Built-In Functions](docs/69.md)
+ [User-defined Sources & Sinks](docs/70.md)
+ [User-defined Functions](docs/71.md)
+ [SQL Client Beta](docs/72.md)
+ [Data Types & Serialization](docs/73.md)
+ [Register a custom serializer for your Flink program](docs/74.md)
+ [Execution Configuration](docs/75.md)
+ [Program Packaging and Distributed Execution](docs/76.md)
+ [Parallel Execution](docs/77.md)
+ [Execution Plans](docs/78.md)
+ [Restart Strategies](docs/79.md)
+ [FlinkCEP - Complex event processing for Flink](docs/80.md)
+ [Storm Compatibility Beta](docs/81.md)
+ [Gelly: Flink Graph API](docs/82.md)
+ [Graph API](docs/83.md)
+ [Iterative Graph Processing](docs/84.md)
+ [Library Methods](docs/85.md)
+ [Graph Algorithms](docs/86.md)
+ [Graph Generators](docs/87.md)
+ [Bipartite Graph](docs/88.md)
+ [FlinkML - Machine Learning for Flink](docs/89.md)
+ [Quickstart Guide](docs/90.md)
+ [Alternating Least Squares](docs/91.md)
+ [How to Contribute](docs/92.md)
+ [Cross Validation](docs/93.md)
+ [Distance Metrics](docs/94.md)
+ [k-Nearest Neighbors Join](docs/95.md)
+ [MinMax Scaler](docs/96.md)
+ [Multiple Linear Regression](docs/97.md)
+ [Looking under the hood of pipelines](docs/98.md)
+ [Polynomial Features](docs/99.md)
+ [Stochastic Outlier Selection](docs/100.md)
+ [Standard Scaler](docs/101.md)
+ [SVM using CoCoA](docs/102.md)
+ [Best Practices](docs/103.md)
+ [API Migration Guides](docs/104.md)
+ [Standalone Cluster](docs/105.md)
+ [YARN Setup](docs/106.md)
+ [Mesos Setup](docs/107.md)
+ [Docker Setup](docs/108.md)
+ [Kubernetes Setup](docs/109.md)
+ [Amazon Web Services (AWS)](docs/110.md)
+ [Google Compute Engine Setup](docs/111.md)
+ [MapR Setup](docs/112.md)
+ [Hadoop Integration](docs/113.md)
+ [JobManager High Availability (HA)](docs/114.md)
+ [Checkpoints](docs/115.md)
+ [Savepoints](docs/116.md)
+ [State Backends](docs/117.md)
+ [Tuning Checkpoints and Large State](docs/118.md)
+ [Configuration](docs/119.md)
+ [Production Readiness Checklist](docs/120.md)
+ [Command-Line Interface](docs/121.md)
+ [Scala REPL](docs/122.md)
+ [Kerberos Authentication Setup and Configuration](docs/123.md)
+ [SSL Setup](docs/124.md)
+ [File Systems](docs/125.md)
+ [Upgrading Applications and Flink Versions](docs/126.md)
+ [Metrics](docs/127.md)
+ [How to use logging](docs/128.md)
+ [History Server](docs/129.md)
+ [Monitoring Checkpointing](docs/130.md)
+ [Monitoring Back Pressure](docs/131.md)
+ [Monitoring REST API](docs/132.md)
+ [Debugging Windows & Event Time](docs/133.md)
+ [Debugging Classloading](docs/134.md)
+ [Application Profiling](docs/135.md)
+ [Importing Flink into an IDE](docs/136.md)
+ [Building Flink from Source](docs/137.md)
+ [Component Stack](docs/138.md)
+ [Data Streaming Fault Tolerance](docs/139.md)
+ [Jobs and Scheduling](docs/140.md)
+ [Task Lifecycle](docs/141.md)
+ [File Systems](docs/142.md)
# Apache Flink Documentation
This documentation is for Apache Flink version 1.7\. These pages were built at: 02/16/19, 01:02:08 AM UTC.
Apache Flink is an open source platform for distributed stream and batch data processing. Flink’s core is a streaming dataflow engine that provides data distribution, communication, and fault tolerance for distributed computations over data streams. Flink builds batch processing on top of the streaming engine, overlaying native iteration support, managed memory, and program optimization.
## First Steps
* **Concepts**: Start with the basic concepts of Flink’s [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommend you read these sections first.
* **Tutorials**:
* [Implement and run a DataStream application](./tutorials/datastream_api.html)
* [Setup a local Flink cluster](./tutorials/local_setup.html)
* **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs.
## Deployment
Before putting your Flink job into production, read the [Production Readiness Checklist](ops/production_ready.html).
## Release Notes
Release notes cover important changes between Flink versions. Please carefully read these notes if you plan to upgrade your Flink setup to a later version.
* [Release notes for Flink 1.7](release-notes/flink-1.7.html).
* [Release notes for Flink 1.6](release-notes/flink-1.6.html).
* [Release notes for Flink 1.5](release-notes/flink-1.5.html).
## External Resources
* **Flink Forward**: Talks from past conferences are available at the [Flink Forward](http://flink-forward.org/) website and on [YouTube](https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA). [Robust Stream Processing with Apache Flink](http://2016.flink-forward.org/kb_sessions/robust-stream-processing-with-apache-flink/) is a good place to start.
* **Training**: The [training materials](http://training.data-artisans.com/) from data Artisans include slides, exercises, and sample solutions.
* **Blogs**: The [Apache Flink](https://flink.apache.org/blog/) and [data Artisans](https://data-artisans.com/blog/) blogs publish frequent, in-depth technical articles about Flink.
# Project Template for Scala
## Build Tools
Flink projects can be built with different build tools. In order to get quickly started, Flink provides project templates for the following build tools:
* [SBT](#sbt)
* [Maven](#maven)
These templates help you to set up the project structure and to create the initial build files.
## SBT
### Create Project
You can scaffold a new project via either of the following two methods:
* [Use the **sbt template**](#sbt_template)
* [Run the **quickstart script**](#quickstart-script-sbt)
<figure class="highlight">
```
$ sbt new tillrohrmann/flink-project.g8
```
</figure>
This will prompt you for a couple of parameters (project name, flink version...) and then create a Flink project from the [flink-project template](https://github.com/tillrohrmann/flink-project.g8). You need sbt >= 0.13.13 to execute this command. You can follow this [installation guide](http://www.scala-sbt.org/download.html) to obtain it if necessary.
<figure class="highlight">
```
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
```
</figure>
This will create a Flink project in the **specified** project directory.
### Build Project
In order to build your project you simply have to issue the `sbt clean assembly` command. This will create the fat-jar **your-project-name-assembly-0.1-SNAPSHOT.jar** in the directory **target/scala_your-major-scala-version/**.
### Run Project
In order to run your project you have to issue the `sbt run` command.
Per default, this will run your job in the same JVM as `sbt` is running. In order to run your job in a distinct JVM, add the following line to `build.sbt`
<figure class="highlight">
```
fork in run := true
```
</figure>
#### IntelliJ
We recommend using [IntelliJ](https://www.jetbrains.com/idea/) for your Flink job development. In order to get started, you have to import your newly created project into IntelliJ. You can do this via `File -&gt; New -&gt; Project from Existing Sources...` and then choosing your project’s directory. IntelliJ will then automatically detect the `build.sbt` file and set everything up.
In order to run your Flink job, it is recommended to choose the `mainRunner` module as the classpath of your **Run/Debug Configuration**. This will ensure, that all dependencies which are set to _provided_ will be available upon execution. You can configure the **Run/Debug Configurations** via `Run -&gt; Edit Configurations...` and then choose `mainRunner` from the _Use classpath of module_ dropbox.
#### Eclipse
In order to import the newly created project into [Eclipse](https://eclipse.org/), you first have to create Eclipse project files for it. These project files can be created via the [sbteclipse](https://github.com/typesafehub/sbteclipse) plugin. Add the following line to your `PROJECT_DIR/project/plugins.sbt` file:
<figure class="highlight">
```
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
```
</figure>
In `sbt` use the following command to create the Eclipse project files
<figure class="highlight">
```
> eclipse
```
</figure>
Now you can import the project into Eclipse via `File -&gt; Import... -&gt; Existing Projects into Workspace` and then select the project directory.
## Maven
### Requirements
The only requirements are working **Maven 3.0.4** (or higher) and **Java 8.x** installations.
### Create Project
Use one of the following commands to **create a project**:
* [Use **Maven archetypes**](#maven-archetype)
* [Run the **quickstart script**](#quickstart-script)
<figure class="highlight">
```
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.7.1
```
</figure>
This allows you to **name your newly created project**. It will interactively ask you for the groupId, artifactId, and package name.
<figure class="highlight">
```
$ curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.7.1
```
</figure>
### Inspect Project
There will be a new directory in your working directory. If you’ve used the _curl_ approach, the directory is called `quickstart`. Otherwise, it has the name of your `artifactId`:
<figure class="highlight">
```
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── resources
│   └── log4j.properties
└── scala
└── org
└── myorg
└── quickstart
├── BatchJob.scala
└── StreamingJob.scala
```
</figure>
The sample project is a **Maven project**, which contains two classes: _StreamingJob_ and _BatchJob_ are the basic skeleton programs for a _DataStream_ and _DataSet_ program. The _main_ method is the entry point of the program, both for in-IDE testing/execution and for proper deployments.
We recommend you **import this project into your IDE**.
IntelliJ IDEA supports Maven out of the box and offers a plugin for Scala development. From our experience, IntelliJ provides the best experience for developing Flink applications.
For Eclipse, you need the following plugins, which you can install from the provided Eclipse Update Sites:
* _Eclipse 4.x_
* [Scala IDE](http://download.scala-ide.org/sdk/lithium/e44/scala211/stable/site)
* [m2eclipse-scala](http://alchim31.free.fr/m2e-scala/update-site)
* [Build Helper Maven Plugin](https://repo1.maven.org/maven2/.m2e/connectors/m2eclipse-buildhelper/0.15.0/N/0.15.0.201207090124/)
* _Eclipse 3.8_
* [Scala IDE for Scala 2.11](http://download.scala-ide.org/sdk/helium/e38/scala211/stable/site) or [Scala IDE for Scala 2.10](http://download.scala-ide.org/sdk/helium/e38/scala210/stable/site)
* [m2eclipse-scala](http://alchim31.free.fr/m2e-scala/update-site)
* [Build Helper Maven Plugin](https://repository.sonatype.org/content/repositories/forge-sites/m2e-extras/0.14.0/N/0.14.0.201109282148/)
### Build Project
If you want to **build/package your project**, go to your project directory and run the ‘`mvn clean package`’ command. You will **find a JAR file** that contains your application, plus connectors and libraries that you may have added as dependencies to the application: `target/&lt;artifact-id&gt;-&lt;version&gt;.jar`.
**Note:** If you use a different class than _StreamingJob_ as the application’s main class / entry point, we recommend you change the `mainClass` setting in the `pom.xml` file accordingly. That way, the Flink can run time application from the JAR file without additionally specifying the main class.
## Next Steps
Write your application!
If you are writing a streaming application and you are looking for inspiration what to write, take a look at the [Stream Processing Application Tutorial](//ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/datastream_api.html#writing-a-flink-program)
If you are writing a batch processing application and you are looking for inspiration what to write, take a look at the [Batch Application Examples](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/examples.html)
For a complete overview over the APIa, have a look at the [DataStream API](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html) and [DataSet API](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/index.html) sections.
[Here](//ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/local_setup.html) you can find out how to run an application outside the IDE on a local cluster.
If you have any trouble, ask on our [Mailing List](http://mail-archives.apache.org/mod_mbox/flink-user/). We are happy to provide help.
$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$
# Stochastic Outlier Selection
## Description
An outlier is one or multiple observations that deviates quantitatively from the majority of the data set and may be the subject of further investigation. Stochastic Outlier Selection (SOS) developed by Jeroen Janssens[[1]](#janssens) is an unsupervised outlier-selection algorithm that takes as input a set of vectors. The algorithm applies affinity-based outlier selection and outputs for each data point an outlier probability. Intuitively, a data point is considered to be an outlier when the other data points have insufficient affinity with it.
Outlier detection has its application in a number of field, for example, log analysis, fraud detection, noise removal, novelty detection, quality control, sensor monitoring, etc. If a sensor turns faulty, it is likely that it will output values that deviate markedly from the majority.
For more information, please consult the [PhD Thesis of Jeroens Janssens](https://github.com/jeroenjanssens/phd-thesis) on Outlier Selection and One-Class Classification which introduces the algorithm.
## Parameters
The stochastic outlier selection algorithm implementation can be controlled by the following parameters:
| Parameters | Description |
| --- | --- |
| **Perplexity** | Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference with SOS being a neighbor is not a binary property, but a probabilistic one, and therefore it a real number. Must be between 1 and n-1, where n is the number of points. A good starting point can be obtained by using the square root of the number of observations. (Default value: **30**) |
| **ErrorTolerance** | The accepted error tolerance to reduce computational time when approximating the affinity. It will sacrifice accuracy in return for reduced computational time. (Default value: **1e-20**) |
| **MaxIterations** | The maximum number of iterations to approximate the affinity of the algorithm. (Default value: **10**) |
## Example
<figure class="highlight">
```
val data = env.fromCollection(List(
LabeledVector(0.0, DenseVector(1.0, 1.0)),
LabeledVector(1.0, DenseVector(2.0, 1.0)),
LabeledVector(2.0, DenseVector(1.0, 2.0)),
LabeledVector(3.0, DenseVector(2.0, 2.0)),
LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier! ))
val sos = new StochasticOutlierSelection().setPerplexity(3)
val outputVector = sos
.transform(data)
.collect()
val expectedOutputVector = Map(
0 -> 0.2790094479202896,
1 -> 0.25775014551682535,
2 -> 0.22136130977995766,
3 -> 0.12707053787018444,
4 -> 0.9922779902453757 // The outlier! )
outputVector.foreach(output => expectedOutputVector(output._1) should be(output._2))
```
</figure>
**References**
[1]J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. _Stochastic Outlier Selection_. Technical Report TiCC TR 2012-001, Tilburg University, Tilburg, the Netherlands, 2012.
$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$
# Standard Scaler
## Description
The standard scaler scales the given data set, so that all features will have a user specified mean and variance. In case the user does not provide a specific mean and standard deviation, the standard scaler transforms the features of the input data set to have mean equal to 0 and standard deviation equal to 1. Given a set of input data $x_1, x_2,… x_n$, with mean:
and standard deviation:
The scaled data set $z_1, z_2,…,z_n$ will be:
where $\textit{std}$ and $\textit{mean}$ are the user specified values for the standard deviation and mean.
## Operations
`StandardScaler` is a `Transformer`. As such, it supports the `fit` and `transform` operation.
### Fit
StandardScaler is trained on all subtypes of `Vector` or `LabeledVector`:
* `fit[T &lt;: Vector]: DataSet[T] =&gt; Unit`
* `fit: DataSet[LabeledVector] =&gt; Unit`
### Transform
StandardScaler transforms all subtypes of `Vector` or `LabeledVector` into the respective type:
* `transform[T &lt;: Vector]: DataSet[T] =&gt; DataSet[T]`
* `transform: DataSet[LabeledVector] =&gt; DataSet[LabeledVector]`
## Parameters
The standard scaler implementation can be controlled by the following two parameters:
| Parameters | Description |
| --- | --- |
| **Mean** | The mean of the scaled data set. (Default value: **0.0**) |
| **Std** | The standard deviation of the scaled data set. (Default value: **1.0**) |
## Examples
<figure class="highlight">
```
// Create standard scaler transformer val scaler = StandardScaler()
.setMean(10.0)
.setStd(2.0)
// Obtain data set to be scaled val dataSet: DataSet[Vector] = ...
// Learn the mean and standard deviation of the training data scaler.fit(dataSet)
// Scale the provided data set to have mean=10.0 and std=2.0 val scaledDS = scaler.transform(dataSet)
```
</figure>
$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$
# SVM using CoCoA
## Description
Implements an SVM with soft-margin using the communication-efficient distributed dual coordinate ascent algorithm with hinge-loss function. The algorithm solves the following minimization problem:
with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant, being the data points and being the convex loss functions, which can also depend on the labels . In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:
With these choices, the problem definition is equivalent to a SVM with soft-margin. Thus, the algorithm allows us to train a SVM with soft-margin.
The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA). In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates several iterations of SDCA locally on a data block before merging the local updates into a valid global state. This state is redistributed to the different data partitions where the next round of local SDCA iterations is then executed. The number of outer iterations and local SDCA iterations control the overall network costs, because there is only network communication required for each outer iteration. The local SDCA iterations are embarrassingly parallel once the individual data partitions have been distributed across the cluster.
The implementation of this algorithm is based on the work of [Jaggi et al.](http://arxiv.org/abs/1409.1458)
## Operations
`SVM` is a `Predictor`. As such, it supports the `fit` and `predict` operation.
### Fit
SVM is trained given a set of `LabeledVector`:
* `fit: DataSet[LabeledVector] =&gt; Unit`
### Predict
SVM predicts for all subtypes of FlinkML’s `Vector` the corresponding class label:
* `predict[T &lt;: Vector]: DataSet[T] =&gt; DataSet[(T, Double)]`, where the `(T, Double)` tuple corresponds to (original_features, label)
If we call evaluate with a `DataSet[(Vector, Double)]`, we make a prediction on the class label for each example, and return a `DataSet[(Double, Double)]`. In each tuple the first element is the true value, as was provided from the input `DataSet[(Vector, Double)]` and the second element is the predicted value. You can then use these `(truth, prediction)` tuples to evaluate the algorithm’s performance.
* `predict: DataSet[(Vector, Double)] =&gt; DataSet[(Double, Double)]`
## Parameters
The SVM implementation can be controlled by the following parameters:
| Parameters | Description |
| --- | --- |
| **Blocks** | Sets the number of blocks into which the input data will be split. On each block the local stochastic dual coordinate ascent method is executed. This number should be set at least to the degree of parallelism. If no value is specified, then the parallelism of the input DataSet is used as the number of blocks. (Default value: **None**) |
| **Iterations** | Defines the maximum number of iterations of the outer loop method. In other words, it defines how often the SDCA method is applied to the blocked data. After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value. The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration. (Default value: **10**) |
| **LocalIterations** | Defines the maximum number of SDCA iterations. In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent. (Default value: **10**) |
| **Regularization** | Defines the regularization constant of the SVM algorithm. The higher the value, the smaller will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications. (Default value: **1.0**) |
| **Stepsize** | Defines the initial step size for the updates of the weight vector. The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value. The effective scaling of the updates is $\frac{stepsize}{blocks}$. This value has to be tuned in case that the algorithm becomes unstable. (Default value: **1.0**) |
| **ThresholdValue** | Defines the limiting value for the decision function above which examples are labeled as positive (+1.0). Examples with a decision function value below this value are classified as negative (-1.0). In order to get the raw decision function values you need to indicate it by using the OutputDecisionFunction parameter. (Default value: **0.0**) |
| **OutputDecisionFunction** | Determines whether the predict and evaluate functions of the SVM should return the distance to the separating hyperplane, or binary class labels. Setting this to true will return the raw distance to the hyperplane for each example. Setting it to false will return the binary class label (+1.0, -1.0) (Default value: **false**) |
| **Seed** | Defines the seed to initialize the random number generator. The seed directly controls which data points are chosen for the SDCA method. (Default value: **Random Long Integer**) |
## Examples
<figure class="highlight">
```
import org.apache.flink.api.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.classification.SVM
import org.apache.flink.ml.RichExecutionEnvironment
val pathToTrainingFile: String = ???
val pathToTestingFile: String = ???
val env = ExecutionEnvironment.getExecutionEnvironment
// Read the training data set, from a LibSVM formatted file val trainingDS: DataSet[LabeledVector] = env.readLibSVM(pathToTrainingFile)
// Create the SVM learner val svm = SVM()
.setBlocks(10)
// Learn the SVM model svm.fit(trainingDS)
// Read the testing data set val testingDS: DataSet[Vector] = env.readLibSVM(pathToTestingFile).map(_.vector)
// Calculate the predictions for the testing data set val predictionDS: DataSet[(Vector, Double)] = svm.predict(testingDS)
```
</figure>
# Best Practices
This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems.
## Parsing command line arguments and passing them around in your Flink application
Almost all Flink applications, both batch and streaming, rely on external configuration parameters. They are used to specify input and output sources (like paths or addresses), system parameters (parallelism, runtime configuration), and application specific parameters (typically used within user functions).
Flink provides a simple utility called `ParameterTool` to provide some basic tooling for solving these problems. Please note that you don’t have to use the `ParameterTool` described here. Other frameworks such as [Commons CLI](https://commons.apache.org/proper/commons-cli/) and [argparse4j](http://argparse4j.sourceforge.net/) also work well with Flink.
### Getting your configuration values into the `ParameterTool`
The `ParameterTool` provides a set of predefined static methods for reading the configuration. The tool is internally expecting a `Map&lt;String, String&gt;`, so its very easy to integrate it with your own configuration style.
#### From `.properties` files
The following method will read a [Properties](https://docs.oracle.com/javase/tutorial/essential/environment/properties.html) file and provide the key/value pairs:
<figure class="highlight">
```
String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
```
</figure>
#### From the command line arguments
This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
<figure class="highlight">
```
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
```
</figure>
#### From system properties
When starting a JVM, you can pass system properties to it: `-Dinput=hdfs:///mydata`. You can also initialize the `ParameterTool` from these system properties:
<figure class="highlight">
```
ParameterTool parameter = ParameterTool.fromSystemProperties();
```
</figure>
### Using the parameters in your Flink program
Now that we’ve got the parameters from somewhere (see above) we can use them in various ways.
**Directly from the `ParameterTool`**
The `ParameterTool` itself has methods for accessing the values.
<figure class="highlight">
```
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
```
</figure>
You can use the return values of these methods directly in the `main()` method of the client submitting the application. For example, you could set the parallelism of a operator like this:
<figure class="highlight">
```
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
```
</figure>
Since the `ParameterTool` is serializable, you can pass it to the functions itself:
<figure class="highlight">
```
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
```
</figure>
and then use it inside the function for getting values from the command line.
#### Register the parameters globally
Parameters registered as global job parameters in the `ExecutionConfig` can be accessed as configuration values from the JobManager web interface and in all functions defined by the user.
Register the parameters globally:
<figure class="highlight">
```
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
```
</figure>
Access them in any rich user function:
<figure class="highlight">
```
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
```
</figure>
## Naming large TupleX types
It is recommended to use POJOs (Plain old Java objects) instead of `TupleX` for data types with many fields. Also, POJOs can be used to give large `Tuple`-types a name.
**Example**
Instead of using:
<figure class="highlight">
```
Tuple11<String, String, ..., String> var = new ...;
```
</figure>
It is much easier to create a custom type extending from the large Tuple type.
<figure class="highlight">
```
CustomType var = new ...;
public static class CustomType extends Tuple11<String, String, ..., String> {
// constructor matching super
}
```
</figure>
## Using Logback instead of Log4j
**Note: This tutorial is applicable starting from Flink 0.10**
Apache Flink is using [slf4j](http://www.slf4j.org/) as the logging abstraction in the code. Users are advised to use sfl4j as well in their user functions.
Sfl4j is a compile-time logging interface that can use different logging implementations at runtime, such as [log4j](http://logging.apache.org/log4j/2.x/) or [Logback](http://logback.qos.ch/).
Flink is depending on Log4j by default. This page describes how to use Flink with Logback. Users reported that they were also able to set up centralized logging with Graylog using this tutorial.
To get a logger instance in the code, use the following code:
<figure class="highlight">
```
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
```
</figure>
### Use Logback when running Flink out of the IDE / from a Java application
In all cases were classes are executed with a classpath created by a dependency manager such as Maven, Flink will pull log4j into the classpath.
Therefore, you will need to exclude log4j from Flink’s dependencies. The following description will assume a Maven project created from a [Flink quickstart](./projectsetup/java_api_quickstart.html).
Change your projects `pom.xml` file like this:
<figure class="highlight">
```
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
```
</figure>
The following changes were done in the `&lt;dependencies&gt;` section:
* Exclude all `log4j` dependencies from all Flink dependencies: this causes Maven to ignore Flink’s transitive dependencies to log4j.
* Exclude the `slf4j-log4j12` artifact from Flink’s dependencies: since we are going to use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
* Add the Logback dependencies: `logback-core` and `logback-classic`
* Add dependencies for `log4j-over-slf4j`. `log4j-over-slf4j` is a tool which allows legacy applications which are directly using the Log4j APIs to use the Slf4j interface. Flink depends on Hadoop which is directly using Log4j for logging. Therefore, we need to redirect all logger calls from Log4j to Slf4j which is in turn logging to Logback.
Please note that you need to manually add the exclusions to all new Flink dependencies you are adding to the pom file.
You may also need to check if other (non-Flink) dependencies are pulling in log4j bindings. You can analyze the dependencies of your project with `mvn dependency:tree`.
### Use Logback when running Flink on a cluster
This tutorial is applicable when running Flink on YARN or as a standalone cluster.
In order to use Logback instead of Log4j with Flink, you need to remove `log4j-1.2.xx.jar` and `sfl4j-log4j12-xxx.jar` from the `lib/` directory.
Next, you need to put the following jar files into the `lib/` folder:
* `logback-classic.jar`
* `logback-core.jar`
* `log4j-over-slf4j.jar`: This bridge needs to be present in the classpath for redirecting logging calls from Hadoop (which is using Log4j) to Slf4j.
Note that you need to explicitly set the `lib/` directory when using a per-job YARN cluster.
The command to submit Flink on YARN with a custom logger is: `./bin/flink run -yt $FLINK_HOME/lib &lt;... remaining arguments ...&gt;`
此差异已折叠。
# Standalone Cluster
This page provides instructions on how to run Flink in a _fully distributed fashion_ on a _static_ (but possibly heterogeneous) cluster.
## Requirements
### Software Requirements
Flink runs on all _UNIX-like environments_, e.g. **Linux**, **Mac OS X**, and **Cygwin** (for Windows) and expects the cluster to consist of **one master node** and **one or more worker nodes**. Before you start to setup the system, make sure you have the following software installed **on each node**:
* **Java 1.8.x** or higher,
* **ssh** (sshd must be running to use the Flink scripts that manage remote components)
If your cluster does not fulfill these software requirements you will need to install/upgrade it.
Having **passwordless SSH** and **the same directory structure** on all your cluster nodes will allow you to use our scripts to control everything.
### `JAVA_HOME` Configuration
Flink requires the `JAVA_HOME` environment variable to be set on the master and all worker nodes and point to the directory of your Java installation.
You can set this variable in `conf/flink-conf.yaml` via the `env.java.home` key.
## Flink Setup
Go to the [downloads page](http://flink.apache.org/downloads.html) and get the ready-to-run package. Make sure to pick the Flink package **matching your Hadoop version**. If you don’t plan to use Hadoop, pick any version.
After downloading the latest release, copy the archive to your master node and extract it:
<figure class="highlight">
```
tar xzf flink-*.tgz
cd flink-*
```
</figure>
### Configuring Flink
After having extracted the system files, you need to configure Flink for the cluster by editing _conf/flink-conf.yaml_.
Set the `jobmanager.rpc.address` key to point to your master node. You should also define the maximum amount of main memory the JVM is allowed to allocate on each node by setting the `jobmanager.heap.mb` and `taskmanager.heap.mb` keys.
These values are given in MB. If some worker nodes have more main memory which you want to allocate to the Flink system you can overwrite the default value by setting the environment variable `FLINK_TM_HEAP` on those specific nodes.
Finally, you must provide a list of all nodes in your cluster which shall be used as worker nodes. Therefore, similar to the HDFS configuration, edit the file _conf/slaves_ and enter the IP/host name of each worker node. Each worker node will later run a TaskManager.
The following example illustrates the setup with three nodes (with IP addresses from _10.0.0.1_ to _10.0.0.3_ and hostnames _master_, _worker1_, _worker2_) and shows the contents of the configuration files (which need to be accessible at the same path on all machines):
![](https://ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart_cluster.png)
/path/to/**flink/conf/
flink-conf.yaml**
```
jobmanager.rpc.address: 10.0.0.1
```
/path/to/**flink/
conf/slaves**
```
10.0.0.2
10.0.0.3
```
The Flink directory must be available on every worker under the same path. You can use a shared NFS directory, or copy the entire Flink directory to every worker node.
Please see the [configuration page](../config.html) for details and additional configuration options.
In particular,
* the amount of available memory per JobManager (`jobmanager.heap.mb`),
* the amount of available memory per TaskManager (`taskmanager.heap.mb`),
* the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
* the total number of CPUs in the cluster (`parallelism.default`) and
* the temporary directories (`taskmanager.tmp.dirs`)
are very important configuration values.
### Starting Flink
The following script starts a JobManager on the local node and connects via SSH to all worker nodes listed in the _slaves_ file to start the TaskManager on each node. Now your Flink system is up and running. The JobManager running on the local node will now accept jobs at the configured RPC port.
Assuming that you are on the master node and inside the Flink directory:
<figure class="highlight">
```
bin/start-cluster.sh
```
</figure>
To stop Flink, there is also a `stop-cluster.sh` script.
### Adding JobManager/TaskManager Instances to a Cluster
You can add both JobManager and TaskManager instances to your running cluster with the `bin/jobmanager.sh` and `bin/taskmanager.sh` scripts.
#### Adding a JobManager
<figure class="highlight">
```
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
```
</figure>
#### Adding a TaskManager
<figure class="highlight">
```
bin/taskmanager.sh start|start-foreground|stop|stop-all
```
</figure>
Make sure to call these scripts on the hosts on which you want to start/stop the respective instance.
# YARN Setup
## Quickstart
### Start a long-running Flink cluster on YARN
Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
<figure class="highlight">
```
# get the hadoop2 package from the Flink download page at
# http://flink.apache.org/downloads.html
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.7.1-bin-hadoop2.tgz
cd flink-1.7.1/
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
```
</figure>
Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
Once the session has been started, you can submit jobs to the cluster using the `./bin/flink` tool.
### Run a Flink job on YARN
<figure class="highlight">
```
# get the hadoop2 package from the Flink download page at
# http://flink.apache.org/downloads.html
curl -O <flink_hadoop2_download_url>
tar xvzf flink-1.7.1-bin-hadoop2.tgz
cd flink-1.7.1/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
```
</figure>
## Flink YARN Session
Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.
**Requirements**
* at least Apache Hadoop 2.2
* HDFS (Hadoop Distributed File System) (or another distributed file system supported by Hadoop)
If you have troubles using the Flink YARN client, have a look in the [FAQ section](http://flink.apache.org/faq.html#yarn-deployment).
### Start Flink Session
Follow these instructions to learn how to launch a Flink Session within your YARN cluster.
A session will start all required Flink services (JobManager and TaskManagers) so that you can submit programs to the cluster. Note that you can run multiple programs per session.
#### Download Flink
Download a Flink package for Hadoop &gt;= 2 from the [download page](http://flink.apache.org/downloads.html). It contains the required files.
Extract the package using:
<figure class="highlight">
```
tar xvzf flink-1.7.1-bin-hadoop2.tgz
cd flink-1.7.1/
```
</figure>
#### Start a Session
Use the following command to start a session
<figure class="highlight">
```
./bin/yarn-session.sh
```
</figure>
This command will show you the following overview:
<figure class="highlight">
```
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode
```
</figure>
Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration.
**Example:** Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:
<figure class="highlight">
```
./bin/yarn-session.sh -n 10 -tm 8192 -s 32
```
</figure>
The system will use the configuration in `conf/flink-conf.yaml`. Please follow our [configuration guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html) if you want to change something.
Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelism.default` if the number of slots has been specified.
If you don’t want to change the configuration file to set configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624`.
The example invocation starts 11 containers (even though only 10 containers were requested), since there is one additional container for the ApplicationMaster and Job Manager.
Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.
Stop the YARN session by stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client.
Flink on YARN will only start all requested containers if enough resources are available on the cluster. Most YARN schedulers account for the requested memory of the containers, some account also for the number of vcores. By default, the number of vcores is equal to the processing slots (`-s`) argument. The [`yarn.containers.vcores`](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#yarn-containers-vcores) allows overwriting the number of vcores with a custom value. In order for this parameter to work you should enable CPU scheduling in your cluster.
#### Detached YARN Session
If you do not want to keep the Flink YARN client running all the time, it’s also possible to start a _detached_ YARN session. The parameter for that is called `-d` or `--detached`.
In that case, the Flink YARN client will only submit Flink to the cluster and then close itself. Note that in this case its not possible to stop the YARN session using Flink.
Use the YARN utilities (`yarn application -kill &lt;appId&gt;`) to stop the YARN session.
#### Attach to an existing Session
Use the following command to start a session
<figure class="highlight">
```
./bin/yarn-session.sh
```
</figure>
This command will show you the following overview:
<figure class="highlight">
```
Usage:
Required
-id,--applicationId <yarnAppId> YARN application Id
```
</figure>
As already mentioned, `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable must be set to read the YARN and HDFS configuration.
**Example:** Issue the following command to attach to running Flink YARN session `application_1463870264508_0029`:
<figure class="highlight">
```
./bin/yarn-session.sh -id application_1463870264508_0029
```
</figure>
Attaching to a running session uses YARN ResourceManager to determine Job Manager RPC port.
Stop the YARN session by stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client.
### Submit Job to Flink
Use the following command to submit a Flink program to the YARN cluster:
<figure class="highlight">
```
./bin/flink
```
</figure>
Please refer to the documentation of the [command-line client](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/cli.html).
The command will show you a help menu like this:
<figure class="highlight">
```
[...]
Action "run" compiles and runs a program.
Syntax: run [OPTIONS] <jar-file> <arguments>
"run" action arguments:
-c,--class <classname> Class with the program entry point ("main"
method or "getPlan()" method. Only needed
if the JAR file does not specify the class
in its manifest.
-m,--jobmanager <host:port> Address of the JobManager (master) to
which to connect. Use this flag to connect
to a different JobManager than the one
specified in the configuration.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration
```
</figure>
Use the _run_ action to submit a job to YARN. The client is able to determine the address of the JobManager. In the rare event of a problem, you can also pass the JobManager address using the `-m` argument. The JobManager address is visible in the YARN console.
**Example**
<figure class="highlight">
```
wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
./bin/flink run ./examples/batch/WordCount.jar \
hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
```
</figure>
If there is the following error, make sure that all TaskManagers started:
<figure class="highlight">
```
Exception in thread "main" org.apache.flink.compiler.CompilerException:
Available instances could not be determined from job manager: Connection timed out.
```
</figure>
You can check the number of TaskManagers in the JobManager web interface. The address of this interface is printed in the YARN session console.
If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
## Run a single Flink job on YARN
The documentation above describes how to start a Flink cluster within a Hadoop YARN environment. It is also possible to launch Flink within YARN only for executing a single job.
Please note that the client then expects the `-yn` value to be set (number of TaskManagers).
**_Example:_**
<figure class="highlight">
```
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
```
</figure>
The command line options of the YARN session are also available with the `./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long argument options).
Note: You can use a different configuration directory per job by setting the environment variable `FLINK_CONF_DIR`. To use this copy the `conf` directory from the Flink distribution and modify, for example, the logging settings on a per-job basis.
Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to “fire and forget” a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!
### User jars & Classpath
By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter.
When setting this to `DISABLED` Flink will include the jar in the user classpath instead.
The user-jars position in the class path can be controlled by setting the parameter to one of the following:
* `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order.
* `FIRST`: Adds the jar to the beginning of the system class path.
* `LAST`: Adds the jar to the end of the system class path.
## Recovery behavior of Flink on YARN
Flink’s YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.
* `yarn.reallocate-failed`: This parameter controls whether Flink should reallocate failed TaskManager containers. Default: true
* `yarn.maximum-failed-containers`: The maximum number of failed containers the ApplicationMaster accepts until it fails the YARN session. Default: The number of initially requested TaskManagers (`-n`).
* `yarn.application-attempts`: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.
## Debugging a failed YARN session
There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
### Log Files
In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the [YARN log aggregation](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/). To enable it, users have to set the `yarn.log-aggregation-enable` property to `true` in the `yarn-site.xml` file. Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
<figure class="highlight">
```
yarn logs -applicationId <application ID>
```
</figure>
Note that it takes a few seconds after the session has finished until the logs show up.
### YARN Client console & Web interfaces
The Flink YARN client also prints error messages in the terminal if errors occur during runtime (for example if a TaskManager stops working after some time).
In addition to that, there is the YARN Resource Manager web interface (by default on port 8088). The port of the Resource Manager web interface is determined by the `yarn.resourcemanager.webapp.address` configuration value.
It allows to access log files for running YARN applications and shows diagnostics for failed apps.
## Build YARN client for a specific Hadoop version
Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](//ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html) for more details.
## Running Flink on YARN behind Firewalls
Some YARN clusters use firewalls for controlling the network traffic between the cluster and the rest of the network. In those setups, Flink jobs can only be submitted to a YARN session from within the cluster’s network (behind the firewall). If this is not feasible for production use, Flink allows to configure a port range for all relevant services. With these ranges configured, users can also submit jobs to Flink crossing the firewall.
Currently, two services are needed to submit a job:
* The JobManager (ApplicationMaster in YARN)
* The BlobServer running within the JobManager.
When submitting a job to Flink, the BlobServer will distribute the jars with the user code to all worker nodes (TaskManagers). The JobManager receives the job itself and triggers the execution.
The two configuration parameters for specifying the ports are the following:
* `yarn.application-master.port`
* `blob.server.port`
These two configuration options accept single ports (for example: “50010”), ranges (“50000-50025”), or a combination of both (“50010,50011,50020-50025,50050-50075”).
(Hadoop is using a similar mechanism, there the configuration parameter is called `yarn.app.mapreduce.am.job.client.port-range`.)
## Background / Internals
This section briefly describes how Flink and YARN interact.
![](https://ci.apache.org/projects/flink/flink-docs-release-1.7/fig/FlinkOnYarn.svg)
The YARN client needs to access the Hadoop configuration to connect to the YARN resource manager and to HDFS. It determines the Hadoop configuration using the following strategy:
* Test if `YARN_CONF_DIR`, `HADOOP_CONF_DIR` or `HADOOP_CONF_PATH` are set (in that order). If one of these variables are set, they are used to read the configuration.
* If the above strategy fails (this should not be the case in a correct YARN setup), the client is using the `HADOOP_HOME` environment variable. If it is set, the client tries to access `$HADOOP_HOME/etc/hadoop` (Hadoop 2) and `$HADOOP_HOME/conf` (Hadoop 1).
When starting a new Flink YARN session, the client first checks if the requested resources (containers and memory) are available. After that, it uploads a jar that contains Flink and the configuration to HDFS (step 1).
The next step of the client is to request (step 2) a YARN container to start the _ApplicationMaster_ (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the _ApplicationMaster_ (AM) is started.
The _JobManager_ and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the _AM_ container is also serving Flink’s web interface. All ports the YARN code is allocating are _ephemeral ports_. This allows users to execute multiple Flink YARN sessions in parallel.
After that, the AM starts allocating the containers for Flink’s TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.
# Mesos Setup
## Background
The Mesos implementation consists of two components: The Application Master and the Worker. The workers are simple TaskManagers which are parameterized by the environment set up by the application master. The most sophisticated component of the Mesos implementation is the application master. The application master currently hosts the following components:
### Mesos Scheduler
The scheduler is responsible for registering the framework with Mesos, requesting resources, and launching worker nodes. The scheduler continuously needs to report back to Mesos to ensure the framework is in a healthy state. To verify the health of the cluster, the scheduler monitors the spawned workers and marks them as failed and restarts them if necessary.
Flink’s Mesos scheduler itself is currently not highly available. However, it persists all necessary information about its state (e.g. configuration, list of workers) in Zookeeper. In the presence of a failure, it relies on an external system to bring up a new scheduler. The scheduler will then register with Mesos again and go through the reconciliation phase. In the reconciliation phase, the scheduler receives a list of running workers nodes. It matches these against the recovered information from Zookeeper and makes sure to bring back the cluster in the state before the failure.
### Artifact Server
The artifact server is responsible for providing resources to the worker nodes. The resources can be anything from the Flink binaries to shared secrets or configuration files. For instance, in non-containerized environments, the artifact server will provide the Flink binaries. What files will be served depends on the configuration overlay used.
### Flink’s Dispatcher and Web Interface
The Dispatcher and the web interface provide a central point for monitoring, job submission, and other client interaction with the cluster (see [FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)).
### Startup script and configuration overlays
The startup script provide a way to configure and start the application master. All further configuration is then inherited by the workers nodes. This is achieved using configuration overlays. Configuration overlays provide a way to infer configuration from environment variables and config files which are shipped to the worker nodes.
## DC/OS
This section refers to [DC/OS](https://dcos.io) which is a Mesos distribution with a sophisticated application management layer. It comes pre-installed with Marathon, a service to supervise applications and maintain their state in case of failures.
If you don’t have a running DC/OS cluster, please follow the [instructions on how to install DC/OS on the official website](https://dcos.io/install/).
Once you have a DC/OS cluster, you may install Flink through the DC/OS Universe. In the search prompt, just search for Flink. Alternatively, you can use the DC/OS CLI:
```
dcos package install flink
```
Further information can be found in the [DC/OS examples documentation](https://github.com/dcos/examples/tree/master/1.8/flink).
## Mesos without DC/OS
You can also run Mesos without DC/OS.
### Installing Mesos
Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/getting-started/).
After installation you have to configure the set of master and agent nodes by creating the files `MESOS_HOME/etc/mesos/masters` and `MESOS_HOME/etc/mesos/slaves`. These files contain in each row a single hostname on which the respective component will be started (assuming SSH access to these nodes).
Next you have to create `MESOS_HOME/etc/mesos/mesos-master-env.sh` or use the template found in the same directory. In this file, you have to define
```
export MESOS_work_dir=WORK_DIRECTORY
```
and it is recommended to uncommment
```
export MESOS_log_dir=LOGGING_DIRECTORY
```
In order to configure the Mesos agents, you have to create `MESOS_HOME/etc/mesos/mesos-agent-env.sh` or use the template found in the same directory. You have to configure
```
export MESOS_master=MASTER_HOSTNAME:MASTER_PORT
```
and uncomment
```
export MESOS_log_dir=LOGGING_DIRECTORY
export MESOS_work_dir=WORK_DIRECTORY
```
#### Mesos Library
In order to run Java applications with Mesos you have to export `MESOS_NATIVE_JAVA_LIBRARY=MESOS_HOME/lib/libmesos.so` on Linux. Under Mac OS X you have to export `MESOS_NATIVE_JAVA_LIBRARY=MESOS_HOME/lib/libmesos.dylib`.
#### Deploying Mesos
In order to start your mesos cluster, use the deployment script `MESOS_HOME/sbin/mesos-start-cluster.sh`. In order to stop your mesos cluster, use the deployment script `MESOS_HOME/sbin/mesos-stop-cluster.sh`. More information about the deployment scripts can be found [here](http://mesos.apache.org/documentation/latest/deploy-scripts/).
### Installing Marathon
Optionally, you may also [install Marathon](https://mesosphere.github.io/marathon/docs/) which enables you to run Flink in [high availability (HA) mode](#high-availability).
### Pre-installing Flink vs Docker/Mesos containers
You may install Flink on all of your Mesos Master and Agent nodes. You can also pull the binaries from the Flink web site during deployment and apply your custom configuration before launching the application master. A more convenient and easier to maintain approach is to use Docker containers to manage the Flink binaries and configuration.
This is controlled via the following configuration entries:
```
mesos.resourcemanager.tasks.container.type: mesos _or_ docker
```
If set to ‘docker’, specify the image name:
```
mesos.resourcemanager.tasks.container.image.name: image_name
```
### Standalone
In the `/bin` directory of the Flink distribution, you find two startup scripts which manage the Flink processes in a Mesos cluster:
1. `mesos-appmaster.sh` This starts the Mesos application master which will register the Mesos scheduler. It is also responsible for starting up the worker nodes.
2. `mesos-taskmanager.sh` The entry point for the Mesos worker processes. You don’t need to explicitly execute this script. It is automatically launched by the Mesos worker node to bring up a new TaskManager.
In order to run the `mesos-appmaster.sh` script you have to define `mesos.master` in the `flink-conf.yaml` or pass it via `-Dmesos.master=...` to the Java process.
When executing `mesos-appmaster.sh`, it will create a job manager on the machine where you executed the script. In contrast to that, the task managers will be run as Mesos tasks in the Mesos cluster.
#### General configuration
It is possible to completely parameterize a Mesos application through Java properties passed to the Mesos application master. This also allows to specify general Flink configuration parameters. For example:
```
bin/mesos-appmaster.sh \
-Dmesos.master=master.foobar.org:5050 \
-Djobmanager.heap.mb=1024 \
-Djobmanager.rpc.port=6123 \
-Drest.port=8081 \
-Dmesos.resourcemanager.tasks.mem=4096 \
-Dtaskmanager.heap.mb=3500 \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10
```
**Note:** If Flink is in [legacy mode](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#legacy), you should additionally define the number of task managers that are started by Mesos via [`mesos.initial-tasks`](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#mesos-initial-tasks).
### High Availability
You will need to run a service like Marathon or Apache Aurora which takes care of restarting the Flink master process in case of node or process failures. In addition, Zookeeper needs to be configured like described in the [High Availability section of the Flink docs](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html).
#### Marathon
Marathon needs to be set up to launch the `bin/mesos-appmaster.sh` script. In particular, it should also adjust any configuration parameters for the Flink cluster.
Here is an example configuration for Marathon:
```
{
"id": "flink",
"cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -Djobmanager.heap.mb=1024 -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.heap.mb=1024 -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
"cpus": 1.0,
"mem": 1024
}
```
When running Flink with Marathon, the whole Flink cluster including the job manager will be run as Mesos tasks in the Mesos cluster.
### Configuration parameters
For a list of Mesos specific configuration, refer to the [Mesos section](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#mesos) of the configuration documentation.
# Docker Setup
[Docker](https://www.docker.com) is a popular container runtime. There are Docker images for Apache Flink available on Docker Hub which can be used to deploy a session cluster. The Flink repository also contains tooling to create container images to deploy a job cluster.
## Flink session cluster
A Flink session cluster can be used to run multiple jobs. Each job needs to be submitted to the cluster after it has been deployed.
### Docker images
The [Flink Docker repository](https://hub.docker.com/_/flink/) is hosted on Docker Hub and serves images of Flink version 1.2.1 and later.
Images for each supported combination of Hadoop and Scala are available, and tag aliases are provided for convenience.
Beginning with Flink 1.5, image tags that omit a Hadoop version (e.g. `-hadoop28`) correspond to Hadoop-free releases of Flink that do not include a bundled Hadoop distribution.
For example, the following aliases can be used: _(`1.5.y` indicates the latest release of Flink 1.5)_
* `flink:latest``flink:&lt;latest-flink&gt;-scala_&lt;latest-scala&gt;`
* `flink:1.5``flink:1.5.y-scala_2.11`
* `flink:1.5-hadoop27``flink:1.5.y-hadoop27-scala_2.11`
**Note:** The Docker images are provided as a community project by individuals on a best-effort basis. They are not official releases by the Apache Flink PMC.
## Flink job cluster
A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.
### Docker images
The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started. Therefore, one needs to build a dedicated container image for every job. The `flink-container` module contains a `build.sh` script which can be used to create such an image. Please see the [instructions](https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md) for more details.
## Flink with Docker Compose
[Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a group of Docker containers locally.
Example config files for a [session cluster](https://github.com/docker-flink/examples/blob/master/docker-compose.yml) and a [job cluster](https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-compose.yml) are available on GitHub.
### Usage
* Launch a cluster in the foreground
```
docker-compose up
```
* Launch a cluster in the background
```
docker-compose up -d
```
* Scale the cluster up or down to _N_ TaskManagers
```
docker-compose scale taskmanager=&lt;N&gt;
```
* Kill the cluster
```
docker-compose kill
```
When the cluster is running, you can visit the web UI at [http://localhost:8081](http://localhost:8081). You can also use the web UI to submit a job to a session cluster.
To submit a job to a session cluster via the command line, you must copy the JAR to the JobManager container and submit the job from there.
For example:
```
$ JOBMANAGER_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}})
$ docker cp path/to/jar "$JOBMANAGER_CONTAINER":/job.jar
$ docker exec -t -i "$JOBMANAGER_CONTAINER" flink run /job.jar
```
# Kubernetes Setup
This page describes how to deploy a Flink job and session cluster on [Kubernetes](https://kubernetes.io).
## Setup Kubernetes
Please follow [Kubernetes’ setup guide](https://kubernetes.io/docs/setup/) in order to deploy a Kubernetes cluster. If you want to run Kubernetes locally, we recommend using [MiniKube](https://kubernetes.io/docs/setup/minikube/).
**Note:** If using MiniKube please make sure to execute `minikube ssh 'sudo ip link set docker0 promisc on'` before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.
## Flink session cluster on Kubernetes
A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.
A basic Flink session cluster deployment in Kubernetes has three components:
* a Deployment/Job which runs the JobManager
* a Deployment for a pool of TaskManagers
* a Service exposing the JobManager’s REST and UI ports
### Deploy Flink session cluster on Kubernetes
Using the resource definitions for a [session cluster](#session-cluster-resource-definitions), launch the cluster with the `kubectl` command:
```
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
```
You can then access the Flink UI via `kubectl proxy`:
1. Run `kubectl proxy` in a terminal
2. Navigate to [http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy](http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy) in your browser
In order to terminate the Flink session cluster, use `kubectl`:
```
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
```
## Flink job cluster on Kubernetes
A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.
### Creating the job-specific image
The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started. Therefore, one needs to build a dedicated container image for every job. Please follow these [instructions](https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md) to build the Docker image.
### Deploy Flink job cluster on Kubernetes
In order to deploy the a job cluster on Kubernetes please follow these [instructions](https://github.com/apache/flink/blob/release-1.7/flink-container/kubernetes/README.md#deploy-flink-job-cluster).
## Advanced Cluster Deployment
An early version of a [Flink Helm chart](https://github.com/docker-flink/examples) is available on GitHub.
## Appendix
### Session cluster resource definitions
The Deployment definitions use the pre-built image `flink:latest` which can be found [on Docker Hub](https://hub.docker.com/r/_/flink/). The image is built from this [Github repository](https://github.com/docker-flink/docker-flink).
`jobmanager-deployment.yaml`
<figure class="highlight">
```
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
args:
- jobmanager
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
```
</figure>
`taskmanager-deployment.yaml`
<figure class="highlight">
```
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:latest
args:
- taskmanager
ports:
- containerPort: 6121
name: data
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
```
</figure>
`jobmanager-service.yaml`
<figure class="highlight">
```
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
```
</figure>
# Configuring Dependencies, Connectors, Libraries
Every Flink application depends on a set of Flink libraries. At the bare minimum, the application depends on the Flink APIs. Many applications depend in addition on certain connector libraries (like Kafka, Cassandra, etc.). When running Flink applications (either in a distributed deployment, or in the IDE for testing), the Flink runtime library must be available as well.
## Flink Core and Application Dependencies
As with most systems that run user-defined applications, there are two broad categories of dependencies and libraries in Flink:
* **Flink Core Dependencies**: Flink itself consists of a set of classes and dependencies that are needed to run the system, for example coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management, etc. The set of all these classes and dependencies forms the core of Flink’s runtime and must be present when a Flink application is started.
These core classes and dependencies are packaged in the `flink-dist` jar. They are part of Flink’s `lib` folder and part of the basic Flink container images. Think of these dependencies as similar to Java’s core library (`rt.jar`, `charsets.jar`, etc.), which contains the classes like `String` and `List`.
The Flink Core Dependencies do not contain any connectors or libraries (CEP, SQL, ML, etc.) in order to avoid having an excessive number of dependencies and classes in the classpath by default. In fact, we try to keep the core dependencies as slim as possible to keep the default classpath small and avoid dependency clashes.
* The **User Application Dependencies** are all connectors, formats, or libraries that a specific user application needs.
The user application is typically packaged into an _application jar_, which contains the application code and the required connector and library dependencies.
The user application dependencies explicitly do not include the Flink DataSet / DataStream APIs and runtime dependencies, because those are already part of Flink’s Core Dependencies.
## Setting up a Project: Basic Dependencies
Every Flink application needs as the bare minimum the API dependencies, to develop against. For Maven, you can use the [Java Project Template](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/java_api_quickstart.html) or [Scala Project Template](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/scala_api_quickstart.html) to create a program skeleton with these initial dependencies.
When setting up a project manually, you need to add the following dependencies for the Java/Scala API (here presented in Maven syntax, but the same dependencies apply to other build tools (Gradle, SBT, etc.) as well.
<figure class="highlight">
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
```
</figure>
<figure class="highlight">
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
```
</figure>
**Important:** Please note that all these dependencies have their scope set to _provided_. That means that they are needed to compile against, but that they should not be packaged into the project’s resulting application jar file - these dependencies are Flink Core Dependencies, which are already available in any setup.
It is highly recommended to keep the dependencies in scope _provided_. If they are not set to _provided_, the best case is that the resulting JAR becomes excessively large, because it also contains all Flink core dependencies. The worst case is that the Flink core dependencies that are added to the application’s jar file clash with some of your own dependency versions (which is normally avoided through inverted classloading).
**Note on IntelliJ:** To make the applications run within IntelliJ IDEA, the Flink dependencies need to be declared in scope _compile_ rather than _provided_. Otherwise IntelliJ will not add them to the classpath and the in-IDE execution will fail with a `NoClassDefFountError`. To avoid having to declare the dependency scope as _compile_ (which is not recommended, see above), the above linked Java- and Scala project templates use a trick: They add a profile that selectively activates when the application is run in IntelliJ and only then promotes the dependencies to scope _compile_, without affecting the packaging of the JAR files.
## Adding Connector and Library Dependencies
Most applications need specific connectors or libraries to run, for example a connector to Kafka, Cassandra, etc. These connectors are not part of Flink’s core dependencies and must hence be added as dependencies to the application
Below is an example adding the connector for Kafka 0.10 as a dependency (Maven syntax):
<figure class="highlight">
```
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
```
</figure>
We recommend to package the application code and all its required dependencies into one _jar-with-dependencies_ which we refer to as the _application jar_. The application jar can be submitted to an already running Flink cluster, or added to a Flink application container image.
Projects created from the [Java Project Template](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/java_api_quickstart.html) or [Scala Project Template](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/scala_api_quickstart.html) are configured to automatically include the application dependencies into the application jar when running `mvn clean package`. For projects that are not set up from those templates, we recommend to add the Maven Shade Plugin (as listed in the Appendix below) to build the application jar with all required dependencies.
**Important:** For Maven (and other build tools) to correctly package the dependencies into the application jar, these application dependencies must be specified in scope _compile_ (unlike the core dependencies, which must be specified in scope _provided_).
## Scala Versions
Scala versions (2.10, 2.11, 2.12, etc.) are not binary compatible with one another. For that reason, Flink for Scala 2.11 cannot be used with an application that uses Scala 2.12.
All Flink dependencies that (transitively) depend on Scala are suffixed with the Scala version that they are built for, for example `flink-streaming-scala_2.11`.
Developers that only use Java can pick any Scala version, Scala developers need to pick the Scala version that matches their application’s Scala version.
Please refer to the [build guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#scala-versions) for details on how to build Flink for a specific Scala version.
**Note:** Because of major breaking changes in Scala 2.12, Flink 1.5 currently builds only for Scala 2.11. We aim to add support for Scala 2.12 in the next versions.
## Hadoop Dependencies
**General rule: It should never be necessary to add Hadoop dependencies directly to your application.** _(The only exception being when using existing Hadoop input-/output formats with Flink’s Hadoop compatibility wrappers)_
If you want to use Flink with Hadoop, you need to have a Flink setup that includes the Hadoop dependencies, rather than adding Hadoop as an application dependency. Please refer to the [Hadoop Setup Guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/hadoop.html) for details.
There are two main reasons for that design:
* Some Hadoop interaction happens in Flink’s core, possibly before the user application is started, for example setting up HDFS for checkpoints, authenticating via Hadoop’s Kerberos tokens, or deployment on YARN.
* Flink’s inverted classloading approach hides many transitive dependencies from the core dependencies. That applies not only to Flink’s own core dependencies, but also to Hadoop’s dependencies when present in the setup. That way, applications can use different versions of the same dependencies without running into dependency conflicts (and trust us, that’s a big deal, because Hadoops dependency tree is huge.)
If you need Hadoop dependencies during testing or development inside the IDE (for example for HDFS access), please configure these dependencies similar to the scope of the dependencies to _test_ or to _provided_.
## Appendix: Template for building a Jar with Dependencies
To build an application JAR that contains all dependencies required for declared connectors and libraries, you can use the following shade plugin definition:
<figure class="highlight">
```
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
```
</figure>
此差异已折叠。
# Google Compute Engine Setup
This documentation provides instructions on how to setup Flink fully automatically with Hadoop 1 or Hadoop 2 on top of a [Google Compute Engine](https://cloud.google.com/compute/) cluster. This is made possible by Google’s [bdutil](https://cloud.google.com/hadoop/bdutil) which starts a cluster and deploys Flink with Hadoop. To get started, just follow the steps below.
# Prerequisites
## Install Google Cloud SDK
Please follow the instructions on how to setup the [Google Cloud SDK](https://cloud.google.com/sdk/). In particular, make sure to authenticate with Google Cloud using the following command:
```
gcloud auth login
```
## Install bdutil
At the moment, there is no bdutil release yet which includes the Flink extension. However, you can get the latest version of bdutil with Flink support from [GitHub](https://github.com/GoogleCloudPlatform/bdutil):
```
git clone https://github.com/GoogleCloudPlatform/bdutil.git
```
After you have downloaded the source, change into the newly created `bdutil` directory and continue with the next steps.
# Deploying Flink on Google Compute Engine
## Set up a bucket
If you have not done so, create a bucket for the bdutil config and staging files. A new bucket can be created with gsutil:
```
gsutil mb gs://<bucket_name>
```
## Adapt the bdutil config
To deploy Flink with bdutil, adapt at least the following variables in bdutil_env.sh.
```
CONFIGBUCKET="<bucket_name>"
PROJECT="<compute_engine_project_name>"
NUM_WORKERS=<number_of_workers>
# set this to 'n1-standard-2' if you're using the free trial
GCE_MACHINE_TYPE="<gce_machine_type>"
# for example: "europe-west1-d"
GCE_ZONE="<gce_zone>"
```
## Adapt the Flink config
bdutil’s Flink extension handles the configuration for you. You may additionally adjust configuration variables in `extensions/flink/flink_env.sh`. If you want to make further configuration, please take a look at [configuring Flink](../config.html). You will have to restart Flink after changing its configuration using `bin/stop-cluster` and `bin/start-cluster`.
## Bring up a cluster with Flink
To bring up the Flink cluster on Google Compute Engine, execute:
```
./bdutil -e extensions/flink/flink_env.sh deploy
```
## Run a Flink example job:
```
./bdutil shell
cd /home/hadoop/flink-install/bin
./flink run ../examples/batch/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
```
## Shut down your cluster
Shutting down a cluster is as simple as executing
```
./bdutil -e extensions/flink/flink_env.sh delete
```
# MapR Setup
This documentation provides instructions on how to prepare Flink for YARN executions on a [MapR](https://mapr.com/) cluster.
## Running Flink on YARN with MapR
The instructions below assume MapR version 5.2.0\. They will guide you to be able to start submitting [Flink on YARN](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html) jobs or sessions to a MapR cluster.
### Building Flink for MapR
In order to run Flink on MapR, Flink needs to be built with MapR’s own Hadoop and Zookeeper distribution. Simply build Flink using Maven with the following command from the project root directory:
<figure class="highlight">
```
mvn clean install -DskipTests -Pvendor-repos,mapr \
-Dhadoop.version=2.7.0-mapr-1607 \
-Dzookeeper.version=3.4.5-mapr-1604
```
</figure>
The `vendor-repos` build profile adds MapR’s repository to the build so that MapR’s Hadoop / Zookeeper dependencies can be fetched. The `mapr` build profile additionally resolves some dependency clashes between MapR and Flink, as well as ensuring that the native MapR libraries on the cluster nodes are used. Both profiles must be activated.
By default the `mapr` profile builds with Hadoop / Zookeeper dependencies for MapR version 5.2.0, so you don’t need to explicitly override the `hadoop.version` and `zookeeper.version` properties. For different MapR versions, simply override these properties to appropriate values. The corresponding Hadoop / Zookeeper distributions for each MapR version can be found on MapR documentations such as [here](http://maprdocs.mapr.com/home/DevelopmentGuide/MavenArtifacts.html).
### Job Submission Client Setup
The client submitting Flink jobs to MapR also needs to be prepared with the below setups.
Ensure that MapR’s JAAS config file is picked up to avoid login failures:
<figure class="highlight">
```
export JVM_ARGS=-Djava.security.auth.login.config=/opt/mapr/conf/mapr.login.conf
```
</figure>
Make sure that the `yarn.nodemanager.resource.cpu-vcores` property is set in `yarn-site.xml`:
<figure class="highlight">
```
<!-- in /opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/yarn-site.xml -->
<configuration>
...
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>...</value>
</property>
...
</configuration>
```
</figure>
Also remember to set the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variables to the path where `yarn-site.xml` is located:
<figure class="highlight">
```
export YARN_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/
export HADOOP_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/
```
</figure>
Make sure that the MapR native libraries are picked up in the classpath:
<figure class="highlight">
```
export FLINK_CLASSPATH=/opt/mapr/lib/*
```
</figure>
If you’ll be starting Flink on YARN sessions with `yarn-session.sh`, the below is also required:
<figure class="highlight">
```
export CC_CLASSPATH=/opt/mapr/lib/*
```
</figure>
## Running Flink with a Secured MapR Cluster
_Note: In Flink 1.2.0, Flink’s Kerberos authentication for YARN execution has a bug that forbids it to work with MapR Security. Please upgrade to later Flink versions in order to use Flink with a secured MapR cluster. For more details, please see [FLINK-5949](https://issues.apache.org/jira/browse/FLINK-5949)._
Flink’s [Kerberos authentication](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/security-kerberos.html) is independent of [MapR’s Security authentication](http://maprdocs.mapr.com/home/SecurityGuide/Configuring-MapR-Security.html). With the above build procedures and environment variable setups, Flink does not require any additional configuration to work with MapR Security.
Users simply need to login by using MapR’s `maprlogin` authentication utility. Users that haven’t acquired MapR login credentials would not be able to submit Flink jobs, erroring with:
<figure class="highlight">
```
java.lang.Exception: unable to establish the security context
Caused by: o.a.f.r.security.modules.SecurityModule$SecurityInstallException: Unable to set the Hadoop login user
Caused by: java.io.IOException: failure to login: Unable to obtain MapR credentials
```
</figure>
# Hadoop Integration
## Configuring Flink with Hadoop Classpaths
Flink will use the environment variable `HADOOP_CLASSPATH` to augment the classpath that is used when starting Flink components such as the Client, JobManager, or TaskManager. Most Hadoop distributions and cloud environments will not set this variable by default so if the Hadoop classpath should be picked up by Flink the environment variable must be exported on all machines that are running Flink components.
When running on YARN, this is usually not a problem because the components running inside YARN will be started with the Hadoop classpaths, but it can happen that the Hadoop dependencies must be in the classpath when submitting a job to YARN. For this, it’s usually enough to do a
<figure class="highlight">
```
export HADOOP_CLASSPATH=`hadoop classpath`
```
</figure>
in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath.
# JobManager High Availability (HA)
The JobManager coordinates every Flink deployment. It is responsible for both _scheduling_ and _resource management_.
By default, there is a single JobManager instance per Flink cluster. This creates a _single point of failure_ (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.
With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the _SPOF_. You can configure high availability for both **standalone** and **YARN clusters**.
## Standalone Cluster High Availability
The general idea of JobManager high availability for standalone clusters is that there is a **single leading JobManager** at any time and **multiple standby JobManagers** to take over leadership in case the leader fails. This guarantees that there is **no single point of failure** and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.
As an example, consider the following setup with three JobManager instances:
![](https://ci.apache.org/projects/flink/flink-docs-release-1.7/fig/jobmanager_ha_overview.png)
### Configuration
To enable JobManager High Availability you have to set the **high-availability mode** to _zookeeper_, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for _distributed coordination_ between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper’s Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
#### Masters File (masters)
In order to start an HA-cluster configure the _masters_ file in `conf/masters`:
* **masters file**: The _masters file_ contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds.
```
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
```
By default, the job manager will pick a _random port_ for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`).
#### Config File (flink-conf.yaml)
In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
* **high-availability mode** (required): The _high-availability mode_ has to be set in `conf/flink-conf.yaml` to _zookeeper_ in order to enable high availability mode. Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance.
```
high-availability: zookeeper
```
* **ZooKeeper quorum** (required): A _ZooKeeper quorum_ is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
```
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
```
Each _addressX:port_ refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
* **ZooKeeper root** (recommended): The _root ZooKeeper node_, under which all cluster nodes are placed.
```
high-availability.zookeeper.path.root: /flink
```
* **ZooKeeper cluster-id** (recommended): The _cluster-id ZooKeeper node_, under which all required coordination data for a cluster is placed.
```
high-availability.cluster-id: /default_ns # important: customize per cluster
```
**Important**: You should not set this value manually when running a YARN cluster, a per-job YARN session, or on another cluster manager. In those cases a cluster-id is automatically being generated based on the application id. Manually setting a cluster-id overrides this behaviour in YARN. Specifying a cluster-id with the -z CLI option, in turn, overrides manual configuration. If you are running multiple Flink HA clusters on bare metal, you have to manually configure separate cluster-ids for each cluster.
* **Storage directory** (required): JobManager metadata is persisted in the file system _storageDir_ and only a pointer to this state is stored in ZooKeeper.
```
high-availability.storageDir: hdfs:///flink/recovery
```
The `storageDir` stores all metadata needed to recover a JobManager failure.
After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting.
#### Example: Standalone Cluster with 2 JobManagers
1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
```
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: hdfs:///flink/recovery
```
2. **Configure masters** in `conf/masters`:
```
localhost:8081
localhost:8082
```
3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it’s only possible to run a single ZooKeeper server per machine):
```
server.0=localhost:2888:3888
```
4. **Start ZooKeeper quorum**:
```
$ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.
```
5. **Start an HA-cluster**:
```
$ bin/start-cluster.sh
Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
Starting jobmanager daemon on host localhost.
Starting jobmanager daemon on host localhost.
Starting taskmanager daemon on host localhost.
```
6. **Stop ZooKeeper quorum and cluster**:
```
$ bin/stop-cluster.sh
Stopping taskmanager daemon (pid: 7647) on localhost.
Stopping jobmanager daemon (pid: 7495) on host localhost.
Stopping jobmanager daemon (pid: 7349) on host localhost.
$ bin/stop-zookeeper-quorum.sh
Stopping zookeeper daemon (pid: 7101) on host localhost.
```
## YARN Cluster High Availability
When running a highly available YARN cluster, **we don’t run multiple JobManager (ApplicationMaster) instances**, but only one, which is restarted by YARN on failures. The exact behaviour depends on on the specific YARN version you are using.
### Configuration
#### Maximum Application Master Attempts (yarn-site.xml)
You have to configure the maximum number of attempts for the application masters for **your** YARN setup in `yarn-site.xml`:
<figure class="highlight">
```
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
```
</figure>
The default for current YARN versions is 2 (meaning a single JobManager failure is tolerated).
#### Application Attempts (flink-conf.yaml)
In addition to the HA configuration ([see above](#configuration)), you have to configure the maximum attempts in `conf/flink-conf.yaml`:
```
yarn.application-attempts: 10
```
This means that the application can be restarted 9 times for failed attempts before YARN fails the application (9 retries + 1 initial attempt). Additional restarts can be performed by YARN if required by YARN operations: Preemption, node hardware failures or reboots, or NodeManager resyncs. These restarts are not counted against `yarn.application-attempts`, see [Jian Fang’s blog post](http://johnjianfang.blogspot.de/2015/04/the-number-of-maximum-attempts-of-yarn.html). It’s important to note that `yarn.resourcemanager.am.max-attempts` is an upper bound for the application restarts. Therefore, the number of application attempts set within Flink cannot exceed the YARN cluster setting with which YARN was started.
#### Container Shutdown Behaviour
* **YARN 2.3.0 &lt; version &lt; 2.4.0**. All containers are restarted if the application master fails.
* **YARN 2.4.0 &lt; version &lt; 2.6.0**. TaskManager containers are kept alive across application master failures. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again.
* **YARN 2.6.0 &lt;= version**: Sets the attempt failure validity interval to the Flinks’ Akka timeout value. The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it’s application attempts.
**Note**: Hadoop YARN 2.4.0 has a major bug (fixed in 2.5.0) preventing container restarts from a restarted Application Master/Job Manager container. See [FLINK-4142](https://issues.apache.org/jira/browse/FLINK-4142) for details. We recommend using at least Hadoop 2.5.0 for high availability setups on YARN.
#### Example: Highly Available YARN Session
1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
```
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
```
2. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it’s only possible to run a single ZooKeeper server per machine):
```
server.0=localhost:2888:3888
```
3. **Start ZooKeeper quorum**:
```
$ bin/start-zookeeper-quorum.sh
Starting zookeeper daemon on host localhost.
```
4. **Start an HA-cluster**:
```
$ bin/yarn-session.sh -n 2
```
## Configuring for Zookeeper Security
If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in `flink-conf.yaml` as necessary:
```
zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the ZooKeeper quorum is configured
# with a different service name then it can be supplied here.
zookeeper.sasl.login-context-name: Client # default is "Client". The value needs to match one of the values
# configured in "security.kerberos.login.contexts".
```
For more information on Flink configuration for Kerberos security, please see [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html). You can also find [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security.
## Bootstrap ZooKeeper
If you don’t have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink.
There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server:
```
server.X=addressX:peerPort:leaderPort
[...]
server.Y=addressY:peerPort:leaderPort
```
The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
# Checkpoints
## Overview
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.
See [Checkpointing](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/checkpointing.html) for how to enable and configure checkpoints for your program.
## Retained Checkpoints
Checkpoints are by default not retained and are only used to resume a job from failures. They are deleted when a program is cancelled. You can, however, configure periodic checkpoints to be retained. Depending on the configuration these _retained_ checkpoints are _not_ automatically cleaned up when the job fails or is canceled. This way, you will have a checkpoint around to resume from if your job fails.
<figure class="highlight">
```
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
```
</figure>
The `ExternalizedCheckpointCleanup` mode configures what happens with checkpoints when you cancel the job:
* **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case.
* **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails.
### Directory Structure
Similarly to [savepoints](savepoints.html), a checkpoint consists of a meta data file and, depending on the state backend, some additional data files. The meta data file and data files are stored in the directory that is configured via `state.checkpoints.dir` in the configuration files, and also can be specified for per job in the code.
#### Configure globally via configuration files
<figure class="highlight">
```
state.checkpoints.dir: hdfs:///checkpoints/
```
</figure>
#### Configure for per job when constructing the state backend
<figure class="highlight">
```
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
```
</figure>
### Difference to Savepoints
Checkpoints have a few differences from [savepoints](savepoints.html). They
* use a state backend specific (low-level) data format, may be incremental.
* do not support Flink specific features like rescaling.
### Resuming from a retained checkpoint
A job may be resumed from a checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the [savepoint restore guide](../cli.html#restore-a-savepoint)). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see [Directory Structure](#directory-structure) above).
<figure class="highlight">
```
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
```
</figure>
# Savepoints
## What is a Savepoint? How is a Savepoint different from a Checkpoint?
A Savepoint is a consistent image of the execution state of a streaming job, created via Flink’s [checkpointing mechanism](//ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html). You can use Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file. The files on stable storage represent the net data of the job’s execution state image. The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths.
**Attention:** In order to allow upgrades between programs and Flink versions, it is important to check out the following section about [assigning IDs to your operators](#assigning-operator-ids).
Conceptually, Flink’s Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. As a method of recovery and being periodically triggered, two main design goals for the Checkpoint implementation are i) being as lightweight to create and ii) being as fast to restore from as possible. Optimizations towards those goals can exploit certain properties, e.g. that the job code doesn’t change between the execution attempts. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints).
In contrast to all this, Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus more on portability and support for the previously mentioned changes to the job.
Those conceptual differences aside, the current implementations of Checkpoints and Savepoints are basically using the same code and produce the same „format”. However, there is currently one exception from this, and we might introduce more differences in the future. The exception are incremental checkpoints with the RocksDB state backend. They are using some RocksDB internal format instead of Flink’s native savepoint format. This makes them the first instance of a more lightweight checkpointing mechanism, compared to Savepoints.
## Assigning Operator IDs
It is **highly recommended** that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the **`uid(String)`** method. These IDs are used to scope the state of each operator.
<figure class="highlight">
```
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
```
</figure>
If you don’t specify the IDs manually they will be generated automatically. You can automatically restore from the savepoint as long as these IDs do not change. The generated IDs depend on the structure of your program and are sensitive to program changes. Therefore, it is highly recommended to assign these IDs manually.
### Savepoint State
You can think of a savepoint as holding a map of `Operator ID -&gt; State` for each stateful operator:
<figure class="highlight">
```
Operator ID | State
------------+------------------------
source-id | State of StatefulSource
mapper-id | State of StatefulMapper
```
</figure>
In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program.
## Operations
You can use the [command line client](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/cli.html#savepoints) to _trigger savepoints_, _cancel a job with a savepoint_, _resume from savepoints_, and _dispose savepoints_.
With Flink &gt;= 1.2.0 it is also possible to _resume from savepoints_ using the webui.
### Triggering Savepoints
When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)).
**Attention:** The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system.
For example with a `FsStateBackend` or `RocksDBStateBackend`:
<figure class="highlight">
```
# Savepoint target directory
/savepoints/
# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/
# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata
# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...
```
</figure>
**Note:** Although it looks as if the savepoints may be moved, it is currently not possible due to absolute paths in the `_metadata` file. Please follow [FLINK-5778](https://issues.apache.org/jira/browse/FLINK-5778) for progress on lifting this restriction.
Note that if you use the `MemoryStateBackend`, metadata _and_ savepoint state will be stored in the `_metadata` file. Since it is self-contained, you may move the file and restore from any location.
**Attention:** It is discouraged to move or delete the last savepoint of a running job, because this might interfere with failure-recovery. Savepoints have side-effects on exactly-once sinks, therefore to ensure exactly-once semantics, if there is no checkpoint after the last savepoint, the savepoint will be used for recovery.
#### Trigger a Savepoint
<figure class="highlight">
```
$ bin/flink savepoint :jobId [:targetDirectory]
```
</figure>
This will trigger a savepoint for the job with ID `:jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints.
#### Trigger a Savepoint with YARN
<figure class="highlight">
```
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
```
</figure>
This will trigger a savepoint for the job with ID `:jobId` and YARN application ID `:yarnAppId`, and returns the path of the created savepoint.
#### Cancel Job with Savepoint
<figure class="highlight">
```
$ bin/flink cancel -s [:targetDirectory] :jobId
```
</figure>
This will atomically trigger a savepoint for the job with ID `:jobid` and cancel the job. Furthermore, you can specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager(s) and TaskManager(s).
### Resuming from Savepoints
<figure class="highlight">
```
$ bin/flink run -s :savepointPath [:runArgs]
```
</figure>
This submits a job and specifies a savepoint to resume from. You may give a path to either the savepoint’s directory or the `_metadata` file.
#### Allowing Non-Restored State
By default the resume operation will try to map all state of the savepoint back to the program you are restoring with. If you dropped an operator, you can allow to skip state that cannot be mapped to the new program via `--allowNonRestoredState` (short: `-n`) option:
<figure class="highlight">
```
$ bin/flink run -s :savepointPath -n [:runArgs]
```
</figure>
### Disposing Savepoints
<figure class="highlight">
```
$ bin/flink savepoint -d :savepointPath
```
</figure>
This disposes the savepoint stored in `:savepointPath`.
Note that it is possible to also manually delete a savepoint via regular file system operations without affecting other savepoints or checkpoints (recall that each savepoint is self-contained). Up to Flink 1.2, this was a more tedious task which was performed with the savepoint command above.
### Configuration
You can configure a default savepoint target directory via the `state.savepoints.dir` key. When triggering savepoints, this directory will be used to store the savepoint. You can overwrite the default by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)).
<figure class="highlight">
```
# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints
```
</figure>
If you neither configure a default nor specify a custom target directory, triggering the savepoint will fail.
**Attention:** The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system.
## F.A.Q
### Should I assign IDs to all operators in my job?
As a rule of thumb, yes. Strictly speaking, it is sufficient to only assign IDs via the `uid` method to the stateful operators in your job. The savepoint only contains state for these operators and stateless operator are not part of the savepoint.
In practice, it is recommended to assign it to all operators, because some of Flink’s built-in operators like the Window operator are also stateful and it is not obvious which built-in operators are actually stateful and which are not. If you are absolutely certain that an operator is stateless, you can skip the `uid` method.
### What happens if I add a new operator that requires state to my job?
When you add a new operator to your job it will be initialized without any state. Savepoints contain the state of each stateful operator. Stateless operators are simply not part of the savepoint. The new operator behaves similar to a stateless operator.
### What happens if I delete an operator that has state from my job?
By default, a savepoint restore will try to match all state back to the restored job. If you restore from a savepoint that contains state for an operator that has been deleted, this will therefore fail.
You can allow non restored state by setting the `--allowNonRestoredState` (short: `-n`) with the run command:
<figure class="highlight">
```
$ bin/flink run -s :savepointPath -n [:runArgs]
```
</figure>
### What happens if I reorder stateful operators in my job?
If you assigned IDs to these operators, they will be restored as usual.
If you did not assign IDs, the auto generated IDs of the stateful operators will most likely change after the reordering. This would result in you not being able to restore from a previous savepoint.
### What happens if I add or delete or reorder operators that have no state in my job?
If you assigned IDs to your stateful operators, the stateless operators will not influence the savepoint restore.
If you did not assign IDs, the auto generated IDs of the stateful operators will most likely change after the reordering. This would result in you not being able to restore from a previous savepoint.
### What happens when I change the parallelism of my program when restoring?
If the savepoint was triggered with Flink &gt;= 1.2.0 and using no deprecated state API like `Checkpointed`, you can simply restore the program from a savepoint and specify a new parallelism.
If you are resuming from a savepoint triggered with Flink &lt; 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink &gt;= 1.2.0 before being able to change the parallelism. See the [upgrading jobs and Flink versions guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/upgrading.html).
### Can I move the Savepoint files on stable storage?
The quick answer to this question is currently “no” because the meta data file references the files on stable storage as absolute paths for technical reasons. The longer answer is: if you MUST move the files for some reason there are two potential approaches as workaround. First, simpler but potentially more dangerous, you can use an editor to find the old path in the meta data file and replace them with the new path. Second, you can use the class SavepointV2Serializer as starting point to programmatically read, manipulate, and rewrite the meta data file with the new paths.
# State Backends
Programs written in the [Data Stream API](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html) often hold state in various forms:
* Windows gather elements or aggregates until they are triggered
* Transformation functions may use the key/value state interface to store values
* Transformation functions may implement the `CheckpointedFunction` interface to make their local variables fault tolerant
See also [state section](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/index.html) in the streaming API guide.
When checkpointing is activated, such state is persisted upon checkpoints to guard against data loss and recover consistently. How the state is represented internally, and how and where it is persisted upon checkpoints depends on the chosen **State Backend**.
## Available State Backends
Out of the box, Flink bundles these state backends:
* _MemoryStateBackend_
* _FsStateBackend_
* _RocksDBStateBackend_
If nothing else is configured, the system will use the MemoryStateBackend.
### The MemoryStateBackend
The _MemoryStateBackend_ holds data internally as objects on the Java heap. Key/value state and window operators hold hash tables that store the values, triggers, etc.
Upon checkpoints, this state backend will snapshot the state and send it as part of the checkpoint acknowledgement messages to the JobManager (master), which stores it on its heap as well.
The MemoryStateBackend can be configured to use asynchronous snapshots. While we strongly encourage the use of asynchronous snapshots to avoid blocking pipelines, please note that this is currently enabled by default. To disable this feature, users can instantiate a `MemoryStateBackend` with the corresponding boolean flag in the constructor set to `false`(this should only used for debug), e.g.:
<figure class="highlight">
```
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
```
</figure>
Limitations of the MemoryStateBackend:
* The size of each individual state is by default limited to 5 MB. This value can be increased in the constructor of the MemoryStateBackend.
* Irrespective of the configured maximal state size, the state cannot be larger than the akka frame size (see [Configuration](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html)).
* The aggregate state must fit into the JobManager memory.
The MemoryStateBackend is encouraged for:
* Local development and debugging
* Jobs that do hold little state, such as jobs that consist only of record-at-a-time functions (Map, FlatMap, Filter, …). The Kafka Consumer requires very little state.
### The FsStateBackend
The _FsStateBackend_ is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon checkpointing, it writes state snapshots into files in the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The FsStateBackend uses _asynchronous snapshots by default_ to avoid blocking the processing pipeline while writing state checkpoints. To disable this feature, users can instantiate a `FsStateBackend` with the corresponding boolean flag in the constructor set to `false`, e.g.:
<figure class="highlight">
```
new FsStateBackend(path, false);
```
</figure>
The FsStateBackend is encouraged for:
* Jobs with large state, long windows, large key/value states.
* All high-availability setups.
### The RocksDBStateBackend
The _RocksDBStateBackend_ is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
The RocksDBStateBackend holds in-flight data in a [RocksDB](http://rocksdb.org) database that is (per default) stored in the TaskManager data directories. Upon checkpointing, the whole RocksDB database will be checkpointed into the configured file system and directory. Minimal metadata is stored in the JobManager’s memory (or, in high-availability mode, in the metadata checkpoint).
The RocksDBStateBackend always performs asynchronous snapshots.
Limitations of the RocksDBStateBackend:
* As RocksDB’s JNI bridge API is based on byte[], the maximum supported size per key and per value is 2^31 bytes each. IMPORTANT: states that use merge operations in RocksDB (e.g. ListState) can silently accumulate value sizes &gt; 2^31 bytes and will then fail on their next retrieval. This is currently a limitation of RocksDB JNI.
The RocksDBStateBackend is encouraged for:
* Jobs with very large state, long windows, large key/value states.
* All high-availability setups.
Note that the amount of state that you can keep is only limited by the amount of disk space available. This allows keeping very large state, compared to the FsStateBackend that keeps state in memory. This also means, however, that the maximum throughput that can be achieved will be lower with this state backend. All reads/writes from/to this backend have to go through de-/serialization to retrieve/store the state objects, which is also more expensive than always working with the on-heap representation as the heap-based backends are doing.
RocksDBStateBackend is currently the only backend that offers incremental checkpoints (see [here](large_state_tuning.html)).
Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics)
## Configuring a State Backend
The default state backend, if you specify nothing, is the jobmanager. If you wish to establish a different default for all jobs on your cluster, you can do so by defining a new default state backend in **flink-conf.yaml**. The default state backend can be overridden on a per-job basis, as shown below.
### Setting the Per-job State Backend
The per-job state backend is set on the `StreamExecutionEnvironment` of the job, as shown in the example below:
<figure class="highlight">
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
```
</figure>
<figure class="highlight">
```
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
```
</figure>
### Setting Default State Backend
A default state backend can be configured in the `flink-conf.yaml`, using the configuration key `state.backend`.
Possible values for the config entry are _jobmanager_ (MemoryStateBackend), _filesystem_ (FsStateBackend), _rocksdb_ (RocksDBStateBackend), or the fully qualified class name of the class that implements the state backend factory [FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java), such as `org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory` for RocksDBStateBackend.
The `state.checkpoints.dir` option defines the directory to which all backends write checkpoint data and meta data files. You can find more details about the checkpoint directory structure [here](checkpoints.html#directory-structure).
A sample section in the configuration file could look as follows:
<figure class="highlight">
```
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
```
</figure>
#### RocksDB State Backend Config Options
| Key | Default | Description |
| --- | --- | --- |
|
##### state.backend.rocksdb.localdir
| (none) | The local directory (on the TaskManager) where RocksDB puts its files. |
|
##### state.backend.rocksdb.timer-service.factory
| "HEAP" | This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB . |
此差异已折叠。
此差异已折叠。
此差异已折叠。
# Production Readiness Checklist
## Production Readiness Checklist
Purpose of this production readiness checklist is to provide a condensed overview of configuration options that are important and need **careful considerations** if you plan to bring your Flink job into **production**. For most of these options Flink provides out-of-the-box defaults to make usage and adoption of Flink easier. For many users and scenarios, those defaults are good starting points for development and completely sufficient for “one-shot” jobs.
However, once you are planning to bring a Flink application to production the requirements typically increase. For example, you want your job to be (re-)scalable and to have a good upgrade story for your job and new Flink versions.
In the following, we present a collection of configuration options that you should check before your job goes into production.
### Set maximum parallelism for operators explicitly
Maximum parallelism is a configuration parameter that is newly introduced in Flink 1.2 and has important implications for the (re-)scalability of your Flink job. This parameter, which can be set on a per-job and/or per-operator granularity, determines the maximum parallelism to which you can scale operators. It is important to understand that (as of now) there is **no way to change** this parameter after your job has been started, except for restarting your job completely from scratch (i.e. with a new state, and not from a previous checkpoint/savepoint). Even if Flink would provide some way to change maximum parallelism for existing savepoints in the future, you can already assume that for large states this is likely a long running operation that you want to avoid. At this point, you might wonder why not just to use a very high value as default for this parameter. The reason behind this is that high maximum parallelism can have some impact on your application’s performance and even state sizes, because Flink has to maintain certain metadata for its ability to rescale which can increase with the maximum parallelism. In general, you should choose a max parallelism that is high enough to fit your future needs in scalability, but keeping it as low as possible can give slightly better performance. In particular, a maximum parallelism higher that 128 will typically result in slightly bigger state snapshots from the keyed backends.
Notice that maximum parallelism must fulfill the following conditions:
`0 &lt; parallelism &lt;= max parallelism &lt;= 2^15`
You can set the maximum parallelism by `setMaxParallelism(int maxparallelism)`. By default, Flink will choose the maximum parallelism as a function of the parallelism when the job is first started:
* `128` : for all parallelism &lt;= 128.
* `MIN(nextPowerOfTwo(parallelism + (parallelism / 2)), 2^15)` : for all parallelism &gt; 128.
### Set UUIDs for operators
As mentioned in the documentation for [savepoints](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html), users should set uids for operators. Those operator uids are important for Flink’s mapping of operator states to operators which, in turn, is essential for savepoints. By default operator uids are generated by traversing the JobGraph and hashing certain operator properties. While this is comfortable from a user perspective, it is also very fragile, as changes to the JobGraph (e.g. exchanging an operator) will result in new UUIDs. To establish a stable mapping, we need stable operator uids provided by the user through `setUid(String uid)`.
### Choice of state backend
Currently, Flink has the limitation that it can only restore the state from a savepoint for the same state backend that took the savepoint. For example, this means that we can not take a savepoint with a memory state backend, then change the job to use a RocksDB state backend and restore. While we are planning to make backends interoperable in the near future, they are not yet. This means you should carefully consider which backend you use for your job before going to production.
In general, we recommend using RocksDB because this is currently the only state backend that supports large states (i.e. state that exceeds the available main memory) and asynchronous snapshots. From our experience, asynchronous snapshots are very important for large states because they do not block the operators and Flink can write the snapshots without stopping stream processing. However, RocksDB can have worse performance than, for example, the memory-based state backends. If you are sure that your state will never exceed main memory and blocking the stream processing to write it is not an issue, you **could consider** to not use the RocksDB backends. However, at this point, we **strongly recommend** using RocksDB for production.
### Config JobManager High Availability(HA)
The JobManager coordinates every Flink deployment. It is responsible for both _scheduling_ and _resource management_.
By default, there is a single JobManager instance per Flink cluster. This creates a _single point of failure_ (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.
With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the _SPOF_. We **strongly recommend** you configure [high availability](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html) for production.
此差异已折叠。
# Scala REPL
Flink comes with an integrated interactive Scala Shell. It can be used in a local setup as well as in a cluster setup.
To use the shell with an integrated Flink cluster just execute:
<figure class="highlight">
```
bin/start-scala-shell.sh local
```
</figure>
in the root directory of your binary Flink directory. To run the Shell on a cluster, please see the Setup section below.
## Usage
The shell supports Batch and Streaming. Two different ExecutionEnvironments are automatically prebound after startup. Use “benv” and “senv” to access the Batch and Streaming environment respectively.
### DataSet API
The following example will execute the wordcount program in the Scala shell:
<figure class="highlight">
```
Scala-Flink> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()
```
</figure>
The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
It is possible to write results to a file. However, in this case you need to call `execute`, to run your program:
<figure class="highlight">
```
Scala-Flink> benv.execute("MyProgram")
```
</figure>
### DataStream API
Similar to the batch program above, we can execute a streaming program through the DataStream API:
<figure class="highlight">
```
Scala-Flink> val textStreaming = senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")
```
</figure>
Note, that in the Streaming case, the print operation does not trigger execution directly.
The Flink Shell comes with command history and auto-completion.
## Adding external dependencies
It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
Use the parameter `-a &lt;path/to/jar.jar&gt;` or `--addclasspath &lt;path/to/jar.jar&gt;` to load additional classes.
<figure class="highlight">
```
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
```
</figure>
## Setup
To get an overview of what options the Scala Shell provides, please use
<figure class="highlight">
```
bin/start-scala-shell.sh --help
```
</figure>
### Local
To use the shell with an integrated Flink cluster just execute:
<figure class="highlight">
```
bin/start-scala-shell.sh local
```
</figure>
### Remote
To use it with a running cluster start the scala shell with the keyword `remote` and supply the host and port of the JobManager with:
<figure class="highlight">
```
bin/start-scala-shell.sh remote <hostname> <portnumber>
```
</figure>
### Yarn Scala Shell cluster
The shell can deploy a Flink cluster to YARN, which is used exclusively by the shell. The number of YARN containers can be controlled by the parameter `-n &lt;arg&gt;`. The shell deploys a new Flink cluster on YARN and connects the cluster. You can also specify options for YARN cluster such as memory for JobManager, name of YARN application, etc.
For example, to start a Yarn cluster for the Scala Shell with two TaskManagers use the following:
<figure class="highlight">
```
bin/start-scala-shell.sh yarn -n 2
```
</figure>
For all other options, see the full reference at the bottom.
### Yarn Session
If you have previously deployed a Flink cluster using the Flink Yarn Session, the Scala shell can connect with it using the following command:
<figure class="highlight">
```
bin/start-scala-shell.sh yarn
```
</figure>
## Full Reference
<figure class="highlight">
```
Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
Command: local [options]
Starts Flink scala shell with a local Flink cluster
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster
<host>
Remote host name as string
<port>
Remote port as integer
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster
-n arg | --container arg
Number of YARN container to allocate (= Number of TaskManagers)
-jm arg | --jobManagerMemory arg
Memory for JobManager container with optional unit (default: MB)
-nm <value> | --name <value>
Set a custom name for the application on YARN
-qu <arg> | --queue <arg>
Specifies YARN queue
-s <arg> | --slots <arg>
Number of slots per TaskManager
-tm <arg> | --taskManagerMemory <arg>
Memory per TaskManager container with optional unit (default: MB)
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
--configDir <value>
The configuration directory.
-h | --help
Prints this usage text
```
</figure>
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
# Debugging Windows & Event Time
## Monitoring Current Event Time
Flink’s [event time](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html) and watermark support is a powerful feature for handling out-of-order events. However, it’s harder to understand what exactly is going on because the progress of time is tracked within the system.
Low watermarks of each task can be accessed through Flink web interface or [metrics system](//ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html).
Each Task in Flink exposes a metric called `currentLowWatermark` that represents the lowest watermark received by this task. This long value represents the “current event time”. The value is calculated by taking the minimum of all watermarks received by upstream operators. This means that the event time tracked with watermarks is always dominated by the furthest-behind source.
The low watermark metric is accessible **using the web interface**, by choosing a task in the metric tab, and selecting the `&lt;taskNr&gt;.currentLowWatermark` metric. In the new box you’ll now be able to see the current low watermark of the task.
Another way of getting the metric is using one of the **metric reporters**, as described in the documentation for the [metrics system](//ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html). For local setups, we recommend using the JMX metric reporter and a tool like [VisualVM](https://visualvm.github.io/).
## Handling Event Time Stragglers
* Approach 1: Watermark stays late (indicated completeness), windows fire early
* Approach 2: Watermark heuristic with maximum lateness, windows accept late data
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册