提交 c438cff8 编写于 作者: A Aljoscha Krettek 提交者: Fabian Hueske

[FLINK-2577] [streaming] Fix Stalling Watermarks when Sources Close

Before, when one source closes early it will not emit watermarks
anymore. Downstream operations don't know about this and expect
watermarks to keep on coming. This leads to watermarks not being
forwarded anymore.

Now, when a source closes it will emit a final watermark with timestamp
Long.MAX_VALUE. This will have the effect of allowing the watermarks
from the other operations to propagate though because the watermark is
defined as the minimum over all inputs.

The Long.MAX_VALUE watermark has the added benefit of notifying
operations that no more elements will arrive in the future.

This closes #1060
上级 9c2eaa8d
......@@ -44,6 +44,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
public void run(Object lockingObject, Output<StreamRecord<T>> collector) throws Exception {
SourceFunction.SourceContext<T> ctx;
if (userFunction instanceof EventTimeSourceFunction) {
ctx = new ManualWatermarkContext<T>(lockingObject, collector);
} else if (executionConfig.getAutoWatermarkInterval() > 0) {
......@@ -55,6 +56,10 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
}
userFunction.run(ctx);
// This will mostly emit a final +Inf Watermark to make the Watermark logic work
// when some sources finish before others do
ctx.close();
}
public void cancel() {
......@@ -235,6 +240,11 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
public void close() {
watermarkTimer.cancel(true);
scheduleExecutor.shutdownNow();
// emit one last +Inf watermark to make downstream watermark processing work
// when some sources close early
synchronized (lockingObject) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}
......@@ -278,6 +288,12 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
}
@Override
public void close() {}
public void close() {
// emit one last +Inf watermark to make downstream watermark processing work
// when some sources close early
synchronized (lockingObject) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}
}
......@@ -36,8 +36,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MAX_VALUE;
private long input2Watermark = Long.MAX_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;
public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
......@@ -66,7 +66,7 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......@@ -76,7 +76,7 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......
......@@ -32,8 +32,8 @@ public class CoStreamMap<IN1, IN2, OUT>
// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MAX_VALUE;
private long input2Watermark = Long.MAX_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;
public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
super(mapper);
......@@ -53,7 +53,7 @@ public class CoStreamMap<IN1, IN2, OUT>
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......@@ -63,7 +63,7 @@ public class CoStreamMap<IN1, IN2, OUT>
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......
......@@ -35,8 +35,8 @@ public class CoStreamReduce<IN1, IN2, OUT>
// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MAX_VALUE;
private long input2Watermark = Long.MAX_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;
public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
super(coReducer);
......@@ -68,7 +68,7 @@ public class CoStreamReduce<IN1, IN2, OUT>
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......@@ -78,7 +78,7 @@ public class CoStreamReduce<IN1, IN2, OUT>
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......
......@@ -52,8 +52,8 @@ public class CoStreamWindow<IN1, IN2, OUT>
// We keep track of watermarks from both inputs, the combined input is the minimum
// Once the minimum advances we emit a new watermark for downstream operators
private long combinedWatermark = Long.MIN_VALUE;
private long input1Watermark = Long.MAX_VALUE;
private long input2Watermark = Long.MAX_VALUE;
private long input1Watermark = Long.MIN_VALUE;
private long input2Watermark = Long.MIN_VALUE;
public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
......@@ -105,7 +105,7 @@ public class CoStreamWindow<IN1, IN2, OUT>
public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......@@ -115,7 +115,7 @@ public class CoStreamWindow<IN1, IN2, OUT>
public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
if (newMin > combinedWatermark) {
combinedWatermark = newMin;
output.emitWatermark(new Watermark(combinedWatermark));
}
......
......@@ -33,6 +33,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
* In some cases a watermark is only a heuristic and operators should be able to deal with
* late elements. They can either discard those or update the result and emit updates/retractions
* to downstream operations.
*
* <p>
* When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}. When
* an operator receives this it will know that no more input will be arriving in the future.
*
*/
public class Watermark extends StreamElement {
......
......@@ -91,6 +91,9 @@ public class TimestampITCase {
* arrive at operators throughout a topology.
*
* <p>
* This also checks whether watermarks keep propagating if a source closes early.
*
* <p>
* This only uses map to test the workings of watermarks in a complete, running topology. All
* tasks and stream operators have dedicated tests that test the watermark propagation
* behaviour.
......@@ -109,9 +112,9 @@ public class TimestampITCase {
DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
source1
source1.union(source2)
.map(new IdentityMap())
.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
......@@ -121,11 +124,32 @@ public class TimestampITCase {
// verify that all the watermarks arrived at the final custom operator
for (int i = 0; i < PARALLELISM; i++) {
for (int j = 0; j < NUM_WATERMARKS; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
// There can be two cases, either we get NUM_WATERMARKS + 1 watermarks or
// (NUM_WATERMARKS / 2) + 1 watermarks. This depends on which source get's to run first.
// If source1 runs first we jump directly to +Inf and skip all the intermediate
// watermarks. If source2 runs first we see the intermediate watermarks from
// NUM_WATERMARKS/2 to +Inf.
if (CustomOperator.finalWatermarks[i].size() == NUM_WATERMARKS + 1) {
for (int j = 0; j < NUM_WATERMARKS; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS).equals(new Watermark(Long.MAX_VALUE))) {
Assert.fail("Wrong watermark.");
}
} else {
for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS / 2).equals(new Watermark(Long.MAX_VALUE))) {
Assert.fail("Wrong watermark.");
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册