提交 ba7a19c1 编写于 作者: F FelixNeutatz 提交者: Fabian Hueske

[FLINK-1271] [hadoop] Remove Writable limitation from Hadoop format and function wrappers

This closes #287
上级 d62ab475
......@@ -25,6 +25,7 @@ import java.io.ObjectOutputStream;
import java.util.ArrayList;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.InputFormat;
......@@ -34,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
......@@ -44,13 +44,12 @@ import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.ReflectionUtils;
public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
private static final long serialVersionUID = 1L;
......@@ -293,6 +292,6 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
@Override
public TypeInformation<Tuple2<K,V>> getProducedType() {
return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
}
}
......@@ -29,14 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;
......@@ -45,8 +42,7 @@ import org.apache.hadoop.mapred.Reporter;
* This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
*/
@SuppressWarnings("rawtypes")
public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
......@@ -108,8 +104,8 @@ public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN e
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
......
......@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
......@@ -41,7 +40,7 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster {
private static final long serialVersionUID = 1L;
......
......@@ -29,15 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
......@@ -47,8 +44,7 @@ import org.apache.hadoop.mapred.Reporter;
*/
@SuppressWarnings("rawtypes")
@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
......@@ -132,9 +128,9 @@ public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable,
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
......
......@@ -29,15 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
......@@ -46,8 +43,7 @@ import org.apache.hadoop.mapred.Reporter;
* This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
......@@ -113,9 +109,9 @@ public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEI
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
}
......
......@@ -20,8 +20,6 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.IOException;
......@@ -32,7 +30,7 @@ import java.io.IOException;
*
*/
@SuppressWarnings("rawtypes")
public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE extends Writable>
public final class HadoopOutputCollector<KEY,VALUE>
implements OutputCollector<KEY,VALUE> {
private Collector<Tuple2<KEY,VALUE>> flinkCollector;
......@@ -63,4 +61,4 @@ public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE e
this.flinkCollector.collect(outTuple);
}
}
\ No newline at end of file
}
......@@ -20,32 +20,30 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper;
import java.util.Iterator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
/**
* Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
*/
@SuppressWarnings("rawtypes")
public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, VALUE extends Writable>
public class HadoopTupleUnwrappingIterator<KEY,VALUE>
extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private Iterator<Tuple2<KEY,VALUE>> iterator;
private final WritableSerializer<KEY> keySerializer;
private final TypeSerializer<KEY> keySerializer;
private boolean atFirst = false;
private KEY curKey = null;
private VALUE firstValue = null;
public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
this.keySerializer = new WritableSerializer<KEY>(keyClass);
this.keySerializer = TypeExtractor.getForClass((Class<KEY>) keyClass).createSerializer();
}
/**
......
......@@ -35,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
......@@ -43,7 +42,6 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
......@@ -51,26 +49,27 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.flink.api.java.typeutils.TypeExtractor;
public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
private Class<K> keyClass;
private Class<V> valueClass;
private org.apache.hadoop.conf.Configuration configuration;
private transient RecordReader<K, V> recordReader;
private boolean fetched = false;
private boolean hasNext;
public HadoopInputFormat() {
super();
}
public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
super();
this.mapreduceInputFormat = mapreduceInputFormat;
......@@ -79,46 +78,46 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
this.configuration = job.getConfiguration();
HadoopUtils.mergeHadoopConf(configuration);
}
public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
this.configuration = configuration;
}
public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
return this.mapreduceInputFormat;
}
public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
this.mapreduceInputFormat = mapreduceInputFormat;
}
public org.apache.hadoop.conf.Configuration getConfiguration() {
return this.configuration;
}
// --------------------------------------------------------------------------------------------
// InputFormat
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
// nothing to do
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if(!(mapreduceInputFormat instanceof FileInputFormat)) {
return null;
}
JobContext jobContext = null;
try {
jobContext = HadoopUtils.instantiateJobContext(configuration, null);
} catch (Exception e) {
throw new RuntimeException(e);
}
final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
......@@ -127,7 +126,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
......@@ -140,19 +139,19 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
// no statistics available
return null;
}
@Override
public HadoopInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
JobContext jobContext = null;
try {
jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
List<org.apache.hadoop.mapreduce.InputSplit> splits;
try {
splits = this.mapreduceInputFormat.getSplits(jobContext);
......@@ -160,18 +159,18 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
throw new IOException("Could not get Splits.", e);
}
HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
for(int i = 0; i < hadoopInputSplits.length; i++){
hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
}
return hadoopInputSplits;
}
@Override
public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
return new LocatableInputSplitAssigner(inputSplits);
}
@Override
public void open(HadoopInputSplit split) throws IOException {
TaskAttemptContext context = null;
......@@ -180,7 +179,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
} catch(Exception e) {
throw new RuntimeException(e);
}
try {
this.recordReader = this.mapreduceInputFormat
.createRecordReader(split.getHadoopInputSplit(), context);
......@@ -191,7 +190,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
this.fetched = false;
}
}
@Override
public boolean reachedEnd() throws IOException {
if(!this.fetched) {
......@@ -199,7 +198,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
}
return !this.hasNext;
}
private void fetchNext() throws IOException {
try {
this.hasNext = this.recordReader.nextKeyValue();
......@@ -209,7 +208,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
this.fetched = true;
}
}
@Override
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
if(!this.fetched) {
......@@ -225,38 +224,38 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
throw new IOException("Could not get KeyValue pair.", e);
}
this.fetched = false;
return record;
}
@Override
public void close() throws IOException {
this.recordReader.close();
}
// --------------------------------------------------------------------------------------------
// Helper methods
// --------------------------------------------------------------------------------------------
private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
ArrayList<FileStatus> files) throws IOException {
private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
ArrayList<FileStatus> files) throws IOException {
long latestModTime = 0L;
// get the file info and check whether the cached statistics are still valid.
for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
final Path filePath = new Path(hadoopPath.toUri());
final FileSystem fs = FileSystem.get(filePath.toUri());
final FileStatus file = fs.getFileStatus(filePath);
latestModTime = Math.max(latestModTime, file.getModificationTime());
// enumerate all files and check their modification time stamp.
if (file.isDir()) {
FileStatus[] fss = fs.listStatus(filePath);
files.ensureCapacity(files.size() + fss.length);
for (FileStatus s : fss) {
if (!s.isDir()) {
files.add(s);
......@@ -267,50 +266,50 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
files.add(file);
}
}
// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
}
// calculate the whole length
long len = 0;
for (FileStatus s : files) {
len += s.getLen();
}
// sanity check
if (len <= 0) {
len = BaseStatistics.SIZE_UNKNOWN;
}
return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
}
// --------------------------------------------------------------------------------------------
// Custom serialization methods
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeUTF(this.mapreduceInputFormat.getClass().getName());
out.writeUTF(this.keyClass.getName());
out.writeUTF(this.valueClass.getName());
this.configuration.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
String hadoopInputFormatClassName = in.readUTF();
String keyClassName = in.readUTF();
String valueClassName = in.readUTF();
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.readFields(in);
if(this.configuration == null) {
this.configuration = configuration;
}
try {
this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
} catch (Exception e) {
......@@ -327,13 +326,13 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
throw new RuntimeException("Unable to find value class.", e);
}
}
// --------------------------------------------------------------------------------------------
// ResultTypeQueryable
// --------------------------------------------------------------------------------------------
@Override
public TypeInformation<Tuple2<K,V>> getProducedType() {
return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
}
}
......@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
......@@ -40,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster {
private static final long serialVersionUID = 1L;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapred.FileInputFormat;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.fail;
public class HadoopInputFormatTest {
public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
public DummyVoidKeyInputFormat() {
}
@Override
public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
return null;
}
}
@Test
public void checkTypeInformation() {
try {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf());
TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
if(tupleType.isTupleType()) {
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
fail("Tuple type information was not set correctly!");
}
} else {
fail("Type information was not set to tuple type information!");
}
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapreduce;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.fail;
public class HadoopInputFormatTest {
public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
public DummyVoidKeyInputFormat() {
}
@Override
public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return null;
}
}
@Test
public void checkTypeInformation() {
try {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job);
TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
if(tupleType.isTupleType()) {
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
fail("Tuple type information was not set correctly!");
}
} else {
fail("Type information was not set to tuple type information!");
}
}
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册