提交 bd87f043 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] streamrecord add/set tuple updated

上级 a6d09db3
......@@ -627,28 +627,22 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Sets the first tuple in the batch
* Sets the first tuple in the batch with a deep copy of the given tuple
*
* @param tuple
* Tuple to set
* @throws TupleSizeMismatchException
*/
// TODO: refactor this functionality - why new list?
public void setTuple(Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
if (numOfTuples != 1) {
tupleBatch = new ArrayList<Tuple>(1);
tupleBatch.add(tuple);
} else {
tupleBatch.set(0, tuple);
}
setTuple(0, tuple);
} else {
throw (new TupleSizeMismatchException());
}
}
/**
* Sets a tuple at the given position in the batch
* Sets a tuple at the given position in the batch with a deep copy of the given tuple
*
* @param tupleNumber
* Position of tuple in the batch
......@@ -661,7 +655,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throws NoSuchTupleException, TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
try {
tupleBatch.set(tupleNumber, tuple);
tupleBatch.set(tupleNumber, copyTuple(tuple));
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
......@@ -672,14 +666,14 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
* adds the deep copy of Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
*/
public void addTuple(Tuple tuple) {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(tuple);
tupleBatch.add(copyTuple(tuple));
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
......
......@@ -31,76 +31,116 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.api.java.tuple.Tuple5;
public class StreamRecordTest {
@Test
public void singleRecordSetGetTest() {
StreamRecord record = new StreamRecord(new Tuple2<String, Integer>(
"Stratosphere", 1));
StreamRecord record = new StreamRecord(new Tuple5<String, Integer, Long, Boolean, Double>(
"Stratosphere", 1, 2L, true, 3.5));
assertEquals(2, record.getNumOfFields());
assertEquals(5, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals("Stratosphere", record.getString(0));
assertEquals((Integer) 1, record.getInteger(1));
assertEquals((Long) 2L, record.getLong(2));
assertEquals(true, record.getBoolean(3));
assertEquals((Double) 3.5, record.getDouble(4));
record.setString(0, "Big Data");
record.setInteger(1, 2);
assertEquals("Big Data", record.getString(0));
assertEquals((Integer) 2, record.getInteger(1));
record.setTuple(new Tuple2<String, Long>(
"Big Data looks tiny from here.", 2L));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals((Long) 2L, record.getLong(1));
record.setTuple(new Tuple2<String, Boolean>(
"Big Data looks tiny from here.", true));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals(true, record.getBoolean(1));
record.setTuple(new Tuple2<String, Double>(
"Big Data looks tiny from here.", 2.5));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals((Double) 2.5, record.getDouble(1));
Tuple2<String, Double> tuple = new Tuple2<String, Double>();
Tuple5<String, Integer, Long, Boolean, Double> tuple = new Tuple5<String, Integer, Long, Boolean, Double>();
record.getTupleInto(tuple);
assertEquals("Big Data looks tiny from here.", tuple.getField(0));
assertEquals((Double) 2.5, tuple.getField(1));
assertEquals("Stratosphere", tuple.getField(0));
assertEquals((Integer) 1, tuple.getField(1));
assertEquals((Long) 2L, tuple.getField(2));
assertEquals(true, tuple.getField(3));
assertEquals((Double) 3.5, tuple.getField(4));
record.setDouble(1, 3.3);
record.setString(0, "Streaming");
record.setInteger(1, 2);
record.setLong(2, 3L);
record.setBoolean(3, false);
record.setDouble(4, 4.5);
assertEquals("Streaming", record.getString(0));
assertEquals((Integer) 2, record.getInteger(1));
assertEquals((Long) 3L, record.getLong(2));
assertEquals(false, record.getBoolean(3));
assertEquals((Double) 4.5, record.getDouble(4));
record.setString(0, 0, "");
record.setInteger(0, 1, 0);
record.setLong(0, 2, 0L);
record.setBoolean(0, 3, false);
record.setDouble(0, 4, 0.);
assertEquals("", record.getString(0));
assertEquals((Integer) 0, record.getInteger(1));
assertEquals((Long) 0L, record.getLong(2));
assertEquals(false, record.getBoolean(3));
assertEquals((Double) 0., record.getDouble(4));
assertEquals("Big Data looks tiny from here.", tuple.getField(0));
assertEquals((Double) 2.5, tuple.getField(1));
}
@Test
public void batchRecordSetGetTest() {
StreamRecord record = new StreamRecord(new Tuple2<Integer, Integer>(1,
2));
record.addTuple(new Tuple2<Integer, Integer>(2, 2));
StreamRecord record = new StreamRecord(5,2);
Tuple5<String, Integer, Long, Boolean, Double> tuple = new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1, 2L, true, 3.5);
record.addTuple(tuple);
tuple.setField("", 0);
tuple.setField(0, 1);
tuple.setField(0L, 2);
tuple.setField(false, 3);
tuple.setField(0., 4);
record.addTuple(tuple);
try {
record.addTuple(new Tuple1<String>("4"));
fail();
} catch (TupleSizeMismatchException e) {
}
assertEquals(2, record.getNumOfFields());
assertEquals(5, record.getNumOfFields());
assertEquals(2, record.getNumOfTuples());
assertEquals((Integer) 1, record.getInteger(0, 0));
assertEquals((Integer) 2, record.getInteger(1, 1));
record.setTuple(1, new Tuple2<Integer, Integer>(-1, -3));
assertEquals(-1, record.getField(1, 0));
assertEquals("Stratosphere", record.getString(0, 0));
assertEquals((Integer) 1, record.getInteger(0, 1));
assertEquals((Long) 2L, record.getLong(0, 2));
assertEquals(true, record.getBoolean(0, 3));
assertEquals((Double) 3.5, record.getDouble(0, 4));
assertEquals("", record.getString(1, 0));
assertEquals((Integer) 0, record.getInteger(1, 1));
assertEquals((Long) 0L, record.getLong(1, 2));
assertEquals(false, record.getBoolean(1, 3));
assertEquals((Double) 0., record.getDouble(1, 4));
record.setTuple(new Tuple5<String, Integer, Long, Boolean, Double>("", 0, 0L, false, 0.));
assertEquals(2, record.getNumOfFields());
assertEquals(5, record.getNumOfFields());
assertEquals(2, record.getNumOfTuples());
assertEquals("", record.getString(0, 0));
assertEquals((Integer) 0, record.getInteger(0, 1));
assertEquals((Long) 0L, record.getLong(0, 2));
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
record.setTuple(1, new Tuple5<String, Integer, Long, Boolean, Double>("Stratosphere", 1,
2L, true, 3.5));
assertEquals("Stratosphere", record.getString(1, 0));
assertEquals((Integer) 1, record.getInteger(1, 1));
assertEquals((Long) 2L, record.getLong(1, 2));
assertEquals(true, record.getBoolean(1, 3));
assertEquals((Double) 3.5, record.getDouble(1, 4));
}
@Test
......@@ -138,9 +178,8 @@ public class StreamRecordTest {
final int ITERATION = 10000;
StreamRecord record = new StreamRecord(
new Tuple4<Integer, Long, String, String>(0, 42L,
"Stratosphere", "Streaming"));
StreamRecord record = new StreamRecord(new Tuple4<Integer, Long, String, String>(0, 42L,
"Stratosphere", "Streaming"));
long t = System.nanoTime();
for (int i = 0; i < ITERATION; i++) {
......@@ -194,19 +233,16 @@ public class StreamRecordTest {
int num = 42;
String str = "above clouds";
StreamRecord rec = new StreamRecord(new Tuple2<Integer, String>(num,
str));
StreamRecord rec = new StreamRecord(new Tuple2<Integer, String>(num, str));
try {
rec.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
buff.toByteArray()));
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
StreamRecord newRec = new StreamRecord();
newRec.read(in);
@SuppressWarnings("unchecked")
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec
.getTuple(0);
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.getTuple(0);
assertEquals(tupleOut.getField(0), 42);
} catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册