提交 6492af04 编写于 作者: G Gyula Fora 提交者: mbalassi

[streaming] Reduce and GroupReduce invokable refactor and performance tweak

上级 618effad
...@@ -17,30 +17,27 @@ ...@@ -17,30 +17,27 @@
package org.apache.flink.streaming.api.invokable.operator; package org.apache.flink.streaming.api.invokable.operator;
import java.io.IOException; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.math.util.MathUtils; import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.SlidingWindowState; import org.apache.flink.streaming.state.CircularFifoList;
public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer; protected GroupReduceFunction<IN, OUT> reducer;
protected BatchIterator<IN> userIterator;
protected Iterable<IN> userIterable;
protected long slideSize; protected long slideSize;
protected long granularity;
protected int listSize;
protected transient SlidingWindowState<IN> state;
protected long batchSize; protected long batchSize;
protected int counter = 0; protected int granularity;
protected int batchPerSlide;
protected StreamBatch batch;
protected StreamBatch currentBatch;
protected long numberOfBatches;
public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize, public BatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
long slideSize) { long slideSize) {
...@@ -48,101 +45,117 @@ public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> ...@@ -48,101 +45,117 @@ public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
this.reducer = reduceFunction; this.reducer = reduceFunction;
this.batchSize = batchSize; this.batchSize = batchSize;
this.slideSize = slideSize; this.slideSize = slideSize;
this.granularity = MathUtils.gcd(batchSize, slideSize); this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
this.listSize = (int) granularity; this.batchPerSlide = (int) (slideSize / granularity);
} this.numberOfBatches = batchSize / granularity;
this.batch = new StreamBatch();
@Override
protected void mutableInvoke() throws Exception {
throw new RuntimeException("Reducing mutable sliding batch is not supported.");
} }
@Override @Override
protected void immutableInvoke() throws Exception { protected void immutableInvoke() throws Exception {
if (getNextRecord() == null) { if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty"); throw new RuntimeException("DataStream must not be empty");
} }
initializeAtFirstRecord(); while (reuse != null) {
StreamBatch batch = getBatch(reuse);
batch.addToBuffer(reuse.getObject());
while (reuse != null && !isStateFull()) { resetReuse();
collectOneUnit(); reuse = recordIterator.next(reuse);
} }
reduce();
while (reuse != null) { reduceLastBatch();
for (int i = 0; i < slideSize / granularity; i++) {
if (reuse != null) {
collectOneUnit();
}
}
reduce();
}
} }
protected boolean isStateFull() { @Override
return state.isFull(); // TODO: implement mutableInvoke for reduce
protected void mutableInvoke() throws Exception {
System.out.println("Immutable setting is used");
immutableInvoke();
} }
protected void initializeAtFirstRecord() { protected StreamBatch getBatch(StreamRecord<IN> next) {
counter = 0; return batch;
} }
protected void collectOneUnit() throws Exception { protected void reduce(StreamBatch batch) {
ArrayList<StreamRecord<IN>> list; this.currentBatch = batch;
callUserFunctionAndLogException();
if (!batchNotFull()) { }
list = new ArrayList<StreamRecord<IN>>();
} else {
list = new ArrayList<StreamRecord<IN>>(listSize);
do { protected void reduceLastBatch() {
list.add(reuse); batch.reduceLastBatch();
resetReuse();
} while (getNextRecord() != null && batchNotFull());
}
state.pushBack(list);
} }
protected StreamRecord<IN> getNextRecord() throws IOException { @Override
reuse = recordIterator.next(reuse); protected void callUserFunction() throws Exception {
if (reuse != null) { if(!currentBatch.circularList.isEmpty()){
counter++; reducer.reduce(currentBatch.circularList.getIterable(), collector);
} }
return reuse;
} }
protected boolean batchNotFull() { protected class StreamBatch implements Serializable {
if (counter < granularity) {
return true; private static final long serialVersionUID = 1L;
} else { private long counter;
counter = 0; protected long minibatchCounter;
return false;
protected CircularFifoList<IN> circularList;
public StreamBatch() {
this.circularList = new CircularFifoList<IN>();
this.counter = 0;
this.minibatchCounter = 0;
} }
}
protected void reduce() { public void addToBuffer(IN nextValue) throws Exception {
userIterator = state.getIterator(); circularList.add(nextValue);
callUserFunctionAndLogException();
}
@Override counter++;
protected void callUserFunction() throws Exception {
reducer.reduce(userIterable, collector);
}
@Override if (miniBatchEnd()) {
public void open(Configuration parameters) throws Exception { circularList.newSlide();
super.open(parameters); minibatchCounter++;
this.state = new SlidingWindowState<IN>(batchSize, slideSize, granularity); if (batchEnd()) {
userIterable = new BatchIterable(); reduceBatch();
} circularList.shiftWindow(batchPerSlide);
}
}
protected class BatchIterable implements Iterable<IN> { }
protected boolean miniBatchEnd() {
if( (counter % granularity) == 0){
counter = 0;
return true;
}else{
return false;
}
}
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
public void reduceBatch() {
reduce(this);
}
public void reduceLastBatch() {
if (!miniBatchEnd()) {
reduceBatch();
}
}
@Override @Override
public Iterator<IN> iterator() { public String toString(){
return userIterator; return circularList.toString();
} }
} }
......
...@@ -22,8 +22,6 @@ import java.util.Iterator; ...@@ -22,8 +22,6 @@ import java.util.Iterator;
import org.apache.commons.math.util.MathUtils; import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer; import org.apache.flink.streaming.state.NullableCircularBuffer;
...@@ -32,7 +30,6 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -32,7 +30,6 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected ReduceFunction<OUT> reducer; protected ReduceFunction<OUT> reducer;
protected TypeSerializer<OUT> typeSerializer;
protected long slideSize; protected long slideSize;
...@@ -58,13 +55,12 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -58,13 +55,12 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
protected void immutableInvoke() throws Exception { protected void immutableInvoke() throws Exception {
if ((reuse = recordIterator.next(reuse)) == null) { if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty"); throw new RuntimeException("DataStream must not be empty");
} }
while (reuse != null) { while (reuse != null) {
StreamBatch batch = getBatch(reuse); StreamBatch batch = getBatch(reuse);
batch.reduceToBuffer(reuse); batch.reduceToBuffer(reuse.getObject());
resetReuse(); resetReuse();
reuse = recordIterator.next(reuse); reuse = recordIterator.next(reuse);
...@@ -74,26 +70,25 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -74,26 +70,25 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
} }
protected void reduceLastBatch() throws Exception {
batch.reduceLastBatch();
}
protected StreamBatch getBatch(StreamRecord<OUT> next) {
return batch;
}
@Override @Override
// TODO: implement mutableInvoke for reduce
protected void mutableInvoke() throws Exception { protected void mutableInvoke() throws Exception {
System.out.println("Immutable setting is used"); System.out.println("Immutable setting is used");
immutableInvoke(); immutableInvoke();
} }
protected StreamBatch getBatch(StreamRecord<OUT> next) {
return batch;
}
protected void reduce(StreamBatch batch) { protected void reduce(StreamBatch batch) {
this.currentBatch = batch; this.currentBatch = batch;
callUserFunctionAndLogException(); callUserFunctionAndLogException();
} }
protected void reduceLastBatch() throws Exception {
batch.reduceLastBatch();
}
@Override @Override
protected void callUserFunction() throws Exception { protected void callUserFunction() throws Exception {
Iterator<OUT> reducedIterator = currentBatch.getIterator(); Iterator<OUT> reducedIterator = currentBatch.getIterator();
...@@ -114,12 +109,6 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -114,12 +109,6 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
} }
} }
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.typeSerializer = inSerializer.getObjectSerializer();
}
protected class StreamBatch implements Serializable { protected class StreamBatch implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -137,8 +126,8 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -137,8 +126,8 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
} }
public void reduceToBuffer(StreamRecord<OUT> next) throws Exception { public void reduceToBuffer(OUT nextValue) throws Exception {
OUT nextValue = next.getObject();
if (currentValue != null) { if (currentValue != null) {
currentValue = reducer.reduce(currentValue, nextValue); currentValue = reducer.reduce(currentValue, nextValue);
} else { } else {
...@@ -163,12 +152,17 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -163,12 +152,17 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
} }
protected boolean miniBatchEnd() { protected boolean miniBatchEnd() {
return (counter % granularity) == 0; if( (counter % granularity) == 0){
counter = 0;
return true;
}else{
return false;
}
} }
public boolean batchEnd() { public boolean batchEnd() {
if (counter == batchSize) { if (minibatchCounter == numberOfBatches) {
counter -= slideSize;
minibatchCounter -= batchPerSlide; minibatchCounter -= batchPerSlide;
return true; return true;
} }
...@@ -203,6 +197,11 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> { ...@@ -203,6 +197,11 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
public Iterator<OUT> getIterator() { public Iterator<OUT> getIterator() {
return circularBuffer.iterator(); return circularBuffer.iterator();
} }
@Override
public String toString(){
return circularBuffer.toString();
}
} }
......
...@@ -17,54 +17,44 @@ ...@@ -17,54 +17,44 @@
package org.apache.flink.streaming.api.invokable.operator; package org.apache.flink.streaming.api.invokable.operator;
import java.util.ArrayList; import java.util.HashMap;
import java.util.Iterator; import java.util.Map;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.MutableTableState;
public class GroupedBatchGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> { public class GroupedBatchGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
int keyPosition; int keyPosition;
private Iterator<StreamRecord<IN>> iterator; Map<Object, StreamBatch> streamBatches;
private MutableTableState<Object, List<IN>> values;
public GroupedBatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize, public GroupedBatchGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long batchSize,
long slideSize, int keyPosition) { long slideSize, int keyPosition) {
super(reduceFunction, batchSize, slideSize); super(reduceFunction, batchSize, slideSize);
this.keyPosition = keyPosition; this.keyPosition = keyPosition;
this.reducer = reduceFunction; this.streamBatches = new HashMap<Object, StreamBatch>();
values = new MutableTableState<Object, List<IN>>();
} }
private IN nextValue;
@Override @Override
protected void reduce() { protected StreamBatch getBatch(StreamRecord<IN> next) {
iterator = state.getStreamRecordIterator(); Object key = next.getField(keyPosition);
while (iterator.hasNext()) { StreamBatch batch = streamBatches.get(key);
StreamRecord<IN> nextRecord = iterator.next(); if(batch == null){
Object key = nextRecord.getField(keyPosition); batch=new StreamBatch();
nextValue = nextRecord.getObject(); streamBatches.put(key, batch);
List<IN> group = values.get(key);
if (group != null) {
group.add(nextValue);
} else {
group = new ArrayList<IN>();
group.add(nextValue);
values.put(key, group);
}
}
for (List<IN> group : values.values()) {
userIterable = group;
callUserFunctionAndLogException();
} }
values.clear(); return batch;
} }
@Override
protected void reduceLastBatch() {
for(StreamBatch batch: streamBatches.values()){
batch.reduceLastBatch();
}
}
} }
...@@ -36,13 +36,6 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT> ...@@ -36,13 +36,6 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
this.streamBatches = new HashMap<Object, StreamBatch>(); this.streamBatches = new HashMap<Object, StreamBatch>();
} }
@Override
protected void reduceLastBatch() throws Exception {
for(StreamBatch batch: streamBatches.values()){
batch.reduceLastBatch();
}
}
@Override @Override
protected StreamBatch getBatch(StreamRecord<OUT> next) { protected StreamBatch getBatch(StreamRecord<OUT> next) {
Object key = next.getField(keyPosition); Object key = next.getField(keyPosition);
...@@ -54,4 +47,11 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT> ...@@ -54,4 +47,11 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
return batch; return batch;
} }
@Override
protected void reduceLastBatch() throws Exception {
for(StreamBatch batch: streamBatches.values()){
batch.reduceLastBatch();
}
}
} }
...@@ -17,54 +17,102 @@ ...@@ -17,54 +17,102 @@
package org.apache.flink.streaming.api.invokable.operator; package org.apache.flink.streaming.api.invokable.operator;
import java.util.ArrayList; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.MutableTableState;
public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduceInvokable<IN, OUT> { public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
int keyPosition; int keyPosition;
private Iterator<StreamRecord<IN>> iterator; Map<Object, StreamWindow> streamWindows;
private MutableTableState<Object, List<IN>> values; List<Object> cleanList;
long currentMiniBatchCount = 0;
public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize, public GroupedWindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
long slideInterval, int keyPosition, TimeStamp<IN> timestamp) { long slideInterval, int keyPosition, TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval, timestamp); super(reduceFunction, windowSize, slideInterval, timestamp);
this.keyPosition = keyPosition; this.keyPosition = keyPosition;
this.reducer = reduceFunction; this.reducer = reduceFunction;
values = new MutableTableState<Object, List<IN>>(); this.streamWindows = new HashMap<Object, StreamWindow>();
} }
private IN nextValue; @Override
protected StreamBatch getBatch(StreamRecord<IN> next) {
Object key = next.getField(keyPosition);
StreamWindow window = streamWindows.get(key);
if (window == null) {
window = new GroupedStreamWindow();
window.minibatchCounter = currentMiniBatchCount;
streamWindows.put(key, window);
}
this.window = window;
return window;
}
@Override @Override
protected void reduce() { protected void reduceLastBatch() {
iterator = state.getStreamRecordIterator(); for (StreamBatch window : streamWindows.values()) {
while (iterator.hasNext()) { window.reduceLastBatch();
StreamRecord<IN> nextRecord = iterator.next(); }
Object key = nextRecord.getField(keyPosition); }
nextValue = nextRecord.getObject();
private void shiftGranularityAllWindows(){
List<IN> group = values.get(key); for (StreamBatch window : streamWindows.values()) {
if (group != null) { window.circularList.newSlide();
group.add(nextValue); window.minibatchCounter+=1;
} else { }
group = new ArrayList<IN>(); }
group.add(nextValue);
values.put(key, group); private void slideAllWindows(){
for (StreamBatch window : streamWindows.values()) {
window.circularList.shiftWindow(batchPerSlide);
}
}
private void reduceAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.minibatchCounter -= batchPerSlide;
window.reduceBatch();
}
}
protected class GroupedStreamWindow extends StreamWindow {
private static final long serialVersionUID = 1L;
public GroupedStreamWindow() {
super();
}
@Override
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
shiftGranularityAllWindows();
if (batchEnd()) {
reduceAllWindows();
slideAllWindows();
}
} }
currentMiniBatchCount = this.minibatchCounter;
} }
for (List<IN> group : values.values()) {
userIterable = group; @Override
callUserFunctionAndLogException(); public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
return true;
}
return false;
} }
values.clear();
} }
private static final long serialVersionUID = 1L;
} }
...@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.invokable.operator; ...@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp;
...@@ -30,85 +29,101 @@ public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT ...@@ -30,85 +29,101 @@ public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private int keyPosition; private int keyPosition;
private Map<Object, StreamWindow> streamWindows;
private long currentMiniBatchCount = 0;
public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize, public GroupedWindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
long slideInterval, int keyPosition, TimeStamp<OUT> timestamp) { long slideInterval, int keyPosition, TimeStamp<OUT> timestamp) {
super(reduceFunction, windowSize, slideInterval, timestamp); super(reduceFunction, windowSize, slideInterval, timestamp);
this.keyPosition = keyPosition; this.keyPosition = keyPosition;
this.window = new GroupedStreamWindow(); this.streamWindows = new HashMap<Object, StreamWindow>();
this.batch = this.window; }
@Override
protected StreamBatch getBatch(StreamRecord<OUT> next) {
Object key = next.getField(keyPosition);
StreamWindow window = streamWindows.get(key);
if (window == null) {
window = new GroupedStreamWindow();
window.minibatchCounter = currentMiniBatchCount;
streamWindows.put(key, window);
}
this.window = window;
return window;
}
private void addToAllBuffers() {
for (StreamBatch window : streamWindows.values()) {
window.addToBuffer();
}
}
private void reduceAllWindows() {
for (StreamBatch window : streamWindows.values()) {
window.minibatchCounter -= batchPerSlide;
window.reduceBatch();
}
}
@Override
protected void reduceLastBatch() throws Exception {
for (StreamBatch window : streamWindows.values()) {
window.reduceLastBatch();
}
} }
@Override @Override
protected void callUserFunction() throws Exception { protected void callUserFunction() throws Exception {
@SuppressWarnings("unchecked") Iterator<OUT> reducedIterator = currentBatch.getIterator();
Iterator<Map<Object, OUT>> reducedIterator = (Iterator<Map<Object, OUT>>) batch.getIterator(); OUT reduced = null;
Map<Object, OUT> reducedValues = reducedIterator.next();
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
while (reducedIterator.hasNext()) { while (reducedIterator.hasNext()) {
Map<Object, OUT> nextValues = reducedIterator.next(); OUT next = reducedIterator.next();
for (Entry<Object, OUT> entry : nextValues.entrySet()) { if (next != null) {
OUT currentValue = reducedValues.get(entry.getKey()); reduced = reducer.reduce(reduced, next);
if (currentValue == null) {
reducedValues.put(entry.getKey(), entry.getValue());
} else {
reducedValues.put(entry.getKey(), reducer.reduce(currentValue, entry.getValue()));
}
} }
} }
for (OUT value : reducedValues.values()) { if (reduced != null) {
collector.collect(value); collector.collect(reduced);
}else{
//remove window if no value received
streamWindows.remove(currentBatch);
} }
} }
protected class GroupedStreamWindow extends StreamWindow { protected class GroupedStreamWindow extends StreamWindow {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private Map<Object, OUT> currentValues;
public GroupedStreamWindow() { public GroupedStreamWindow() {
super(); super();
this.currentValues = new HashMap<Object, OUT>();
} }
@Override @Override
public void reduceToBuffer(StreamRecord<OUT> next) throws Exception { protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
OUT nextValue = next.getObject(); while (miniBatchEnd()) {
Object key = next.getField(keyPosition); addToAllBuffers();
checkBatchEnd(timestamp.getTimestamp(nextValue)); if (batchEnd()) {
reduceAllWindows();
OUT currentValue = currentValues.get(key); }
if (currentValue != null) {
currentValues.put(key, reducer.reduce(currentValue, nextValue));
}else{
currentValues.put(key, nextValue);
} }
currentMiniBatchCount = this.minibatchCounter;
} }
@Override
public boolean miniBatchInProgress() {
return !currentValues.isEmpty();
};
@SuppressWarnings("unchecked")
@Override @Override
protected void addToBuffer() { public boolean batchEnd() {
Map<Object, OUT> reuseMap; if (minibatchCounter == numberOfBatches) {
return true;
if (circularBuffer.isFull()) {
reuseMap = (Map<Object, OUT>) circularBuffer.remove();
reuseMap.clear();
} else {
reuseMap = new HashMap<Object, OUT>(currentValues.size());
} }
return false;
circularBuffer.add(currentValues);
minibatchCounter++;
currentValues = reuseMap;
} }
} }
......
...@@ -17,46 +17,92 @@ ...@@ -17,46 +17,92 @@
package org.apache.flink.streaming.api.invokable.operator; package org.apache.flink.streaming.api.invokable.operator;
import java.io.IOException;
import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> { public class WindowGroupReduceInvokable<IN, OUT> extends BatchGroupReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private long startTime; private long startTime;
private long nextRecordTime; protected long nextRecordTime;
private TimeStamp<IN> timestamp; protected TimeStamp<IN> timestamp;
protected StreamWindow window;
public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize, public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize,
long slideInterval, TimeStamp<IN> timestamp) { long slideInterval, TimeStamp<IN> timestamp) {
super(reduceFunction, windowSize, slideInterval); super(reduceFunction, windowSize, slideInterval);
this.timestamp = timestamp; this.timestamp = timestamp;
this.startTime = timestamp.getStartTime(); this.startTime = timestamp.getStartTime();
this.window = new StreamWindow();
this.batch = this.window;
} }
@Override @Override
protected StreamRecord<IN> getNextRecord() throws IOException { public void open(Configuration config) throws Exception {
reuse = recordIterator.next(reuse); super.open(config);
if (reuse != null) { if (timestamp instanceof DefaultTimeStamp) {
nextRecordTime = timestamp.getTimestamp(reuse.getObject()); (new TimeCheck()).start();
} }
return reuse;
} }
@Override protected class StreamWindow extends StreamBatch {
protected boolean batchNotFull() {
if (nextRecordTime < startTime + granularity) { private static final long serialVersionUID = 1L;
return true;
} else { public StreamWindow() {
startTime += granularity; super();
return false; }
@Override
public void addToBuffer(IN nextValue) throws Exception {
checkWindowEnd(timestamp.getTimestamp(nextValue));
if (minibatchCounter >= 0) {
circularList.add(nextValue);
}
} }
protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp;
while (miniBatchEnd()) {
circularList.newSlide();
minibatchCounter++;
if (batchEnd()) {
reduceBatch();
circularList.shiftWindow(batchPerSlide);
}
}
}
@Override
protected boolean miniBatchEnd() {
if (nextRecordTime < startTime + granularity) {
return false;
} else {
startTime += granularity;
return true;
}
}
} }
protected void mutableInvoke() throws Exception { private class TimeCheck extends Thread {
throw new RuntimeException("Reducing mutable sliding window is not supported."); @Override
public void run() {
while (true) {
try {
Thread.sleep(slideSize);
} catch (InterruptedException e) {
}
if (isRunning) {
window.checkWindowEnd(System.currentTimeMillis());
} else {
break;
}
}
}
} }
} }
...@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.ReduceFunction; ...@@ -21,7 +21,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> { public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -39,6 +38,14 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> { ...@@ -39,6 +38,14 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
this.batch = this.window; this.batch = this.window;
} }
@Override
public void open(Configuration config) throws Exception {
super.open(config);
if (timestamp instanceof DefaultTimeStamp) {
(new TimeCheck()).start();
}
}
protected class StreamWindow extends StreamBatch { protected class StreamWindow extends StreamBatch {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -49,11 +56,10 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> { ...@@ -49,11 +56,10 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
} }
@Override @Override
public void reduceToBuffer(StreamRecord<OUT> next) throws Exception { public void reduceToBuffer(OUT nextValue) throws Exception {
OUT nextValue = next.getObject();
checkWindowEnd(timestamp.getTimestamp(nextValue));
checkBatchEnd(timestamp.getTimestamp(nextValue));
if (currentValue != null) { if (currentValue != null) {
currentValue = reducer.reduce(currentValue, nextValue); currentValue = reducer.reduce(currentValue, nextValue);
} else { } else {
...@@ -61,7 +67,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> { ...@@ -61,7 +67,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
} }
} }
protected synchronized void checkBatchEnd(long timeStamp) { protected synchronized void checkWindowEnd(long timeStamp) {
nextRecordTime = timeStamp; nextRecordTime = timeStamp;
while (miniBatchEnd()) { while (miniBatchEnd()) {
...@@ -93,14 +99,6 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> { ...@@ -93,14 +99,6 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
} }
@Override
public void open(Configuration config) throws Exception {
super.open(config);
if (timestamp instanceof DefaultTimeStamp) {
(new TimeCheck()).start();
}
}
private class TimeCheck extends Thread { private class TimeCheck extends Thread {
@Override @Override
public void run() { public void run() {
...@@ -110,7 +108,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> { ...@@ -110,7 +108,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
if (isRunning) { if (isRunning) {
window.checkBatchEnd(System.currentTimeMillis()); window.checkWindowEnd(System.currentTimeMillis());
} else { } else {
break; break;
} }
......
...@@ -33,11 +33,13 @@ public class CircularFifoList<T> implements Serializable { ...@@ -33,11 +33,13 @@ public class CircularFifoList<T> implements Serializable {
private Queue<T> queue; private Queue<T> queue;
private Queue<Long> slideSizes; private Queue<Long> slideSizes;
private long counter; private long counter;
private Iterable<T> iterable;
public CircularFifoList() { public CircularFifoList() {
this.queue = new LinkedList<T>(); this.queue = new LinkedList<T>();
this.slideSizes = new LinkedList<Long>(); this.slideSizes = new LinkedList<Long>();
this.counter = 0; this.counter = 0;
this.iterable = new ListIterable();
} }
public void add(T element) { public void add(T element) {
...@@ -51,18 +53,52 @@ public class CircularFifoList<T> implements Serializable { ...@@ -51,18 +53,52 @@ public class CircularFifoList<T> implements Serializable {
} }
public void shiftWindow() { public void shiftWindow() {
Long firstSlideSize = slideSizes.remove(); shiftWindow(1);
for (int i = 0; i < firstSlideSize; i++) { }
queue.remove();
public void shiftWindow(int numberOfSlides) {
if (numberOfSlides <= slideSizes.size()) {
for (int i = 0; i < numberOfSlides; i++) {
Long firstSlideSize = slideSizes.remove();
for (int j = 0; j < firstSlideSize; j++) {
queue.remove();
}
}
} else {
slideSizes.clear();
queue.clear();
counter = 0;
} }
} }
public Iterator<T> getIterator() { public Iterator<T> getIterator() {
return queue.iterator(); return queue.iterator();
} }
public Iterable<T> getIterable() {
return iterable;
}
private class ListIterable implements Iterable<T> {
@Override
public Iterator<T> iterator() {
return getIterator();
}
}
public boolean isEmpty() {
return queue.isEmpty();
}
@Override @Override
public String toString() { public String toString() {
return queue.toString(); return queue.toString();
} }
} }
...@@ -72,10 +72,10 @@ public class GroupedBatchGroupReduceTest { ...@@ -72,10 +72,10 @@ public class GroupedBatchGroupReduceTest {
@Test @Test
public void slidingBatchGroupReduceTest() { public void slidingBatchGroupReduceTest() {
GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>( GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>(
new MySlidingBatchReduce1(), 3, 2, 0); new MySlidingBatchReduce1(), 2, 2, 0);
List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "2", END_OF_GROUP, "2", List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "3", "3", END_OF_GROUP, "2",
END_OF_GROUP, "3", "3", END_OF_GROUP); END_OF_GROUP);
List<String> actual = MockInvokable.createAndExecute(invokable1, List<String> actual = MockInvokable.createAndExecute(invokable1,
Arrays.asList(1, 1, 2, 3, 3)); Arrays.asList(1, 1, 2, 3, 3));
...@@ -84,11 +84,12 @@ public class GroupedBatchGroupReduceTest { ...@@ -84,11 +84,12 @@ public class GroupedBatchGroupReduceTest {
GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>( GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
new MySlidingBatchReduce2(), 2, 2, 1); new MySlidingBatchReduce2(), 2, 2, 1);
expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", END_OF_GROUP, "open", expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", "3", END_OF_GROUP,
"4", END_OF_GROUP); "open", "4", END_OF_GROUP);
actual = MockInvokable.createAndExecute(invokable2, Arrays.asList( actual = MockInvokable.createAndExecute(invokable2, Arrays.asList(
new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "a"), new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "a"),
new Tuple2<Integer, String>(3, "b"), new Tuple2<Integer, String>(4, "a"))); new Tuple2<Integer, String>(3, "b"), new Tuple2<Integer, String>(3, "b"),
new Tuple2<Integer, String>(4, "a")));
assertEquals(expected, actual); assertEquals(expected, actual);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockInvokable;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class GroupedWindowGroupReduceInvokableTest {
@Test
public void windowReduceTest() {
List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
inputs2.add(new Tuple2<String, Integer>("a", 1));
inputs2.add(new Tuple2<String, Integer>("a", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 5));
inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10));
//1,2-4,5-7,8-10
List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
expected2.add(new Tuple2<String, Integer>("a", 3));
expected2.add(new Tuple2<String, Integer>("b", 4));
expected2.add(new Tuple2<String, Integer>("b", 5));
expected2.add(new Tuple2<String, Integer>("a", 7));
expected2.add(new Tuple2<String, Integer>("b", 10));
GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>> invokable2 = new GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>>(
new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void reduce(Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>("", 0);
for (@SuppressWarnings("unused") Tuple2<String, Integer> value : values) {
}
for (Tuple2<String, Integer> value : values) {
outTuple.f0 = value.f0;
outTuple.f1 += value.f1;
}
out.collect(outTuple);
}
}, 2, 3, 0, new TimeStamp<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Tuple2<String, Integer> value) {
return value.f1;
}
@Override
public long getStartTime() {
return 1;
}
});
List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
new HashSet<Tuple2<String, Integer>>(actual2));
assertEquals(expected2.size(), actual2.size());
}
}
...@@ -80,15 +80,15 @@ public class WindowGroupReduceInvokableTest { ...@@ -80,15 +80,15 @@ public class WindowGroupReduceInvokableTest {
List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L, List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L,
110L); 110L);
expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7", expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7",
EOW, "7", "8", "9", EOW, "8", "9", "10", EOW)); EOW, "7", "8", "9", EOW, "9", "10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(), invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps))); windowSize, slideSize, new MyTimestamp(timestamps)));
windowSize = 10; windowSize = 10;
slideSize = 5; slideSize = 5;
timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L); timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, "3", "4", "5", "6", EOW, "3", expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", "6", EOW, "3",
"4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", EOW, "7", "4", "5", "6", EOW, "7", EOW, "7",
"8", "9", EOW, "8", "9", "10", EOW)); "8", "9", EOW, "8", "9", "10", EOW));
invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(), invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps))); windowSize, slideSize, new MyTimestamp(timestamps)));
......
...@@ -44,9 +44,9 @@ public class WindowReduceInvokableTest { ...@@ -44,9 +44,9 @@ public class WindowReduceInvokableTest {
inputs.add(10); inputs.add(10);
inputs.add(11); inputs.add(11);
inputs.add(11); inputs.add(11);
//1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11 // 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
//12-12-5-10-32 // 12-12-5-10-32
List<Integer> expected = new ArrayList<Integer>(); List<Integer> expected = new ArrayList<Integer>();
expected.add(12); expected.add(12);
expected.add(12); expected.add(12);
...@@ -54,7 +54,6 @@ public class WindowReduceInvokableTest { ...@@ -54,7 +54,6 @@ public class WindowReduceInvokableTest {
expected.add(10); expected.add(10);
expected.add(32); expected.add(32);
WindowReduceInvokable<Integer> invokable = new WindowReduceInvokable<Integer>( WindowReduceInvokable<Integer> invokable = new WindowReduceInvokable<Integer>(
new ReduceFunction<Integer>() { new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -77,7 +76,6 @@ public class WindowReduceInvokableTest { ...@@ -77,7 +76,6 @@ public class WindowReduceInvokableTest {
} }
}); });
assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs)); assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>(); List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
...@@ -89,7 +87,7 @@ public class WindowReduceInvokableTest { ...@@ -89,7 +87,7 @@ public class WindowReduceInvokableTest {
inputs2.add(new Tuple2<String, Integer>("a", 7)); inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9)); inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10)); inputs2.add(new Tuple2<String, Integer>("b", 10));
List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>(); List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
expected2.add(new Tuple2<String, Integer>("a", 3)); expected2.add(new Tuple2<String, Integer>("a", 3));
expected2.add(new Tuple2<String, Integer>("b", 4)); expected2.add(new Tuple2<String, Integer>("b", 4));
...@@ -97,7 +95,6 @@ public class WindowReduceInvokableTest { ...@@ -97,7 +95,6 @@ public class WindowReduceInvokableTest {
expected2.add(new Tuple2<String, Integer>("a", 7)); expected2.add(new Tuple2<String, Integer>("a", 7));
expected2.add(new Tuple2<String, Integer>("b", 10)); expected2.add(new Tuple2<String, Integer>("b", 10));
GroupedWindowReduceInvokable<Tuple2<String, Integer>> invokable2 = new GroupedWindowReduceInvokable<Tuple2<String, Integer>>( GroupedWindowReduceInvokable<Tuple2<String, Integer>> invokable2 = new GroupedWindowReduceInvokable<Tuple2<String, Integer>>(
new ReduceFunction<Tuple2<String, Integer>>() { new ReduceFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
...@@ -121,7 +118,6 @@ public class WindowReduceInvokableTest { ...@@ -121,7 +118,6 @@ public class WindowReduceInvokableTest {
} }
}); });
List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2); List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
assertEquals(new HashSet<Tuple2<String, Integer>>(expected2), assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
new HashSet<Tuple2<String, Integer>>(actual2)); new HashSet<Tuple2<String, Integer>>(actual2));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册