From 3832d7b7a0216e5cf6e5100bb56b5a703d4fb79e Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 2 Dec 2014 17:34:46 +0100 Subject: [PATCH] [FLINK-1137] Enhance MutableObjectIterator with non-reuse next() This is in preparation for configurable object-reuse mode. We previously referred to this as mutable object vs. mutable object safe mode or some such thing. --- .../flink/streaming/io/CoReaderIterator.java | 9 +-- .../flink/streaming/util/MockContext.java | 11 ++++ .../flink/util/MutableObjectIterator.java | 20 +++++-- .../BroadcastVariableMaterialization.java | 8 +-- .../disk/ChannelReaderInputViewIterator.java | 14 +++++ .../runtime/io/disk/InputViewIterator.java | 9 +++ .../iterative/io/HashPartitionIterator.java | 33 +++++++++++ .../flink/runtime/operators/DataSinkTask.java | 5 +- .../operators/hash/CompactingHashTable.java | 21 +++++++ .../runtime/operators/hash/HashPartition.java | 23 +++++++- .../operators/hash/MutableHashTable.java | 50 ++++++++++++++++ .../AbstractBlockResettableIterator.java | 10 ++++ .../BlockResettableMutableObjectIterator.java | 36 ++++++++++- ...illingResettableMutableObjectIterator.java | 35 ++++++++++- .../sort/FixedLengthRecordSorter.java | 27 +++++++++ .../runtime/operators/sort/MergeIterator.java | 29 +++++++++ .../operators/sort/NormalizedKeySorter.java | 30 ++++++++++ .../operators/util/ReaderIterator.java | 33 +++++++++-- .../plugable/DeserializationDelegate.java | 39 +----------- .../NonReusingDeserializationDelegate.java | 57 ++++++++++++++++++ .../ReusingDeserializationDelegate.java | 59 +++++++++++++++++++ .../util/EmptyMutableObjectIterator.java | 11 ++++ .../util/KeyGroupedMutableObjectIterator.java | 37 ++++++++++++ .../util/RegularToMutableObjectIterator.java | 9 +++ .../operators/hash/HashTableITCase.java | 30 ++++++++++ .../sort/MassiveStringSortingITCase.java | 19 ++++++ .../operators/sort/MergeIteratorTest.java | 17 ++++++ .../operators/sort/MockRecordReader.java | 26 ++++++++ .../DelayingInfinitiveInputIterator.java | 10 +++- .../testutils/InfiniteInputIterator.java | 9 +++ .../MutableObjectIteratorWrapper.java | 13 ++++ .../testutils/RandomIntPairGenerator.java | 12 ++++ .../runtime/operators/testutils/TestData.java | 40 ++++++++++++- .../testutils/UniformIntPairGenerator.java | 33 +++++++++++ .../testutils/UniformRecordGenerator.java | 34 +++++++++++ .../testutils/UniformStringPairGenerator.java | 33 +++++++++++ .../operators/testutils/UnionIterator.java | 18 ++++++ .../util/KeyGroupedIteratorImmutableTest.java | 14 +++++ .../runtime/util/KeyGroupedIteratorTest.java | 15 +++++ .../misc/MassiveCaseClassSortingITCase.scala | 10 ++++ 40 files changed, 884 insertions(+), 64 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java index e4110e7280b..ed90c03d6c3 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate; /** * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two @@ -31,15 +32,15 @@ public class CoReaderIterator { private final CoRecordReader, DeserializationDelegate> reader; // the // source - protected final DeserializationDelegate delegate1; - protected final DeserializationDelegate delegate2; + protected final ReusingDeserializationDelegate delegate1; + protected final ReusingDeserializationDelegate delegate2; public CoReaderIterator( CoRecordReader, DeserializationDelegate> reader, TypeSerializer serializer1, TypeSerializer serializer2) { this.reader = reader; - this.delegate1 = new DeserializationDelegate(serializer1); - this.delegate2 = new DeserializationDelegate(serializer2); + this.delegate1 = new ReusingDeserializationDelegate(serializer1); + this.delegate2 = new ReusingDeserializationDelegate(serializer2); } public int next(T1 target1, T2 target2) throws IOException { diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 87bedb2af1e..5537052d82f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -72,6 +72,17 @@ public class MockContext implements StreamTaskContext { } return reuse; } + + @Override + public StreamRecord next() throws IOException { + if (listIterator.hasNext()) { + StreamRecord result = new StreamRecord(); + result.setObject(listIterator.next()); + return result; + } else { + return null; + } + } } public List getOutputs() { diff --git a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java index b7b41d40fff..ea5ed78b17d 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java +++ b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java @@ -21,16 +21,18 @@ import java.io.IOException; /** * A simple iterator interface. The key differences to the {@link java.util.Iterator} are that this - * iterator accepts an object into which it can place the content if the object is mutable, and that - * it consolidates the logic in a single next() function, rather than in two different - * functions such as hasNext() and next(). + * iterator also as a next() method that accepts an object into which it can + * place the content if the object is mutable, and that it consolidates the logic in a single + * next() function, rather than in two different functions such as + * hasNext() and next(). * * @param The element type of the collection iterated over. */ public interface MutableObjectIterator { /** - * Gets the next element from the collection. The contents of that next element is put into the given target object. + * Gets the next element from the collection. The contents of that next element is put into the + * given target object. * * @param reuse The target object into which to place next element if E is mutable. * @return The filled object or null if the iterator is exhausted @@ -39,4 +41,14 @@ public interface MutableObjectIterator { * serialization / deserialization logic */ public E next(E reuse) throws IOException; + + /** + * Gets the next element from the collection. The reader must create a new instance itself. + * + * @return The object or null if the iterator is exhausted + * + * @throws IOException Thrown, if a problem occurred in the underlying I/O layer or in the + * serialization / deserialization logic + */ + public E next() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java index f7aebb69fa4..5b5f2f25c45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.runtime.io.network.api.MutableReader; import org.apache.flink.runtime.operators.RegularPactTask; import org.apache.flink.runtime.operators.util.ReaderIterator; -import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,10 +95,11 @@ public class BroadcastVariableMaterialization { try { @SuppressWarnings("unchecked") - final MutableReader> typedReader = (MutableReader>) reader; + final MutableReader typedReader = (MutableReader) reader; @SuppressWarnings("unchecked") final TypeSerializer serializer = ((TypeSerializerFactory) serializerFactory).getSerializer(); - + + @SuppressWarnings("unchecked") final ReaderIterator readerIterator = new ReaderIterator(typedReader, serializer); if (materializer) { @@ -111,7 +111,7 @@ public class BroadcastVariableMaterialization { ArrayList data = new ArrayList(); T element; - while ((element = readerIterator.next(serializer.createInstance())) != null) { + while ((element = readerIterator.next()) != null) { data.add(element); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java index f38aa2535f0..6007db913b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java @@ -92,4 +92,18 @@ public class ChannelReaderInputViewIterator implements MutableObjectIterator< return null; } } + + @Override + public E next() throws IOException + { + try { + return this.accessors.deserialize(this.inView); + } catch (EOFException eofex) { + final List freeMem = this.inView.close(); + if (this.freeMemTarget != null) { + this.freeMemTarget.addAll(freeMem); + } + return null; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java index 74562c202b9..4f37549bffa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/InputViewIterator.java @@ -45,4 +45,13 @@ public class InputViewIterator implements MutableObjectIterator return null; } } + + @Override + public E next() throws IOException { + try { + return this.serializer.deserialize(this.inputView); + } catch (EOFException e) { + return null; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java index 2daf5cf68ef..209fb796fb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java @@ -65,6 +65,24 @@ public class HashPartitionIterator implements MutableObjectIterator return reuse; } + @Override + public BT next() throws IOException { + if (currentPartition == null) { + if (!partitions.hasNext()) { + return null; + } + currentPartition = partitions.next(); + currentPartition.setReadPosition(0); + } + + try { + return serializer.deserialize(currentPartition); + } catch (EOFException e) { + return advanceAndRead(); + } + + } + /* jump to the next partition and continue reading from that */ private BT advanceAndRead(BT reuse) throws IOException { if (!partitions.hasNext()) { @@ -81,4 +99,19 @@ public class HashPartitionIterator implements MutableObjectIterator return reuse; } + /* jump to the next partition and continue reading from that */ + private BT advanceAndRead() throws IOException { + if (!partitions.hasNext()) { + return null; + } + currentPartition = partitions.next(); + currentPartition.setReadPosition(0); + + try { + return serializer.deserialize(currentPartition); + } catch (EOFException e) { + return advanceAndRead(); + } + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 74c625f8bcb..610ab0661f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; @@ -38,7 +39,6 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.util.CloseableInputProvider; import org.apache.flink.runtime.operators.util.ReaderIterator; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.util.MutableObjectIterator; /** @@ -334,9 +334,8 @@ public class DataSinkTask extends AbstractInvokable { this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader()); - MutableReader> reader = (MutableReader>) inputReader; @SuppressWarnings({ "rawtypes" }) - final MutableObjectIterator iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer()); + final MutableObjectIterator iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer()); this.reader = (MutableObjectIterator)iter; // final sanity check diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 4f10e0b740a..4c39f28a7cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -1238,6 +1238,27 @@ public class CompactingHashTable extends AbstractMutableHashTable{ } } + @Override + public T next() throws IOException { + // This is just a copy of the above, I wanted to keep the two separate, + // in case we change something later. Plus, it keeps the diff clean... :D + if(done || this.table.closed.get()) { + return null; + } else if(!cache.isEmpty()) { + return cache.remove(cache.size()-1); + } else { + while(!done && cache.isEmpty()) { + done = !fillCache(); + } + if(!done) { + return cache.remove(cache.size()-1); + } else { + return null; + } + } + } + + /** * utility function that inserts all entries from a bucket and its overflow buckets into the cache * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java index 08acd16544e..23a415ddd60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java @@ -44,8 +44,8 @@ import org.apache.flink.util.MutableObjectIterator; /** * - * @param The type of the build side records. - * @param The type of the probe side records. + * @tparam BT The type of the build side records. + * @tparam PT The type of the probe side records. */ public class HashPartition extends AbstractPagedInputView implements SeekableDataInputView { @@ -620,7 +620,24 @@ public class HashPartition extends AbstractPagedInputView implements See return null; } } - + + public final BT next() throws IOException + { + final int pos = getCurrentPositionInSegment(); + final int buffer = HashPartition.this.currentBufferNum; + + this.currentPointer = (((long) buffer) << HashPartition.this.segmentSizeBits) + pos; + + try { + BT result = HashPartition.this.buildSideSerializer.deserialize(HashPartition.this); + this.currentHashCode = this.comparator.hash(result); + return result; + } catch (EOFException eofex) { + return null; + } + } + + protected final long getPointer() { return this.currentPointer; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 67f1ea25ea6..e69ef172717 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -1386,6 +1386,56 @@ public class MutableHashTable implements MemorySegmentSource { this.numInSegment = 0; } } + + public BT next() { + // loop over all segments that are involved in the bucket (original bucket plus overflow buckets) + while (true) { + + while (this.numInSegment < this.countInSegment) { + + final int thisCode = this.bucket.getInt(this.posInSegment); + this.posInSegment += HASH_CODE_LEN; + + // check if the hash code matches + if (thisCode == this.searchHashCode) { + // get the pointer to the pair + final long pointer = this.bucket.getLong(this.bucketInSegmentOffset + + BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN)); + this.numInSegment++; + + // deserialize the key to check whether it is really equal, or whether we had only a hash collision + try { + this.partition.setReadPosition(pointer); + BT result = this.accessor.deserialize(this.partition); + if (this.comparator.equalToReference(result)) { + this.lastPointer = pointer; + return result; + } + } + catch (IOException ioex) { + throw new RuntimeException("Error deserializing key or value from the hashtable: " + + ioex.getMessage(), ioex); + } + } + else { + this.numInSegment++; + } + } + + // this segment is done. check if there is another chained bucket + final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET); + if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) { + return null; + } + + final int overflowSegNum = (int) (forwardPointer >>> 32); + this.bucket = this.overflowSegments[overflowSegNum]; + this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff); + this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET); + this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH; + this.numInSegment = 0; + } + } public void writeBack(BT value) throws IOException { final SeekableDataOutputView outView = this.partition.getWriteView(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java index 421e0c7c1e6..730d19a8d2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/AbstractBlockResettableIterator.java @@ -173,4 +173,14 @@ abstract class AbstractBlockResettableIterator implements MemoryBlockIterator return null; } } + + protected T getNextRecord() throws IOException { + if (this.numRecordsReturned < this.numRecordsInBuffer) { + this.numRecordsReturned++; + return this.serializer.deserialize(this.readView); + } else { + return null; + } + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java index 4fe7dbba114..abce462ab4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/BlockResettableMutableObjectIterator.java @@ -103,7 +103,41 @@ public class BlockResettableMutableObjectIterator extends AbstractBlockResett } } } - + + @Override + public T next() throws IOException { + // check for the left over element + if (this.readPhase) { + return getNextRecord(); + } else { + // writing phase. check for leftover first + T result = null; + if (this.leftOverReturned) { + // get next record + if ((result = this.input.next()) != null) { + if (writeNextRecord(result)) { + return result; + } else { + // did not fit into memory, keep as leftover + this.leftOverRecord = this.serializer.copy(result); + this.leftOverReturned = false; + this.fullWriteBuffer = true; + return null; + } + } else { + this.noMoreBlocks = true; + return null; + } + } else if (this.fullWriteBuffer) { + return null; + } else { + this.leftOverReturned = true; + return this.leftOverRecord; + } + } + } + + public void reset() { // a reset always goes to the read phase diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java index 1d0b5407e25..5467ae91ac7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIterator.java @@ -159,7 +159,40 @@ public class SpillingResettableMutableObjectIterator implements ResettableMut } } } - + + @Override + public T next() throws IOException { + T result = null; + if (this.inView != null) { + // reading, any subsequent pass + if (this.currentElementNum < this.elementCount) { + try { + result = this.serializer.deserialize(this.inView); + } catch (IOException e) { + throw new RuntimeException("SpillingIterator: Error reading element from buffer.", e); + } + this.currentElementNum++; + return result; + } else { + return null; + } + } else { + // writing pass (first) + if ((result = this.input.next()) != null) { + try { + this.serializer.serialize(result, this.buffer); + } catch (IOException e) { + throw new RuntimeException("SpillingIterator: Error writing element to buffer.", e); + } + this.elementCount++; + return result; + } else { + return null; + } + } + } + + public void consumeAndCacheRemainingData() throws IOException { // check that we are in the first pass and that more input data is left if (this.inView == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java index 46399e91959..f05694b693e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java @@ -358,6 +358,33 @@ public final class FixedLengthRecordSorter implements InMemorySorter { return null; } } + + @Override + public T next() { + if (this.currentTotal < this.numTotal) { + + if (this.currentInSegment >= this.numPerSegment) { + this.currentInSegment = 0; + this.currentSegmentIndex++; + this.in.set(sortBuffer.get(this.currentSegmentIndex), 0); + } + + this.currentTotal++; + this.currentInSegment++; + + try { + // This might blow up in our face, but we ignore the readWithNormalization/ + // writeWithNormalization methods for now. + return this.comp.readWithKeyDenormalization(null, this.in); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + else { + return null; + } + } }; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java index 7a5012de04d..f3dc50eb457 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java @@ -93,6 +93,35 @@ public class MergeIterator implements MutableObjectIterator { } } + /** + * Gets the next smallest element, with respect to the definition of order implied by + * the {@link TypeSerializer} provided to this iterator. + * + * @return True, if the iterator had another element, false otherwise. + * + * @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object) + */ + @Override + public E next() throws IOException + { + if (this.heap.size() > 0) { + // get the smallest element + final HeadStream top = this.heap.peek(); + E result = this.serializer.copy(top.getHead()); + + // read an element + if (!top.nextHead()) { + this.heap.poll(); + } else { + this.heap.adjustTop(); + } + return result; + } + else { + return null; + } + } + // ============================================================================================ // Internal Classes that wrap the sorted input streams // ============================================================================================ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java index c69474a9951..c3827081304 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java @@ -322,6 +322,11 @@ public final class NormalizedKeySorter implements InMemorySorter { this.recordBuffer.setReadPosition(pointer); return this.serializer.deserialize(reuse, this.recordBuffer); } + + private final T getRecordFromBuffer(long pointer) throws IOException { + this.recordBuffer.setReadPosition(pointer); + return this.serializer.deserialize(this.recordBuffer); + } private final int compareRecords(long pointer1, long pointer2) { this.recordBuffer.setReadPosition(pointer1); @@ -431,6 +436,31 @@ public final class NormalizedKeySorter implements InMemorySorter { return null; } } + + @Override + public T next() + { + if (this.current < this.size) { + this.current++; + if (this.currentOffset > lastIndexEntryOffset) { + this.currentOffset = 0; + this.currentIndexSegment = sortIndex.get(++this.currentSegment); + } + + long pointer = this.currentIndexSegment.getLong(this.currentOffset); + this.currentOffset += indexEntrySize; + + try { + return getRecordFromBuffer(pointer); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + else { + return null; + } + } }; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java index 0d29d5e5454..606c50ce451 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/ReaderIterator.java @@ -24,6 +24,8 @@ import java.io.IOException; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.network.api.MutableReader; import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate; import org.apache.flink.util.MutableObjectIterator; @@ -32,9 +34,10 @@ import org.apache.flink.util.MutableObjectIterator; */ public final class ReaderIterator implements MutableObjectIterator { - private final MutableReader> reader; // the source + private final MutableReader reader; // the source - private final DeserializationDelegate delegate; + private final ReusingDeserializationDelegate reusingDelegate; + private final NonReusingDeserializationDelegate nonReusingDelegate; /** * Creates a new iterator, wrapping the given reader. @@ -43,15 +46,33 @@ public final class ReaderIterator implements MutableObjectIterator { */ public ReaderIterator(MutableReader> reader, TypeSerializer serializer) { this.reader = reader; - this.delegate = new DeserializationDelegate(serializer); + this.reusingDelegate = new ReusingDeserializationDelegate(serializer); + this.nonReusingDelegate = new NonReusingDeserializationDelegate(serializer); } @Override + @SuppressWarnings("unchecked") public T next(T target) throws IOException { - this.delegate.setInstance(target); + this.reusingDelegate.setInstance(target); try { - if (this.reader.next(this.delegate)) { - return this.delegate.getInstance(); + if (this.reader.next(this.reusingDelegate)) { + return this.reusingDelegate.getInstance(); + } else { + return null; + } + + } + catch (InterruptedException e) { + throw new IOException("Reader interrupted.", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public T next() throws IOException { + try { + if (this.reader.next(this.nonReusingDelegate)) { + return this.nonReusingDelegate.getInstance(); } else { return null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java index fbe91eda922..9ca5954dee2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/DeserializationDelegate.java @@ -15,45 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - package org.apache.flink.runtime.plugable; -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - - -public class DeserializationDelegate implements IOReadableWritable { - - private T instance; - - private final TypeSerializer serializer; - - - public DeserializationDelegate(TypeSerializer serializer) { - this.serializer = serializer; - } - - - public void setInstance(T instance) { - this.instance = instance; - } - - public T getInstance() { - return instance; - } - @Override - public void write(DataOutputView out) throws IOException { - throw new IllegalStateException("Serialization method called on DeserializationDelegate."); - } +public interface DeserializationDelegate extends IOReadableWritable { + void setInstance(T instance); - @Override - public void read(DataInputView in) throws IOException { - this.instance = this.serializer.deserialize(this.instance, in); - } + T getInstance(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java new file mode 100644 index 00000000000..859f354d07e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/NonReusingDeserializationDelegate.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.plugable; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + + +public class NonReusingDeserializationDelegate implements DeserializationDelegate { + + private T instance; + + private final TypeSerializer serializer; + + + public NonReusingDeserializationDelegate(TypeSerializer serializer) { + this.serializer = serializer; + } + + public void setInstance(T instance) { + this.instance = instance; + } + + public T getInstance() { + return instance; + } + + @Override + public void write(DataOutputView out) throws IOException { + throw new IllegalStateException("Serialization method called on DeserializationDelegate."); + } + + @Override + public void read(DataInputView in) throws IOException { + this.instance = this.serializer.deserialize(in); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java new file mode 100644 index 00000000000..f3c254b4847 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/plugable/ReusingDeserializationDelegate.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.plugable; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + + +public class ReusingDeserializationDelegate implements DeserializationDelegate { + + private T instance; + + private final TypeSerializer serializer; + + + public ReusingDeserializationDelegate(TypeSerializer serializer) { + this.serializer = serializer; + } + + @Override + public void setInstance(T instance) { + this.instance = instance; + } + + @Override + public T getInstance() { + return instance; + } + + @Override + public void write(DataOutputView out) throws IOException { + throw new IllegalStateException("Serialization method called on DeserializationDelegate."); + } + + @Override + public void read(DataInputView in) throws IOException { + this.instance = this.serializer.deserialize(this.instance, in); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java index 71fb30f21ef..12ae5c1883c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EmptyMutableObjectIterator.java @@ -54,4 +54,15 @@ public final class EmptyMutableObjectIterator implements MutableObjectIterato public E next(E target) { return null; } + + /** + * Always returns null. + * + * @see MutableObjectIterator#next() + */ + @Override + public E next() { + return null; + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java index 88097cdb4c3..c139aca8a32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java @@ -167,5 +167,42 @@ public final class KeyGroupedMutableObjectIterator { ioex.getMessage(), ioex); } } + + @Override + public E next() + { + if (KeyGroupedMutableObjectIterator.this.next == null || KeyGroupedMutableObjectIterator.this.nextIsFresh) { + return null; + } + if (this.nextIsUnconsumed) { + return this.serializer.copy(KeyGroupedMutableObjectIterator.this.next); + } + + E result = null; + try { + if ((result = KeyGroupedMutableObjectIterator.this.iterator.next()) != null) { + // check whether the keys are equal + if (!this.comparator.equalToReference(result)) { + // moved to the next key, no more values here + KeyGroupedMutableObjectIterator.this.next = + this.serializer.copy(result); + KeyGroupedMutableObjectIterator.this.nextIsFresh = true; + return null; + } + // same key, next value is in "next" + return result; + } + else { + // backing iterator is consumed + KeyGroupedMutableObjectIterator.this.next = null; + return null; + } + } + catch (IOException ioex) { + throw new RuntimeException("An error occurred while reading the next record: " + + ioex.getMessage(), ioex); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java index f2fea80fb36..8eb17c43726 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/RegularToMutableObjectIterator.java @@ -47,4 +47,13 @@ public class RegularToMutableObjectIterator implements MutableObjectIterator< return null; } } + + @Override + public T next() { + if (this.iterator.hasNext()) { + return this.serializer.copy(this.iterator.next()); + } else { + return null; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index 877c5cccef3..27ece69d988 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -1433,6 +1433,20 @@ public class HashTableITCase { return null; } } + + @Override + public Record next() { + if (this.numLeft > 0) { + this.numLeft--; + Record result = new Record(2); + result.setField(0, this.key); + result.setField(1, this.value); + return result; + } + else { + return null; + } + } } // ============================================================================================ @@ -1466,6 +1480,22 @@ public class HashTableITCase { return null; } } + + @Override + public IntPair next() { + if (this.numLeft > 0) { + this.numLeft--; + + IntPair result = new IntPair(); + result.setKey(this.key); + result.setValue(this.value); + return result; + } + else { + return null; + } + } + } // ============================================================================================ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java index 9dec847499a..8db9934d4cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java @@ -274,6 +274,11 @@ public class MassiveStringSortingITCase { public String next(String reuse) throws IOException { return reader.readLine(); } + + @Override + public String next() throws IOException { + return reader.readLine(); + } } private static final class StringTupleReaderMutableObjectIterator implements MutableObjectIterator> { @@ -296,6 +301,20 @@ public class MassiveStringSortingITCase { reuse.f1 = parts; return reuse; } + + @Override + public Tuple2 next() throws IOException { + String line = reader.readLine(); + if (line == null) { + return null; + } + + String[] parts = line.split(" "); + Tuple2 result = new Tuple2(); + result.f0 = parts[0]; + result.f1 = parts; + return result; + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java index 7d681d8951a..c2214560e0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java @@ -77,6 +77,23 @@ public class MergeIteratorTest return null; } } + + @Override + public Record next() + { + if (current < keys.length) { + key.setKey(keys[current]); + value.setValue(values[current]); + current++; + Record result = new Record(2); + result.setField(0, key); + result.setField(1, value); + return result; + } + else { + return null; + } + } }; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java index 738e7fc4b8a..e5b8ed092fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MockRecordReader.java @@ -65,6 +65,32 @@ public class MockRecordReader implements MutableObjectIterator { } } + @Override + public Record next() { + Record r = null; + while (r == null) { + try { + r = queue.take(); + } catch (InterruptedException iex) { + throw new RuntimeException("Reader was interrupted."); + } + } + + if (r == SENTINEL) { + // put the sentinel back, to ensure that repeated calls do not block + try { + queue.put(r); + } catch (InterruptedException e) { + throw new RuntimeException("Reader was interrupted."); + } + return null; + } else { + Record result = new Record(r.getNumFields()); + r.copyTo(result); + return result; + } + } + public void emit(Record element) throws InterruptedException { queue.put(element.createCopy()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java index 7e61a1819de..f21d53f8091 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingInfinitiveInputIterator.java @@ -37,5 +37,13 @@ public class DelayingInfinitiveInputIterator extends InfiniteInputIterator catch (InterruptedException e) { } return super.next(reuse); } - + + @Override + public Record next() { + try { + Thread.sleep(delay); + } + catch (InterruptedException e) { } + return super.next(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java index 2ae103b9ccf..5c10f5e7b9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteInputIterator.java @@ -37,4 +37,13 @@ public class InfiniteInputIterator implements MutableObjectIterator reuse.setField(1, val2); return reuse; } + + @Override + public Record next() { + Record result = new Record(2); + result.setField(0, val1); + result.setField(1, val2); + return result; + } + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java index 8a01f121483..88f16fc9aac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MutableObjectIteratorWrapper.java @@ -50,4 +50,17 @@ public class MutableObjectIteratorWrapper implements MutableObjectIterator return null; } } + + @Override + public IntPair next() { + if (this.count++ < this.numRecords) { + IntPair result = new IntPair(); + result.setKey(this.rnd.nextInt()); + result.setValue(this.rnd.nextInt()); + return result; + } else { + return null; + } + } public void reset() { this.rnd = new Random(this.seed); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java index c28e542b078..5fe1303a89c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java @@ -181,6 +181,17 @@ public final class TestData { return reuse; } + public Record next() { + this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1); + if (this.valueMode != ValueMode.CONSTANT) { + this.value.setValue(randomString()); + } + Record result = new Record(2); + result.setField(0, this.key); + result.setField(1, this.value); + return result; + } + public boolean next(org.apache.flink.types.Value[] target) { this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1); // TODO change this to something proper @@ -264,6 +275,17 @@ public final class TestData { return null; } } + + @Override + public Record next() { + if (counter < numberOfRecords) { + counter++; + return generator.next(); + } + else { + return null; + } + } public void reset() { this.counter = 0; @@ -306,7 +328,23 @@ public final class TestData { return null; } } - + + @Override + public Record next() { + if (pos < this.numPairs) { + this.value.setValue(this.valueValue + ' ' + pos); + Record result = new Record(2); + result.setField(0, this.key); + result.setField(1, this.value); + pos++; + return result; + } + else { + return null; + } + } + + public void reset() { this.pos = 0; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java index 5d820b952a5..a7e2a7c0cb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntPairGenerator.java @@ -67,4 +67,37 @@ public class UniformIntPairGenerator implements MutableObjectIterator return target; } + + @Override + public IntPair next() { + IntPair result = new IntPair(); + if(!repeatKey) { + if(valCnt >= numVals) { + return null; + } + + result.setKey(keyCnt++); + result.setValue(valCnt); + + if(keyCnt == numKeys) { + keyCnt = 0; + valCnt++; + } + } else { + if(keyCnt >= numKeys) { + return null; + } + + result.setKey(keyCnt); + result.setValue(valCnt++); + + if(valCnt == numVals) { + valCnt = 0; + keyCnt++; + } + } + + return result; + } + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java index 8e350530546..b628f059e14 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformRecordGenerator.java @@ -81,4 +81,38 @@ public class UniformRecordGenerator implements MutableObjectIterator { reuse.updateBinaryRepresenation(); return reuse; } + + @Override + public Record next() { + if(!repeatKey) { + if(valCnt >= numVals+startVal) { + return null; + } + + key.setValue(keyCnt++); + value.setValue(valCnt); + + if(keyCnt == numKeys+startKey) { + keyCnt = startKey; + valCnt++; + } + } else { + if(keyCnt >= numKeys+startKey) { + return null; + } + key.setValue(keyCnt); + value.setValue(valCnt++); + + if(valCnt == numVals+startVal) { + valCnt = startVal; + keyCnt++; + } + } + + Record result = new Record(2); + result.setField(0, this.key); + result.setField(1, this.value); + result.updateBinaryRepresenation(); + return result; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java index ef697a8de80..45a44facb44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformStringPairGenerator.java @@ -69,4 +69,37 @@ public class UniformStringPairGenerator implements MutableObjectIterator= numVals) { + return null; + } + + result.setKey(Integer.toString(keyCnt++)); + result.setValue(Integer.toBinaryString(valCnt)); + + if(keyCnt == numKeys) { + keyCnt = 0; + valCnt++; + } + } else { + if(keyCnt >= numKeys) { + return null; + } + + result.setKey(Integer.toString(keyCnt)); + result.setValue(Integer.toBinaryString(valCnt++)); + + if(valCnt == numVals) { + valCnt = 0; + keyCnt++; + } + } + + return result; + } + + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java index 824e96bde1e..3a76ebdd6c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java @@ -57,4 +57,22 @@ public class UnionIterator implements MutableObjectIterator } } } + + @Override + public E next() throws IOException + { + E targetStaging = this.currentSource.next(); + if (targetStaging != null) { + return targetStaging; + } else { + if (this.nextSources.size() > 0) { + this.currentSource = this.nextSources.remove(0); + return next(); + } + else { + return null; + } + } + } + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java index 54a14925737..3d1a80b5598 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorImmutableTest.java @@ -78,6 +78,20 @@ public class KeyGroupedIteratorImmutableTest { return null; } } + + @Override + public Record next() throws IOException { + if (it.hasNext()) { + IntStringPair pair = it.next(); + Record result = new Record(2); + result.setField(0, pair.getInteger()); + result.setField(1, pair.getString()); + return result; + } + else { + return null; + } + } }; final RecordSerializer serializer = RecordSerializer.get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java index 85ca9a98813..39ff077e8e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyGroupedIteratorTest.java @@ -79,6 +79,21 @@ public class KeyGroupedIteratorTest { return null; } } + + @Override + public Record next() throws IOException { + if (it.hasNext()) { + IntStringPair pair = it.next(); + Record result = new Record(2); + result.setField(0, pair.getInteger()); + result.setField(1, pair.getString()); + return result; + } + else { + return null; + } + } + }; final RecordSerializer serializer = RecordSerializer.get(); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index a34c7d8e0d5..7c6bcafcb73 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -216,6 +216,16 @@ class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterato val parts = line.split(" ") StringTuple(parts(0), parts(1), parts) } + + override def next(): StringTuple = { + val line = reader.readLine() + if (line == null) { + return null + } + val parts = line.split(" ") + StringTuple(parts(0), parts(1), parts) + } + } class DummyInvokable extends AbstractInvokable { -- GitLab