提交 4b648870 编写于 作者: A Aljoscha Krettek

[FLINK-2936] Fix ClassCastException for Event-Time source

Before, would throw a ClassCastException when emitting watermarks with
timestamp/watermark multiplexing disabled.
上级 c4a2d60c
......@@ -46,7 +46,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
final ExecutionConfig executionConfig = getExecutionConfig();
if (userFunction instanceof EventTimeSourceFunction) {
ctx = new ManualWatermarkContext<T>(lockingObject, collector);
ctx = new ManualWatermarkContext<T>(lockingObject, collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled());
} else if (executionConfig.getAutoWatermarkInterval() > 0) {
ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig);
} else if (executionConfig.areTimestampsEnabled()) {
......@@ -261,11 +261,13 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
private final boolean watermarkMultiplexingEnabled;
public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output) {
public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output, boolean watermarkMultiplexingEnabled) {
this.lockingObject = lockingObject;
this.output = output;
this.reuse = new StreamRecord<T>(null);
this.watermarkMultiplexingEnabled = watermarkMultiplexingEnabled;
}
@Override
......@@ -283,7 +285,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
@Override
public void emitWatermark(Watermark mark) {
output.emitWatermark(mark);
if (watermarkMultiplexingEnabled) {
output.emitWatermark(mark);
}
}
@Override
......@@ -296,7 +300,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
// emit one last +Inf watermark to make downstream watermark processing work
// when some sources close early
synchronized (lockingObject) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
if (watermarkMultiplexingEnabled) {
output.emitWatermark(new Watermark(Long.MAX_VALUE));
}
}
}
}
......
......@@ -23,6 +23,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.taskmanager.MultiShotLatch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
......@@ -133,7 +134,7 @@ public class TimestampITCase {
source1.union(source2)
.map(new IdentityMap())
.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.addSink(new NoOpSink<Integer>());
env.execute();
......@@ -293,7 +294,7 @@ public class TimestampITCase {
});
extractOp
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.transform("Timestamp Check",
BasicTypeInfo.INT_TYPE_INFO,
new TimestampCheckingOperator());
......@@ -362,7 +363,7 @@ public class TimestampITCase {
return Long.MIN_VALUE;
}
})
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
......@@ -429,7 +430,7 @@ public class TimestampITCase {
return Long.MIN_VALUE;
}
})
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
......@@ -503,23 +504,50 @@ public class TimestampITCase {
env.execute();
}
/**
* This verifies that an event time source works when setting stream time characteristic to
* processing time. In this case, the watermarks should just be swallowed.
*/
@Test
public void testEventTimeSourceWithProcessingTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
env.setParallelism(2);
env.getConfig().disableSysoutLogging();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.getConfig().disableTimestamps();
DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10));
source1
.map(new IdentityMap())
.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false));
env.execute();
// verify that we don't get any watermarks, the source is used as watermark source in
// other tests, so it normally emits watermarks
Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0);
}
@SuppressWarnings("unchecked")
public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
List<Watermark> watermarks;
public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
private long oldTimestamp;
private final boolean timestampsEnabled;
public CustomOperator() {
public CustomOperator(boolean timestampsEnabled) {
setChainingStrategy(ChainingStrategy.ALWAYS);
this.timestampsEnabled = timestampsEnabled;
}
@Override
public void processElement(StreamRecord<Integer> element) throws Exception {
if (element.getTimestamp() != element.getValue()) {
Assert.fail("Timestamps are not properly handled.");
if (timestampsEnabled) {
if (element.getTimestamp() != element.getValue()) {
Assert.fail("Timestamps are not properly handled.");
}
}
oldTimestamp = element.getTimestamp();
output.collect(element);
}
......
......@@ -62,7 +62,7 @@ public class SourceFunctionUtil<T> {
final Object lockingObject = new Object();
SourceFunction.SourceContext<T> ctx;
if (sourceFunction instanceof EventTimeSourceFunction) {
ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector);
ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector, true);
} else {
ctx = new StreamSource.NonWatermarkContext<T>(lockingObject, collector);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册