提交 2d17a6cd 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] StreamRecord serialization improvement

上级 53e54a60
......@@ -1153,55 +1153,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return basicTypes;
}
/**
* Writes tuple to the specified DataOutput
*
* @param tuple
* Tuple to be written
* @param out
* Output chosen
* @throws IOException
*/
private void writeTuple(Tuple tuple, DataOutput out) throws IOException {
// TODO: exception for empty record - no getField
byte[] typesInByte = tupleTypesToByteArray(getTuple());
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(
tupleTypesFromByteArray(typesInByte));
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(
tupleSerializer);
serializationDelegate.setInstance(tuple);
out.write(typesInByte);
serializationDelegate.write(out);
}
/**
* Reads a tuple from the specified DataInput
*
* @param in
* Input chosen
* @return Tuple read
* @throws IOException
*/
private Tuple readTuple(DataInput in, int numberOfFields) throws IOException {
byte[] typesInByte = new byte[numberOfFields];
in.readFully(typesInByte, 0, numberOfFields);
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(
tupleTypesFromByteArray(typesInByte));
TupleSerializer<Tuple> tupleSerializer = typeInfo.createSerializer();
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
return dd.getInstance();
}
/**
* Write method definition for the IOReadableWritable interface
*/
......@@ -1213,8 +1164,19 @@ public class StreamRecord implements IOReadableWritable, Serializable {
out.writeByte(numOfFields);
out.writeInt(numOfTuples);
byte[] typesInByte = tupleTypesToByteArray(getTuple());
out.write(typesInByte);
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(
tupleTypesFromByteArray(typesInByte));
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(
tupleSerializer);
for (Tuple tuple : tupleBatch) {
writeTuple(tuple, out);
serializationDelegate.setInstance(tuple);
serializationDelegate.write(out);
}
}
......@@ -1231,8 +1193,19 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch = new ArrayList<Tuple>(numOfTuples);
byte[] typesInByte = new byte[numOfFields];
in.readFully(typesInByte, 0, numOfFields);
TupleTypeInfo<Tuple> typeInfo = new TupleTypeInfo<Tuple>(
tupleTypesFromByteArray(typesInByte));
TupleSerializer<Tuple> tupleSerializer = typeInfo.createSerializer();
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
for (int k = 0; k < numOfTuples; ++k) {
tupleBatch.add(readTuple(in, numOfFields));
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
tupleBatch.add(dd.getInstance());
}
}
......
......@@ -264,8 +264,12 @@ public class StreamRecordTest {
int num = 42;
String str = "above clouds";
Integer[] intArray = new Integer[] { 1, 2 };
StreamRecord rec = new StreamRecord(new Tuple3<Integer, String, Integer[]>(num, str,
intArray));
Tuple3<Integer, String, Integer[]> tuple1 = new Tuple3<Integer, String, Integer[]>(num,
str, intArray);
Tuple3<Integer, String, Integer[]> tuple2 = new Tuple3<Integer, String, Integer[]>(1, "",
new Integer[] { 1, 2 });
StreamRecord rec = new StreamRecord(tuple1);
rec.addTuple(tuple2);
try {
rec.write(out);
......@@ -273,13 +277,24 @@ public class StreamRecordTest {
StreamRecord newRec = new StreamRecord();
newRec.read(in);
assertEquals(2, newRec.getNumOfTuples());
@SuppressWarnings("unchecked")
Tuple3<Integer, String, Integer[]> tupleOut = (Tuple3<Integer, String, Integer[]>) newRec
Tuple3<Integer, String, Integer[]> tupleOut1 = (Tuple3<Integer, String, Integer[]>) newRec
.getTuple(0);
assertEquals(tupleOut.getField(0), 42);
assertEquals(str, tupleOut.getField(1));
assertArrayEquals(intArray, (Integer[]) tupleOut.getField(2));
assertEquals(tupleOut1.getField(0), 42);
assertEquals(str, tupleOut1.getField(1));
assertArrayEquals(intArray, (Integer[]) tupleOut1.getField(2));
@SuppressWarnings("unchecked")
Tuple3<Integer, String, Integer[]> tupleOut2 = (Tuple3<Integer, String, Integer[]>) newRec
.getTuple(1);
assertEquals(tupleOut2.getField(0), 1);
assertEquals("", tupleOut2.getField(1));
assertArrayEquals(new Integer[] { 1, 2 }, (Integer[]) tupleOut2.getField(2));
} catch (IOException e) {
fail();
e.printStackTrace();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册