提交 aa63c823 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] StreamRecord documentation update2

上级 bb01ea01
......@@ -160,36 +160,35 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Returns the value of a field in the given position of a specific tuple in
* Returns the value of a field in the given position of the first tuple in
* the batch as an object, cast needed to obtain a typed version
*
* @param tupleNumber
* Position of the tuple in the batch
* @param fieldNumber
* Position of the field in the tuple
* @return value of the field
* @throws NoSuchTupleException
*/
public Object getField(int tupleNumber, int fieldNumber) {
try {
return tupleBatch.get(tupleNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
public Object getField(int fieldNumber) throws NoSuchTupleException {
return getField(0, fieldNumber);
}
/**
* Returns the value of a field in the given position of the first tuple in
* Returns the value of a field in the given position of a specific tuple in
* the batch as an object, cast needed to obtain a typed version
*
* @param tupleNumber
* Position of the tuple in the batch
* @param fieldNumber
* Position of the field in the tuple
* @return value of the field
* @throws NoSuchTupleException
*/
public Object getField(int fieldNumber) {
public Object getField(int tupleNumber, int fieldNumber)
throws NoSuchTupleException {
try {
return tupleBatch.get(0).getField(fieldNumber);
return tupleBatch.get(tupleNumber).getField(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
throw (new NoSuchTupleException());
}
}
......@@ -583,12 +582,31 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return getTuple(0);
}
//TODO: doc from here on
/**
* Gets the fields of the first tuple of the batch into the parameter tuple
*
* @param tuple
* Target tuple
*/
public void getTupleInto(Tuple tuple) {
getTupleInto(0, tuple);
}
/**
* Gets the fields of the specified tuple of the batch into the parameter
* tuple
*
* @param tupleNumber
* Position of the tuple to be written out
*
* @param tuple
* Target tuple
*/
public void getTupleInto(int tupleNumber, Tuple tuple) {
if (tuple.getArity() == numOfFields) {
try {
Tuple source = tupleBatch.get(0);
Tuple source = tupleBatch.get(tupleNumber);
for (int i = 0; i < numOfFields; i++) {
tuple.setField(source.getField(i), i);
}
......@@ -602,19 +620,19 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Sets a record at the given position in the batch
* Sets the first tuple in the batch
*
* @param recordNumber
* Position of record in the batch
* @param tuple
* Value to set
* Tuple to set
*/
public void setRecord(int recordNumber, Tuple tuple) {
// TODO: refactor this functionality - why new list?
public void setTuple(Tuple tuple) {
if (tuple.getArity() == numOfFields) {
try {
tupleBatch.set(recordNumber, tuple);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
if (numOfTuples != 1) {
tupleBatch = new ArrayList<Tuple>(1);
tupleBatch.add(tuple);
} else {
tupleBatch.set(0, tuple);
}
} else {
throw (new TupleSizeMismatchException());
......@@ -622,18 +640,19 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Sets the first record in the batch
* Sets a tuple at the given position in the batch
*
* @param tupleNumber
* Position of tuple in the batch
* @param tuple
* Value to set
*/
public void setRecord(Tuple tuple) {
public void setTuple(int tupleNumber, Tuple tuple) {
if (tuple.getArity() == numOfFields) {
if (numOfTuples != 1) {
tupleBatch = new ArrayList<Tuple>(1);
tupleBatch.add(tuple);
} else {
tupleBatch.set(0, tuple);
try {
tupleBatch.set(tupleNumber, tuple);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
} else {
throw (new TupleSizeMismatchException());
......@@ -642,12 +661,12 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Value array to the end of the batch
* adds the Tuple to the end of the batch
*
* @param tuple
* Value array to be added as the next record of the batch
* Tuple to be added as the next record of the batch
*/
public void addRecord(Tuple tuple) {
public void addTuple(Tuple tuple) {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(tuple);
numOfTuples++;
......@@ -657,7 +676,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Creates a copy of the StreamRecord
* Creates a deep copy of the StreamRecord
*
* @return Copy of the StreamRecord
*
......@@ -695,8 +714,17 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return newTuple;
}
/**
* Writes tuple to the specified DataOutput
*
* @param tuple
* Tuple to be written
* @param out
* Output chosen
*/
private void writeTuple(Tuple tuple, DataOutput out) {
@SuppressWarnings("rawtypes")
Class[] basicTypes = new Class[tuple.getArity()];
StringBuilder basicTypeNames = new StringBuilder();
......@@ -725,12 +753,21 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Reads a tuple from the specified DataInput
*
* @param in
* Input chosen
* @return Tuple read
* @throws IOException
*/
private Tuple readTuple(DataInput in) throws IOException {
StringValue typeVal = new StringValue();
typeVal.read(in);
// TODO: use Tokenizer
// TODO: use StringTokenizer
String[] types = typeVal.getValue().split(",");
@SuppressWarnings("rawtypes")
Class[] basicTypes = new Class[types.length];
for (int i = 0; i < types.length; i++) {
try {
......@@ -743,6 +780,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
@SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
......@@ -753,6 +791,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return dd.getInstance();
}
/**
* Write method definition for the IOReadableWritable interface
*/
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
......@@ -768,6 +809,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Read method definition for the IOReadableWritable interface
*/
@Override
public void read(DataInput in) throws IOException {
uid.read(in);
......@@ -790,6 +834,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Creates a String representation as a list of tuples
*/
public String toString() {
StringBuilder outputString = new StringBuilder("[");
......
......@@ -51,7 +51,7 @@ public class BatchWordCountSource extends UserSourceInvokable {
while (line != null) {
if (line != "") {
outRecord.addRecord(new Tuple2<String, Long>(line, timestamp));
outRecord.addTuple(new Tuple2<String, Long>(line, timestamp));
timestamp++;
if (timestamp % BATCH_SIZE == 0) {
emit(outRecord);
......
......@@ -45,17 +45,17 @@ public class StreamRecordTest {
record.setField(1, "Big Data");
assertEquals("Big Data", record.getString(1));
record.setRecord(new Tuple2<String, Long>("Big Data looks tiny from here.", 2L));
record.setTuple(new Tuple2<String, Long>("Big Data looks tiny from here.", 2L));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals((Long) 2L, record.getLong(1));
record.setRecord(new Tuple2<String, Boolean>("Big Data looks tiny from here.", true));
record.setTuple(new Tuple2<String, Boolean>("Big Data looks tiny from here.", true));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals(true, record.getBoolean(1));
record.setRecord(new Tuple2<String, Double>("Big Data looks tiny from here.", 2.5));
record.setTuple(new Tuple2<String, Double>("Big Data looks tiny from here.", 2.5));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfTuples());
assertEquals((Double) 2.5, record.getDouble(1));
......@@ -76,9 +76,9 @@ public class StreamRecordTest {
@Test
public void batchRecordSetGetTest() {
StreamRecord record = new StreamRecord(new Tuple2<Integer, Integer>(1, 2));
record.addRecord(new Tuple2<Integer, Integer>(2, 2));
record.addTuple(new Tuple2<Integer, Integer>(2, 2));
try {
record.addRecord(new Tuple1<String>("4"));
record.addTuple(new Tuple1<String>("4"));
fail();
} catch (TupleSizeMismatchException e) {
}
......@@ -88,7 +88,7 @@ public class StreamRecordTest {
assertEquals((Integer) 1, record.getInteger(0, 0));
assertEquals((Integer) 2, record.getInteger(1, 1));
record.setRecord(1, new Tuple2<Integer, Integer>(-1, -3));
record.setTuple(1, new Tuple2<Integer, Integer>(-1, -3));
assertEquals(-1, record.getField(1, 0));
assertEquals(2, record.getNumOfFields());
......@@ -103,7 +103,7 @@ public class StreamRecordTest {
assertTrue(a.getField(0).equals(b.getField(0)));
assertTrue(a.getId().equals(b.getId()));
b.setId("2");
b.setRecord(new Tuple1<String>("Data"));
b.setTuple(new Tuple1<String>("Data"));
assertFalse(a.getId().equals(b.getId()));
assertFalse(a.getField(0).equals(b.getField(0)));
......@@ -113,20 +113,20 @@ public class StreamRecordTest {
public void exceptionTest() {
StreamRecord a = new StreamRecord(new Tuple1<String>("Big"));
try {
a.setRecord(4, new Tuple1<String>("Data"));
a.setTuple(4, new Tuple1<String>("Data"));
fail();
} catch (NoSuchTupleException e) {
}
try {
a.setRecord(new Tuple2<String, String>("Data", "Stratosphere"));
a.setTuple(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (TupleSizeMismatchException e) {
}
StreamRecord b = new StreamRecord();
try {
b.addRecord(new Tuple2<String, String>("Data", "Stratosphere"));
b.addTuple(new Tuple2<String, String>("Data", "Stratosphere"));
fail();
} catch (TupleSizeMismatchException e) {
}
......@@ -153,6 +153,7 @@ public class StreamRecordTest {
StreamRecord newRec = new StreamRecord();
newRec.read(in);
@SuppressWarnings("unchecked")
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.getTuple(0);
assertEquals(tupleOut.getField(0), 42);
......@@ -165,7 +166,9 @@ public class StreamRecordTest {
@Test
public void tupleCopyTest(){
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("a",1);
Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) StreamRecord.copyTuple(t1);
@SuppressWarnings("unchecked")
Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) StreamRecord
.copyTuple(t1);
assertEquals("a", t2.getField(0));
assertEquals(1, t2.getField(1));
......
......@@ -116,7 +116,7 @@ public class AtLeastOnceBufferTest {
System.out.println("ADD - " + " exec. time (ns): " + (System.nanoTime() - nt));
record1.setRecord(new Tuple1<String>("R2"));
record1.setTuple(new Tuple1<String>("R2"));
record1.setId("1");
String id2 = record1.getId();
......@@ -192,7 +192,7 @@ public class AtLeastOnceBufferTest {
String id1 = record1.getId();
buffer.add(record1);
record1.setRecord(new Tuple1<String>("R2"));
record1.setTuple(new Tuple1<String>("R2"));
record1.setId("1");
String id2 = record1.getId();
buffer.add(record1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册