提交 13968cd4 编写于 作者: R Robert Metzger 提交者: Stephan Ewen

[FLINK-1305] [FLINK-1304] Test for HadoopInputWrapper and NullWritable support

This closes #252
上级 de7f478f
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.hadoopcompatibility.mapred;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
@RunWith(Parameterized.class)
public class HadoopIOFormatsITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 2;
private int curProgId = config.getInteger("ProgramId", -1);
private String[] resultPath;
private String[] expectedResult;
private String sequenceFileInPath;
private String sequenceFileInPathNull;
public HadoopIOFormatsITCase(Configuration config) {
super(config);
}
@Override
protected void preSubmit() throws Exception {
resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
File sequenceFile = createAndRegisterTempFile("seqFile");
sequenceFileInPath = sequenceFile.toURI().toString();
// Create a sequence file
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
Path path = new Path(sequenceFile.getAbsolutePath());
// ------------------ Long / Text Key Value pair: ------------
int kvCount = 4;
LongWritable key = new LongWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < kvCount; i ++) {
if(i == 1) {
// write key = 0 a bit more often.
for(int a = 0;a < 15; a++) {
key.set(i);
value.set(i+" - somestring");
writer.append(key, value);
}
}
key.set(i);
value.set(i+" - somestring");
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
// ------------------ Long / Text Key Value pair: ------------
File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
sequenceFileInPathNull = sequenceFileNull.toURI().toString();
path = new Path(sequenceFileInPathNull);
LongWritable value1 = new LongWritable();
SequenceFile.Writer writer1 = null;
try {
writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
for (int i = 0; i < kvCount; i ++) {
value1.set(i);
writer1.append(NullWritable.get(), value1);
}
} finally {
IOUtils.closeStream(writer1);
}
}
@Override
protected void testProgram() throws Exception {
expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
}
@Override
protected void postSubmit() throws Exception {
for(int i = 0; i < resultPath.length; i++) {
compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
}
}
@Parameters
public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
for(int i=1; i <= NUM_PROGRAMS; i++) {
Configuration config = new Configuration();
config.setInteger("ProgramId", i);
tConfigs.add(config);
}
return toParameterList(tConfigs);
}
public static class HadoopIOFormatPrograms {
public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
switch(progId) {
case 1: {
/**
* Test sequence file, including a key access.
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
JobConf hdconf = new JobConf();
SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
@Override
public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
return new Tuple2<Long, Text>(value.f0.get(), value.f1);
}
}).sum(0);
sumed.writeAsText(resultPath[0]);
DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
@Override
public String map(Tuple2<LongWritable, Text> value) throws Exception {
return value.f1 + " - " + value.f0.get();
}
});
res.writeAsText(resultPath[1]);
env.execute();
// return expected result
return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
"1 - somestring - 1\n" +
"2 - somestring - 2\n" +
"3 - somestring - 3\n"};
}
case 2: {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
JobConf hdconf = new JobConf();
SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
@Override
public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
return new Tuple2<Void, Long>(null, value.f1.get());
}
});
DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
res1.writeAsText(resultPath[1]);
res.writeAsText(resultPath[0]);
env.execute();
// return expected result
return new String [] {"(null,2)\n" +
"(null,0)\n" +
"(null,1)\n" +
"(null,3)",
"(null,0)\n" +
"(null,1)\n" +
"(null,2)\n" +
"(null,3)"};
}
default:
throw new IllegalArgumentException("Invalid program id");
}
}
}
}
......@@ -86,4 +86,39 @@ under the License.
</plugin>
</plugins>
</build>
<!-- See main pom.xml for explanation of profiles -->
<profiles>
<profile>
<id>hadoop-1</id>
<activation>
<property>
<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
<!--hadoop1--><name>hadoop.profile</name><value>1</value>
</property>
</activation>
<dependencies>
<!-- "Old" Hadoop = MapReduce v1 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-2</id>
<activation>
<property>
<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
<!--hadoop2--><name>!hadoop.profile</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
......@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import com.esotericsoftware.kryo.Kryo;
......@@ -44,6 +45,9 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
@Override
public T createInstance() {
if(typeClass == NullWritable.class) {
return (T) NullWritable.get();
}
return InstantiationUtil.instantiate(typeClass);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// ================================================================================================
// !!! NOTICE !!!
//
// This interface has been directly copied from the Apache Hadoop project.
// It has been added to this project to allow compiling against the type "Writable"
// without adding the heavyweight Hadoop dependency. This keeps the project dependencies
// lightweight.
//
// At runtime, the JVM will load either this interface, or the interface from a Hadoop jar,
// if present. In both cases, the dynamic class loading, linking, and method lookup will
// allow the types to interoperate as long as package name, class name, and method signature
// of this interface are kept strictly in sync with the version packaged with Hadoop.
//
// This is a core interface of the Hadoop project and has been stable across all releases.
//
// ================================================================================================
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
/**
* A serializable object which implements a simple, efficient, serialization
* protocol, based on {@link DataInput} and {@link DataOutput}.
*
* <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
* framework implements this interface.</p>
*
* <p>Implementations typically implement a static <code>read(DataInput)</code>
* method which constructs a new instance, calls {@link #readFields(DataInput)}
* and returns the instance.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class MyWritable implements Writable {
* // Some data
* private int counter;
* private long timestamp;
*
* // Default constructor to allow (de)serialization
* MyWritable() { }
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public static MyWritable read(DataInput in) throws IOException {
* MyWritable w = new MyWritable();
* w.readFields(in);
* return w;
* }
* }
* </pre></blockquote></p>
*/
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out
* <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>
* For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.
* </p>
*
* @param in
* <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册