diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 7043f5fa16dd621ae34e38e39b4b3ef8f43c4e48..2399cd08fa93453c69774af8464888cee5e40e7e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -28,6 +28,28 @@ import java.util.List; import java.util.UUID; import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.api.java.tuple.Tuple10; +import eu.stratosphere.api.java.tuple.Tuple11; +import eu.stratosphere.api.java.tuple.Tuple12; +import eu.stratosphere.api.java.tuple.Tuple13; +import eu.stratosphere.api.java.tuple.Tuple14; +import eu.stratosphere.api.java.tuple.Tuple15; +import eu.stratosphere.api.java.tuple.Tuple16; +import eu.stratosphere.api.java.tuple.Tuple17; +import eu.stratosphere.api.java.tuple.Tuple18; +import eu.stratosphere.api.java.tuple.Tuple19; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.tuple.Tuple20; +import eu.stratosphere.api.java.tuple.Tuple21; +import eu.stratosphere.api.java.tuple.Tuple22; +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.api.java.tuple.Tuple4; +import eu.stratosphere.api.java.tuple.Tuple5; +import eu.stratosphere.api.java.tuple.Tuple6; +import eu.stratosphere.api.java.tuple.Tuple7; +import eu.stratosphere.api.java.tuple.Tuple8; +import eu.stratosphere.api.java.tuple.Tuple9; import eu.stratosphere.api.java.typeutils.TupleTypeInfo; import eu.stratosphere.api.java.typeutils.TypeInformation; import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer; @@ -43,7 +65,7 @@ import eu.stratosphere.types.StringValue; * objects in Stratosphere stream processing. The elements of the batch are * Value arrays. */ -//TODO: update documentation +// TODO: update documentation public class StreamRecord implements IOReadableWritable, Serializable { private static final long serialVersionUID = 1L; @@ -52,6 +74,12 @@ public class StreamRecord implements IOReadableWritable, Serializable { private int numOfFields; private int numOfRecords; + private static final Class[] CLASSES = new Class[] { Tuple1.class, Tuple2.class, + Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, + Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, + Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, + Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class }; + // TODO implement equals, clone /** * Creates a new empty instance for read @@ -472,6 +500,22 @@ public class StreamRecord implements IOReadableWritable, Serializable { return newRecord; } + public static Tuple copyTuple(Tuple tuple) { + int numofFields = tuple.getArity(); + Tuple newTuple = null; + try { + newTuple = (Tuple) CLASSES[numofFields - 1].newInstance(); + } catch (Exception e) { + + } + + for (int i = 0; i < numofFields; i++) { + newTuple.setField(tuple.getField(i), i); + } + + return newTuple; + } + private void writeTuple(Tuple tuple, DataOutput out) { Class[] basicTypes = new Class[tuple.getArity()]; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index 29857693c15b09034722af1a4aa238e80d355056..c0eb1f831cf78cdd22a959b377b0a47e69db4d0e 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -162,5 +162,18 @@ public class StreamRecordTest { } } + @Test + public void tupleCopyTest(){ + Tuple2 t1 = new Tuple2("a",1); + Tuple2 t2 = (Tuple2) StreamRecord.copyTuple(t1); + + assertEquals("a", t2.getField(0)); + assertEquals(1, t2.getField(1)); + + t1.setField(2, 1); + assertEquals(1, t2.getField(1)); + assertEquals(2, t1.getField(1)); + + } }