提交 361947d6 编写于 作者: S Stephan Ewen

[FLINK-2648] [tests] Fix flaky CombineTaskTest and improve cancelling in GroupReduceCombineDriver

上级 4b6eae5a
......@@ -184,7 +184,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
}
// sort, combine, and send the final batch
sortAndCombine();
if (running) {
sortAndCombine();
}
}
private void sortAndCombine() throws Exception {
......
......@@ -53,10 +53,12 @@ public class CombineTaskTest
private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>();
@SuppressWarnings("unchecked")
private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>(
(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>(
new int[]{0},
new TypeComparator<?>[] { new IntComparator(true) },
......@@ -179,9 +181,14 @@ public class CombineTaskTest
testTask.cancel();
// make sure it reacts to the canceling in some time
taskRunner.join(5000);
long deadline = System.currentTimeMillis() + 10000;
do {
taskRunner.interrupt();
taskRunner.join(5000);
}
while (taskRunner.isAlive() && System.currentTimeMillis() < deadline);
assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive());
assertFalse("Task did not cancel properly within in 10 seconds.", taskRunner.isAlive());
}
catch (Exception e) {
e.printStackTrace();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册