提交 69f7f6d9 编写于 作者: A Aljoscha Krettek

[FLINK-2845] Fix TimestampITCase.testWatermarkPropagation()

上级 f8f747f2
......@@ -47,6 +47,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
......@@ -141,42 +142,11 @@ public class TimestampITCase {
// verify that all the watermarks arrived at the final custom operator
for (int i = 0; i < PARALLELISM; i++) {
// There can be two cases, either we get NUM_WATERMARKS + 1 watermarks or
// (NUM_WATERMARKS / 2) + 1 watermarks. This depends on which source get's to run first.
// If source1 runs first we jump directly to +Inf and skip all the intermediate
// watermarks. If source2 runs first we see the intermediate watermarks from
// NUM_WATERMARKS/2 to +Inf.
if (CustomOperator.finalWatermarks[i].size() == NUM_WATERMARKS + 1) {
for (int j = 0; j < NUM_WATERMARKS; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS).equals(new Watermark(Long.MAX_VALUE))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
Assert.fail("Wrong watermark.");
}
} else {
for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS / 2).equals(new Watermark(Long.MAX_VALUE))) {
// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks in order, because
// after that source2 emits Long.MAX_VALUE which could match with an arbitrary
// mark from source 1, for example, we could see 0,1,2,4,5,7,MAX
for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
......@@ -184,9 +154,14 @@ public class TimestampITCase {
Assert.fail("Wrong watermark.");
}
}
if (!CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size() - 1).equals(new Watermark(Long.MAX_VALUE))) {
System.err.println("All Watermarks: ");
for (int k = 0; k <= NUM_WATERMARKS; k++) {
System.err.println(CustomOperator.finalWatermarks[i].get(k));
}
Assert.fail("Wrong watermark.");
}
}
}
......@@ -615,6 +590,9 @@ public class TimestampITCase {
@Override
public void processWatermark(Watermark mark) throws Exception {
for (Watermark previousMark: watermarks) {
assertTrue(previousMark.getTimestamp() < mark.getTimestamp());
}
watermarks.add(mark);
latch.trigger();
output.emitWatermark(mark);
......@@ -623,7 +601,7 @@ public class TimestampITCase {
@Override
public void open() throws Exception {
super.open();
watermarks = new ArrayList<Watermark>();
watermarks = new ArrayList<>();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册