提交 819fb270 编写于 作者: K kl0u 提交者: Aljoscha Krettek

[FLINK-3647] Change StreamSource to use Processing-Time Clock Service

上级 4b5a7890
...@@ -250,8 +250,8 @@ public abstract class AbstractStreamOperator<OUT> ...@@ -250,8 +250,8 @@ public abstract class AbstractStreamOperator<OUT>
} }
/** /**
* Register a timer callback. At the specified time the {@link Triggerable} will be invoked. * Register a timer callback. At the specified time the provided {@link Triggerable} will
* This call is guaranteed to not happen concurrently with method calls on the operator. * be invoked. This call is guaranteed to not happen concurrently with method calls on the operator.
* *
* @param time The absolute time in milliseconds. * @param time The absolute time in milliseconds.
* @param target The target to be triggered. * @param target The target to be triggered.
......
...@@ -21,12 +21,10 @@ import org.apache.flink.annotation.Internal; ...@@ -21,12 +21,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* {@link StreamOperator} for streaming sources. * {@link StreamOperator} for streaming sources.
...@@ -195,7 +193,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> ...@@ -195,7 +193,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
private final Output<StreamRecord<T>> output; private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse; private final StreamRecord<T> reuse;
private final ScheduledExecutorService scheduleExecutor;
private final ScheduledFuture<?> watermarkTimer; private final ScheduledFuture<?> watermarkTimer;
private final long watermarkInterval; private final long watermarkInterval;
...@@ -216,29 +213,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> ...@@ -216,29 +213,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
this.output = outputParam; this.output = outputParam;
this.watermarkInterval = watermarkInterval; this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord<T>(null); this.reuse = new StreamRecord<T>(null);
this.scheduleExecutor = Executors.newScheduledThreadPool(1); long now = owner.getCurrentProcessingTime();
this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() { new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
@Override
public void run() {
final long currentTime = System.currentTimeMillis();
if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we
// don't have watermarks that creep along at different intervals because
// the machine clocks are out of sync
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
synchronized (lockingObjectParam) {
if (currentTime > nextWatermarkTime) {
outputParam.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime += watermarkInterval;
}
}
}
}
}, 0, watermarkInterval, TimeUnit.MILLISECONDS);
} }
@Override @Override
...@@ -246,14 +224,21 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> ...@@ -246,14 +224,21 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
owner.checkAsyncException(); owner.checkAsyncException();
synchronized (lockingObject) { synchronized (lockingObject) {
final long currentTime = System.currentTimeMillis(); final long currentTime = owner.getCurrentProcessingTime();
output.collect(reuse.replace(element, currentTime)); output.collect(reuse.replace(element, currentTime));
// this is to avoid lock contention in the lockingObject by
// sending the watermark before the firing of the watermark
// emission task.
if (currentTime > nextWatermarkTime) { if (currentTime > nextWatermarkTime) {
// in case we jumped some watermarks, recompute the next watermark time // in case we jumped some watermarks, recompute the next watermark time
final long watermarkTime = currentTime - (currentTime % watermarkInterval); final long watermarkTime = currentTime - (currentTime % watermarkInterval);
nextWatermarkTime = watermarkTime + watermarkInterval; nextWatermarkTime = watermarkTime + watermarkInterval;
output.emitWatermark(new Watermark(watermarkTime)); output.emitWatermark(new Watermark(watermarkTime));
// we do not need to register another timer here
// because the emitting task will do so.
} }
} }
} }
...@@ -276,7 +261,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> ...@@ -276,7 +261,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
// we can shutdown the timer now, no watermarks will be needed any more // we can shutdown the timer now, no watermarks will be needed any more
watermarkTimer.cancel(true); watermarkTimer.cancel(true);
scheduleExecutor.shutdownNow();
} }
} }
...@@ -288,13 +272,47 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> ...@@ -288,13 +272,47 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override @Override
public void close() { public void close() {
watermarkTimer.cancel(true); watermarkTimer.cancel(true);
scheduleExecutor.shutdownNow(); }
private class WatermarkEmittingTask implements Triggerable {
private final StreamSource<?, ?> owner;
private final Object lockingObject;
private final Output<StreamRecord<T>> output;
private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) {
this.owner = src;
this.lockingObject = lock;
this.output = output;
}
@Override
public void trigger(long timestamp) {
final long currentTime = owner.getCurrentProcessingTime();
if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we
// don't have watermarks that creep along at different intervals because
// the machine clocks are out of sync
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
synchronized (lockingObject) {
if (currentTime > nextWatermarkTime) {
output.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime += watermarkInterval;
}
}
}
owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval,
new WatermarkEmittingTask(owner, lockingObject, output));
}
} }
} }
/** /**
* A SourceContext for event time. Sources may directly attach timestamps and generate * A SourceContext for event time. Sources may directly attach timestamps and generate
* watermarks, but if records are emitted without timestamps, no timetamps are automatically * watermarks, but if records are emitted without timestamps, no timestamps are automatically
* generated and attached. The records will simply have no timestamp in that case. * generated and attached. The records will simply have no timestamp in that case.
* *
* Streaming topologies can use timestamp assigner functions to override the timestamps * Streaming topologies can use timestamp assigner functions to override the timestamps
......
...@@ -35,14 +35,22 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; ...@@ -35,14 +35,22 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledFuture;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
...@@ -58,7 +66,7 @@ public class StreamSourceOperatorTest { ...@@ -58,7 +66,7 @@ public class StreamSourceOperatorTest {
final List<StreamElement> output = new ArrayList<>(); final List<StreamElement> output = new ArrayList<>();
setupSourceOperator(operator); setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
operator.run(new Object(), new CollectorOutput<String>(output)); operator.run(new Object(), new CollectorOutput<String>(output));
assertEquals(1, output.size()); assertEquals(1, output.size());
...@@ -75,7 +83,7 @@ public class StreamSourceOperatorTest { ...@@ -75,7 +83,7 @@ public class StreamSourceOperatorTest {
new StreamSource<>(new InfiniteSource<String>()); new StreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator); setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
operator.cancel(); operator.cancel();
// run and exit // run and exit
...@@ -95,7 +103,7 @@ public class StreamSourceOperatorTest { ...@@ -95,7 +103,7 @@ public class StreamSourceOperatorTest {
new StreamSource<>(new InfiniteSource<String>()); new StreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator); setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
// trigger an async cancel in a bit // trigger an async cancel in a bit
new Thread("canceler") { new Thread("canceler") {
...@@ -128,7 +136,7 @@ public class StreamSourceOperatorTest { ...@@ -128,7 +136,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>()); new StoppableStreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator); setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
operator.stop(); operator.stop();
// run and stop // run and stop
...@@ -147,7 +155,7 @@ public class StreamSourceOperatorTest { ...@@ -147,7 +155,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>()); new StoppableStreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator); setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
// trigger an async cancel in a bit // trigger an async cancel in a bit
new Thread("canceler") { new Thread("canceler") {
...@@ -166,18 +174,61 @@ public class StreamSourceOperatorTest { ...@@ -166,18 +174,61 @@ public class StreamSourceOperatorTest {
assertTrue(output.isEmpty()); assertTrue(output.isEmpty());
} }
@Test
public void testAutomaticWatermarkContext() throws Exception {
// regular stream source operator
final StoppableStreamSource<String, InfiniteSource<String>> operator =
new StoppableStreamSource<>(new InfiniteSource<String>());
long watermarkInterval = 10;
TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider);
final List<StreamElement> output = new ArrayList<>();
StreamSource.AutomaticWatermarkContext<String> ctx =
new StreamSource.AutomaticWatermarkContext<>(
operator,
operator.getContainingTask().getCheckpointLock(),
new CollectorOutput<String>(output),
operator.getExecutionConfig().getAutoWatermarkInterval());
// periodically emit the watermarks
// even though we start from 1 the watermark are still
// going to be aligned with the watermark interval.
for (long i = 1; i < 100; i += watermarkInterval) {
timeProvider.setCurrentTime(i);
}
assertTrue(output.size() == 9);
long nextWatermark = 0;
for (StreamElement el : output) {
nextWatermark += watermarkInterval;
Watermark wm = (Watermark) el;
assertTrue(wm.getTimestamp() == nextWatermark);
}
}
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(StreamSource<T, ?> operator) { private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
TimeCharacteristic timeChar,
long watermarkInterval,
final TimeServiceProvider timeProvider) {
ExecutionConfig executionConfig = new ExecutionConfig(); ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setAutoWatermarkInterval(watermarkInterval);
StreamConfig cfg = new StreamConfig(new Configuration()); StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); cfg.setTimeCharacteristic(timeChar);
Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0); Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
StreamTask<?, ?> mockTask = mock(StreamTask.class); StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task"); when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(new Object()); when(mockTask.getCheckpointLock()).thenReturn(new Object());
...@@ -186,9 +237,44 @@ public class StreamSourceOperatorTest { ...@@ -186,9 +237,44 @@ public class StreamSourceOperatorTest {
when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class)); doAnswer(new Answer<ScheduledFuture>() {
@Override
public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable {
final long execTime = (Long) invocation.getArguments()[0];
final Triggerable target = (Triggerable) invocation.getArguments()[1];
if (timeProvider == null) {
throw new RuntimeException("The time provider is null");
}
timeProvider.registerTimer(execTime, new Runnable() {
@Override
public void run() {
try {
target.trigger(execTime);
} catch (Exception e) {
e.printStackTrace();
}
}
});
return null;
}
}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
doAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
if (timeProvider == null) {
throw new RuntimeException("The time provider is null");
}
return timeProvider.getCurrentProcessingTime();
}
}).when(mockTask).getCurrentProcessingTime();
operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
} }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction { private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册