提交 35415f2a 编写于 作者: A Artem Tsikiridis 提交者: Fabian Hueske

[FLINK-1076] Extend Hadoop compatibility. Added wrappers for stand-alone Map,...

[FLINK-1076] Extend Hadoop compatibility. Added wrappers for stand-alone Map, Reduce, and CombinableReduce functions.
上级 53d8f1f0
......@@ -43,6 +43,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
......@@ -66,7 +67,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
private transient RecordReader<K, V> recordReader;
private transient boolean fetched = false;
private transient boolean hasNext;
public HadoopInputFormat() {
super();
}
......@@ -155,6 +156,9 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
@Override
public void open(HadoopInputSplit split) throws IOException {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopConfiguration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;
/**
* The wrapper for a Hadoop Mapper (mapred API). Parses a Hadoop JobConf object and initialises all operations related
* mappers.
*/
public final class HadoopMapFunction<KEYIN extends WritableComparable<?>, VALUEIN extends Writable,
KEYOUT extends WritableComparable<?>, VALUEOUT extends Writable>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
private transient JobConf jobConf;
private transient Class<KEYOUT> keyOutClass;
private transient Class<VALUEOUT> valOutClass;
private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
private transient Reporter reporter;
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass) {
this(hadoopMapper, keyOutClass, valOutClass, new JobConf());
}
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass, JobConf conf) {
this.mapper = hadoopMapper;
this.keyOutClass = keyOutClass;
this.valOutClass = valOutClass;
this.jobConf = conf;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.mapper.configure(jobConf);
this.reporter = new HadoopDummyReporter();
this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
}
/**
* Wrap a hadoop map() function call and use a Flink collector to collect the result values.
* @param value The input value.
* @param out The collector for emitting result values.
*
* @throws Exception
*/
@Override
public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
throws Exception {
outputCollector.setFlinkCollector(out);
mapper.map(value.f0, value.f1, outputCollector, reporter);
}
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(keyOutClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(valOutClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
* Custom serialization methods.
* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeObject(mapper.getClass());
HadoopConfiguration.writeHadoopJobConf(jobConf, out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass =
(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
mapper = InstantiationUtil.instantiate(mapperClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
}
......@@ -39,7 +39,6 @@ import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
private static final long serialVersionUID = 1L;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopConfiguration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* The wrapper for a Hadoop Reducer (mapred API). Parses a Hadoop JobConf object and initialises all operations related
* reducers and combiners.
*/
@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable<?>, VALUEIN extends Writable,
KEYOUT extends WritableComparable<?>, VALUEOUT extends Writable>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
private transient JobConf jobConf;
private transient Class<KEYOUT> keyOutClass;
private transient Class<VALUEOUT> valOutClass;
private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
private transient Reporter reporter;
private transient ReducerTransformingIterator iterator;
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass) {
this(hadoopReducer, hadoopCombiner, keyOutClass, valOutClass, new JobConf());
}
public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass, JobConf conf) {
this.reducer = hadoopReducer;
this.combiner = hadoopCombiner;
this.keyOutClass = keyOutClass;
this.valOutClass = valOutClass;
this.jobConf = new JobConf();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reporter = new HadoopDummyReporter();
this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
this.iterator = new ReducerTransformingIterator();
}
/**
* A wrapping iterator for an iterator of key-value pairs that can be used as an iterator of values.
*/
private final class ReducerTransformingIterator extends TupleUnwrappingIterator<VALUEIN,KEYIN>
implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private Iterator<Tuple2<KEYIN,VALUEIN>> iterator;
private KEYIN key;
private Tuple2<KEYIN,VALUEIN> first;
/**
* Set the iterator to wrap.
* @param iterator iterator to wrap
*/
@Override()
public void set(final Iterator<Tuple2<KEYIN,VALUEIN>> iterator) {
this.iterator = iterator;
if(this.hasNext()) {
this.first = iterator.next();
this.key = this.first.f0;
}
}
@Override
public boolean hasNext() {
if(this.first != null) {
return true;
}
return iterator.hasNext();
}
@Override
public VALUEIN next() {
if(this.first != null) {
final VALUEIN val = this.first.f1;
this.first = null;
return val;
}
final Tuple2<KEYIN,VALUEIN> tuple = iterator.next();
return tuple.f1;
}
private KEYIN getKey() {
return WritableUtils.clone(this.key, jobConf);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
* Wrap a hadoop reduce() function call and use a Flink collector to collect the result values.
* @param values The iterator returning the group of values to be reduced.
* @param out The collector to emit the returned values.
*
* @throws Exception
*/
@Override
public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
throws Exception {
reduceCollector.setFlinkCollector(out);
iterator.set(values.iterator());
reducer.reduce(iterator.getKey(), iterator, reduceCollector, reporter);
}
/**
* Wrap a hadoop combine() function call and use a Flink collector to collect the result values.
* @param values The iterator returning the group of values to be reduced.
* @param out The collector to emit the returned values.
*
* @throws Exception
*/
@Override
public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
if (this.combiner == null) {
throw new RuntimeException("No combiner has been specified in Hadoop job. Flink reduce function is" +
"declared combinable. Invalid behaviour."); //This should not happen.
}
else {
combineCollector.setFlinkCollector(out);
iterator.set(values.iterator());
combiner.reduce(iterator.getKey(), iterator, combineCollector, reporter);
}
}
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(keyOutClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(valOutClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
* Custom serialization methods.
* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeObject(reducer.getClass());
out.writeObject(combiner.getClass());
HadoopConfiguration.writeHadoopJobConf(jobConf, out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
combiner = InstantiationUtil.instantiate(combinerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopConfiguration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* The wrapper for a Hadoop Reducer (mapred API). Parses a Hadoop JobConf object and initialises all operations related
* reducers and combiners.
*/
public final class HadoopReduceFunction<KEYIN extends WritableComparable<?>, VALUEIN extends Writable,
KEYOUT extends WritableComparable<?>, VALUEOUT extends Writable>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
private static final long serialVersionUID = 1L;
private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
private transient Class<KEYOUT> keyOutClass;
private transient Class<VALUEOUT> valOutClass;
private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
private transient Reporter reporter;
private transient ReducerTransformingIterator iterator;
private JobConf jobConf;
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass) {
this(hadoopReducer, keyOutClass, valOutClass, new JobConf());
}
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass, JobConf conf) {
this.reducer = hadoopReducer;
this.keyOutClass = keyOutClass;
this.valOutClass = valOutClass;
this.jobConf = new JobConf();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.reporter = new HadoopDummyReporter();
this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
this.iterator = new ReducerTransformingIterator();
}
/**
* A wrapping iterator for an iterator of key-value pairs that can be used as an iterator of values.
*/
private final class ReducerTransformingIterator extends TupleUnwrappingIterator<VALUEIN,KEYIN>
implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private Iterator<Tuple2<KEYIN,VALUEIN>> iterator;
private KEYIN key;
private Tuple2<KEYIN,VALUEIN> first;
/**
* Set the iterator to wrap.
* @param iterator iterator to wrap
*/
@Override()
public void set(final Iterator<Tuple2<KEYIN,VALUEIN>> iterator) {
this.iterator = iterator;
if(this.hasNext()) {
this.first = iterator.next();
this.key = this.first.f0;
}
}
@Override
public boolean hasNext() {
if(this.first != null) {
return true;
}
return iterator.hasNext();
}
@Override
public VALUEIN next() {
if(this.first != null) {
final VALUEIN val = this.first.f1;
this.first = null;
return val;
}
final Tuple2<KEYIN,VALUEIN> tuple = iterator.next();
return tuple.f1;
}
private KEYIN getKey() {
return WritableUtils.clone(this.key, jobConf);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
* Wrap a hadoop reduce() function call and use a Flink collector to collect the result values.
* @param values The iterator returning the group of values to be reduced.
* @param out The collector to emit the returned values.
*
* @throws Exception
*/
@Override
public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
throws Exception {
reduceCollector.setFlinkCollector(out);
iterator.set(values.iterator());
reducer.reduce(iterator.getKey(), iterator, reduceCollector, reporter);
}
@Override
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(keyOutClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(valOutClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
/**
* Custom serialization methods.
* @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
*/
private void writeObject(final ObjectOutputStream out) throws IOException {
out.writeObject(reducer.getClass());
HadoopConfiguration.writeHadoopJobConf(jobConf, out);
}
@SuppressWarnings("unchecked")
private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass =
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
reducer = InstantiationUtil.instantiate(reducerClass);
jobConf = new JobConf();
jobConf.readFields(in);
}
}
......@@ -18,22 +18,26 @@
package org.apache.flink.hadoopcompatibility.mapred.example;
import org.apache.flink.api.common.functions.RichMapFunction;
import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
import org.apache.flink.util.Collector;
......@@ -44,9 +48,9 @@ import org.apache.flink.util.Collector;
* This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
* common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
*/
@SuppressWarnings("serial")
public class WordCount {
public class HadoopMapredCompatWordCount {
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
......@@ -57,7 +61,6 @@ public class WordCount {
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
......@@ -66,55 +69,66 @@ public class WordCount {
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction(new Tokenizer(), Text.class, LongWritable.class))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction(new Counter(), new Counter(), Text.class, LongWritable.class));
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), new JobConf());
HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
// Output & Execute
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
words.output(hadoopOutputFormat);
env.execute("Hadoop Compat WordCount");
}
/**
* Splits a line into words and converts Hadoop Writables into normal Java data types.
*/
public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> {
@Override
public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
// normalize and split the line
String line = value.f1.toString();
String line = v.toString();
String[] tokens = line.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
out.collect(new Text(token), new LongWritable(1l));
}
}
}
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
/**
* Converts Java data types to Hadoop Writables.
*/
public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
long cnt = 0;
while(vs.hasNext()) {
cnt += vs.next().get();
}
out.collect(k, new LongWritable(cnt));
}
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
}
......@@ -52,10 +52,10 @@ import org.apache.hadoop.mapred.TextInputFormat;
*
* <br /><br />
*
* <b>Note</b>: This example uses the out dated Record API.
* <b>Note</b>: This example uses the out-dated Record API.
* It is recommended to use the new Java API.
*
* @see org.apache.flink.hadoopcompatibility.mapred.example.WordCount
* @see org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount
*/
public class WordCount implements Program, ProgramDescription {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred.utils;
import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;
/**
* Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
*/
public class HadoopConfiguration {
public static void mergeHadoopConf(JobConf jobConf) {
org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
for (Map.Entry<String, String> e : hadoopConf) {
jobConf.set(e.getKey(), e.getValue());
}
}
/**
* Each task should gets its own jobConf object, when serializing.
* @param jobConf the jobConf to write
* @param out the outputstream to write to
* @throws IOException
*/
public static void writeHadoopJobConf(final JobConf jobConf, final ObjectOutputStream out) throws IOException{
final JobConf clonedConf = WritableUtils.clone(jobConf, new Configuration());
clonedConf.write(out);
}
}
\ No newline at end of file
......@@ -29,7 +29,6 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
public class HadoopUtils {
/**
......
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import org.apache.hadoop.util.Progressable;
......@@ -26,8 +25,9 @@ import org.apache.hadoop.util.Progressable;
*
*/
public class HadoopDummyProgressable implements Progressable {
@Override
public void progress() {
public void progress() {
// Nothing reported here.
}
}
......@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.mapred.JobConf;
......@@ -35,33 +36,31 @@ public class HadoopInputSplit implements InputSplit {
private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
@SuppressWarnings("unused")
private JobConf jobConf;
private int splitNumber;
private String hadoopInputSplitTypeName;
public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
return hadoopInputSplit;
}
public HadoopInputSplit() {
super();
}
public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
this.hadoopInputSplit = hInputSplit;
this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
this.jobConf = jobconf;
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(splitNumber);
out.writeUTF(hadoopInputSplitTypeName);
jobConf.write(out);
hadoopInputSplit.write(out);
}
......@@ -71,7 +70,7 @@ public class HadoopInputSplit implements InputSplit {
this.hadoopInputSplitTypeName = in.readUTF();
if(hadoopInputSplit == null) {
try {
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
}
......@@ -79,7 +78,13 @@ public class HadoopInputSplit implements InputSplit {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
jobConf = new JobConf();
jobConf.readFields(in);
if (this.hadoopInputSplit instanceof Configurable) {
((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
}
this.hadoopInputSplit.readFields(in);
}
@Override
......@@ -90,7 +95,7 @@ public class HadoopInputSplit implements InputSplit {
public void setSplitNumber(int splitNumber) {
this.splitNumber = splitNumber;
}
public void setHadoopInputSplit(
org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
this.hadoopInputSplit = hadoopInputSplit;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.IOException;
/**
* A Hadoop OutputCollector that basically wraps a Flink OutputCollector.
* This implies that on each call of collect() the data is actually collected.
*/
public final class HadoopOutputCollector<KEY extends WritableComparable<?>, VALUE extends Writable>
implements OutputCollector<KEY,VALUE> {
private Collector<Tuple2<KEY,VALUE>> flinkCollector;
private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
this.flinkCollector = flinkCollector;
}
/**
* Use the wrapped Flink collector to collect a key-value pair for Flink.
*
* @param key the key to collect
* @param val the value to collect
* @throws IOException unexpected of key or value in key-value pair.
*/
@Override
public void collect(final KEY key, final VALUE val) throws IOException {
this.outTuple.f0 = key;
this.outTuple.f1 = val;
this.flinkCollector.collect(outTuple);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.Arrays;
public abstract class HadoopTestBase extends JavaProgramTestBase {
/**
* Hadoop tests should not sort the result.
*/
public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
ArrayList<String> list = new ArrayList<String>();
readAllResultLines(list, resultPath, false);
String[] result = list.toArray(new String[list.size()]);
String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
Arrays.sort(expected);
Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
Assert.assertArrayEquals(expected, result);
}
}
......@@ -18,22 +18,22 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
public class HadoopInputOutputITCase extends JavaProgramTestBase {
public class HadoopMapredITCase extends JavaProgramTestBase {
protected String textPath;
protected String resultPath;
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
this.setDegreeOfParallelism(1);
}
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
......@@ -41,6 +41,7 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
WordCount.main(new String[] { textPath, resultPath });
HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册