提交 cc7d7ef2 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] StreamRecord copy updated

上级 b22a9e92
......@@ -95,6 +95,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch = new ArrayList<Tuple>();
}
public StreamRecord(int numOfFields, int batchSize) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>(batchSize);
}
/**
* Creates a new batch of records containing only the given Tuple as element
......@@ -457,7 +464,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public void setInteger(int fieldNumber, Integer i)
throws NoSuchFieldException {
setInteger(0, i);
setInteger(0, fieldNumber,i);
}
/**
......@@ -662,13 +669,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Creates a deep copy of the StreamRecord
*
* @return Copy of the StreamRecord
*
*/
public StreamRecord copy() {
public StreamRecord copySerialized() {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
......@@ -684,6 +686,23 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return newRecord;
}
/**
* Creates a deep copy of the StreamRecord
*
* @return Copy of the StreamRecord
*
*/
public StreamRecord copy() {
StreamRecord newRecord= new StreamRecord(numOfFields,numOfTuples);
newRecord.uid=new StringValue(uid.getValue());
for(Tuple tuple: tupleBatch){
newRecord.tupleBatch.add(StreamRecord.copyTuple(tuple));
}
return newRecord;
}
/**
* Creates deep copy of Tuple
......
......@@ -42,8 +42,11 @@ public class StreamRecordTest {
assertEquals("Stratosphere", record.getString(0));
assertEquals((Integer) 1, record.getInteger(1));
record.setField(1, "Big Data");
assertEquals("Big Data", record.getString(1));
record.setString(0, "Big Data");
record.setInteger(1, 2);
assertEquals("Big Data", record.getString(0));
assertEquals((Integer) 2, record.getInteger(1));
record.setTuple(new Tuple2<String, Long>("Big Data looks tiny from here.", 2L));
assertEquals(2, record.getNumOfFields());
......@@ -66,9 +69,9 @@ public class StreamRecordTest {
assertEquals("Big Data looks tiny from here.", tuple.getField(0));
assertEquals((Double) 2.5, tuple.getField(1));
record.setDouble(1,3.3);
record.setDouble(1, 3.3);
assertEquals("Big Data looks tiny from here.", tuple.getField(0));
assertEquals((Double) 2.5, tuple.getField(1));
}
......@@ -107,6 +110,16 @@ public class StreamRecordTest {
assertFalse(a.getId().equals(b.getId()));
assertFalse(a.getField(0).equals(b.getField(0)));
StreamRecord c = new StreamRecord(new Tuple1<String>("Big"));
long t = System.nanoTime();
c.copySerialized();
System.out.println("Serialized copy:\t" + (System.nanoTime() - t));
t = System.nanoTime();
c.copy();
System.out.println("New copy:\t" + (System.nanoTime() - t));
}
@Test
......@@ -163,20 +176,23 @@ public class StreamRecordTest {
}
}
@Test
public void tupleCopyTest(){
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("a",1);
@SuppressWarnings("unchecked")
Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) StreamRecord
.copyTuple(t1);
public void tupleCopyTest() {
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("a", 1);
Tuple2 t2 = (Tuple2) StreamRecord.copyTuple(t1);
assertEquals("a", t2.getField(0));
assertEquals(1, t2.getField(1));
t1.setField(2, 1);
assertEquals(1, t2.getField(1));
assertEquals(2, t1.getField(1));
assertEquals(t1.getField(0).getClass(), t2.getField(0).getClass());
assertEquals(t1.getField(1).getClass(), t2.getField(1).getClass());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册