提交 53d8f1f0 编写于 作者: F Fabian Hueske

Revert "Added wrappers for Hadoop functions"

This reverts commit 74dded1c.
上级 74dded1c
......@@ -27,7 +27,6 @@
<li><a href="streaming_guide.html">Streaming Guide</a></li>
<li><a href="iterations.html">Iterations</a></li>
<li><a href="spargel_guide.html">Spargel Graph API</a></li>
<li><a href="hadoop_compatibility.html">Hadoop Compatibility</a></li>
</ul>
</li>
......
---
title: "Hadoop Compatibility"
title: "Hadoop Compatability"
---
* This will be replaced by the TOC
{:toc}
Flink is compatible with many Apache Hadoop's MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce.
You can:
- use Hadoop's `Writable` [data types](programming_guide.html#data-types) in Flink programs.
- use any Hadoop `InputFormat` as a [DataSource](programming_guide.html#data-sources).
- use any Hadoop `OutputFormat` as a [DataSink](programming_guide.html#data-sinks).
- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
This document shows how to use existing Hadoop MapReduce code with Flink.
### Project Configuration
The Hadoop Compatibility Layer is part of the `flink-addons` Maven module. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. It includes separate packages and classes for the Hadoop `mapred` and `mapreduce` APIs.
Add the following dependency to your `pom.xml` to use the Hadoop Compatibility Layer.
~~~xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility</artifactId>
<version>{{site.FLINK_VERSION_STABLE}}</version>
</dependency>
~~~
### Using Hadoop Data Types
Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details.
### Using Hadoop InputFormats
Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair.
Flink's InputFormat wrappers are
- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and
- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat`
and can be used as regular Flink [InputFormats](programming_guide.html#data-sources).
The following example shows how to use Hadoop's `TextInputFormat`.
~~~java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
// create the Flink wrapper.
new HadoopInputFormat<LongWritable, Text>(
// create the Hadoop InputFormat, specify key and value type, and job.
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
// Do something with the data.
[...]
~~~
### Using Hadoop OutputFormats
Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat.
Flink's OUtputFormat wrappers are
- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and
- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat`
and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks).
The following example shows how to use Hadoop's `TextOutputFormat`.
~~~java
// Obtain your result to emit.
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
// create the Flink wrapper.
new HadoopOutputFormat<Text, IntWritable>(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF)
.setParallelism(1);
~~~
**Please note:** At the moment, Hadoop OutputFormats must be executed with a parallelism of 1 (DOP = 1). This limitation will be resolved soon.
### Using Hadoop Mappers and Reducers
Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported.
The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a `DataSet<Tuple2<KEYOUT,VALUEOUT>>` as output where `KEYIN` and `KEYOUT` are the keys and `VALUEIN` and `VALUEOUT` are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and without a Combiner (`HadoopReduceFunction`). The wrappers accept an optional `JobConf` object to configure the Hadoop Mapper or Reducer.
Flink's function wrappers are
- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`,
- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and
- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`.
and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html#flatmap) or [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
~~~java
// Obtain data to process somehow.
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
~~~
**Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`.
### Complete Hadoop WordCount Example
The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
~~~java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// Read data using the Hadoop TextInputFormat.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// use Hadoop Mapper (Tokenizer) as MapFunction
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// use Hadoop Reducer (Counter) as Reduce- and CombineFunction
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// Emit data using the Hadoop TextOutputFormat.
result.output(hadoopOF)
.setParallelism(1);
// Execute Program
env.execute("Hadoop WordCount");
~~~
To be written.
\ No newline at end of file
......@@ -17,14 +17,14 @@ This vertex-centric view makes it easy to express a large class of graph problem
Spargel API
-----------
The Spargel API is part of the *addons* Maven project. All relevant classes are located in the *org.apache.flink.spargel.java* package.
The Spargel API is part of the *addons* Maven project. All relevant classes are located in the *org.apache.flinkspargel.java* package.
Add the following dependency to your `pom.xml` to use the Spargel.
~~~xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-spargel</artifactId>
<artifactId>spargel</artifactId>
<version>{{site.FLINK_VERSION_STABLE}}</version>
</dependency>
~~~
......@@ -110,4 +110,4 @@ The computation **terminates** after a specified *maximum number of supersteps*
<p class="text-center">
<img alt="Spargel Example" width="75%" src="img/spargel_example.png" />
</p>
</p>
\ No newline at end of file
......@@ -43,7 +43,6 @@ import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
......@@ -67,7 +66,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
private transient RecordReader<K, V> recordReader;
private transient boolean fetched = false;
private transient boolean hasNext;
public HadoopInputFormat() {
super();
}
......@@ -156,9 +155,6 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
@Override
public void open(HadoopInputSplit split) throws IOException {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;
/**
* This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
*/
@SuppressWarnings("rawtypes")
public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
private transient JobConf jobConf;
private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
private transient Reporter reporter;
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
*
* @param hadoopMapper The Hadoop Mapper to wrap.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
this(hadoopMapper, new JobConf());
}
/**
* Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
* The Hadoop Mapper is configured with the provided JobConf.
*
* @param hadoopMapper The Hadoop Mapper to wrap.
* @param conf The JobConf that is used to configure the Hadoop Mapper.
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
if(hadoopMapper == null) {
throw new NullPointerException("Mapper may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.mapper = hadoopMapper;
this.jobConf = conf;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapper.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
@Override
public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
}
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
* Custom serialization methods.
* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeObject(mapper.getClass());
jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass =
(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
mapper = InstantiationUtil.instantiate(mapperClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
private transient JobConf jobConf;
private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
private transient Reporter reporter;
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
this(hadoopReducer, hadoopCombiner, new JobConf());
}
/**
* Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
* @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
* @param conf The JobConf that is used to configure both Hadoop Reducers.
*/
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
if(hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if(hadoopCombiner == null) {
throw new NullPointerException("Combiner may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.jobConf = conf;
}
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.combiner.configure(jobConf);
this.reporter = new HadoopDummyReporter();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
@Override
public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
throws Exception {
reduceCollector.setFlinkCollector(out);
valueIterator.set(values.iterator());
reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
}
@Override
public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
combineCollector.setFlinkCollector(out);
valueIterator.set(values.iterator());
combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
}
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
* Custom serialization methods.
* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeObject(reducer.getClass());
out.writeObject(combiner.getClass());
jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
combiner = InstantiationUtil.instantiate(combinerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
private transient JobConf jobConf;
private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
private transient Reporter reporter;
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer to wrap.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
this(hadoopReducer, new JobConf());
}
/**
* Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*
* @param hadoopReducer The Hadoop Reducer to wrap.
* @param conf The JobConf that is used to configure the Hadoop Reducer.
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
if(hadoopReducer == null) {
throw new NullPointerException("Reducer may not be null.");
}
if(conf == null) {
throw new NullPointerException("JobConf may not be null.");
}
this.reducer = hadoopReducer;
this.jobConf = conf;
}
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reducer.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
}
@Override
public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
throws Exception {
reduceCollector.setFlinkCollector(out);
valueIterator.set(values.iterator());
reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
}
@SuppressWarnings("unchecked")
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
* Custom serialization methods.
* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeObject(reducer.getClass());
jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
}
......@@ -18,26 +18,22 @@
package org.apache.flink.hadoopcompatibility.mapred.example;
import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
import org.apache.flink.util.Collector;
......@@ -48,7 +44,8 @@ import org.apache.hadoop.mapred.TextOutputFormat;
* This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
* common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
*/
public class HadoopMapredCompatWordCount {
@SuppressWarnings("serial")
public class WordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
......@@ -60,6 +57,7 @@ public class HadoopMapredCompatWordCount {
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
......@@ -68,66 +66,55 @@ public class HadoopMapredCompatWordCount {
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
// Set up Hadoop Output Format
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
}
public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> {
/**
* Splits a line into words and converts Hadoop Writables into normal Java data types.
*/
public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
@Override
public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String line = v.toString();
String line = value.f1.toString();
String[] tokens = line.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Text(token), new LongWritable(1l));
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
long cnt = 0;
while(vs.hasNext()) {
cnt += vs.next().get();
}
out.collect(k, new LongWritable(cnt));
}
/**
* Converts Java data types to Hadoop Writables.
*/
public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
@Override
public void configure(JobConf arg0) { }
public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
}
@Override
public void close() throws IOException { }
}
}
......@@ -52,10 +52,10 @@ import org.apache.hadoop.mapred.TextInputFormat;
*
* <br /><br />
*
* <b>Note</b>: This example uses the out-dated Record API.
* <b>Note</b>: This example uses the out dated Record API.
* It is recommended to use the new Java API.
*
* @see org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount
* @see org.apache.flink.hadoopcompatibility.mapred.example.WordCount
*/
public class WordCount implements Program, ProgramDescription {
......
......@@ -27,7 +27,7 @@ import org.apache.hadoop.util.Progressable;
*/
public class HadoopDummyProgressable implements Progressable {
@Override
public void progress() {
public void progress() {
}
}
......@@ -24,7 +24,6 @@ import java.io.IOException;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.mapred.JobConf;
......@@ -36,31 +35,33 @@ public class HadoopInputSplit implements InputSplit {
private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
@SuppressWarnings("unused")
private JobConf jobConf;
private int splitNumber;
private String hadoopInputSplitTypeName;
public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
return hadoopInputSplit;
}
public HadoopInputSplit() {
super();
}
public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
this.hadoopInputSplit = hInputSplit;
this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
this.jobConf = jobconf;
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(splitNumber);
out.writeUTF(hadoopInputSplitTypeName);
jobConf.write(out);
hadoopInputSplit.write(out);
}
......@@ -70,7 +71,7 @@ public class HadoopInputSplit implements InputSplit {
this.hadoopInputSplitTypeName = in.readUTF();
if(hadoopInputSplit == null) {
try {
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
}
......@@ -78,13 +79,7 @@ public class HadoopInputSplit implements InputSplit {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
jobConf = new JobConf();
jobConf.readFields(in);
if (this.hadoopInputSplit instanceof Configurable) {
((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
}
this.hadoopInputSplit.readFields(in);
}
@Override
......@@ -95,7 +90,7 @@ public class HadoopInputSplit implements InputSplit {
public void setSplitNumber(int splitNumber) {
this.splitNumber = splitNumber;
}
public void setHadoopInputSplit(
org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
this.hadoopInputSplit = hadoopInputSplit;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.IOException;
/**
* A Hadoop OutputCollector that wraps a Flink OutputCollector.
* On each call of collect() the data is forwarded to the wrapped Flink collector.
*
*/
@SuppressWarnings("rawtypes")
public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE extends Writable>
implements OutputCollector<KEY,VALUE> {
private Collector<Tuple2<KEY,VALUE>> flinkCollector;
private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
/**
* Set the wrapped Flink collector.
*
* @param flinkCollector The wrapped Flink OutputCollector.
*/
public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
this.flinkCollector = flinkCollector;
}
/**
* Use the wrapped Flink collector to collect a key-value pair for Flink.
*
* @param key the key to collect
* @param val the value to collect
* @throws IOException unexpected of key or value in key-value pair.
*/
@Override
public void collect(final KEY key, final VALUE val) throws IOException {
this.outTuple.f0 = key;
this.outTuple.f1 = val;
this.flinkCollector.collect(outTuple);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import java.util.Iterator;
import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
*/
@SuppressWarnings("rawtypes")
public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, VALUE extends Writable>
extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private Iterator<Tuple2<KEY,VALUE>> iterator;
private final WritableSerializer<KEY> keySerializer;
private boolean atFirst = false;
private KEY curKey = null;
private VALUE firstValue = null;
public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
this.keySerializer = new WritableSerializer<KEY>(keyClass);
}
/**
* Set the Flink iterator to wrap.
*
* @param iterator The Flink iterator to wrap.
*/
@Override()
public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
this.iterator = iterator;
if(this.hasNext()) {
final Tuple2<KEY, VALUE> tuple = iterator.next();
this.curKey = keySerializer.copy(tuple.f0);
this.firstValue = tuple.f1;
this.atFirst = true;
} else {
this.atFirst = false;
}
}
@Override
public boolean hasNext() {
if(this.atFirst) {
return true;
}
return iterator.hasNext();
}
@Override
public VALUE next() {
if(this.atFirst) {
this.atFirst = false;
return firstValue;
}
final Tuple2<KEY, VALUE> tuple = iterator.next();
return tuple.f1;
}
public KEY getCurrentKey() {
return this.curKey;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
......@@ -18,22 +18,22 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
public class HadoopMapredITCase extends JavaProgramTestBase {
public class HadoopInputOutputITCase extends JavaProgramTestBase {
protected String textPath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
this.setDegreeOfParallelism(4);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
......@@ -41,7 +41,6 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });
WordCount.main(new String[] { textPath, resultPath });
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class HadoopMapFunctionITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 3;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
private String expectedResult;
public HadoopMapFunctionITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}
@Override
protected void testProgram() throws Exception {
expectedResult = MapperProgs.runProgram(curProgId, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
return toParameterList(tConfigs);
}
public static class MapperProgs {
public static String runProgram(int progId, String resultPath) throws Exception {
switch(progId) {
case 1: {
/*
* Test non-passing mapper
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
nonPassingFlatMapDs.writeAsText(resultPath);
env.execute();
// return expected result
return "\n";
}
case 2: {
/*
* Test data duplicating mapper
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
duplicatingFlatMapDs.writeAsText(resultPath);
env.execute();
// return expected result
return "(1,Hi)\n" + "(1,HI)\n" +
"(2,Hello)\n" + "(2,HELLO)\n" +
"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
"(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
"(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
"(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
"(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
"(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
"(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
"(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
"(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
}
case 3: {
// Mapper configured via JobConf
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.filterPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, Text>> hellos = ds.
flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
hellos.writeAsText(resultPath);
env.execute();
// return expected result
return "(2,Hello)\n" +
"(3,Hello world)\n" +
"(4,Hello world, how are you?)\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
}
}
public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
@Override
public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
if ( v.toString().contains("bananas") ) {
out.collect(k,v);
}
}
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
@Override
public void map(final IntWritable k, final Text v,
final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
out.collect(k, v);
out.collect(k, new Text(v.toString().toUpperCase()));
}
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
private String filterPrefix;
@Override
public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
throws IOException {
if(v.toString().startsWith(filterPrefix)) {
out.collect(k, v);
}
}
@Override
public void configure(JobConf c) {
filterPrefix = c.get("my.filterPrefix");
}
@Override
public void close() throws IOException { }
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class HadoopReduceCombineFunctionITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 4;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
private String expectedResult;
public HadoopReduceCombineFunctionITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}
@Override
protected void testProgram() throws Exception {
expectedResult = ReducerProgs.runProgram(curProgId, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
@Override
protected boolean skipCollectionExecution() {
if (this.curProgId == 3) {
return true;
}
return false;
}
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
return toParameterList(tConfigs);
}
public static class ReducerProgs {
public static String runProgram(int progId, String resultPath) throws Exception {
switch(progId) {
case 1: {
/*
* Test standard counting with combiner
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
private static final long serialVersionUID = 1L;
Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
@Override
public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
throws Exception {
outT.f0 = new IntWritable(v.f0.get() / 6);
outT.f1 = new IntWritable(1);
return outT;
}
});
DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
groupBy(0).
reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
new SumReducer(), new SumReducer()));
counts.writeAsText(resultPath);
env.execute();
// return expected result
return "(0,5)\n"+
"(1,6)\n" +
"(2,6)\n" +
"(3,4)\n";
}
case 2: {
/*
* Test ungrouped Hadoop reducer
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
private static final long serialVersionUID = 1L;
Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
@Override
public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
throws Exception {
outT.f0 = new IntWritable(0);
outT.f1 = v.f0;
return outT;
}
});
DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
new SumReducer(), new SumReducer()));
sum.writeAsText(resultPath);
env.execute();
// return expected result
return "(0,231)\n";
}
case 3: {
/* Test combiner */
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
private static final long serialVersionUID = 1L;
Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
@Override
public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
throws Exception {
outT.f0 = v.f0;
outT.f1 = new IntWritable(1);
return outT;
}
});
DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
groupBy(0).
reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
new SumReducer(), new KeyChangingReducer()));
counts.writeAsText(resultPath);
env.execute();
// return expected result
return "(0,5)\n"+
"(1,6)\n" +
"(2,5)\n" +
"(3,5)\n";
}
case 4: {
/*
* Test configuration via JobConf
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.cntPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
throws Exception {
v.f0 = new IntWritable(v.f0.get() % 5);
return v;
}
});
DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
groupBy(0).
reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
new ConfigurableCntReducer(), conf));
hellos.writeAsText(resultPath);
env.execute();
// return expected result
return "(0,0)\n"+
"(1,0)\n" +
"(2,1)\n" +
"(3,1)\n" +
"(4,1)\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
}
}
public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int sum = 0;
while(v.hasNext()) {
sum += v.next().get();
}
out.collect(k, new IntWritable(sum));
}
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
while(v.hasNext()) {
out.collect(new IntWritable(k.get() % 4), v.next());
}
}
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
private String countPrefix;
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while(vs.hasNext()) {
String v = vs.next().toString();
if(v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void configure(final JobConf c) {
this.countPrefix = c.get("my.cntPrefix");
}
@Override
public void close() throws IOException { }
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class HadoopReduceFunctionITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 3;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
private String expectedResult;
public HadoopReduceFunctionITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
}
@Override
protected void testProgram() throws Exception {
expectedResult = ReducerProgs.runProgram(curProgId, resultPath);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(expectedResult, resultPath);
}
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
return toParameterList(tConfigs);
}
public static class ReducerProgs {
public static String runProgram(int progId, String resultPath) throws Exception {
switch(progId) {
case 1: {
/*
* Test standard grouping
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
throws Exception {
v.f0 = new IntWritable(v.f0.get() / 5);
return v;
}
});
DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
groupBy(0).
reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer()));
commentCnts.writeAsText(resultPath);
env.execute();
// return expected result
return "(0,0)\n"+
"(1,3)\n" +
"(2,5)\n" +
"(3,5)\n" +
"(4,2)\n";
}
case 2: {
/*
* Test ungrouped Hadoop reducer
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer()));
commentCnts.writeAsText(resultPath);
env.execute();
// return expected result
return "(42,15)\n";
}
case 3: {
/*
* Test configuration via JobConf
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JobConf conf = new JobConf();
conf.set("my.cntPrefix", "Hello");
DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
throws Exception {
v.f0 = new IntWritable(v.f0.get() % 5);
return v;
}
});
DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
groupBy(0).
reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
new ConfigurableCntReducer(), conf));
helloCnts.writeAsText(resultPath);
env.execute();
// return expected result
return "(0,0)\n"+
"(1,0)\n" +
"(2,1)\n" +
"(3,1)\n" +
"(4,1)\n";
}
default:
throw new IllegalArgumentException("Invalid program id");
}
}
}
public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while(vs.hasNext()) {
String v = vs.next().toString();
if(v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while(vs.hasNext()) {
String v = vs.next().toString();
if(v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(new IntWritable(42), new IntWritable(commentCnt));
}
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
private String countPrefix;
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
while(vs.hasNext()) {
String v = vs.next().toString();
if(v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
@Override
public void configure(final JobConf c) {
this.countPrefix = c.get("my.cntPrefix");
}
@Override
public void close() throws IOException { }
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class HadoopTestData {
public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) {
List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>();
data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine.")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14")));
data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15")));
Collections.shuffle(data);
return env.fromCollection(data);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.hadoop.io.IntWritable;
import org.junit.Assert;
import org.junit.Test;
public class HadoopTupleUnwrappingIteratorTest {
@Test
public void testValueIterator() {
HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt =
new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(IntWritable.class);
// many values
ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8)));
int expectedKey = 1;
int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
valIt.set(tList.iterator());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
for(int expectedValue : expectedValues) {
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.next().get() == expectedValue);
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
}
Assert.assertFalse(valIt.hasNext());
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
// one value
tList.clear();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10)));
expectedKey = 2;
expectedValues = new int[]{10};
valIt.set(tList.iterator());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
for(int expectedValue : expectedValues) {
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.next().get() == expectedValue);
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
}
Assert.assertFalse(valIt.hasNext());
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
// more values
tList.clear();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21)));
expectedKey = 3;
expectedValues = new int[]{10,4,7,9,21};
valIt.set(tList.iterator());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
for(int expectedValue : expectedValues) {
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.next().get() == expectedValue);
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
}
Assert.assertFalse(valIt.hasNext());
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
// no has next calls
tList.clear();
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1)));
tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0)));
expectedKey = 4;
expectedValues = new int[]{5,8,42,-1,0};
valIt.set(tList.iterator());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
for(int expectedValue : expectedValues) {
Assert.assertTrue(valIt.next().get() == expectedValue);
}
try {
valIt.next();
Assert.fail();
} catch (NoSuchElementException nsee) {
// expected
}
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
}
}
......@@ -87,9 +87,6 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
protected void postSubmit() throws Exception {}
protected boolean skipCollectionExecution() {
return false;
};
// --------------------------------------------------------------------------------------------
// Test entry point
......@@ -144,12 +141,6 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
@Test
public void testJobCollectionExecution() throws Exception {
// check if collection execution should be skipped.
if(this.skipCollectionExecution()) {
return;
}
isCollectionExecution = true;
// pre-submit
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册