diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 79ef064bea863a914b6c19a06cec269439a91b5d..6e24314fd7dfcceb61ea18188a0847b2a3f8df7c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -95,6 +95,13 @@ public class StreamRecord implements IOReadableWritable, Serializable { tupleBatch = new ArrayList(); } + + public StreamRecord(int numOfFields, int batchSize) { + this.numOfFields = numOfFields; + this.numOfTuples = 0; + tupleBatch = new ArrayList(batchSize); + + } /** * Creates a new batch of records containing only the given Tuple as element @@ -457,7 +464,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { */ public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException { - setInteger(0, i); + setInteger(0, fieldNumber,i); } /** @@ -662,13 +669,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - /** - * Creates a deep copy of the StreamRecord - * - * @return Copy of the StreamRecord - * - */ - public StreamRecord copy() { + + public StreamRecord copySerialized() { ByteArrayOutputStream buff = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(buff); @@ -684,6 +686,23 @@ public class StreamRecord implements IOReadableWritable, Serializable { return newRecord; } + + /** + * Creates a deep copy of the StreamRecord + * + * @return Copy of the StreamRecord + * + */ + public StreamRecord copy() { + StreamRecord newRecord= new StreamRecord(numOfFields,numOfTuples); + newRecord.uid=new StringValue(uid.getValue()); + + for(Tuple tuple: tupleBatch){ + newRecord.tupleBatch.add(StreamRecord.copyTuple(tuple)); + } + + return newRecord; + } /** * Creates deep copy of Tuple diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index f2a4ad8835437d600e0a872d35b52fcab47bc6df..c5dbcfc5b2b66ded851947a8c918bfc3ded5bbda 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -42,8 +42,11 @@ public class StreamRecordTest { assertEquals("Stratosphere", record.getString(0)); assertEquals((Integer) 1, record.getInteger(1)); - record.setField(1, "Big Data"); - assertEquals("Big Data", record.getString(1)); + record.setString(0, "Big Data"); + record.setInteger(1, 2); + assertEquals("Big Data", record.getString(0)); + assertEquals((Integer) 2, record.getInteger(1)); + record.setTuple(new Tuple2("Big Data looks tiny from here.", 2L)); assertEquals(2, record.getNumOfFields()); @@ -66,9 +69,9 @@ public class StreamRecordTest { assertEquals("Big Data looks tiny from here.", tuple.getField(0)); assertEquals((Double) 2.5, tuple.getField(1)); - - record.setDouble(1,3.3); - + + record.setDouble(1, 3.3); + assertEquals("Big Data looks tiny from here.", tuple.getField(0)); assertEquals((Double) 2.5, tuple.getField(1)); } @@ -107,6 +110,16 @@ public class StreamRecordTest { assertFalse(a.getId().equals(b.getId())); assertFalse(a.getField(0).equals(b.getField(0))); + StreamRecord c = new StreamRecord(new Tuple1("Big")); + + long t = System.nanoTime(); + c.copySerialized(); + System.out.println("Serialized copy:\t" + (System.nanoTime() - t)); + + t = System.nanoTime(); + c.copy(); + System.out.println("New copy:\t" + (System.nanoTime() - t)); + } @Test @@ -163,20 +176,23 @@ public class StreamRecordTest { } } + @Test - public void tupleCopyTest(){ - Tuple2 t1 = new Tuple2("a",1); - @SuppressWarnings("unchecked") - Tuple2 t2 = (Tuple2) StreamRecord - .copyTuple(t1); - + public void tupleCopyTest() { + Tuple2 t1 = new Tuple2("a", 1); + + Tuple2 t2 = (Tuple2) StreamRecord.copyTuple(t1); + assertEquals("a", t2.getField(0)); assertEquals(1, t2.getField(1)); - + t1.setField(2, 1); assertEquals(1, t2.getField(1)); assertEquals(2, t1.getField(1)); + assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass()); + assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass()); + } }