提交 618effad 编写于 作者: S szape 提交者: mbalassi

[streaming] Added CoBatchReduceInvokable, CoWindowReduceInvokable,...

[streaming] Added CoBatchReduceInvokable, CoWindowReduceInvokable, CoGroupedBatchReduceInvokable, CoGroupedWindowReduceInvokable
上级 39ac5ab0
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;
public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected CoReduceFunction<IN1, IN2, OUT> coReducer;
protected TypeSerializer<IN1> typeSerializer1;
protected TypeSerializer<IN2> typeSerializer2;
protected long slideSize1;
protected long slideSize2;
protected long batchSize1;
protected long batchSize2;
protected int granularity1;
protected int granularity2;
protected long batchPerSlide1;
protected long batchPerSlide2;
protected long numberOfBatches1;
protected long numberOfBatches2;
protected StreamBatch<IN1> batch1;
protected StreamBatch<IN2> batch2;
protected StreamBatch<IN1> currentBatch1;
protected StreamBatch<IN2> currentBatch2;
public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1,
long batchSize2, long slideSize1, long slideSize2) {
super(coReducer);
this.coReducer = coReducer;
this.batchSize1 = batchSize1;
this.batchSize2 = batchSize2;
this.slideSize1 = slideSize1;
this.slideSize2 = slideSize2;
this.granularity1 = (int) MathUtils.gcd(batchSize1, slideSize1);
this.granularity2 = (int) MathUtils.gcd(batchSize2, slideSize2);
this.batchPerSlide1 = slideSize1 / granularity1;
this.batchPerSlide2 = slideSize2 / granularity2;
this.numberOfBatches1 = batchSize1 / granularity1;
this.numberOfBatches2 = batchSize2 / granularity2;
this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
}
@Override
public void immutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
if (next == 0) {
reduceLastBatch1();
reduceLastBatch2();
break;
} else if (next == 1) {
handleStream1();
resetReuse1();
} else {
handleStream2();
resetReuse2();
}
}
}
@Override
protected void handleStream1() throws Exception {
StreamBatch<IN1> batch1 = getBatch1(reuse1);
reduceToBuffer1(reuse1, batch1);
}
@Override
protected void handleStream2() throws Exception {
StreamBatch<IN2> batch2 = getBatch2(reuse2);
reduceToBuffer2(reuse2, batch2);
}
protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
return batch1;
}
protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
return batch2;
}
@Override
// TODO: implement mutableInvoke for reduce
protected void mutableInvoke() throws Exception {
System.out.println("Immutable setting is used");
immutableInvoke();
}
protected void reduce1(StreamBatch<IN1> batch) {
this.currentBatch1 = batch;
callUserFunctionAndLogException1();
}
protected void reduce2(StreamBatch<IN2> batch) {
this.currentBatch2 = batch;
callUserFunctionAndLogException2();
}
protected void reduceLastBatch1() throws Exception {
reduceLastBatch1(batch1);
}
protected void reduceLastBatch2() throws Exception {
reduceLastBatch2(batch2);
}
@Override
protected void callUserFunction1() throws Exception {
Iterator<IN1> reducedIterator = currentBatch1.getIterator();
IN1 reduced = null;
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
while (reducedIterator.hasNext()) {
IN1 next = reducedIterator.next();
if (next != null) {
reduced = coReducer.reduce1(reduced, next);
}
}
if (reduced != null) {
collector.collect(coReducer.map1(reduced));
}
}
@Override
protected void callUserFunction2() throws Exception {
Iterator<IN2> reducedIterator = currentBatch2.getIterator();
IN2 reduced = null;
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
while (reducedIterator.hasNext()) {
IN2 next = reducedIterator.next();
if (next != null) {
reduced = coReducer.reduce2(reduced, next);
}
}
if (reduced != null) {
collector.collect(coReducer.map2(reduced));
}
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.typeSerializer1 = serializer1.getObjectSerializer();
this.typeSerializer2 = serializer2.getObjectSerializer();
}
public void reduceToBuffer1(StreamRecord<IN1> next, StreamBatch<IN1> streamBatch)
throws Exception {
IN1 nextValue = next.getObject();
if (streamBatch.currentValue != null) {
streamBatch.currentValue = coReducer.reduce1(streamBatch.currentValue, nextValue);
} else {
streamBatch.currentValue = nextValue;
}
streamBatch.counter++;
if (streamBatch.miniBatchEnd()) {
streamBatch.addToBuffer();
if (streamBatch.batchEnd()) {
reduceBatch1(streamBatch);
}
}
}
public void reduceToBuffer2(StreamRecord<IN2> next, StreamBatch<IN2> streamBatch)
throws Exception {
IN2 nextValue = next.getObject();
if (streamBatch.currentValue != null) {
streamBatch.currentValue = coReducer.reduce2(streamBatch.currentValue, nextValue);
} else {
streamBatch.currentValue = nextValue;
}
streamBatch.counter++;
if (streamBatch.miniBatchEnd()) {
streamBatch.addToBuffer();
if (streamBatch.batchEnd()) {
reduceBatch2(streamBatch);
}
}
}
public void reduceLastBatch1(StreamBatch<IN1> streamBatch) throws Exception {
if (streamBatch.miniBatchInProgress()) {
streamBatch.addToBuffer();
}
if (streamBatch.minibatchCounter >= 0) {
for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) {
streamBatch.circularBuffer.remove();
}
if (!streamBatch.circularBuffer.isEmpty()) {
reduce1(streamBatch);
}
}
}
public void reduceLastBatch2(StreamBatch<IN2> streamBatch) throws Exception {
if (streamBatch.miniBatchInProgress()) {
streamBatch.addToBuffer();
}
if (streamBatch.minibatchCounter >= 0) {
for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) {
streamBatch.circularBuffer.remove();
}
if (!streamBatch.circularBuffer.isEmpty()) {
reduce2(streamBatch);
}
}
}
public void reduceBatch1(StreamBatch<IN1> streamBatch) {
reduce1(streamBatch);
}
public void reduceBatch2(StreamBatch<IN2> streamBatch) {
reduce2(streamBatch);
}
protected class StreamBatch<IN> implements Serializable {
private static final long serialVersionUID = 1L;
protected long counter;
protected long minibatchCounter;
protected IN currentValue;
protected long batchSize;
protected long slideSize;
protected long granularity;
protected long batchPerSlide;
protected long numberOfBatches;
protected NullableCircularBuffer circularBuffer;
public StreamBatch(long batchSize, long slideSize) {
this.batchSize = batchSize;
this.slideSize = slideSize;
this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
this.batchPerSlide = slideSize / granularity;
this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
this.counter = 0;
this.minibatchCounter = 0;
this.currentValue = null;
this.numberOfBatches = batchSize / granularity;
}
protected void addToBuffer() {
circularBuffer.add(currentValue);
minibatchCounter++;
currentValue = null;
}
protected boolean miniBatchEnd() {
return (counter % granularity) == 0;
}
public boolean batchEnd() {
if (counter == batchSize) {
counter -= slideSize;
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
public boolean miniBatchInProgress() {
return currentValue != null;
}
@SuppressWarnings("unchecked")
public Iterator<IN> getIterator() {
return circularBuffer.iterator();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class CoGroupedBatchReduceInvokable<IN1, IN2, OUT> extends
CoBatchReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
int keyPosition1;
int keyPosition2;
Map<Object, StreamBatch<IN1>> streamBatches1;
Map<Object, StreamBatch<IN2>> streamBatches2;
public CoGroupedBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
long batchSize1, long batchSize2, long slideSize1, long slideSize2, int keyPosition1,
int keyPosition2) {
super(coReducer, batchSize1, batchSize2, slideSize1, slideSize2);
this.keyPosition1 = keyPosition1;
this.keyPosition2 = keyPosition2;
this.streamBatches1 = new HashMap<Object, StreamBatch<IN1>>();
this.streamBatches2 = new HashMap<Object, StreamBatch<IN2>>();
}
protected void reduceLastBatch1() throws Exception {
for (StreamBatch<IN1> batch : streamBatches1.values()) {
reduceLastBatch1(batch);
}
}
protected void reduceLastBatch2() throws Exception {
for (StreamBatch<IN2> batch : streamBatches2.values()) {
reduceLastBatch2(batch);
}
}
@Override
protected StreamBatch<IN1> getBatch1(StreamRecord<IN1> next) {
Object key = next.getField(keyPosition1);
StreamBatch<IN1> batch = streamBatches1.get(key);
if (batch == null) {
batch = new StreamBatch<IN1>(batchSize1, slideSize1);
streamBatches1.put(key, batch);
}
return batch;
}
@Override
protected StreamBatch<IN2> getBatch2(StreamRecord<IN2> next) {
Object key = next.getField(keyPosition2);
StreamBatch<IN2> batch = streamBatches2.get(key);
if (batch == null) {
batch = new StreamBatch<IN2>(batchSize2, slideSize2);
streamBatches2.put(key, batch);
}
return batch;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
CoWindowReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private int keyPosition1;
private int keyPosition2;
public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
int keyPosition1, int keyPosition2, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
timestamp2);
this.keyPosition1 = keyPosition1;
this.batch1 = new GroupedStreamWindow<IN1>(windowSize1, slideInterval1);
this.batch2 = new GroupedStreamWindow<IN2>(windowSize2, slideInterval2);
// this.batch1 = this.window1;
// this.batch2 = this.window2;
}
@Override
protected void callUserFunction1() throws Exception {
@SuppressWarnings("unchecked")
Iterator<Map<Object, IN1>> reducedIterator = (Iterator<Map<Object, IN1>>) batch1
.getIterator();
Map<Object, IN1> reducedValues = reducedIterator.next();
while (reducedIterator.hasNext()) {
Map<Object, IN1> nextValues = reducedIterator.next();
for (Entry<Object, IN1> entry : nextValues.entrySet()) {
IN1 currentValue = reducedValues.get(entry.getKey());
if (currentValue == null) {
reducedValues.put(entry.getKey(), entry.getValue());
} else {
reducedValues.put(entry.getKey(),
coReducer.reduce1(currentValue, entry.getValue()));
}
}
}
for (IN1 value : reducedValues.values()) {
collector.collect(coReducer.map1(value));
}
}
@Override
protected void callUserFunction2() throws Exception {
@SuppressWarnings("unchecked")
Iterator<Map<Object, IN2>> reducedIterator = (Iterator<Map<Object, IN2>>) batch2
.getIterator();
Map<Object, IN2> reducedValues = reducedIterator.next();
while (reducedIterator.hasNext()) {
Map<Object, IN2> nextValues = reducedIterator.next();
for (Entry<Object, IN2> entry : nextValues.entrySet()) {
IN2 currentValue = reducedValues.get(entry.getKey());
if (currentValue == null) {
reducedValues.put(entry.getKey(), entry.getValue());
} else {
reducedValues.put(entry.getKey(),
coReducer.reduce2(currentValue, entry.getValue()));
}
}
}
for (IN2 value : reducedValues.values()) {
collector.collect(coReducer.map2(value));
}
}
@Override
public void reduceToBuffer1(StreamRecord<IN1> next, StreamBatch<IN1> streamBatch)
throws Exception {
IN1 nextValue = next.getObject();
Object key = next.getField(keyPosition1);
checkBatchEnd1(timestamp1.getTimestamp(nextValue), streamBatch);
IN1 currentValue = ((GroupedStreamWindow<IN1>) streamBatch).currentValues.get(key);
if (currentValue != null) {
((GroupedStreamWindow<IN1>) streamBatch).currentValues.put(key,
coReducer.reduce1(currentValue, nextValue));
} else {
((GroupedStreamWindow<IN1>) streamBatch).currentValues.put(key, nextValue);
}
}
@Override
public void reduceToBuffer2(StreamRecord<IN2> next, StreamBatch<IN2> streamBatch)
throws Exception {
IN2 nextValue = next.getObject();
Object key = next.getField(keyPosition2);
checkBatchEnd2(timestamp2.getTimestamp(nextValue), streamBatch);
IN2 currentValue = ((GroupedStreamWindow<IN2>) streamBatch).currentValues.get(key);
if (currentValue != null) {
((GroupedStreamWindow<IN2>) streamBatch).currentValues.put(key,
coReducer.reduce2(currentValue, nextValue));
} else {
((GroupedStreamWindow<IN2>) streamBatch).currentValues.put(key, nextValue);
}
}
protected class GroupedStreamWindow<IN> extends StreamWindow<IN> {
private static final long serialVersionUID = 1L;
private Map<Object, IN> currentValues;
public GroupedStreamWindow(long windowSize, long slideInterval) {
super(windowSize, slideInterval);
this.currentValues = new HashMap<Object, IN>();
}
@Override
public boolean miniBatchInProgress() {
return !currentValues.isEmpty();
};
@SuppressWarnings("unchecked")
@Override
protected void addToBuffer() {
Map<Object, IN> reuseMap;
if (circularBuffer.isFull()) {
reuseMap = (Map<Object, IN>) circularBuffer.remove();
reuseMap.clear();
} else {
reuseMap = new HashMap<Object, IN>(currentValues.size());
}
circularBuffer.add(currentValues);
minibatchCounter++;
currentValues = reuseMap;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
protected long startTime1;
protected long startTime2;
protected long nextRecordTime1;
protected long nextRecordTime2;
protected TimeStamp<IN1> timestamp1;
protected TimeStamp<IN2> timestamp2;
// protected StreamWindow<IN1> window1;
// protected StreamWindow<IN2> window2;
public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1,
long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2);
this.timestamp1 = timestamp1;
this.timestamp2 = timestamp2;
this.startTime1 = timestamp1.getStartTime();
this.startTime2 = timestamp2.getStartTime();
this.batch1 = new StreamWindow<IN1>(windowSize1, slideInterval1);
this.batch2 = new StreamWindow<IN2>(windowSize2, slideInterval2);
// this.batch1 = this.window1;
// this.batch2 = this.window2;
}
@Override
public void reduceToBuffer1(StreamRecord<IN1> next, StreamBatch<IN1> streamWindow)
throws Exception {
IN1 nextValue = next.getObject();
checkBatchEnd1(timestamp1.getTimestamp(nextValue), streamWindow);
if (streamWindow.currentValue != null) {
streamWindow.currentValue = coReducer.reduce1(streamWindow.currentValue, nextValue);
} else {
streamWindow.currentValue = nextValue;
}
}
@Override
public void reduceToBuffer2(StreamRecord<IN2> next, StreamBatch<IN2> streamWindow)
throws Exception {
IN2 nextValue = next.getObject();
checkBatchEnd2(timestamp2.getTimestamp(nextValue), streamWindow);
if (streamWindow.currentValue != null) {
streamWindow.currentValue = coReducer.reduce2(streamWindow.currentValue, nextValue);
} else {
streamWindow.currentValue = nextValue;
}
}
protected synchronized void checkBatchEnd1(long timeStamp, StreamBatch<IN1> streamWindow) {
nextRecordTime1 = timeStamp;
while (miniBatchEnd1()) {
((StreamWindow<IN1>) streamWindow).addToBuffer();
if (((StreamWindow<IN1>) streamWindow).batchEnd()) {
reduceBatch1((StreamWindow<IN1>) streamWindow);
}
}
}
protected synchronized void checkBatchEnd2(long timeStamp, StreamBatch<IN2> streamWindow) {
nextRecordTime2 = timeStamp;
while (miniBatchEnd2()) {
(streamWindow).addToBuffer();
if (((StreamWindow<IN2>) streamWindow).batchEnd()) {
reduceBatch2(streamWindow);
}
}
}
protected boolean miniBatchEnd1() {
if (nextRecordTime1 < startTime1 + granularity1) {
return false;
} else {
startTime1 += granularity1;
return true;
}
}
protected boolean miniBatchEnd2() {
if (nextRecordTime2 < startTime2 + granularity2) {
return false;
} else {
startTime2 += granularity2;
return true;
}
}
protected class StreamWindow<IN> extends StreamBatch<IN> {
private static final long serialVersionUID = 1L;
public StreamWindow(long windowSize, long slideInterval) {
super(windowSize, slideInterval);
}
@Override
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
return true;
}
return false;
}
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
if (timestamp1 instanceof DefaultTimeStamp) {
(new TimeCheck1()).start();
}
if (timestamp2 instanceof DefaultTimeStamp) {
(new TimeCheck2()).start();
}
}
private class TimeCheck1 extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(slideSize1);
} catch (InterruptedException e) {
}
if (isRunning) {
checkBatchEnd1(System.currentTimeMillis(), (StreamWindow<IN1>) batch1);
} else {
break;
}
}
}
}
private class TimeCheck2 extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(slideSize2);
} catch (InterruptedException e) {
}
if (isRunning) {
checkBatchEnd2(System.currentTimeMillis(), (StreamWindow<IN2>) batch2);
} else {
break;
}
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
public class CoBatchReduceTest {
private static class MyCoReduceFunction implements CoReduceFunction<Integer, Integer, String> {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce1(Integer value1, Integer value2) {
return value1 + value2;
}
@Override
public Integer reduce2(Integer value1, Integer value2) {
return value1 + value2;
}
@Override
public String map1(Integer value) {
return value.toString();
}
@Override
public String map2(Integer value) {
return value.toString();
}
}
@Test
public void coBatchReduceTest() {
List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
inputs.add(i);
}
List<Integer> inputs2 = new ArrayList<Integer>();
inputs2.add(1);
inputs2.add(2);
inputs2.add(-1);
inputs2.add(-3);
inputs2.add(-4);
CoBatchReduceInvokable<Integer, Integer, String> invokable = new CoBatchReduceInvokable<Integer, Integer, String>(
new MyCoReduceFunction(), 3L, 3L, 2L, 2L);
List<String> expected = new ArrayList<String>();
expected.add("6");
expected.add("12");
expected.add("18");
expected.add("24");
expected.add("19");
expected.add("2");
expected.add("-8");
expected.add("-4");
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
Collections.sort(result);
Collections.sort(expected);
assertEquals(expected, result);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
public class CoGroupedBatchReduceTest {
private static class MyCoReduceFunction implements
CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) {
return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
}
@Override
public Tuple2<String, Integer> reduce2(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) {
return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
}
@Override
public String map1(Tuple2<String, Integer> value) {
return value.f1.toString();
}
@Override
public String map2(Tuple2<String, Integer> value) {
return value.f1.toString();
}
}
@Test
public void coGroupedBatchReduceTest() {
List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
inputs1.add(new Tuple2<String, Integer>("a", 1));
inputs1.add(new Tuple2<String, Integer>("a", 2));
inputs1.add(new Tuple2<String, Integer>("b", 2));
inputs1.add(new Tuple2<String, Integer>("b", 2));
inputs1.add(new Tuple2<String, Integer>("b", 5));
inputs1.add(new Tuple2<String, Integer>("a", 7));
inputs1.add(new Tuple2<String, Integer>("b", 9));
inputs1.add(new Tuple2<String, Integer>("b", 10));
List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
inputs2.add(new Tuple2<String, Integer>("a", 1));
inputs2.add(new Tuple2<String, Integer>("a", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 5));
inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10));
List<String> expected = new ArrayList<String>();
expected.add("10");
expected.add("7");
expected.add("9");
expected.add("24");
expected.add("10");
expected.add("10");
expected.add("7");
expected.add("9");
expected.add("24");
expected.add("10");
CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>, String> invokable = new CoGroupedBatchReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(
new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0);
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(result);
Collections.sort(expected);
assertEquals(expected, result);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
public class CoGroupedWindowReduceTest {
private static class MyCoReduceFunction implements
CoReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> reduce1(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) {
return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
}
@Override
public Tuple2<String, Integer> reduce2(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) {
return new Tuple2<String, Integer>("a", value1.f1 + value2.f1);
}
@Override
public String map1(Tuple2<String, Integer> value) {
return value.f1.toString();
}
@Override
public String map2(Tuple2<String, Integer> value) {
return value.f1.toString();
}
}
public static final class MyTimeStamp<T> implements TimeStamp<T> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
private long start;
public MyTimeStamp(List<Long> timestamps) {
this.timestamps = timestamps.iterator();
this.start = timestamps.get(0);
}
@Override
public long getTimestamp(T value) {
long ts = timestamps.next();
return ts;
}
@Override
public long getStartTime() {
return start;
}
}
List<Long> timestamps = Arrays.asList(0L, 1L, 1L, 2L, 2L, 8L, 8L, 10L);
@Test
public void coGroupedWindowReduceTest() {
List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
inputs1.add(new Tuple2<String, Integer>("a", 1));
inputs1.add(new Tuple2<String, Integer>("a", 2));
inputs1.add(new Tuple2<String, Integer>("b", 2));
inputs1.add(new Tuple2<String, Integer>("b", 2));
inputs1.add(new Tuple2<String, Integer>("b", 5));
inputs1.add(new Tuple2<String, Integer>("a", 7));
inputs1.add(new Tuple2<String, Integer>("b", 9));
inputs1.add(new Tuple2<String, Integer>("b", 10));
List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
inputs2.add(new Tuple2<String, Integer>("a", 1));
inputs2.add(new Tuple2<String, Integer>("a", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 2));
inputs2.add(new Tuple2<String, Integer>("b", 5));
inputs2.add(new Tuple2<String, Integer>("a", 7));
inputs2.add(new Tuple2<String, Integer>("b", 9));
inputs2.add(new Tuple2<String, Integer>("b", 10));
List<String> expected = new ArrayList<String>();
expected.add("3");
expected.add("9");
expected.add("7");
expected.add("7");
expected.add("9");
expected.add("7");
expected.add("19");
expected.add("3");
expected.add("9");
expected.add("7");
expected.add("7");
expected.add("9");
expected.add("7");
expected.add("19");
CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>, String> invokable = new CoGroupedWindowReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(
new MyCoReduceFunction(), 3L, 3L, 2L, 2L, 0, 0,
new MyTimeStamp<Tuple2<String, Integer>>(timestamps),
new MyTimeStamp<Tuple2<String, Integer>>(timestamps));
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(result);
Collections.sort(expected);
assertEquals(expected, result);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
public class CoWindowReduceTest {
private static class MyCoReduceFunction implements CoReduceFunction<Integer, Integer, String> {
private static final long serialVersionUID = 1L;
@Override
public Integer reduce1(Integer value1, Integer value2) {
return value1 + value2;
}
@Override
public Integer reduce2(Integer value1, Integer value2) {
return value1 + value2;
}
@Override
public String map1(Integer value) {
return value.toString();
}
@Override
public String map2(Integer value) {
return value.toString();
}
}
public static final class MyTimeStamp<T> implements TimeStamp<T> {
private static final long serialVersionUID = 1L;
private Iterator<Long> timestamps;
private long start;
public MyTimeStamp(List<Long> timestamps) {
this.timestamps = timestamps.iterator();
this.start = timestamps.get(0);
}
@Override
public long getTimestamp(T value) {
long ts = timestamps.next();
return ts;
}
@Override
public long getStartTime() {
return start;
}
}
List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 8L, 10L);
List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L);
@Test
public void coWindowReduceTest() {
List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
inputs.add(i);
}
List<Integer> inputs2 = new ArrayList<Integer>();
inputs2.add(1);
inputs2.add(2);
inputs2.add(-1);
inputs2.add(-3);
inputs2.add(-4);
CoWindowReduceInvokable<Integer, Integer, String> invokable = new CoWindowReduceInvokable<Integer, Integer, String>(
new MyCoReduceFunction(), 3L, 3L, 2L, 2L, new MyTimeStamp<Integer>(timestamps1),
new MyTimeStamp<Integer>(timestamps2));
List<String> expected = new ArrayList<String>();
expected.add("28");
expected.add("26");
expected.add("9");
expected.add("19");
expected.add("1");
expected.add("-6");
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
Collections.sort(result);
Collections.sort(expected);
assertEquals(expected, result);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册