From b59e9b534542cd73d1ab274b68b8368dec73adf4 Mon Sep 17 00:00:00 2001 From: StephanEwen Date: Tue, 6 May 2014 16:52:40 +0200 Subject: [PATCH] Add modifications to hadoop compatibility package as required for parquet testing. --- .../hadoopcompatibility/HadoopInputSplitWrapper.java | 6 ++++-- .../datatypes/DefaultHadoopTypeConverter.java | 12 ++++++++---- .../datatypes/DefaultStratosphereTypeConverter.java | 1 + 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java index 97e089d2a92..f92dcc36f4b 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java @@ -60,9 +60,11 @@ public class HadoopInputSplitWrapper implements InputSplit { this.hadoopInputSplitTypeName = in.readUTF(); if(hadoopInputSplit == null) { try { - Class inputSplit = Class.forName(hadoopInputSplitTypeName ); + Class inputSplit = + Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException("Unable to create InputSplit", e); } } diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java index e6f950bd533..42f81de72c5 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import eu.stratosphere.types.BooleanValue; @@ -27,6 +28,7 @@ import eu.stratosphere.types.DoubleValue; import eu.stratosphere.types.FloatValue; import eu.stratosphere.types.IntValue; import eu.stratosphere.types.LongValue; +import eu.stratosphere.types.NullValue; import eu.stratosphere.types.Record; import eu.stratosphere.types.StringValue; import eu.stratosphere.types.Value; @@ -35,18 +37,17 @@ import eu.stratosphere.types.Value; /** * Converter for the default hadoop writables. * Key will be in field 0, Value in field 1 of a Stratosphere Record. - * */ public class DefaultHadoopTypeConverter implements HadoopTypeConverter { private static final long serialVersionUID = 1L; @Override public void convert(Record stratosphereRecord, K hadoopKey, V hadoopValue) { - stratosphereRecord.setField(0, convert(hadoopKey)); - stratosphereRecord.setField(1, convert(hadoopValue)); + stratosphereRecord.setField(0, convert(hadoopKey)); + stratosphereRecord.setField(1, convert(hadoopValue)); } - private Value convert(Object hadoopType) { + protected Value convert(Object hadoopType) { if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) { return new LongValue(((LongWritable)hadoopType).get()); } @@ -68,6 +69,9 @@ public class DefaultHadoopTypeConverter implements HadoopTypeConverter implements StratosphereTypeCo return convert(stratosphereRecord, 1, this.valueClass); } + @SuppressWarnings("unchecked") private T convert(Record stratosphereType, int pos, Class hadoopType) { if(hadoopType == LongWritable.class ) { return (T) new LongWritable((stratosphereType.getField(pos, LongValue.class)).getValue()); -- GitLab