提交 6051f5cb 编写于 作者: K Kostas Kloudas

[hotfix] Fixing test instabilities in streaming.runtime.tasks.

上级 971c5633
......@@ -123,6 +123,8 @@ public class OneInputStreamTaskTest extends TestLogger {
expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2));
testHarness.waitForInputProcessing();
testHarness.endInput();
testHarness.waitForTaskCompletion();
......@@ -564,6 +566,7 @@ public class OneInputStreamTaskTest extends TestLogger {
assertEquals(numberChainedTasks, TestingStreamOperator.numberRestoreCalls);
TestingStreamOperator.numberRestoreCalls = 0;
TestingStreamOperator.numberSnapshotCalls = 0;
}
@Test
......@@ -654,6 +657,9 @@ public class OneInputStreamTaskTest extends TestLogger {
assertEquals(numRecords, numRecordsInCounter.getCount());
assertEquals(numRecords * 2 * 2 * 2, numRecordsOutCounter.getCount());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
static class DuplicatingOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
......@@ -889,6 +895,11 @@ public class OneInputStreamTaskTest extends TestLogger {
public static boolean openCalled = false;
public static boolean closeCalled = false;
TestOpenCloseMapFunction() {
openCalled = false;
closeCalled = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
......
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
......@@ -46,8 +47,14 @@ import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class SourceExternalCheckpointTriggerTest {
private static final OneShotLatch ready = new OneShotLatch();
private static final MultiShotLatch sync = new MultiShotLatch();
private static OneShotLatch ready = new OneShotLatch();
private static MultiShotLatch sync = new MultiShotLatch();
@Before
public void resetLatches() {
ready = new OneShotLatch();
sync = new MultiShotLatch();
}
@Test
@SuppressWarnings("unchecked")
......
......@@ -255,6 +255,11 @@ public class SourceStreamTaskTest {
public static boolean openCalled = false;
public static boolean closeCalled = false;
OpenCloseTestSource() {
openCalled = false;
closeCalled = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
......
......@@ -103,11 +103,13 @@ public class TwoInputStreamTaskTest {
expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2));
testHarness.waitForInputProcessing();
testHarness.endInput();
testHarness.waitForTaskCompletion();
Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
Assert.assertTrue("RichFunction methods were not called.", TestOpenCloseMapFunction.closeCalled);
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
......@@ -435,6 +437,9 @@ public class TwoInputStreamTaskTest {
assertEquals(numRecords1 + numRecords2, numRecordsInCounter.getCount());
assertEquals((numRecords1 + numRecords2) * 2 * 2 * 2, numRecordsOutCounter.getCount());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
static class DuplicatingOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> {
......@@ -562,6 +567,11 @@ public class TwoInputStreamTaskTest {
public static boolean openCalled = false;
public static boolean closeCalled = false;
TestOpenCloseMapFunction() {
openCalled = false;
closeCalled = false;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册