提交 f5471bdc 编写于 作者: K Kurt Young 提交者: Kurt Young

[FLINK-6394] [runtime] Respect object reuse configuration when executing group combining function

This closes #3803.
上级 a49ab6c1
......@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -162,7 +163,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
{
return new CombiningSpillingThread(exceptionHandler, queues, parentTask,
memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles);
memoryManager, ioManager, serializerFactory.getSerializer(),
comparator, sortReadMemory, writeMemory, maxFileHandles, objectReuseEnabled);
}
// ------------------------------------------------------------------------
......@@ -172,16 +174,20 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
protected class CombiningSpillingThread extends SpillingThread {
private final TypeComparator<E> comparator2;
private final boolean objectReuseEnabled;
public CombiningSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager,
TypeSerializer<E> serializer, TypeComparator<E> comparator,
List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles)
List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles,
boolean objectReuseEnabled)
{
super(exceptionHandler, queues, parentTask, memManager, ioManager, serializer, comparator,
sortReadMemory, writeMemory, maxNumFileHandles);
this.comparator2 = comparator.duplicate();
this.objectReuseEnabled = objectReuseEnabled;
}
/**
......@@ -315,7 +321,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// set up the combining helpers
final InMemorySorter<E> buffer = element.buffer;
final CombineValueIterator<E> iter = new CombineValueIterator<E>(buffer, this.serializer.createInstance());
final CombineValueIterator<E> iter = new CombineValueIterator<E>(
buffer, this.serializer.createInstance(), this.objectReuseEnabled);
final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
int i = 0;
......@@ -454,7 +461,6 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// the list with the target iterators
final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
// create a new channel writer
final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
......@@ -469,8 +475,18 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// combine and write to disk
try {
while (groupedIter.nextKey()) {
combineStub.combine(groupedIter.getValues(), collector);
if (objectReuseEnabled) {
final ReusingKeyGroupedIterator<E> groupedIter = new ReusingKeyGroupedIterator<>(
mergeIterator, this.serializer, this.comparator2);
while (groupedIter.nextKey()) {
combineStub.combine(groupedIter.getValues(), collector);
}
} else {
final NonReusingKeyGroupedIterator<E> groupedIter = new NonReusingKeyGroupedIterator<>(
mergeIterator, this.comparator2);
while (groupedIter.nextKey()) {
combineStub.combine(groupedIter.getValues(), collector);
}
}
}
catch (Exception e) {
......@@ -505,7 +521,9 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
private final InMemorySorter<E> buffer; // the buffer from which values are returned
private E record;
private E recordReuse;
private final boolean objectReuseEnabled;
private int last; // the position of the last value to be returned
......@@ -519,9 +537,10 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
* @param buffer
* The buffer to get the values from.
*/
public CombineValueIterator(InMemorySorter<E> buffer, E instance) {
public CombineValueIterator(InMemorySorter<E> buffer, E instance, boolean objectReuseEnabled) {
this.buffer = buffer;
this.record = instance;
this.recordReuse = instance;
this.objectReuseEnabled = objectReuseEnabled;
}
/**
......@@ -547,9 +566,14 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
public E next() {
if (this.position <= this.last) {
try {
this.record = this.buffer.getRecord(this.record, this.position);
E record;
if (objectReuseEnabled) {
record = this.buffer.getRecord(this.recordReuse, this.position);
} else {
record = this.buffer.getRecord(this.position);
}
this.position++;
return this.record;
return record;
}
catch (IOException ioex) {
LOG.error("Error retrieving a value from a buffer.", ioex);
......
......@@ -164,6 +164,7 @@ public final class NonReusingKeyGroupedIterator<E> implements KeyGroupedIterator
*
* @return Iterator over all values that belong to the current key.
*/
@Override
public ValuesIterator getValues() {
return this.valuesIterator;
}
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;
......@@ -183,6 +184,42 @@ public class CombiningUnilateralSortMergerITCase extends TestLogger {
Assert.assertTrue(comb.opened == comb.closed);
}
@Test
public void testCombineSpillingDisableObjectReuse() throws Exception {
int noKeys = 100;
int noKeyCnt = 10000;
TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
LOG.debug("initializing sortmerger");
MaterializedCountCombiner comb = new MaterializedCountCombiner();
// set maxNumFileHandles = 2 to trigger multiple channel merging
Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
0.01, 2, 0.005f, true /* use large record handler */, false);
final Tuple2<Integer, Integer> rec = new Tuple2<>();
for (int i = 0; i < noKeyCnt; i++) {
rec.setField(i, 0);
for (int j = 0; j < noKeys; j++) {
rec.setField(j, 1);
reader.emit(rec);
}
}
reader.close();
MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
while (result.hasNext()) {
Assert.assertEquals(4950, result.next().intValue());
}
merger.close();
}
@Test
public void testSortAndValidate() throws Exception
{
......@@ -332,7 +369,39 @@ public class CombiningUnilateralSortMergerITCase extends TestLogger {
closed = true;
}
}
// --------------------------------------------------------------------------------------------
public static class MaterializedCountCombiner
extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
implements GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
{
private static final long serialVersionUID = 1L;
@Override
public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {
ArrayList<Tuple2<Integer, Integer>> valueList = new ArrayList<>();
for (Tuple2<Integer, Integer> next : values) {
valueList.add(next);
}
int count = 0;
Tuple2<Integer, Integer> rec = new Tuple2<>();
for (Tuple2<Integer, Integer> tuple : valueList) {
rec.setField(tuple.f0, 0);
count += tuple.f1;
}
rec.setField(count, 1);
out.collect(rec);
}
@Override
public void reduce(Iterable<Tuple2<Integer, Integer>> values,
Collector<Tuple2<Integer, Integer>> out) throws Exception
{
}
}
private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer, Integer>> comparator) {
final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>> groupIter = new ReusingKeyGroupedIterator<> (data, serializer, comparator);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册