提交 4f5b68d6 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] update internal state abstraction

上级 68ebfc48
......@@ -78,25 +78,31 @@ public class StreamRecord implements IOReadableWritable, Serializable {
Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone
/**
* Creates a new empty instance for read
*/
public StreamRecord() {
}
public StreamRecord() {}
public StreamRecord(int numOfFields) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
}
public StreamRecord(int numOfFields, int batchSize) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>(batchSize);
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
......@@ -127,6 +133,10 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this(tuple, 1);
}
public boolean isEmpty(){
return (this.numOfTuples==0);
}
/**
* @return Number of fields in the tuples
*/
......@@ -200,19 +210,20 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
public Object getFieldFast(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
Tuple tuple;
try {
tuple = tupleBatch.get(tupleNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
try {
return tuple.getFieldFast(fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchFieldException());
}
}
//I haven't seen any difference between getField() and getFieldFast...
// public Object getFieldFast(int tupleNumber, int fieldNumber) throws NoSuchTupleException, NoSuchFieldException {
// Tuple tuple;
// try {
// tuple = tupleBatch.get(tupleNumber);
// } catch (IndexOutOfBoundsException e) {
// throw (new NoSuchTupleException());
// }
// try {
// return tuple.getFieldFast(fieldNumber);
// } catch (IndexOutOfBoundsException e) {
// throw (new NoSuchFieldException());
// }
// }
/**
* Get a Boolean from the given field of the first Tuple of the batch
......@@ -901,6 +912,35 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw new TupleSizeMismatchException();
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the shadow copy of Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
*/
public void addShadowTuple(Tuple tuple) throws TupleSizeMismatchException {
addShadowTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the shadow copy of Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
*/
public void addShadowTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
public StreamRecord copySerialized() {
......@@ -959,6 +999,17 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return newTuple;
}
/**
* copy tuples from the given record and append them to the end.
*
* @param record
*/
public void appendRecord(StreamRecord record){
for(int i=0; i<record.getNumOfTuples(); ++i){
this.addTuple(record.getTuple(i));
}
}
/**
* Converts tuple field types to a byte array
*
......
......@@ -18,16 +18,18 @@ package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.state.WindowInternalState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private WindowInternalState<Integer> window;
private MutableInternalState<String, Integer> wordCounts;
private WindowState<Integer> window;
private MutableTableState<String, Integer> wordCounts;
private String word = "";
private Integer count = 0;
......@@ -38,8 +40,10 @@ public class WindowWordCountCounter extends UserTaskInvokable {
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
window = new WindowInternalState<Integer>(windowSize, slidingStep);
wordCounts = new MutableInternalState<String, Integer>();
computeGranularity = 10;
windowFieldId = 2;
window = new WindowState<Integer>(windowSize, slidingStep, computeGranularity, windowFieldId);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
......
......@@ -18,12 +18,12 @@ package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountKvCounter extends UserTaskInvokable {
private MutableInternalState<String, Integer> wordCounts = new MutableInternalState<String, Integer>();
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
......
package eu.stratosphere.streaming.state;
public class LogTableState {
}
package eu.stratosphere.streaming.state;
public class LogTableStateIterator {
}
......@@ -21,7 +21,7 @@ import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableInternalState<K, V> implements InternalState<K, V> {
public class MutableTableState<K, V> implements TableState<K, V> {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
......@@ -49,9 +49,21 @@ public class MutableInternalState<K, V> implements InternalState<K, V> {
}
@Override
public StateIterator<K, V> getIterator() {
public TableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableStateIterator<K, V>(state.entrySet().iterator());
return new MutableTableStateIterator<K, V>(state.entrySet().iterator());
}
@Override
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public void deserialize(String str) {
// TODO Auto-generated method stub
}
}
......@@ -20,10 +20,10 @@ import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
public class MutableStateIterator<K, V> implements StateIterator<K, V>{
public class MutableTableStateIterator<K, V> implements TableStateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public MutableStateIterator(Iterator<Entry<K, V>> iter){
public MutableTableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
......
......@@ -18,10 +18,12 @@ package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface InternalState<K, V> {
public interface TableState<K, V> {
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public StateIterator<K, V> getIterator();
public String serialize();
public void deserialize(String str);
public TableStateIterator<K, V> getIterator();
}
......@@ -20,7 +20,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
/**
* the iterator for internal states.
*/
public interface StateIterator<K, V>{
public interface TableStateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
}
......@@ -25,26 +25,55 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowInternalState<K> implements InternalState<K, StreamRecord> {
public class WindowState<K> {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private int initTimestamp;
private int nextTimestamp;
private int currentRecordNum;
private int fullRecordNum;
private int slideRecordNum;
CircularFifoBuffer buffer;
public WindowInternalState(int windowSize, int slidingStep) {
currentRecordNum = 0;
fullRecordNum = windowSize;
slideRecordNum = slidingStep;
buffer = new CircularFifoBuffer(windowSize);
StreamRecord tempRecord;
public WindowState(int windowSize, int slidingStep, int computeGranularity, int windowFieldId) {
this.windowSize = windowSize;
this.slidingStep = slidingStep;
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
this.initTimestamp = -1;
this.nextTimestamp = -1;
this.currentRecordNum = 0;
//here we assume that windowSize and slidingStep is divisible by computeGranularity.
this.fullRecordNum = windowSize / computeGranularity;
this.slideRecordNum = slidingStep / computeGranularity;
this.buffer = new CircularFifoBuffer(fullRecordNum);
}
public void pushBack(StreamRecord records) {
buffer.add(records);
currentRecordNum += 1;
public void pushBack(StreamRecord record) {
if (initTimestamp == -1){
initTimestamp = record.getTuple(0).getField(windowFieldId);
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
for (int i = 0; i < record.getNumOfTuples(); ++i) {
while((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp){
buffer.add(tempRecord);
currentRecordNum += 1;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addShadowTuple(record.getTuple(i));
}
}
public StreamRecord popFront() {
StreamRecord frontRecord=(StreamRecord) buffer.get();
StreamRecord frontRecord = (StreamRecord) buffer.get();
buffer.remove();
return frontRecord;
}
......@@ -61,34 +90,4 @@ public class WindowInternalState<K> implements InternalState<K, StreamRecord> {
return false;
}
@Override
public void put(K key, StreamRecord value) {
// TODO Auto-generated method stub
}
@Override
public StreamRecord get(K key) {
// TODO Auto-generated method stub
return null;
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return false;
}
@Override
public StateIterator<K, StreamRecord> getIterator() {
// TODO Auto-generated method stub
return null;
}
}
......@@ -18,15 +18,13 @@ package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowStateIterator<K> implements StateIterator<K, StreamRecord>{
public class WindowStateIterator<K>{
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public Tuple2<K, StreamRecord> next() {
// TODO Auto-generated method stub
return null;
......
......@@ -241,7 +241,7 @@ public class StreamRecordTest {
t = System.nanoTime();
for (int i = 0; i < ITERATION; i++) {
record.getFieldFast(0, i % 4);
record.getField(0, i % 4);
}
t2 = System.nanoTime() - t;
System.out.println("getFieldFast:\t" + t2 + " ns");
......@@ -262,7 +262,7 @@ public class StreamRecordTest {
t = System.nanoTime();
for (int i = 0; i < ITERATION; i++) {
record20.getFieldFast(0, i % 20);
record20.getField(0, i % 20);
}
t2 = System.nanoTime() - t;
System.out.println("getFieldFast:\t" + t2 + " ns");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册