From 2d17a6cda14199e0efdbce8b22dcc5249a14ac57 Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:09 +0200 Subject: [PATCH] [streaming] StreamRecord serialization improvement --- .../api/streamrecord/StreamRecord.java | 75 ++++++------------- .../api/streamrecord/StreamRecordTest.java | 27 +++++-- 2 files changed, 45 insertions(+), 57 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 03db03b54d9..ebeb81dd3f2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -1153,55 +1153,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { return basicTypes; } - /** - * Writes tuple to the specified DataOutput - * - * @param tuple - * Tuple to be written - * @param out - * Output chosen - * @throws IOException - */ - private void writeTuple(Tuple tuple, DataOutput out) throws IOException { - // TODO: exception for empty record - no getField - byte[] typesInByte = tupleTypesToByteArray(getTuple()); - - TupleTypeInfo typeInfo = new TupleTypeInfo( - tupleTypesFromByteArray(typesInByte)); - TupleSerializer tupleSerializer = (TupleSerializer) typeInfo - .createSerializer(); - SerializationDelegate serializationDelegate = new SerializationDelegate( - tupleSerializer); - serializationDelegate.setInstance(tuple); - - out.write(typesInByte); - serializationDelegate.write(out); - } - - /** - * Reads a tuple from the specified DataInput - * - * @param in - * Input chosen - * @return Tuple read - * @throws IOException - */ - private Tuple readTuple(DataInput in, int numberOfFields) throws IOException { - - byte[] typesInByte = new byte[numberOfFields]; - in.readFully(typesInByte, 0, numberOfFields); - - TupleTypeInfo typeInfo = new TupleTypeInfo( - tupleTypesFromByteArray(typesInByte)); - TupleSerializer tupleSerializer = typeInfo.createSerializer(); - - DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer); - dd.setInstance(tupleSerializer.createInstance()); - - dd.read(in); - return dd.getInstance(); - } - /** * Write method definition for the IOReadableWritable interface */ @@ -1213,8 +1164,19 @@ public class StreamRecord implements IOReadableWritable, Serializable { out.writeByte(numOfFields); out.writeInt(numOfTuples); + byte[] typesInByte = tupleTypesToByteArray(getTuple()); + out.write(typesInByte); + + TupleTypeInfo typeInfo = new TupleTypeInfo( + tupleTypesFromByteArray(typesInByte)); + TupleSerializer tupleSerializer = (TupleSerializer) typeInfo + .createSerializer(); + SerializationDelegate serializationDelegate = new SerializationDelegate( + tupleSerializer); + for (Tuple tuple : tupleBatch) { - writeTuple(tuple, out); + serializationDelegate.setInstance(tuple); + serializationDelegate.write(out); } } @@ -1231,8 +1193,19 @@ public class StreamRecord implements IOReadableWritable, Serializable { tupleBatch = new ArrayList(numOfTuples); + byte[] typesInByte = new byte[numOfFields]; + in.readFully(typesInByte, 0, numOfFields); + + TupleTypeInfo typeInfo = new TupleTypeInfo( + tupleTypesFromByteArray(typesInByte)); + TupleSerializer tupleSerializer = typeInfo.createSerializer(); + + DeserializationDelegate dd = new DeserializationDelegate(tupleSerializer); + for (int k = 0; k < numOfTuples; ++k) { - tupleBatch.add(readTuple(in, numOfFields)); + dd.setInstance(tupleSerializer.createInstance()); + dd.read(in); + tupleBatch.add(dd.getInstance()); } } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index 0fbdafe2209..349414ede32 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -264,8 +264,12 @@ public class StreamRecordTest { int num = 42; String str = "above clouds"; Integer[] intArray = new Integer[] { 1, 2 }; - StreamRecord rec = new StreamRecord(new Tuple3(num, str, - intArray)); + Tuple3 tuple1 = new Tuple3(num, + str, intArray); + Tuple3 tuple2 = new Tuple3(1, "", + new Integer[] { 1, 2 }); + StreamRecord rec = new StreamRecord(tuple1); + rec.addTuple(tuple2); try { rec.write(out); @@ -273,13 +277,24 @@ public class StreamRecordTest { StreamRecord newRec = new StreamRecord(); newRec.read(in); + + assertEquals(2, newRec.getNumOfTuples()); + @SuppressWarnings("unchecked") - Tuple3 tupleOut = (Tuple3) newRec + Tuple3 tupleOut1 = (Tuple3) newRec .getTuple(0); - assertEquals(tupleOut.getField(0), 42); - assertEquals(str, tupleOut.getField(1)); - assertArrayEquals(intArray, (Integer[]) tupleOut.getField(2)); + assertEquals(tupleOut1.getField(0), 42); + assertEquals(str, tupleOut1.getField(1)); + assertArrayEquals(intArray, (Integer[]) tupleOut1.getField(2)); + + @SuppressWarnings("unchecked") + Tuple3 tupleOut2 = (Tuple3) newRec + .getTuple(1); + assertEquals(tupleOut2.getField(0), 1); + assertEquals("", tupleOut2.getField(1)); + assertArrayEquals(new Integer[] { 1, 2 }, (Integer[]) tupleOut2.getField(2)); + } catch (IOException e) { fail(); e.printStackTrace(); -- GitLab