提交 b59e9b53 编写于 作者: S StephanEwen

Add modifications to hadoop compatibility package as required for parquet testing.

上级 0685ca1f
...@@ -60,9 +60,11 @@ public class HadoopInputSplitWrapper implements InputSplit { ...@@ -60,9 +60,11 @@ public class HadoopInputSplitWrapper implements InputSplit {
this.hadoopInputSplitTypeName = in.readUTF(); this.hadoopInputSplitTypeName = in.readUTF();
if(hadoopInputSplit == null) { if(hadoopInputSplit == null) {
try { try {
Class inputSplit = Class.forName(hadoopInputSplitTypeName ); Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
} catch (Exception e) { }
catch (Exception e) {
throw new RuntimeException("Unable to create InputSplit", e); throw new RuntimeException("Unable to create InputSplit", e);
} }
} }
......
...@@ -19,6 +19,7 @@ import org.apache.hadoop.io.DoubleWritable; ...@@ -19,6 +19,7 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import eu.stratosphere.types.BooleanValue; import eu.stratosphere.types.BooleanValue;
...@@ -27,6 +28,7 @@ import eu.stratosphere.types.DoubleValue; ...@@ -27,6 +28,7 @@ import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.FloatValue; import eu.stratosphere.types.FloatValue;
import eu.stratosphere.types.IntValue; import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue; import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.NullValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value; import eu.stratosphere.types.Value;
...@@ -35,18 +37,17 @@ import eu.stratosphere.types.Value; ...@@ -35,18 +37,17 @@ import eu.stratosphere.types.Value;
/** /**
* Converter for the default hadoop writables. * Converter for the default hadoop writables.
* Key will be in field 0, Value in field 1 of a Stratosphere Record. * Key will be in field 0, Value in field 1 of a Stratosphere Record.
*
*/ */
public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> { public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@Override @Override
public void convert(Record stratosphereRecord, K hadoopKey, V hadoopValue) { public void convert(Record stratosphereRecord, K hadoopKey, V hadoopValue) {
stratosphereRecord.setField(0, convert(hadoopKey)); stratosphereRecord.setField(0, convert(hadoopKey));
stratosphereRecord.setField(1, convert(hadoopValue)); stratosphereRecord.setField(1, convert(hadoopValue));
} }
private Value convert(Object hadoopType) { protected Value convert(Object hadoopType) {
if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) { if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) {
return new LongValue(((LongWritable)hadoopType).get()); return new LongValue(((LongWritable)hadoopType).get());
} }
...@@ -68,6 +69,9 @@ public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, ...@@ -68,6 +69,9 @@ public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K,
if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) { if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) {
return new ByteValue(((ByteWritable)hadoopType).get()); return new ByteValue(((ByteWritable)hadoopType).get());
} }
if (hadoopType instanceof NullWritable) {
return NullValue.getInstance();
}
throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to Stratosphere."); throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to Stratosphere.");
} }
......
...@@ -54,6 +54,7 @@ public class DefaultStratosphereTypeConverter<K,V> implements StratosphereTypeCo ...@@ -54,6 +54,7 @@ public class DefaultStratosphereTypeConverter<K,V> implements StratosphereTypeCo
return convert(stratosphereRecord, 1, this.valueClass); return convert(stratosphereRecord, 1, this.valueClass);
} }
@SuppressWarnings("unchecked")
private<T> T convert(Record stratosphereType, int pos, Class<T> hadoopType) { private<T> T convert(Record stratosphereType, int pos, Class<T> hadoopType) {
if(hadoopType == LongWritable.class ) { if(hadoopType == LongWritable.class ) {
return (T) new LongWritable((stratosphereType.getField(pos, LongValue.class)).getValue()); return (T) new LongWritable((stratosphereType.getField(pos, LongValue.class)).getValue());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册