提交 adeee2b9 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] StreamRecord refactor

上级 05c5b48a
......@@ -62,11 +62,10 @@ import eu.stratosphere.types.Value;
* objects in Stratosphere stream processing. The elements of the batch are
* Value arrays.
*/
public class StreamRecord<T extends Tuple> implements IOReadableWritable,
Serializable {
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
private List<T> recordBatch;
private List<? extends Tuple> recordBatch;
private StringValue uid = new StringValue("");
private int numOfFields;
private int numOfRecords;
......@@ -82,23 +81,9 @@ public class StreamRecord<T extends Tuple> implements IOReadableWritable,
// TODO implement equals, clone
/**
* Creates a new empty batch of records and sets the field number to one
* Creates a new empty instance for read
*/
public StreamRecord() {
this.numOfFields = 1;
recordBatch = new ArrayList<T>();
}
/**
* Creates a new empty batch of records and sets the field number to the
* given number
*
* @param length
* Number of fields in the records
*/
public StreamRecord(int length) {
numOfFields = length;
recordBatch = new ArrayList<T>();
}
/**
......@@ -111,9 +96,11 @@ public class StreamRecord<T extends Tuple> implements IOReadableWritable,
* @param batchSize
* Number of records
*/
public StreamRecord(int length, int batchSize) {
numOfFields = length;
recordBatch = new ArrayList<T>(batchSize);
public StreamRecord(Tuple tuple, int batchSize) {
numOfFields = tuple.getArity();
Class<?> tupleClass = CLASSES[tuple.getArity()-1];
tuple = (tupleClass) tuple;
recordBatch = new ArrayList<>(batchSize);
}
/**
......@@ -328,17 +315,18 @@ public class StreamRecord<T extends Tuple> implements IOReadableWritable,
}
private void writeTuple(Tuple tuple, DataOutput out) {
// Class basicType = CLASSES[tuple.getArity()-1];
// TypeInformation<? extends Tuple> typeInfo =
// TupleTypeInfo.getBasicTupleTypeInfo(basicType);
Class[] basicTypes = new Class[tuple.getArity()];
StringBuilder basicTypeNames = new StringBuilder();
for (int i = 0; i < basicTypes.length; i++) {
basicTypes[i] = tuple.getField(i).getClass();
basicTypeNames.append(basicTypes[i].getName() + ",");
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
StringValue typeVal = new StringValue(basicTypeNames.toString());
@SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
......@@ -346,6 +334,7 @@ public class StreamRecord<T extends Tuple> implements IOReadableWritable,
tupleSerializer);
serializationDelegate.setInstance(tuple);
try {
typeVal.write(out);
serializationDelegate.write(out);
} catch (IOException e) {
// TODO Auto-generated catch block
......@@ -353,7 +342,21 @@ public class StreamRecord<T extends Tuple> implements IOReadableWritable,
}
}
public Tuple readTuple(DataInput in, Class... basicTypes) throws IOException {
private Tuple readTuple(DataInput in) throws IOException {
StringValue typeVal = new StringValue();
typeVal.read(in);
// TODO: use Tokenizer
String[] types = typeVal.getValue().split(",");
Class[] basicTypes = new Class[types.length];
for (int i = 0; i < types.length; i++) {
try {
basicTypes[i] = Class.forName(types[i]);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
......@@ -400,9 +403,7 @@ public class StreamRecord<T extends Tuple> implements IOReadableWritable,
recordBatch = new ArrayList<T>();
for (int k = 0; k < numOfRecords; ++k) {
T tuple = null;
readTuple(tuple, in, numOfFields);
recordBatch.add(tuple);
recordBatch.add((T) readTuple(in));
}
}
......
......@@ -149,7 +149,7 @@ public class StreamRecordTest {
Tuple2<Integer, String> tuple = new Tuple2<Integer, String>();
StreamRecord<Tuple2<Integer, String>> newRec = new StreamRecord<Tuple2<Integer, String>>(
2);
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) newRec.readTuple(in, Integer.class, String.class);
Tuple2<Integer, String> tupleOut = newRec.read(in);
assertEquals(tupleOut.getField(0), tuple.getField(0));
} catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册