提交 e2fd615c 编写于 作者: F Fabian Hueske

[FLINK-1076] Extended Hadoop Compatibility documentation to cover Hadoop functions

上级 3ebdebc0
......@@ -27,7 +27,7 @@
<li><a href="streaming_guide.html">Streaming Guide</a></li>
<li><a href="iterations.html">Iterations</a></li>
<li><a href="spargel_guide.html">Spargel Graph API</a></li>
<li><a href="hadoop_compatibility.html">Hadoop I/O Compatibility</a></li>
<li><a href="hadoop_compatibility.html">Hadoop Compatibility</a></li>
title: "Hadoop I/O Compatibility"
title: "Hadoop Compatibility"
Flink not only supports types that implement Apache Hadoop's `Writable` interface by default, but also provides a compatibility
layer that allows for using any class extending `org.apache.hadoop.mapred(uce).InputFormat` as a Flink `InputFormat` as well as any
class extending `org.apache.hadoop.mapred(uce).OutputFormat` as a Flink `OutputFormat`.
* This will be replaced by the TOC
Thus, Flink can handle Hadoop-related formats from the common `TextInputFormat` up to third-party components such as Hive through HCatalog's `HCatInputFormat`. Flink supports both the formats which use the old `org.apache.hadoop.mapred` API as well as the new `org.apache.hadoop.mapreduce` API.
Flink is compatible with many Apache Hadoop's MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce.
This document explains how to configure your Maven project correctly and shows an example.
You can:
- use Hadoop's `Writable` [data types](programming_guide.html#data-types) in Flink programs.
- use any Hadoop `InputFormat` as a [DataSource](programming_guide.html#data-sources).
- use any Hadoop `OutputFormat` as a [DataSink](programming_guide.html#data-sinks).
- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
This document shows how to use existing Hadoop MapReduce code with Flink.
### Project Configuration
The Hadoop Compatibility Layer is part of the *addons* Maven project. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. The package includes separate packages and classes for the Hadoop `mapred` and `mapreduce` API.
The Hadoop Compatibility Layer is part of the `flink-addons` Maven module. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. It includes separate packages and classes for the Hadoop `mapred` and `mapreduce` APIs.
Add the following dependency to your `pom.xml` to use the Hadoop Compatibility Layer.
......@@ -24,52 +31,152 @@ Add the following dependency to your `pom.xml` to use the Hadoop Compatibility L
### Examples
### Using Hadoop Data Types
Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details.
The following example shows how to read a file from Hadoop's `TextInputFormat`, count the words with Flink and output the result with Hadoop's `TextOutputFormat`.
### Using Hadoop InputFormats
Flink's `HadoopInputFormat` is initiated in line 5 and acts as a wrapper for the Hadoop `TextInputFormat`. An instance of `TextInputFormat` and the corresponding return types for key and value together with a Hadoop `Job` definition must be passed to the constructor of the `HadoopInputFormat`.
Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair.
Take care that you choose the right classes since `HadoopInputFormat` and `HadoopOutputFormat` exist in both packages for Hadoop's `mapred` and `mapreduce` API. For example, the `HadoopInputFormat` for the `mapred` API takes a Hadoop `JobConf` instance as parameter instead of `Job` instance.
Flink's InputFormat wrappers are
Flink's `HadoopOutputFormat` is initiated in a similar way with an instance of Hadoop's `TextOutputFormat` and the previously used Hadoop `Job` definition.
- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and
- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat`
Additional Hadoop properties can be set by calling the formats `getConfiguration()` (for `mapreduce` API) or `getJobConf()` (for `mapred` API) method.
and can be used as regular Flink [InputFormats](programming_guide.html#data-sources).
The following example shows how to use Hadoop's `TextInputFormat`.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
HadoopInputFormat<LongWritable, Text> hadoopIF =
// create the Flink wrapper.
new HadoopInputFormat<LongWritable, Text>(
// create the Hadoop InputFormat, specify key and value type, and job.
new TextInputFormat(), LongWritable.class, Text.class, job
TextInputFormat.addInputPath(job, new Path(inputPath));
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
// Do something with the data.
### Using Hadoop OutputFormats
Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat.
Flink's OUtputFormat wrappers are
- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and
- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat`
and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks).
The following example shows how to use Hadoop's `TextOutputFormat`.
// Obtain your result to emit.
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
// create the Flink wrapper.
new HadoopOutputFormat<Text, IntWritable>(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat<Text, IntWritable>(), job
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Emit data using the Hadoop TextOutputFormat.
**Please note:** At the moment, Hadoop OutputFormats must be executed with a parallelism of 1 (DOP = 1). This limitation will be resolved soon.
### Using Hadoop Mappers and Reducers
Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported.
The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a `DataSet<Tuple2<KEYOUT,VALUEOUT>>` as output where `KEYIN` and `KEYOUT` are the keys and `VALUEIN` and `VALUEOUT` are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and without a Combiner (`HadoopReduceFunction`). The wrappers accept an optional `JobConf` object to configure the Hadoop Mapper or Reducer.
Flink's function wrappers are
- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`,
- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and
- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`.
and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html#flatmap) or [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
**Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`.
### Complete Hadoop WordCount Example
The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
TextInputFormat.addInputPath(job, new Path(inputPath));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Output & Execute
env.execute("Word Count");
// Emit data using the Hadoop TextOutputFormat.
A full running example can be found in `org.apache.flink.hadoopcompatibility.mapred(uce).example.WordCount`.
// Execute Program
env.execute("Hadoop WordCount");
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册