提交 49812f3c 编写于 作者: S szape 提交者: mbalassi

[streaming] Several bugfixes and doc updates

上级 7af127ea
......@@ -416,8 +416,8 @@ To use this function the user needs to call, the `iteration.setMaxWaitTime(milli
The usage of rich functions are essentially the same as in the core Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:
~~~java
dataStream.map(new RichMapFunction<String, Integer>() {
public Integer map(String value) { return value.toString(); }
dataStream.map(new RichMapFunction<Integer, String>() {
public String map(Integer value) { return value.toString(); }
});
~~~
......@@ -549,7 +549,7 @@ The API provided is the [same](#kafka_source_close) as the one for `KafkaSource`
#### Building A Topology
To use a Kafka connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `KafkaSource` as parameter:
```java
~~~java
DataStream<String> stream1 = env.
addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
......@@ -564,7 +564,7 @@ The followings have to be provided for the `MyKafkaSource()` constructor in orde
Similarly to use a Kafka connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `KafkaSink`:
```java
~~~java
DataStream<String> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
......@@ -668,7 +668,7 @@ The API provided is the [same](#flume_source_close) as the one for `FlumeSource`
#### Building A Topology
To use a Flume connector as a source in Flink call the `addSource()` function with a new instance of the class which extends `FlumeSource` as parameter:
```java
~~~java
DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
.print();
......@@ -681,7 +681,7 @@ The followings have to be provided for the `MyFlumeSource()` constructor in orde
Similarly to use a Flume connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `FlumeSink`
```java
~~~java
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyFlumeSink("localhost", 42424));
......@@ -824,7 +824,7 @@ The followings have to be provided for the `MyRabbitMQSource()` constructor in o
Similarly to use a RabbitMQ connector as a sink in Flink call the `addSink()` function with a new instance of the class which extends `RabbitMQSink`
```java
~~~java
DataStream<String> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
......@@ -847,7 +847,7 @@ Twitter Streaming API provides opportunity to connect to the stream of tweets ma
In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
#### Acquiring the authentication information
First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sing in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
After selecting the application you the API key and API secret (called `consumerKey` and `sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary access token data (`token` and `secret`) can be acquired here.
Remember to keep these pieces of information a secret and do not push them to public repositories.
......
......@@ -22,6 +22,8 @@ import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;
......@@ -40,6 +42,8 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
protected StreamBatch batch;
protected StreamBatch currentBatch;
protected TypeSerializer<OUT> serializer;
public BatchReduceInvokable(ReduceFunction<OUT> reduceFunction, long batchSize, long slideSize) {
super(reduceFunction);
this.reducer = reduceFunction;
......@@ -48,7 +52,6 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
this.granularity = (int) MathUtils.gcd(batchSize, slideSize);
this.batchPerSlide = slideSize / granularity;
this.numberOfBatches = batchSize / granularity;
this.batch = new StreamBatch();
}
@Override
......@@ -57,7 +60,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
throw new RuntimeException("DataStream must not be empty");
}
while (reuse != null) {
while (reuse != null) {
StreamBatch batch = getBatch(reuse);
batch.reduceToBuffer(reuse.getObject());
......@@ -65,7 +68,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
resetReuse();
reuse = recordIterator.next(reuse);
}
reduceLastBatch();
}
......@@ -86,7 +89,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
}
protected void reduceLastBatch() throws Exception {
batch.reduceLastBatch();
batch.reduceLastBatch();
}
@Override
......@@ -101,7 +104,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
while (reducedIterator.hasNext()) {
OUT next = reducedIterator.next();
if (next != null) {
reduced = reducer.reduce(reduced, next);
reduced = reducer.reduce(serializer.copy(reduced), serializer.copy(next));
}
}
if (reduced != null) {
......@@ -115,6 +118,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
protected long counter;
protected long minibatchCounter;
protected OUT currentValue;
boolean changed;
protected NullableCircularBuffer circularBuffer;
......@@ -123,13 +127,13 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
this.circularBuffer = new NullableCircularBuffer((int) (batchSize / granularity));
this.counter = 0;
this.minibatchCounter = 0;
this.changed = false;
}
public void reduceToBuffer(OUT nextValue) throws Exception {
if (currentValue != null) {
currentValue = reducer.reduce(currentValue, nextValue);
currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
} else {
currentValue = nextValue;
}
......@@ -147,20 +151,20 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
protected void addToBuffer() {
circularBuffer.add(currentValue);
changed = true;
minibatchCounter++;
currentValue = null;
}
protected boolean miniBatchEnd() {
if( (counter % granularity) == 0){
if ((counter % granularity) == 0) {
counter = 0;
return true;
}else{
} else {
return false;
}
}
public boolean batchEnd() {
if (minibatchCounter == numberOfBatches) {
minibatchCounter -= batchPerSlide;
......@@ -170,39 +174,50 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
}
public void reduceLastBatch() throws Exception {
if (miniBatchInProgress()) {
addToBuffer();
}
if (minibatchCounter >= 0) {
for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
circularBuffer.remove();
if (changed == true && minibatchCounter >= 0) {
if (circularBuffer.isFull()) {
for (long i = 0; i < (numberOfBatches - minibatchCounter); i++) {
if (!circularBuffer.isEmpty()) {
circularBuffer.remove();
}
}
}
if (!circularBuffer.isEmpty()) {
reduce(this);
}
}
}
public boolean miniBatchInProgress(){
public boolean miniBatchInProgress() {
return currentValue != null;
}
public void reduceBatch() {
reduce(this);
changed = false;
}
@SuppressWarnings("unchecked")
public Iterator<OUT> getIterator() {
return circularBuffer.iterator();
}
@Override
public String toString(){
public String toString() {
return circularBuffer.toString();
}
}
@Override
public void open(Configuration config) throws Exception{
serializer = inSerializer.getObjectSerializer();
this.batch = new StreamBatch();
}
}
......@@ -40,8 +40,8 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
protected StreamBatch getBatch(StreamRecord<OUT> next) {
Object key = next.getField(keyPosition);
StreamBatch batch = streamBatches.get(key);
if(batch == null){
batch=new StreamBatch();
if (batch == null) {
batch = new StreamBatch();
streamBatches.put(key, batch);
}
return batch;
......@@ -49,9 +49,9 @@ public class GroupedBatchReduceInvokable<OUT> extends BatchReduceInvokable<OUT>
@Override
protected void reduceLastBatch() throws Exception {
for(StreamBatch batch: streamBatches.values()){
for (StreamBatch batch : streamBatches.values()) {
batch.reduceLastBatch();
}
}
}
}
......@@ -48,6 +48,9 @@ public class GroupedWindowGroupReduceInvokable<IN, OUT> extends WindowGroupReduc
StreamWindow window = streamWindows.get(key);
if (window == null) {
window = new GroupedStreamWindow();
for (int i = 0; i < currentMiniBatchCount; i++) {
window.circularList.newSlide();
}
streamWindows.put(key, window);
}
this.window = window;
......
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
......@@ -71,29 +70,6 @@ public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT
window.reduceLastBatch();
}
}
@Override
protected void callUserFunction() throws Exception {
Iterator<OUT> reducedIterator = currentBatch.getIterator();
OUT reduced = null;
while (reducedIterator.hasNext() && reduced == null) {
reduced = reducedIterator.next();
}
while (reducedIterator.hasNext()) {
OUT next = reducedIterator.next();
if (next != null) {
reduced = reducer.reduce(reduced, next);
}
}
if (reduced != null) {
collector.collect(reduced);
}else{
//remove window if no value received
streamWindows.remove(currentBatch);
}
}
protected class GroupedStreamWindow extends StreamWindow {
......@@ -123,7 +99,6 @@ public class GroupedWindowReduceInvokable<OUT> extends WindowReduceInvokable<OUT
}
return false;
}
}
......
......@@ -34,13 +34,13 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
super(reduceFunction, windowSize, slideInterval);
this.timestamp = timestamp;
this.startTime = timestamp.getStartTime();
this.window = new StreamWindow();
this.batch = this.window;
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.window = new StreamWindow();
this.batch = this.window;
if (timestamp instanceof DefaultTimeStamp) {
(new TimeCheck()).start();
}
......@@ -61,7 +61,7 @@ public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
checkWindowEnd(timestamp.getTimestamp(nextValue));
if (currentValue != null) {
currentValue = reducer.reduce(currentValue, nextValue);
currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
} else {
currentValue = nextValue;
}
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -31,8 +31,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
private static final long serialVersionUID = 1L;
protected CoReduceFunction<IN1, IN2, OUT> coReducer;
protected TypeSerializer<IN1> typeSerializer1;
protected TypeSerializer<IN2> typeSerializer2;
protected long slideSize1;
protected long slideSize2;
......@@ -48,6 +46,8 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
protected StreamBatch<IN2> batch2;
protected StreamBatch<IN1> currentBatch1;
protected StreamBatch<IN2> currentBatch2;
protected TypeSerializer<IN1> serializer1;
protected TypeSerializer<IN2> serializer2;
public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1,
long batchSize2, long slideSize1, long slideSize2) {
......@@ -63,8 +63,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
this.batchPerSlide2 = slideSize2 / granularity2;
this.numberOfBatches1 = batchSize1 / granularity1;
this.numberOfBatches2 = batchSize2 / granularity2;
this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
}
@Override
......@@ -142,11 +140,11 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
while (reducedIterator.hasNext()) {
IN1 next = reducedIterator.next();
if (next != null) {
reduced = coReducer.reduce1(reduced, next);
reduced = coReducer.reduce1(serializer1.copy(reduced), serializer1.copy(next));
}
}
if (reduced != null) {
collector.collect(coReducer.map1(reduced));
collector.collect(coReducer.map1(serializer1.copy(reduced)));
}
}
......@@ -162,26 +160,29 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
while (reducedIterator.hasNext()) {
IN2 next = reducedIterator.next();
if (next != null) {
reduced = coReducer.reduce2(reduced, next);
reduced = coReducer.reduce2(serializer2.copy(reduced), serializer2.copy(next));
}
}
if (reduced != null) {
collector.collect(coReducer.map2(reduced));
collector.collect(coReducer.map2(serializer2.copy(reduced)));
}
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.typeSerializer1 = serializer1.getObjectSerializer();
this.typeSerializer2 = serializer2.getObjectSerializer();
this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
this.serializer1 = srSerializer1.getObjectSerializer();
this.serializer2 = srSerializer2.getObjectSerializer();
}
public void reduceToBuffer1(StreamRecord<IN1> next, StreamBatch<IN1> streamBatch)
throws Exception {
IN1 nextValue = next.getObject();
if (streamBatch.currentValue != null) {
streamBatch.currentValue = coReducer.reduce1(streamBatch.currentValue, nextValue);
streamBatch.currentValue = coReducer.reduce1(
serializer1.copy(streamBatch.currentValue), serializer1.copy(nextValue));
} else {
streamBatch.currentValue = nextValue;
}
......@@ -200,7 +201,8 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
throws Exception {
IN2 nextValue = next.getObject();
if (streamBatch.currentValue != null) {
streamBatch.currentValue = coReducer.reduce2(streamBatch.currentValue, nextValue);
streamBatch.currentValue = coReducer.reduce2(
serializer2.copy(streamBatch.currentValue), serializer2.copy(nextValue));
} else {
streamBatch.currentValue = nextValue;
}
......@@ -220,9 +222,13 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
streamBatch.addToBuffer();
}
if (streamBatch.minibatchCounter >= 0) {
for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) {
streamBatch.circularBuffer.remove();
if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
if (streamBatch.circularBuffer.isFull()) {
for (long i = 0; i < (numberOfBatches1 - streamBatch.minibatchCounter); i++) {
if (!streamBatch.circularBuffer.isEmpty()) {
streamBatch.circularBuffer.remove();
}
}
}
if (!streamBatch.circularBuffer.isEmpty()) {
reduce1(streamBatch);
......@@ -236,9 +242,11 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
streamBatch.addToBuffer();
}
if (streamBatch.minibatchCounter >= 0) {
if (streamBatch.changed == true && streamBatch.minibatchCounter >= 0) {
for (long i = 0; i < (numberOfBatches2 - streamBatch.minibatchCounter); i++) {
streamBatch.circularBuffer.remove();
if (!streamBatch.circularBuffer.isEmpty()) {
streamBatch.circularBuffer.remove();
}
}
if (!streamBatch.circularBuffer.isEmpty()) {
reduce2(streamBatch);
......@@ -249,10 +257,12 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
public void reduceBatch1(StreamBatch<IN1> streamBatch) {
reduce1(streamBatch);
streamBatch.changed = false;
}
public void reduceBatch2(StreamBatch<IN2> streamBatch) {
reduce2(streamBatch);
streamBatch.changed = false;
}
protected class StreamBatch<IN> implements Serializable {
......@@ -266,6 +276,7 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
protected long granularity;
protected long batchPerSlide;
protected long numberOfBatches;
boolean changed;
protected NullableCircularBuffer circularBuffer;
......@@ -279,10 +290,13 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
this.minibatchCounter = 0;
this.currentValue = null;
this.numberOfBatches = batchSize / granularity;
this.changed = false;
}
protected void addToBuffer() {
circularBuffer.add(currentValue);
changed = true;
minibatchCounter++;
currentValue = null;
}
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
......@@ -38,10 +39,6 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
timestamp2);
this.keyPosition1 = keyPosition1;
this.batch1 = new GroupedStreamWindow<IN1>(windowSize1, slideInterval1);
this.batch2 = new GroupedStreamWindow<IN2>(windowSize2, slideInterval2);
// this.batch1 = this.window1;
// this.batch2 = this.window2;
}
@Override
......@@ -128,6 +125,13 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
}
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.batch1 = new GroupedStreamWindow<IN1>(batchSize1, slideSize1);
this.batch2 = new GroupedStreamWindow<IN2>(batchSize2, slideSize2);
}
protected class GroupedStreamWindow<IN> extends StreamWindow<IN> {
private static final long serialVersionUID = 1L;
......@@ -156,6 +160,7 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
}
circularBuffer.add(currentValues);
changed = true;
minibatchCounter++;
currentValues = reuseMap;
}
......
......@@ -39,8 +39,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
protected StreamRecord<IN1> reuse1;
protected StreamRecord<IN2> reuse2;
protected StreamRecordSerializer<IN1> serializer1;
protected StreamRecordSerializer<IN2> serializer2;
protected StreamRecordSerializer<IN1> srSerializer1;
protected StreamRecordSerializer<IN2> srSerializer2;
public void initialize(Collector<OUT> collector,
CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
......@@ -52,22 +52,22 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
this.reuse1 = serializer1.createInstance();
this.reuse2 = serializer2.createInstance();
this.serializer1 = serializer1;
this.serializer2 = serializer2;
this.srSerializer1 = serializer1;
this.srSerializer2 = serializer2;
this.isMutable = isMutable;
}
protected void resetReuseAll() {
this.reuse1 = serializer1.createInstance();
this.reuse2 = serializer2.createInstance();
this.reuse1 = srSerializer1.createInstance();
this.reuse2 = srSerializer2.createInstance();
}
protected void resetReuse1() {
this.reuse1 = serializer1.createInstance();
this.reuse1 = srSerializer1.createInstance();
}
protected void resetReuse2() {
this.reuse2 = serializer2.createInstance();
this.reuse2 = srSerializer2.createInstance();
}
@Override
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -32,9 +32,6 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
protected TimeStamp<IN1> timestamp1;
protected TimeStamp<IN2> timestamp2;
// protected StreamWindow<IN1> window1;
// protected StreamWindow<IN2> window2;
public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1,
long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
......@@ -43,10 +40,7 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
this.timestamp2 = timestamp2;
this.startTime1 = timestamp1.getStartTime();
this.startTime2 = timestamp2.getStartTime();
this.batch1 = new StreamWindow<IN1>(windowSize1, slideInterval1);
this.batch2 = new StreamWindow<IN2>(windowSize2, slideInterval2);
// this.batch1 = this.window1;
// this.batch2 = this.window2;
}
@Override
......@@ -57,7 +51,8 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
checkBatchEnd1(timestamp1.getTimestamp(nextValue), streamWindow);
if (streamWindow.currentValue != null) {
streamWindow.currentValue = coReducer.reduce1(streamWindow.currentValue, nextValue);
streamWindow.currentValue = coReducer.reduce1(
serializer1.copy(streamWindow.currentValue), serializer1.copy(nextValue));
} else {
streamWindow.currentValue = nextValue;
}
......@@ -71,7 +66,8 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
checkBatchEnd2(timestamp2.getTimestamp(nextValue), streamWindow);
if (streamWindow.currentValue != null) {
streamWindow.currentValue = coReducer.reduce2(streamWindow.currentValue, nextValue);
streamWindow.currentValue = coReducer.reduce2(
serializer2.copy(streamWindow.currentValue), serializer2.copy(nextValue));
} else {
streamWindow.currentValue = nextValue;
}
......@@ -138,6 +134,8 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.batch1 = new StreamWindow<IN1>(batchSize1, slideSize1);
this.batch2 = new StreamWindow<IN2>(batchSize2, slideSize2);
if (timestamp1 instanceof DefaultTimeStamp) {
(new TimeCheck1()).start();
}
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -120,12 +120,10 @@ public class CoBatchReduceTest {
expected.add("18");
expected.add("26");
expected.add("34");
expected.add("19");
expected.add("abc");
expected.add("cde");
expected.add("efg");
expected.add("ghi");
expected.add("i");
List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -132,12 +132,9 @@ public class CoGroupedBatchReduceTest {
List<String> expected = new ArrayList<String>();
expected.add("10");
expected.add("19");
expected.add("12");
expected.add("33");
expected.add("19");
expected.add("ace");
expected.add("egi");
expected.add("i");
expected.add("bdf");
expected.add("fh");
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......
......@@ -50,8 +50,6 @@ public class GroupedBatchReduceTest {
expected.add(3);
expected.add(3);
expected.add(15);
expected.add(1);
expected.add(5);
GroupedBatchReduceInvokable<Integer> invokable = new GroupedBatchReduceInvokable<Integer>(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册