提交 3ac92935 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] removeTuple method added to streamrecord

上级 bd87f043
......@@ -74,13 +74,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private int numOfFields;
private int numOfTuples;
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class,
Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class,
Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class,
Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class,
Tuple22.class };
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone
/**
......@@ -176,8 +174,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Object getField(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
public Object getField(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
return getField(0, fieldNumber);
}
......@@ -193,8 +190,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Object getField(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public Object getField(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
Tuple tuple;
try {
tuple = tupleBatch.get(tupleNumber);
......@@ -208,8 +205,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
public Object getFieldFast(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public Object getFieldFast(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
Tuple tuple;
try {
tuple = tupleBatch.get(tupleNumber);
......@@ -232,8 +229,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Boolean getBoolean(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
public Boolean getBoolean(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
return getBoolean(0, fieldNumber);
}
......@@ -249,8 +245,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* , NoSuchFieldException
*/
// TODO: add exception for cast for all getters
public Boolean getBoolean(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public Boolean getBoolean(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Boolean) getField(tupleNumber, fieldNumber);
}
......@@ -263,8 +259,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Double getDouble(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
public Double getDouble(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
return getDouble(0, fieldNumber);
}
......@@ -279,8 +274,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Double getDouble(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public Double getDouble(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Double) getField(tupleNumber, fieldNumber);
}
......@@ -293,8 +288,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Integer getInteger(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
public Integer getInteger(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
return getInteger(0, fieldNumber);
}
......@@ -309,8 +303,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Integer getInteger(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public Integer getInteger(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Integer) getField(tupleNumber, fieldNumber);
}
......@@ -323,8 +317,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Long getLong(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
public Long getLong(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
return getLong(0, fieldNumber);
}
......@@ -339,8 +332,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public Long getLong(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public Long getLong(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (Long) getField(tupleNumber, fieldNumber);
}
......@@ -353,8 +346,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , NoSuchFieldException
*/
public String getString(int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
public String getString(int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
return getString(0, fieldNumber);
}
......@@ -367,8 +359,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the field in the tuple
* @return value of the field as String
*/
public String getString(int tupleNumber, int fieldNumber)
throws NoSuchTupleException, NoSuchFieldException {
public String getString(int tupleNumber, int fieldNumber) throws NoSuchTupleException,
NoSuchFieldException {
return (String) getField(tupleNumber, fieldNumber);
}
......@@ -397,8 +389,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchFieldException
*/
// TODO: consider no such tuple exception and interaction with batch size
public void setField(int tupleNumber, int fieldNumber, Object o)
throws NoSuchFieldException {
public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException {
try {
tupleBatch.get(tupleNumber).setField(o, fieldNumber);
} catch (IndexOutOfBoundsException e) {
......@@ -416,8 +407,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setBoolean(int fieldNumber, Boolean b)
throws NoSuchFieldException {
public void setBoolean(int fieldNumber, Boolean b) throws NoSuchFieldException {
setBoolean(0, fieldNumber, b);
}
......@@ -433,8 +423,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setBoolean(int tupleNumber, int fieldNumber, Boolean b)
throws NoSuchFieldException {
public void setBoolean(int tupleNumber, int fieldNumber, Boolean b) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, b);
}
......@@ -447,8 +436,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setDouble(int fieldNumber, Double d)
throws NoSuchFieldException {
public void setDouble(int fieldNumber, Double d) throws NoSuchFieldException {
setDouble(0, fieldNumber, d);
}
......@@ -464,8 +452,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setDouble(int tupleNumber, int fieldNumber, Double d)
throws NoSuchFieldException {
public void setDouble(int tupleNumber, int fieldNumber, Double d) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, d);
}
......@@ -479,8 +466,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setInteger(int fieldNumber, Integer i)
throws NoSuchFieldException {
public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException {
setInteger(0, fieldNumber, i);
}
......@@ -496,8 +482,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setInteger(int tupleNumber, int fieldNumber, Integer i)
throws NoSuchFieldException {
public void setInteger(int tupleNumber, int fieldNumber, Integer i) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, i);
}
......@@ -525,8 +510,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setLong(int tupleNumber, int fieldNumber, Long l)
throws NoSuchFieldException {
public void setLong(int tupleNumber, int fieldNumber, Long l) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, l);
}
......@@ -539,8 +523,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setString(int fieldNumber, String str)
throws NoSuchFieldException {
public void setString(int fieldNumber, String str) throws NoSuchFieldException {
setField(0, fieldNumber, str);
}
......@@ -556,8 +539,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* New value
* @throws NoSuchFieldException
*/
public void setString(int tupleNumber, int fieldNumber, String str)
throws NoSuchFieldException {
public void setString(int tupleNumber, int fieldNumber, String str) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, str);
}
......@@ -591,8 +573,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void getTupleInto(Tuple tuple) throws NoSuchTupleException,
TupleSizeMismatchException {
public void getTupleInto(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
getTupleInto(0, tuple);
}
......@@ -608,8 +589,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void getTupleInto(int tupleNumber, Tuple tuple)
throws NoSuchTupleException, TupleSizeMismatchException {
public void getTupleInto(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
try {
......@@ -642,7 +623,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Sets a tuple at the given position in the batch with a deep copy of the given tuple
* Sets a tuple at the given position in the batch with a deep copy of the
* given tuple
*
* @param tupleNumber
* Position of tuple in the batch
......@@ -651,8 +633,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void setTuple(int tupleNumber, Tuple tuple)
throws NoSuchTupleException, TupleSizeMismatchException {
public void setTuple(int tupleNumber, Tuple tuple) throws NoSuchTupleException,
TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
try {
tupleBatch.set(tupleNumber, copyTuple(tuple));
......@@ -680,6 +662,20 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Removes the tuple at the given position from the recordbatch and returns
* it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
*/
public Tuple removeTuple(int index) {
numOfTuples--;
return tupleBatch.remove(index);
}
public StreamRecord copySerialized() {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
......@@ -687,8 +683,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
StreamRecord newRecord = new StreamRecord();
try {
this.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
buff.toByteArray()));
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
newRecord.read(in);
} catch (Exception e) {
......@@ -756,8 +751,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
basicTypes[i] = tuple.getField(i).getClass();
basicTypeNames.append(basicTypes[i].getName() + ",");
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(basicTypes);
StringValue typeVal = new StringValue(basicTypeNames.toString());
......@@ -801,14 +795,12 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(basicTypes);
@SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(
tupleSerializer);
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(tupleSerializer);
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
return dd.getInstance();
......
......@@ -140,7 +140,16 @@ public class StreamRecordTest {
assertEquals((Long) 2L, record.getLong(1, 2));
assertEquals(true, record.getBoolean(1, 3));
assertEquals((Double) 3.5, record.getDouble(1, 4));
record.removeTuple(1);
assertEquals(1, record.getNumOfTuples());
assertEquals("", record.getString(0, 0));
assertEquals((Integer) 0, record.getInteger(0, 1));
assertEquals((Long) 0L, record.getLong(0, 2));
assertEquals(false, record.getBoolean(0, 3));
assertEquals((Double) 0., record.getDouble(0, 4));
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册