提交 bdb306a1 编写于 作者: A Aljoscha Krettek

[FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour

Previously the internal currentWatermark would be updated even if the
value returned from getCurrentWatermark was lower than the current
watermark.

This can lead to problems with chaining because the watermark is
directly forwarded without going through the watermark logic that
ensures correct behaviour (monotonically increasing).

This adds a test that verifies that the timestamp extractor does not
emit decreasing watermarks.
上级 14c24e79
......@@ -79,10 +79,10 @@ public class ExtractTimestampsOperator<T>
public void trigger(long timestamp) throws Exception {
// register next timer
registerTimer(System.currentTimeMillis() + watermarkInterval, this);
long lastWatermark = currentWatermark;
currentWatermark = userFunction.getCurrentWatermark();
long newWatermark = userFunction.getCurrentWatermark();
if (currentWatermark > lastWatermark) {
if (newWatermark > currentWatermark) {
currentWatermark = newWatermark;
// emit watermark
output.emitWatermark(new Watermark(currentWatermark));
}
......
......@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
......@@ -378,6 +379,73 @@ public class TimestampITCase {
}
}
/**
* This test verifies that the timestamp extractor does not emit decreasing watermarks even
*
*/
@Test
public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
final int NUM_ELEMENTS = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
env.setParallelism(1);
env.getConfig().disableSysoutLogging();
env.getConfig().enableTimestamps();
env.getConfig().setAutoWatermarkInterval(1);
DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int index = 0;
while (index < NUM_ELEMENTS) {
ctx.collect(index);
Thread.sleep(100);
ctx.collect(index - 1);
latch.await();
index++;
}
}
@Override
public void cancel() {
}
});
source1.assignTimestamps(new TimestampExtractor<Integer>() {
@Override
public long extractTimestamp(Integer element, long currentTimestamp) {
return element;
}
@Override
public long extractWatermark(Integer element, long currentTimestamp) {
return element - 1;
}
@Override
public long getCurrentWatermark() {
return Long.MIN_VALUE;
}
})
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
env.execute();
// verify that we get NUM_ELEMENTS watermarks
for (int j = 0; j < NUM_ELEMENTS; j++) {
if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
Assert.fail("Wrong watermark.");
}
}
/**
* This tests whether the program throws an exception when an event-time source tries
* to emit without timestamp.
......@@ -442,6 +510,10 @@ public class TimestampITCase {
public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
private long oldTimestamp;
public CustomOperator() {
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
public void processElement(StreamRecord<Integer> element) throws Exception {
if (element.getTimestamp() != element.getValue()) {
......@@ -473,6 +545,10 @@ public class TimestampITCase {
public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
public TimestampCheckingOperator() {
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
public void processElement(StreamRecord<Integer> element) throws Exception {
if (element.getTimestamp() != element.getValue()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册