From bce3a68d97f759a7f5731daf7dd25886f6b4d1ba Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 16 Jun 2016 14:39:23 +0200 Subject: [PATCH] [FLINK-4082] Add Setting for enabling/disabling LargeRecordHandler By default this is set to disabled because there are known issues when users specify a custom TypeInformation. --- .../flink/configuration/ConfigConstants.java | 12 +++++++++++- .../plantranslate/JobGraphGenerator.java | 8 ++++++++ .../flink/runtime/operators/BatchTask.java | 6 ++++-- .../flink/runtime/operators/DataSinkTask.java | 1 + .../sort/CombiningUnilateralSortMerger.java | 11 ++++++----- .../operators/sort/UnilateralSortMerger.java | 18 +++++++++--------- .../runtime/operators/util/TaskConfig.java | 14 +++++++++++++- .../operators/ReduceTaskExternalITCase.java | 4 ++-- .../runtime/operators/ReduceTaskTest.java | 2 +- .../CombiningUnilateralSortMergerITCase.java | 6 +++--- .../operators/sort/ExternalSortITCase.java | 11 ++++++----- .../sort/ExternalSortLargeRecordsITCase.java | 8 ++++---- .../testutils/BinaryOperatorTestBase.java | 1 + .../operators/testutils/DriverTestBase.java | 2 +- .../testutils/UnaryOperatorTestBase.java | 4 +++- .../util/HashVsSortMiniBenchmark.java | 6 ++++-- .../test/manual/MassiveStringSorting.java | 6 ++++-- .../test/manual/MassiveStringValueSorting.java | 6 ++++-- .../manual/MassiveCaseClassSortingITCase.scala | 2 +- 19 files changed, 86 insertions(+), 42 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b67a11de169..a7fc274e00d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -257,7 +257,12 @@ public final class ConfigConstants { */ public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout"; - + /** + * Whether to use the LargeRecordHandler when spilling. + */ + public static final String USE_LARGE_RECORD_HANDLER_KEY = "taskmanager.runtime.large-record-handler"; + + // -------- Common Resource Framework Configuration (YARN & Mesos) -------- /** @@ -758,6 +763,11 @@ public final class ConfigConstants { */ public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0; + /** + * Whether to use the LargeRecordHandler when spilling. + */ + public static final boolean DEFAULT_USE_LARGE_RECORD_HANDLER = false; + // ------ Common Resource Framework Configuration (YARN & Mesos) ------ diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index a5ae00ca928..12c5dfcd715 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -126,6 +126,8 @@ public class JobGraphGenerator implements Visitor { private final int defaultMaxFan; private final float defaultSortSpillingThreshold; + + private final boolean useLargeRecordHandler; private int iterationIdEnumerator = 1; @@ -143,6 +145,7 @@ public class JobGraphGenerator implements Visitor { public JobGraphGenerator() { this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN; this.defaultSortSpillingThreshold = ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD; + this.useLargeRecordHandler = ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER; } public JobGraphGenerator(Configuration config) { @@ -150,6 +153,10 @@ public class JobGraphGenerator implements Visitor { ConfigConstants.DEFAULT_SPILLING_MAX_FAN); this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY, ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD); + this.useLargeRecordHandler = config.getBoolean( + ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY, + ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER); + } /** @@ -1051,6 +1058,7 @@ public class JobGraphGenerator implements Visitor { config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy()); config.setFilehandlesInput(inputNum, this.defaultMaxFan); config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold); + config.setUseLargeRecordHandler(this.useLargeRecordHandler); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index 36965ab1bfb..68995d8d182 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -926,7 +926,8 @@ public class BatchTask extends AbstractInvokable impleme UnilateralSortMerger sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled()); + this.config.getSpillingThresholdInput(inputNum), this.config.getUseLargeRecordHandler(), + this.getExecutionConfig().isObjectReuseEnabled()); // set the input to null such that it will be lazily fetched from the input strategy this.inputs[inputNum] = null; this.localStrategies[inputNum] = sorter; @@ -962,7 +963,8 @@ public class BatchTask extends AbstractInvokable impleme (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), - this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled()); + this.config.getSpillingThresholdInput(inputNum), this.getTaskConfig().getUseLargeRecordHandler(), + this.getExecutionConfig().isObjectReuseEnabled()); cSorter.setUdfConfiguration(this.config.getStubParameters()); // set the input to null such that it will be lazily fetched from the input strategy diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index b73c85ae60d..c77a9ae9593 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -144,6 +144,7 @@ public class DataSinkTask extends AbstractInvokable { this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(), this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0), this.config.getSpillingThresholdInput(0), + this.config.getUseLargeRecordHandler(), this.getExecutionConfig().isObjectReuseEnabled()); this.localStrategy = sorter; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index 855ee21106e..a02ced2adab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -103,11 +103,12 @@ public class CombiningUnilateralSortMerger extends UnilateralSortMerger { public CombiningUnilateralSortMerger(GroupCombineFunction combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator input, AbstractInvokable parentTask, TypeSerializerFactory serializerFactory, TypeComparator comparator, - double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled) + double memoryFraction, int maxNumFileHandles, float startSpillingFraction, + boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled); + memoryFraction, -1, maxNumFileHandles, startSpillingFraction, handleLargeRecords, objectReuseEnabled); } /** @@ -136,12 +137,12 @@ public class CombiningUnilateralSortMerger extends UnilateralSortMerger { MutableObjectIterator input, AbstractInvokable parentTask, TypeSerializerFactory serializerFactory, TypeComparator comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction, boolean objectReuseEnabled) + float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true, - objectReuseEnabled); + memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, + handleLargeRecords, objectReuseEnabled); this.combineStub = combineStub; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index 0fa24f20ee4..f70be290e2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -156,28 +156,28 @@ public class UnilateralSortMerger implements Sorter { // ------------------------------------------------------------------------ public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, - MutableObjectIterator input, AbstractInvokable parentTask, + MutableObjectIterator input, AbstractInvokable parentTask, TypeSerializerFactory serializerFactory, TypeComparator comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction, - boolean objectReuseEnabled) + boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled); + memoryFraction, -1, maxNumFileHandles, startSpillingFraction, handleLargeRecords, objectReuseEnabled); } - + public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, - MutableObjectIterator input, AbstractInvokable parentTask, + MutableObjectIterator input, AbstractInvokable parentTask, TypeSerializerFactory serializerFactory, TypeComparator comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, - float startSpillingFraction, boolean objectReuseEnabled) + float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException { this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, - memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true, + memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords, objectReuseEnabled); } - + public UnilateralSortMerger(MemoryManager memoryManager, List memory, IOManager ioManager, MutableObjectIterator input, AbstractInvokable parentTask, @@ -202,7 +202,7 @@ public class UnilateralSortMerger implements Sorter { { this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)), ioManager, input, parentTask, serializerFactory, comparator, - numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true, + numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, handleLargeRecords, objectReuseEnabled); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index 2c452938b96..03a8910c483 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -168,7 +168,11 @@ public class TaskConfig implements Serializable { private static final String SORT_SPILLING_THRESHOLD_DRIVER = "sort-spill-threshold.driver"; private static final String SORT_SPILLING_THRESHOLD_INPUT_PREFIX = "sort-spill-threshold.input."; - + + private static final String USE_LARGE_RECORD_HANDLER = "sort-spill.large-record-handler"; + + private static final boolean USE_LARGE_RECORD_HANDLER_DEFAULT = false; + // ----------------------------------- Iterations --------------------------------------------- private static final String NUMBER_OF_ITERATIONS = "iterative.num-iterations"; @@ -692,6 +696,14 @@ public class TaskConfig implements Serializable { public float getSpillingThresholdInput(int inputNum) { return this.config.getFloat(SORT_SPILLING_THRESHOLD_INPUT_PREFIX + inputNum, 0.7f); } + + public void setUseLargeRecordHandler(boolean useLargeRecordHandler) { + this.config.setBoolean(USE_LARGE_RECORD_HANDLER, useLargeRecordHandler); + } + + public boolean getUseLargeRecordHandler() { + return this.config.getBoolean(USE_LARGE_RECORD_HANDLER, USE_LARGE_RECORD_HANDLER_DEFAULT); + } // -------------------------------------------------------------------------------------------- // Parameters for Function Chaining diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java index 1d7fdb8422d..2184302d01a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java @@ -136,7 +136,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase testTask = new GroupReduceDriver<>(); @@ -182,7 +182,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase testTask = new GroupReduceDriver<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java index e05f7d286b5..7c92dbab168 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java @@ -130,7 +130,7 @@ public class ReduceTaskTest extends DriverTestBase(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, - 4, 0.8f, true); + 4, 0.8f, true /* use large record handler */, true); addInput(sorter.getIterator()); GroupReduceDriver testTask = new GroupReduceDriver<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index 38e3db83960..0f636ef774e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -117,7 +117,7 @@ public class CombiningUnilateralSortMergerITCase { Sorter> merger = new CombiningUnilateralSortMerger<>(comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2, - 0.25, 64, 0.7f, false); + 0.25, 64, 0.7f, true /* use large record handler */, false); final Tuple2 rec = new Tuple2<>(); rec.setField(1, 1); @@ -156,7 +156,7 @@ public class CombiningUnilateralSortMergerITCase { Sorter> merger = new CombiningUnilateralSortMerger<>(comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2, - 0.01, 64, 0.005f, true); + 0.01, 64, 0.005f, true /* use large record handler */, true); final Tuple2 rec = new Tuple2<>(); rec.setField(1, 1); @@ -203,7 +203,7 @@ public class CombiningUnilateralSortMergerITCase { Sorter> merger = new CombiningUnilateralSortMerger<>(comb, this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory1, this.comparator1, - 0.25, 2, 0.7f, false); + 0.25, 2, 0.7f, true /* use large record handler */, false); // emit data LOG.debug("emitting data"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index b19591b81f0..a5235f3981a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -114,7 +114,7 @@ public class ExternalSortITCase { Sorter> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 2, 0.9f, true); + (double)64/78, 2, 0.9f, true /*use large record handler*/, true); // emit data LOG.debug("Reading and sorting data..."); @@ -163,7 +163,7 @@ public class ExternalSortITCase { Sorter> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 10, 2, 0.9f, false); + (double)64/78, 10, 2, 0.9f, true /*use large record handler*/, false); // emit data LOG.debug("Reading and sorting data..."); @@ -212,7 +212,7 @@ public class ExternalSortITCase { Sorter> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)16/78, 64, 0.7f, true); + (double)16/78, 64, 0.7f, true /*use large record handler*/, true); // emit data LOG.debug("Reading and sorting data..."); @@ -264,7 +264,7 @@ public class ExternalSortITCase { Sorter> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager, source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 16, 0.7f, false); + (double)64/78, 16, 0.7f, true /*use large record handler*/, false); // emit data LOG.debug("Emitting data..."); @@ -321,7 +321,8 @@ public class ExternalSortITCase { LOG.debug("Initializing sortmerger..."); Sorter merger = new UnilateralSortMerger(this.memoryManager, this.ioManager, - generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f, true); + generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f, + true /*use large record handler*/, true); // emit data LOG.debug("Emitting data..."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index c806766fdbd..c0b00583d9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -128,7 +128,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f, false); + comparator, 1.0, 1, 128, 0.7f, true /* use large record handler */ , false); // check order MutableObjectIterator> iterator = sorter.getIterator(); @@ -198,7 +198,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f, true); + comparator, 1.0, 1, 128, 0.7f, true /*use large record handler*/, true); // check order MutableObjectIterator> iterator = sorter.getIterator(); @@ -283,7 +283,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f, false); + comparator, 1.0, 1, 128, 0.7f, true /*use large record handler*/, false); // check order MutableObjectIterator> iterator = sorter.getIterator(); @@ -354,7 +354,7 @@ public class ExternalSortLargeRecordsITCase { this.memoryManager, this.ioManager, source, this.parentTask, new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), - comparator, 1.0, 1, 128, 0.7f, true); + comparator, 1.0, 1, 128, 0.7f, true /*use large record handler*/, true); // check order MutableObjectIterator> iterator = sorter.getIterator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java index 766123f3931..7531b995c24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java @@ -151,6 +151,7 @@ public class BinaryOperatorTestBase extends TestLog this.perSortFractionMem, 32, 0.8f, + true /*use large record handler*/, false ); this.sorters.add(sorter); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java index 3a69fab6917..e9a0ba5ce3d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java @@ -146,7 +146,7 @@ public class DriverTestBase extends TestLogger implements Ta public void addInputSorted(MutableObjectIterator input, RecordComparator comp) throws Exception { UnilateralSortMerger sorter = new UnilateralSortMerger( this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp, - this.perSortFractionMem, 32, 0.8f, true); + this.perSortFractionMem, 32, 0.8f, true /*use large record handler*/, true); this.sorters.add(sorter); this.inputs.add(null); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java index 50bb1ee3c33..ff12e764208 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java @@ -148,7 +148,9 @@ public class UnaryOperatorTestBase extends TestLogg this.memManager, this.ioManager, input, this.owner, this.getInputSerializer(0), comp, - this.perSortFractionMem, 32, 0.8f, false); + this.perSortFractionMem, 32, 0.8f, + true /*use large record handler*/, + false); } public void addDriverComparator(TypeComparator comparator) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index 6205de4d381..4da2c8cc5b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -130,11 +130,13 @@ public class HashVsSortMiniBenchmark { final UnilateralSortMerger> sorter1 = new UnilateralSortMerger<>( this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, - this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true); + this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, + true /*use large record handler*/, true); final UnilateralSortMerger> sorter2 = new UnilateralSortMerger<>( this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, - this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true); + this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, + true /*use large record handler*/, true); final MutableObjectIterator> sortedInput1 = sorter1.getIterator(); final MutableObjectIterator> sortedInput2 = sorter2.getIterator(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java index c9bd56bb702..9821b056e5b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java @@ -91,7 +91,8 @@ public class MassiveStringSorting { MutableObjectIterator inputIterator = new StringReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory(serializer, String.class), comparator, 1.0, 4, 0.8f, false); + new RuntimeSerializerFactory(serializer, String.class), comparator, 1.0, 4, 0.8f, + true /* use large record handler */, false); MutableObjectIterator sortedData = sorter.getIterator(); @@ -184,7 +185,8 @@ public class MassiveStringSorting { MutableObjectIterator> inputIterator = new StringTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f, false); + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f, + true /* use large record handler */, false); diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java index 9a016ccc494..9e37b79fc24 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java @@ -91,7 +91,8 @@ public class MassiveStringValueSorting { MutableObjectIterator inputIterator = new StringValueReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory(serializer, StringValue.class), comparator, 1.0, 4, 0.8f, true); + new RuntimeSerializerFactory(serializer, StringValue.class), comparator, 1.0, 4, 0.8f, + true /* use large record handler */, true); MutableObjectIterator sortedData = sorter.getIterator(); @@ -187,7 +188,8 @@ public class MassiveStringValueSorting { MutableObjectIterator> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f, false); + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f, + true /* use large record handler */, false); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala index d58eda435f2..5eb1e8b5a48 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala @@ -98,7 +98,7 @@ class MassiveCaseClassSortingITCase { sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator, new DummyInvokable(), new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]), - comparator, 1.0, 4, 0.8f, false) + comparator, 1.0, 4, 0.8f, true /*use large record handler*/, false) val sortedData = sorter.getIterator reader.close() -- GitLab