提交 ff80dc04 编写于 作者: S Stephan Ewen

[FLINK-3554] [streaming] Emit a MAX Watermark after finite sources finished

This closes #1750
上级 93733cc0
......@@ -20,12 +20,12 @@ package org.apache.flink.streaming.examples.ml.util;
public class IncrementalLearningSkeletonData {
public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
"0\n" + "0\n" + "0\n" + "0\n";
"0\n" + "0\n" + "0\n" + "0\n" + "0\n";
private IncrementalLearningSkeletonData() {
}
......
......@@ -296,9 +296,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
Tuple2.of(new Event(2, "end", 2.0), 8L),
Tuple2.of(new Event(1, "middle", 5.0), 7L),
Tuple2.of(new Event(3, "middle", 6.0), 9L),
Tuple2.of(new Event(3, "end", 7.0), 7L),
// last element for high final watermark
Tuple2.of(new Event(3, "end", 7.0), 100L)
Tuple2.of(new Event(3, "end", 7.0), 7L)
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
@Override
......
......@@ -18,16 +18,23 @@
package org.apache.flink.runtime.minicluster
import java.util
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
import org.apache.flink.runtime.messages.JobManagerMessages
import org.apache.flink.runtime.messages.JobManagerMessages.{StoppingFailure, StoppingResponse, RunningJobsStatus, RunningJobs}
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
import scala.concurrent.Await
/**
* Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
* JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to setup Flink's
......@@ -211,4 +218,34 @@ class LocalFlinkMiniCluster(
JobManager.ARCHIVE_NAME
}
}
// --------------------------------------------------------------------------
// Actions on running jobs
// --------------------------------------------------------------------------
def currentlyRunningJobs: Iterable[JobID] = {
val leader = getLeaderGateway(timeout)
val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout)
.mapTo[RunningJobsStatus]
Await.result(future, timeout).runningJobs.map(_.getJobId)
}
def getCurrentlyRunningJobsJava(): java.util.List[JobID] = {
val list = new java.util.ArrayList[JobID]()
currentlyRunningJobs.foreach(list.add)
list
}
def stopJob(id: JobID) : Unit = {
val leader = getLeaderGateway(timeout)
val response = leader.ask(new JobManagerMessages.StopJob(id), timeout)
.mapTo[StoppingResponse]
val rc = Await.result(response, timeout)
rc match {
case failure: StoppingFailure =>
throw new Exception(s"Stopping the job with ID $id failed.", failure.cause)
case _ =>
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.testutils;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
......@@ -105,7 +106,7 @@ public class DummyEnvironment implements Environment {
@Override
public Map<String, Future<Path>> getDistributedCacheEntries() {
return null;
return Collections.emptyMap();
}
@Override
......
......@@ -41,7 +41,14 @@ public class StoppableStreamSource<OUT, SRC extends SourceFunction<OUT> & Stoppa
super(sourceFunction);
}
/**
* Marks the source a stopped and calls {@link StoppableFunction#stop()} on the user function.
*/
public void stop() {
// important: marking the source as stopped has to happen before the function is stopped.
// the flag that tracks this status is volatile, so the memory model also guarantees
// the happens-before relationship
markCanceledOrStopped();
userFunction.stop();
}
}
......@@ -42,14 +42,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
private transient SourceFunction.SourceContext<OUT> ctx;
private transient volatile boolean canceledOrStopped = false;
public StreamSource(SRC sourceFunction) {
super(sourceFunction);
this.chainingStrategy = ChainingStrategy.HEAD;
}
public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final SourceFunction.SourceContext<OUT> ctx;
switch (timeCharacteristic) {
case EventTime:
......@@ -66,19 +71,60 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
throw new Exception(String.valueOf(timeCharacteristic));
}
userFunction.run(ctx);
ctx.close();
// copy to a field to give the 'cancel()' method access
this.ctx = ctx;
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
}
}
public void cancel() {
// important: marking the source as stopped has to happen before the function is stopped.
// the flag that tracks this status is volatile, so the memory model also guarantees
// the happens-before relationship
markCanceledOrStopped();
userFunction.cancel();
// the context may not be initialized if the source was never running.
if (ctx != null) {
ctx.close();
}
}
/**
* Marks this source as canceled or stopped.
*
* <p>This indicates that any exit of the {@link #run(Object, Output)} method
* cannot be interpreted as the result of a finite source.
*/
protected void markCanceledOrStopped() {
this.canceledOrStopped = true;
}
/**
* Checks whether the source has been canceled or stopped.
* @return True, if the source is canceled or stopped, false is not.
*/
protected boolean isCanceledOrStopped() {
return canceledOrStopped;
}
/**
* Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...)
* has caused an exception. If one of these threads caused an exception, this method will
* throw that exception.
*/
void checkAsyncException() {
getContainingTask().checkTimerException();
}
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.watermark;
import org.apache.flink.annotation.PublicEvolving;
......@@ -38,11 +39,15 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
* <p>
* When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}. When
* an operator receives this it will know that no more input will be arriving in the future.
*
*/
@PublicEvolving
public class Watermark extends StreamElement {
public final class Watermark extends StreamElement {
/** The watermark that signifies end-of-event-time */
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
// ------------------------------------------------------------------------
/** The timestamp of the watermark */
private final long timestamp;
......@@ -60,6 +65,8 @@ public class Watermark extends StreamElement {
return timestamp;
}
// ------------------------------------------------------------------------
@Override
public boolean equals(Object o) {
return this == o ||
......
......@@ -61,9 +61,9 @@ public class SourceFunctionTest {
assertEquals(expectedList, actualList);
}
@Test
public void socketTextStreamTest() throws Exception {
// TODO: does not work because we cannot set the internal socket anymore
// TODO: does not work because we cannot set the internal socket anymore
// @Test
// public void socketTextStreamTest() throws Exception {
// List<String> expectedList = Arrays.asList("a", "b", "c");
// List<String> actualList = new ArrayList<String>();
//
......@@ -80,5 +80,5 @@ public class SourceFunctionTest {
// actualList.add(source.next());
// }
// assertEquals(expectedList, actualList);
}
// }
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@SuppressWarnings("serial")
public class StreamSourceOperatorTest {
@Test
public void testEmitMaxWatermarkForFiniteSource() throws Exception {
// regular stream source operator
StreamSource<String, FiniteSource<String>> operator =
new StreamSource<>(new FiniteSource<String>());
final List<StreamElement> output = new ArrayList<>();
setupSourceOperator(operator);
operator.run(new Object(), new CollectorOutput<String>(output));
assertEquals(1, output.size());
assertEquals(Watermark.MAX_WATERMARK, output.get(0));
}
@Test
public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
final List<StreamElement> output = new ArrayList<>();
// regular stream source operator
final StreamSource<String, InfiniteSource<String>> operator =
new StreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator);
operator.cancel();
// run and exit
operator.run(new Object(), new CollectorOutput<String>(output));
assertTrue(output.isEmpty());
}
@Test
public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
final List<StreamElement> output = new ArrayList<>();
final Thread runner = Thread.currentThread();
// regular stream source operator
final StreamSource<String, InfiniteSource<String>> operator =
new StreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator);
// trigger an async cancel in a bit
new Thread("canceler") {
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {}
operator.cancel();
runner.interrupt();
}
}.start();
// run and wait to be canceled
try {
operator.run(new Object(), new CollectorOutput<String>(output));
}
catch (InterruptedException ignored) {}
assertTrue(output.isEmpty());
}
@Test
public void testNoMaxWatermarkOnImmediateStop() throws Exception {
final List<StreamElement> output = new ArrayList<>();
// regular stream source operator
final StoppableStreamSource<String, InfiniteSource<String>> operator =
new StoppableStreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator);
operator.stop();
// run and stop
operator.run(new Object(), new CollectorOutput<String>(output));
assertTrue(output.isEmpty());
}
@Test
public void testNoMaxWatermarkOnAsyncStop() throws Exception {
final List<StreamElement> output = new ArrayList<>();
// regular stream source operator
final StoppableStreamSource<String, InfiniteSource<String>> operator =
new StoppableStreamSource<>(new InfiniteSource<String>());
setupSourceOperator(operator);
// trigger an async cancel in a bit
new Thread("canceler") {
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {}
operator.stop();
}
}.start();
// run and wait to be stopped
operator.run(new Object(), new CollectorOutput<String>(output));
assertTrue(output.isEmpty());
}
// ------------------------------------------------------------------------
@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
ExecutionConfig executionConfig = new ExecutionConfig();
StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(cfg);
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
}
// ------------------------------------------------------------------------
private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction {
@Override
public void run(SourceContext<T> ctx) {}
@Override
public void cancel() {}
@Override
public void stop() {}
}
private static final class InfiniteSource<T> implements SourceFunction<T>, StoppableFunction {
private volatile boolean running = true;
@Override
public void run(SourceContext<T> ctx) throws Exception {
while (running) {
Thread.sleep(20);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void stop() {
running = false;
}
}
// ------------------------------------------------------------------------
private static class CollectorOutput<T> implements Output<StreamRecord<T>> {
private final List<StreamElement> list;
private CollectorOutput(List<StreamElement> list) {
this.list = list;
}
@Override
public void emitWatermark(Watermark mark) {
list.add(mark);
}
@Override
public void collect(StreamRecord<T> record) {
list.add(record);
}
@Override
public void close() {}
}
}
......@@ -74,8 +74,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple2.of("a", 7));
ctx.collect(Tuple2.of("a", 8));
// so we get a final big watermark
ctx.collect(Tuple2.of("a", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
......@@ -96,8 +95,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple2.of("c", 7));
ctx.collect(Tuple2.of("c", 8));
// so we get a final big watermark
ctx.collect(Tuple2.of("a", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
......@@ -172,8 +170,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple3.of("a", "j", 7));
ctx.collect(Tuple3.of("a", "k", 8));
// so we get a final big watermark
ctx.collect(Tuple3.of("a", "k", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
......@@ -194,8 +191,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple3.of("a", "x", 6));
ctx.collect(Tuple3.of("a", "z", 8));
// so we get a final high watermark
ctx.collect(Tuple3.of("a", "z", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
......@@ -272,8 +268,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple3.of("a", "j", 7));
ctx.collect(Tuple3.of("a", "k", 8));
// so we get a final high watermark
ctx.collect(Tuple3.of("a", "k", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
......
......@@ -74,13 +74,12 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple2.of("a", 7));
ctx.collect(Tuple2.of("a", 8));
// so that we get a high final watermark to process the previously sent elements
ctx.collect(Tuple2.of("a", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
public void cancel() {
}
public void cancel() {}
}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
source1
......@@ -139,8 +138,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(Tuple2.of("b", 5));
ctx.collect(Tuple2.of("a", 5));
// so that we get a high final watermark to process the previously sent elements
ctx.collect(Tuple2.of("a", 20));
// source is finite, so it will have an implicit MAX watermark when it finishes
}
@Override
......
......@@ -18,8 +18,10 @@
package org.apache.flink.streaming.timestamp;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
......@@ -52,7 +54,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
......@@ -156,14 +159,88 @@ public class TimestampITCase {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
Assert.fail("Wrong watermark.");
fail("Wrong watermark.");
}
}
assertFalse(CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1).equals(new Watermark(Long.MAX_VALUE)));
assertEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
}
}
@Test
public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
// for this test to work, we need to be sure that no other jobs are being executed
while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
Thread.sleep(100);
}
final int NUM_WATERMARKS = 10;
long initialTime = 0L;
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getLeaderRPCPort());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(PARALLELISM);
env.getConfig().disableSysoutLogging();
DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS));
DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2));
source1.union(source2)
.map(new IdentityMap())
.connect(source2).map(new IdentityCoMap())
.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
.addSink(new NoOpSink<Integer>());
new Thread("stopper") {
@Override
public void run() {
try {
// try until we get the running jobs
List<JobID> running;
while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
Thread.sleep(100);
}
JobID id = running.get(0);
// send stop until the job is stopped
do {
cluster.stopJob(id);
Thread.sleep(50);
} while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
}
catch (Throwable t) {
t.printStackTrace();
}
}
}.start();
env.execute();
// verify that all the watermarks arrived at the final custom operator
for (int i = 0; i < PARALLELISM; i++) {
// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
// other source stops emitting after that
for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
fail("Wrong watermark.");
}
}
assertNotEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
}
}
/**
* These check whether timestamps are properly assigned at the sources and handled in
......@@ -200,8 +277,7 @@ public class TimestampITCase {
@Test
public void testDisabledTimestamps() throws Exception {
final int NUM_ELEMENTS = 10;
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getLeaderRPCPort());
......@@ -222,7 +298,7 @@ public class TimestampITCase {
}
/**
* This thests whether timestamps are properly extracted in the timestamp
* This tests whether timestamps are properly extracted in the timestamp
* extractor and whether watermarks are also correctly forwared from this with the auto watermark
* interval.
*/
......@@ -279,7 +355,10 @@ public class TimestampITCase {
Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
}
}
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE)));
// the input is finite, so it should have a MAX Watermark
assertEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
/**
......@@ -339,7 +418,10 @@ public class TimestampITCase {
Assert.fail("Wrong watermark.");
}
}
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE)));
// the input is finite, so it should have a MAX Watermark
assertEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
/**
......@@ -400,7 +482,9 @@ public class TimestampITCase {
Assert.fail("Wrong watermark.");
}
}
assertFalse(CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size()-1).equals(new Watermark(Long.MAX_VALUE)));
// the input is finite, so it should have a MAX Watermark
assertEquals(Watermark.MAX_WATERMARK,
CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
}
/**
......@@ -710,8 +794,8 @@ public class TimestampITCase {
public static class MyTimestampSource implements SourceFunction<Integer> {
private long initialTime;
private int numWatermarks;
private final long initialTime;
private final int numWatermarks;
public MyTimestampSource(long initialTime, int numWatermarks) {
this.initialTime = initialTime;
......@@ -730,6 +814,41 @@ public class TimestampITCase {
public void cancel() {}
}
public static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction {
private final long initialTime;
private final int numWatermarks;
private volatile boolean running = true;
public MyTimestampSourceInfinite(long initialTime, int numWatermarks) {
this.initialTime = initialTime;
this.numWatermarks = numWatermarks;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for (int i = 0; i < numWatermarks; i++) {
ctx.collectWithTimestamp(i, initialTime + i);
ctx.emitWatermark(new Watermark(initialTime + i));
}
while (running) {
Thread.sleep(20);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void stop() {
running = false;
}
}
public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
int numWatermarks;
......
......@@ -102,7 +102,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
}).when(mockTask).createStateBackend(any(String.class), any(TypeSerializer.class));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage(), e);
}
doAnswer(new Answer<Void>() {
......
......@@ -28,12 +28,9 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.mockito.stubbing.OngoingStubbing;
import java.util.concurrent.ConcurrentLinkedQueue;
......
......@@ -56,8 +56,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {}
......@@ -73,8 +72,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("c", 7))
ctx.collect(("c", 8))
// so that we get a high final watermark to process the previously sent elements
ctx.collect(("c", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
......@@ -126,8 +124,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", "j", 7))
ctx.collect(("a", "k", 8))
// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", "k", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {}
......@@ -145,8 +142,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", "x", 6))
ctx.collect(("a", "z", 8))
// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", "z", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {}
......@@ -208,8 +204,7 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", "j", 7))
ctx.collect(("a", "k", 8))
// so that we get a high final watermark to process the previously sent elements
ctx.collect(("a", "k", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {}
......
......@@ -59,8 +59,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// so we get a big watermark to trigger processing of the previous elements
ctx.collect(("a", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
......@@ -107,8 +106,7 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
ctx.collect(("b", 5))
ctx.collect(("a", 5))
// so we get a big watermark to trigger processing of the previous elements
ctx.collect(("a", 20))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册