diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 7b373a7af226d16e6977d39ff887293f0efff6f6..f6dee6c029bbd60862f2ff6f2f9142a102a292e4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -614,12 +614,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Tuple to set * @throws TupleSizeMismatchException */ - public void setTuple(Tuple tuple) throws TupleSizeMismatchException { - if (tuple.getArity() == numOfFields) { - setTuple(0, tuple); - } else { - throw (new TupleSizeMismatchException()); - } + public void setTuple(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException { + setTuple(0, tuple); } /** @@ -653,9 +649,22 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param tuple * Tuple to be added as the next record of the batch */ - public void addTuple(Tuple tuple) { + public void addTuple(Tuple tuple) throws TupleSizeMismatchException{ + addTuple(numOfTuples,tuple); + } + + /** + * Checks if the number of fields are equal to the batch field size then + * inserts the deep copy of Tuple to the given position into the recordbatch + * + * @param index + * Position of the added tuple + * @param tuple + * Tuple to be added as the next record of the batch + */ + public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException{ if (tuple.getArity() == numOfFields) { - tupleBatch.add(copyTuple(tuple)); + tupleBatch.add(index, copyTuple(tuple)); numOfTuples++; } else { throw new TupleSizeMismatchException(); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java index c0bdca0ff9e5838901199d03317bcc96a7cb6cb4..bb30063d44760eedd1d58ff0cb1f80abd7d325c3 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/StreamRecordTest.java @@ -25,6 +25,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.junit.Test; @@ -150,6 +152,18 @@ public class StreamRecordTest { assertEquals((Long) 0L, record.getLong(0, 2)); assertEquals(false, record.getBoolean(0, 3)); assertEquals((Double) 0., record.getDouble(0, 4)); + + record.addTuple(0,new Tuple5("Stratosphere", 1, + 2L, true, 3.5)); + + assertEquals(2, record.getNumOfTuples()); + + assertEquals("Stratosphere", record.getString(0, 0)); + assertEquals((Integer) 1, record.getInteger(0, 1)); + assertEquals((Long) 2L, record.getLong(0, 2)); + assertEquals(true, record.getBoolean(0, 3)); + assertEquals((Double) 3.5, record.getDouble(0, 4)); + } @Test