提交 d5f2647a 编写于 作者: L lincoln-lil 提交者: Stephan Ewen

[FLINK-5883] [core] Re-adding the Exception-thrown code for...

[FLINK-5883] [core] Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time

This closes #3392
上级 3f700caf
......@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.TraversableOnceException;
import java.io.IOException;
import java.util.Iterator;
......@@ -78,6 +79,7 @@ public final class ListKeyGroupedIterator<E> {
this.comparator.setReference(this.lookahead);
this.valuesIterator.next = this.lookahead;
this.lookahead = null;
this.valuesIterator.iteratorAvailable = true;
return true;
}
......@@ -96,6 +98,7 @@ public final class ListKeyGroupedIterator<E> {
// the keys do not match, so we have a new group. store the current key
this.comparator.setReference(next);
this.valuesIterator.next = next;
this.valuesIterator.iteratorAvailable = true;
return true;
}
}
......@@ -160,7 +163,9 @@ public final class ListKeyGroupedIterator<E> {
public final class ValuesIterator implements Iterator<E>, Iterable<E> {
private E next;
private boolean iteratorAvailable = true;
private final TypeSerializer<E> serializer;
private ValuesIterator(E first, TypeSerializer<E> serializer) {
......@@ -191,7 +196,12 @@ public final class ListKeyGroupedIterator<E> {
@Override
public Iterator<E> iterator() {
return this;
if (iteratorAvailable) {
iteratorAvailable = false;
return this;
} else {
throw new TraversableOnceException();
}
}
public E getCurrent() {
......
......@@ -19,6 +19,7 @@
package org.apache.flink.test.iterative;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
......@@ -115,11 +116,12 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> candidates, Iterable<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
if (!current.iterator().hasNext()) {
Iterator<Tuple2<Long, Long>> iterator = current.iterator();
if (!iterator.hasNext()) {
throw new RuntimeException("Error: Id not encountered before.");
}
Tuple2<Long, Long> old = current.iterator().next();
Tuple2<Long, Long> old = iterator.next();
long minimumComponentID = Long.MAX_VALUE;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册