diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index feeceb0d7ad8b4c910f1f3b88c8098061bcf65cc..582f280941ee7d15406c77ef77ac80cbe77a2614 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -51,6 +51,8 @@ public class GroupReduceDriver implements PactDriver comparator; + private boolean mutableObjectMode = false; + private volatile boolean running; // ------------------------------------------------------------------------ diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java new file mode 100644 index 0000000000000000000000000000000000000000..7d093d56c5ea8f4ef2a045f072b3d0a7f20ac44f --- /dev/null +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutable.java @@ -0,0 +1,202 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.util; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import eu.stratosphere.api.common.typeutils.TypeComparator; +import eu.stratosphere.api.common.typeutils.TypeSerializer; +import eu.stratosphere.util.MutableObjectIterator; + +/** + * The KeyValueIterator returns a key and all values that belong to the key (share the same key). + * + */ +public final class KeyGroupedIteratorImmutable { + + private final MutableObjectIterator iterator; + + private final TypeSerializer serializer; + + private final TypeComparator comparator; + + private ValuesIterator valuesIterator; + + private E lastKeyRecord; + + private E lookahead; + + private boolean done; + + /** + * Initializes the KeyGroupedIterator. It requires an iterator which returns its result + * sorted by the key fields. + * + * @param iterator An iterator over records, which are sorted by the key fields, in any order. + * @param serializer The serializer for the data type iterated over. + * @param comparator The comparator for the data type iterated over. + */ + public KeyGroupedIteratorImmutable(MutableObjectIterator iterator, + TypeSerializer serializer, TypeComparator comparator) + { + if (iterator == null || serializer == null || comparator == null) { + throw new NullPointerException(); + } + + this.iterator = iterator; + this.serializer = serializer; + this.comparator = comparator; + } + + /** + * Moves the iterator to the next key. This method may skip any values that have not yet been returned by the + * iterator created by the {@link #getValues()} method. Hence, if called multiple times it "removes" key groups. + * + * @return true, if the input iterator has an other group of records with the same key. + */ + public boolean nextKey() throws IOException { + + if (lookahead != null) { + // common case: whole value-iterator was consumed and a new key group is available. + this.comparator.setReference(this.lookahead); + this.valuesIterator.next = this.lookahead; + this.lastKeyRecord = this.lookahead; + this.lookahead = null; + return true; + } + + // first element, empty/done, or the values iterator was not entirely consumed + if (this.done) { + return false; + } + + if (this.valuesIterator != null) { + // values was not entirely consumed. move to the next key + // Required if user code / reduce() method did not read the whole value iterator. + E next = this.serializer.createInstance(); + while (true) { + if ((next = this.iterator.next(next)) != null) { + if (!this.comparator.equalToReference(next)) { + // the keys do not match, so we have a new group. store the current key + this.comparator.setReference(next); + this.valuesIterator.next = next; + this.lastKeyRecord = next; + return true; + } + } + else { + // input exhausted + this.valuesIterator.next = null; + this.valuesIterator = null; + this.lastKeyRecord = null; + this.done = true; + return false; + } + } + } + else { + // first element + // get the next element + E first = this.iterator.next(this.serializer.createInstance()); + if (first != null) { + this.comparator.setReference(first); + this.valuesIterator = new ValuesIterator(first); + this.lastKeyRecord = first; + return true; + } + else { + // empty input, set everything null + this.done = true; + return false; + } + } + } + + private E advanceToNext() { + try { + E next = this.iterator.next(serializer.createInstance()); + if (next != null) { + if (comparator.equalToReference(next)) { + // same key + return next; + } else { + // moved to the next key, no more values here + this.lookahead = next; + return null; + } + } + else { + // backing iterator is consumed + this.done = true; + return null; + } + } + catch (IOException e) { + throw new RuntimeException("An error occurred while reading the next record.", e); + } + } + + public E getCurrent() { + return lastKeyRecord; + } + + public TypeComparator getComparatorWithCurrentReference() { + return this.comparator; + } + + /** + * Returns an iterator over all values that belong to the current key. The iterator is initially null + * (before the first call to {@link #nextKey()} and after all keys are consumed. In general, this method returns + * always a non-null value, if a previous call to {@link #nextKey()} return true. + * + * @return Iterator over all values that belong to the current key. + */ + public ValuesIterator getValues() { + return this.valuesIterator; + } + + // -------------------------------------------------------------------------------------------- + + public final class ValuesIterator implements Iterator { + + private E next; + + private ValuesIterator(E first) { + this.next = first; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public E next() { + if (this.next != null) { + E current = this.next; + this.next = KeyGroupedIteratorImmutable.this.advanceToNext(); + return current; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4b8342292c3b7a5eb272346464042a3277699d68 --- /dev/null +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/util/KeyGroupedIteratorImmutableTest.java @@ -0,0 +1,353 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator; +import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer; +import eu.stratosphere.types.IntValue; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; +import eu.stratosphere.util.MutableObjectIterator; + +/** + * Test for the safe key grouped iterator, which advances in windows containing the same key and provides a sub-iterator + * over the records with the same key. + */ +public class KeyGroupedIteratorImmutableTest { + + private MutableObjectIterator sourceIter; // the iterator that provides the input + + private KeyGroupedIteratorImmutable psi; // the grouping iterator, progressing in key steps + + @Before + public void setup() + { + final ArrayList source = new ArrayList(); + + // add elements to the source + source.add(new IntStringPair(new IntValue(1), new StringValue("A"))); + source.add(new IntStringPair(new IntValue(2), new StringValue("B"))); + source.add(new IntStringPair(new IntValue(3), new StringValue("C"))); + source.add(new IntStringPair(new IntValue(3), new StringValue("D"))); + source.add(new IntStringPair(new IntValue(4), new StringValue("E"))); + source.add(new IntStringPair(new IntValue(4), new StringValue("F"))); + source.add(new IntStringPair(new IntValue(4), new StringValue("G"))); + source.add(new IntStringPair(new IntValue(5), new StringValue("H"))); + source.add(new IntStringPair(new IntValue(5), new StringValue("I"))); + source.add(new IntStringPair(new IntValue(5), new StringValue("J"))); + source.add(new IntStringPair(new IntValue(5), new StringValue("K"))); + source.add(new IntStringPair(new IntValue(5), new StringValue("L"))); + + + this.sourceIter = new MutableObjectIterator() { + final Iterator it = source.iterator(); + + @Override + public Record next(Record reuse) throws IOException { + if (it.hasNext()) { + IntStringPair pair = it.next(); + reuse.setField(0, pair.getInteger()); + reuse.setField(1, pair.getString()); + return reuse; + } + else { + return null; + } + } + }; + + final RecordSerializer serializer = RecordSerializer.get(); + @SuppressWarnings("unchecked") + final RecordComparator comparator = new RecordComparator(new int[] {0}, new Class[] {IntValue.class}); + + this.psi = new KeyGroupedIteratorImmutable(this.sourceIter, serializer, comparator); + } + + @Test + public void testNextKeyOnly() throws Exception + { + try { + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + Assert.assertNull("KeyGroupedIterator must not have another value.", this.psi.getValues()); + + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test encountered an unexpected exception."); + } + } + + @Test + public void testFullIterationThroughAllValues() throws IOException + { + try { + // Key 1, Value A + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(1)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + // Key 2, Value B + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(2)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + // Key 3, Values C, D + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + try { + this.psi.getValues().next(); + Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next()."); + } + catch (NoSuchElementException nseex) {} + Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext()); + try { + this.psi.getValues().next(); + Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next()."); + } + catch (NoSuchElementException nseex) {} + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + // Key 4, Values E, F, G + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("F"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("G"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(4)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 4, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + // Key 5, Values H, I, J, K, L + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("J"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("K"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("L"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + try { + this.psi.getValues().next(); + Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next()."); + } + catch (NoSuchElementException nseex) {} + Assert.assertFalse("KeyGroupedIterator must not have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + try { + this.psi.getValues().next(); + Assert.fail("A new KeyGroupedIterator must not have any value available and hence throw an exception on next()."); + } + catch (NoSuchElementException nseex) {} + + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test encountered an unexpected exception."); + } + } + + @Test + public void testMixedProgress() throws Exception + { + try { + // Progression only via nextKey() and hasNext() - Key 1, Value A + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + + // Progression only through nextKey() - Key 2, Value B + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + + // Progression first though haNext() and next(), then through hasNext() - Key 3, Values C, D + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(3)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + + // Progression first via next() only, then hasNext() only Key 4, Values E, F, G + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("E"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + + // Key 5, Values H, I, J, K, L + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("H"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + Assert.assertTrue("KeyGroupedIterator returned a wrong key.", this.psi.getComparatorWithCurrentReference().equalToReference(new Record(new IntValue(5)))); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 5, this.psi.getCurrent().getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("I"), this.psi.getValues().next().getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator must have another value.", this.psi.getValues().hasNext()); + + // end + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + Assert.assertFalse("KeyGroupedIterator must not have another key.", this.psi.nextKey()); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test encountered an unexpected exception."); + } + } + + @Test + public void testHasNextDoesNotOverweiteCurrentRecord() throws Exception + { + try { + Iterator valsIter = null; + Record rec = null; + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + valsIter = this.psi.getValues(); + Assert.assertNotNull("Returned Iterator must not be null", valsIter); + Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext()); + rec = valsIter.next(); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 1, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("A"), rec.getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext()); + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + valsIter = this.psi.getValues(); + Assert.assertNotNull("Returned Iterator must not be null", valsIter); + Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext()); + rec = valsIter.next(); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator must have another value.", valsIter.hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 2, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("B"), rec.getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator's value iterator must not have another value.", valsIter.hasNext()); + + Assert.assertTrue("KeyGroupedIterator must have another key.", this.psi.nextKey()); + valsIter = this.psi.getValues(); + Assert.assertNotNull("Returned Iterator must not be null", valsIter); + Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext()); + rec = valsIter.next(); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class)); + Assert.assertTrue("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("C"), rec.getField(1, StringValue.class)); + rec = valsIter.next(); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class)); + Assert.assertFalse("KeyGroupedIterator's value iterator must have another value.", valsIter.hasNext()); + Assert.assertEquals("KeyGroupedIterator returned a wrong key.", 3, rec.getField(0, IntValue.class).getValue()); + Assert.assertEquals("KeyGroupedIterator returned a wrong value.", new StringValue("D"), rec.getField(1, StringValue.class)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("The test encountered an unexpected exception."); + } + } + + private static final class IntStringPair + { + private final IntValue integer; + private final StringValue string; + + IntStringPair(IntValue integer, StringValue string) { + this.integer = integer; + this.string = string; + } + + public IntValue getInteger() { + return integer; + } + + public StringValue getString() { + return string; + } + } +}