提交 98967ea7 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] user-defined window operator

上级 6f8f384b
......@@ -15,29 +15,35 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.state.WindowInternalState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private int windowSize;
private int slidingStep;
private WindowInternalState<Integer> window;
private MutableInternalState<String, Integer> wordCounts;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, Integer, Long>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
timestamp = record.getLong(1);
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
window = new WindowInternalState<Integer>(windowSize, slidingStep);
wordCounts = new MutableInternalState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -45,12 +51,41 @@ public class WindowWordCountCounter extends UserTaskInvokable {
count = 1;
wordCounts.put(word, 1);
}
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
private void decrementCompute(StreamRecord record) {
word = record.getString(0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
}
}
emit(outRecord);
@Override
public void invoke(StreamRecord record) throws Exception {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if(window.isFull()){
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
emit(outRecord);
}
}
}
}
......@@ -13,20 +13,82 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.wordcount;
package eu.stratosphere.streaming.state;
public class IncrementalWindow {
import org.apache.commons.collections.buffer.CircularFifoBuffer;
private int currentTupleNum;
private int fullTupleNum;
private int slideTupleNum;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public IncrementalWindow(int batchRange, int windowSize, int slidingStep) {
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowInternalState<K> implements InternalState<K, StreamRecord> {
private int currentRecordNum;
private int fullRecordNum;
private int slideRecordNum;
CircularFifoBuffer buffer;
public WindowInternalState(int windowSize, int slidingStep) {
currentRecordNum = 0;
fullRecordNum = windowSize;
slideRecordNum = slidingStep;
buffer = new CircularFifoBuffer(windowSize);
}
public void pushBack(StreamRecord records) {
buffer.add(records);
currentRecordNum += 1;
}
public StreamRecord popFront() {
StreamRecord frontRecord=(StreamRecord) buffer.get();
buffer.remove();
return frontRecord;
}
public boolean isFull() {
return currentRecordNum >= fullRecordNum;
}
public boolean isComputable() {
if (currentRecordNum == fullRecordNum + slideRecordNum) {
currentRecordNum -= slideRecordNum;
return true;
}
return false;
}
void pushBack() {
@Override
public void put(K key, StreamRecord value) {
// TODO Auto-generated method stub
}
void popFront() {
@Override
public StreamRecord get(K key) {
// TODO Auto-generated method stub
return null;
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return false;
}
@Override
public StateIterator<K, StreamRecord> getIterator() {
// TODO Auto-generated method stub
return null;
}
}
package eu.stratosphere.streaming.state;
public class WindowStateIterator {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册