提交 05c5b48a 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Tuple serialization added

上级 34d53c2a
...@@ -19,11 +19,25 @@ import eu.stratosphere.api.java.typeutils.TypeInformation; ...@@ -19,11 +19,25 @@ import eu.stratosphere.api.java.typeutils.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate; import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate; import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.types.StringValue;
public class TupleTest { public class TupleTest {
public Tuple readTuple(DataInput in, Class... basicTypes) public Tuple readTuple(DataInput in) throws IOException {
throws IOException {
StringValue typeVal = new StringValue();
typeVal.read(in);
// TODO: use Tokenizer
String[] types = typeVal.getValue().split(",");
Class[] basicTypes = new Class[types.length];
for (int i = 0; i < types.length; i++) {
try {
basicTypes[i] = Class.forName(types[i]);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes); .getBasicTupleTypeInfo(basicTypes);
...@@ -39,12 +53,17 @@ public class TupleTest { ...@@ -39,12 +53,17 @@ public class TupleTest {
private void writeTuple(Tuple tuple, DataOutput out) { private void writeTuple(Tuple tuple, DataOutput out) {
Class[] basicTypes = new Class[tuple.getArity()]; Class[] basicTypes = new Class[tuple.getArity()];
StringBuilder basicTypeNames = new StringBuilder();
for (int i = 0; i < basicTypes.length; i++) { for (int i = 0; i < basicTypes.length; i++) {
basicTypes[i] = tuple.getField(i).getClass(); basicTypes[i] = tuple.getField(i).getClass();
basicTypeNames.append(basicTypes[i].getName() + ",");
} }
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes); .getBasicTupleTypeInfo(basicTypes);
StringValue typeVal = new StringValue(basicTypeNames.toString());
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer(); .createSerializer();
...@@ -52,6 +71,7 @@ public class TupleTest { ...@@ -52,6 +71,7 @@ public class TupleTest {
tupleSerializer); tupleSerializer);
serializationDelegate.setInstance(tuple); serializationDelegate.setInstance(tuple);
try { try {
typeVal.write(out);
serializationDelegate.write(out); serializationDelegate.write(out);
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
...@@ -76,7 +96,7 @@ public class TupleTest { ...@@ -76,7 +96,7 @@ public class TupleTest {
buff.toByteArray())); buff.toByteArray()));
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) readTuple( Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) readTuple(
in, Integer.class, String.class); in);
assertEquals(tupleOut.getField(0), 42); assertEquals(tupleOut.getField(0), 42);
} catch (IOException e) { } catch (IOException e) {
fail(); fail();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册