提交 d529749c 编写于 作者: A Aljoscha Krettek 提交者: Stephan Ewen

[FLINK-1285] Make Merge-Join aware of object-reuse setting

This closes #259
上级 b7b32a05
......@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.FlatJoinFunction;
......@@ -32,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
......@@ -125,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
if (this.objectReuseEnabled) {
switch (ls) {
case MERGE:
this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
this.matchIterator = new ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
......@@ -140,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
} else {
switch (ls) {
case MERGE:
this.matchIterator = new MergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
this.matchIterator = new NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
......
......@@ -36,41 +36,37 @@ import org.apache.flink.runtime.util.ResettableIterator;
* access to the data in that block.
*
*/
public class BlockResettableIterator<T> extends AbstractBlockResettableIterator<T> implements ResettableIterator<T> {
public class NonReusingBlockResettableIterator<T> extends AbstractBlockResettableIterator<T> implements ResettableIterator<T> {
public static final Logger LOG = LoggerFactory.getLogger(BlockResettableIterator.class);
public static final Logger LOG = LoggerFactory.getLogger(NonReusingBlockResettableIterator.class);
// ------------------------------------------------------------------------
protected Iterator<T> input;
private T nextElement;
protected T nextElement;
private final T reuseElement;
protected T leftOverElement;
private T leftOverElement;
protected boolean readPhase;
private boolean readPhase;
private boolean noMoreBlocks;
protected boolean noMoreBlocks;
// ------------------------------------------------------------------------
public BlockResettableIterator(MemoryManager memoryManager, Iterator<T> input,
TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
public NonReusingBlockResettableIterator(MemoryManager memoryManager, Iterator<T> input,
TypeSerializer<T> serializer, int numPages,
AbstractInvokable ownerTask)
throws MemoryAllocationException
{
this(memoryManager, serializer, numPages, ownerTask);
this.input = input;
}
public BlockResettableIterator(MemoryManager memoryManager,
TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
public NonReusingBlockResettableIterator(MemoryManager memoryManager, TypeSerializer<T> serializer, int numPages, AbstractInvokable ownerTask)
throws MemoryAllocationException
{
super(serializer, memoryManager, numPages, ownerTask);
this.reuseElement = serializer.createInstance();
}
// ------------------------------------------------------------------------
......@@ -83,8 +79,6 @@ public class BlockResettableIterator<T> extends AbstractBlockResettableIterator<
nextBlock();
}
@Override
public boolean hasNext() {
......@@ -92,7 +86,7 @@ public class BlockResettableIterator<T> extends AbstractBlockResettableIterator<
if (this.nextElement == null) {
if (this.readPhase) {
// read phase, get next element from buffer
T tmp = getNextRecord(this.reuseElement);
T tmp = getNextRecord();
if (tmp != null) {
this.nextElement = tmp;
return true;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.resettable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
/**
* Implementation of an iterator that fetches a block of data into main memory and offers resettable
* access to the data in that block.
*
*/
public class ReusingBlockResettableIterator<T> extends NonReusingBlockResettableIterator<T> {
public static final Logger LOG = LoggerFactory.getLogger(ReusingBlockResettableIterator.class);
private final T reuseElement;
// ------------------------------------------------------------------------
public ReusingBlockResettableIterator(MemoryManager memoryManager, Iterator<T> input,
TypeSerializer<T> serializer, int numPages,
AbstractInvokable ownerTask)
throws MemoryAllocationException
{
this(memoryManager, serializer, numPages, ownerTask);
this.input = input;
}
public ReusingBlockResettableIterator(MemoryManager memoryManager, TypeSerializer<T>
serializer, int numPages, AbstractInvokable ownerTask)
throws MemoryAllocationException
{
super(memoryManager, serializer, numPages, ownerTask);
this.reuseElement = serializer.createInstance();
}
// ------------------------------------------------------------------------
@Override
public boolean hasNext() {
try {
if (this.nextElement == null) {
if (this.readPhase) {
// read phase, get next element from buffer
T tmp = getNextRecord(this.reuseElement);
if (tmp != null) {
this.nextElement = tmp;
return true;
} else {
return false;
}
} else {
if (this.input.hasNext()) {
final T next = this.input.next();
if (writeNextRecord(next)) {
this.nextElement = next;
return true;
} else {
this.leftOverElement = next;
return false;
}
} else {
this.noMoreBlocks = true;
return false;
}
}
} else {
return true;
}
} catch (IOException ioex) {
throw new RuntimeException("Error (de)serializing record in block resettable iterator.", ioex);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
* matching through a sort-merge join strategy.
*/
public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
/**
* The log used by this iterator to log messages.
*/
private static final Logger LOG = LoggerFactory.getLogger(NonReusingMergeMatchIterator.class);
// --------------------------------------------------------------------------------------------
private TypePairComparator<T1, T2> comp;
private NonReusingKeyGroupedIterator<T1> iterator1;
private NonReusingKeyGroupedIterator<T2> iterator2;
private final TypeSerializer<T1> serializer1;
private final TypeSerializer<T2> serializer2;
private final NonReusingBlockResettableIterator<T2> blockIt; // for N:M cross products with same key
private final List<MemorySegment> memoryForSpillingIterator;
private final MemoryManager memoryManager;
private final IOManager ioManager;
// --------------------------------------------------------------------------------------------
public NonReusingMergeMatchIterator(
MutableObjectIterator<T1> input1,
MutableObjectIterator<T2> input2,
TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
TypePairComparator<T1, T2> pairComparator,
MemoryManager memoryManager,
IOManager ioManager,
int numMemoryPages,
AbstractInvokable parentTask)
throws MemoryAllocationException
{
if (numMemoryPages < 2) {
throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
}
this.comp = pairComparator;
this.serializer1 = serializer1;
this.serializer2 = serializer2;
this.memoryManager = memoryManager;
this.ioManager = ioManager;
this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
(numMemoryPages - numPagesForSpiller), parentTask);
this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
}
@Override
public void open() throws IOException {}
@Override
public void close() {
if (this.blockIt != null) {
try {
this.blockIt.close();
}
catch (Throwable t) {
LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
}
}
this.memoryManager.release(this.memoryForSpillingIterator);
}
@Override
public void abort() {
close();
}
/**
* Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
* from different inputs. The output of the <code>match()</code> method is forwarded.
* <p>
* This method first zig-zags between the two sorted inputs in order to find a common
* key, and then calls the match stub with the cross product of the values.
*
* @throws Exception Forwards all exceptions from the user code and the I/O system.
*
* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
*/
@Override
public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
throws Exception
{
if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
while (this.iterator1.nextKey());
while (this.iterator2.nextKey());
return false;
}
final TypePairComparator<T1, T2> comparator = this.comp;
comparator.setReference(this.iterator1.getCurrent());
T2 current2 = this.iterator2.getCurrent();
// zig zag
while (true) {
// determine the relation between the (possibly composite) keys
final int comp = comparator.compareToReference(current2);
if (comp == 0) {
break;
}
if (comp < 0) {
if (!this.iterator2.nextKey()) {
return false;
}
current2 = this.iterator2.getCurrent();
}
else {
if (!this.iterator1.nextKey()) {
return false;
}
comparator.setReference(this.iterator1.getCurrent());
}
}
// here, we have a common key! call the match function with the cross product of the
// values
final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues();
final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues();
final T1 firstV1 = values1.next();
final T2 firstV2 = values2.next();
final boolean v1HasNext = values1.hasNext();
final boolean v2HasNext = values2.hasNext();
// check if one side is already empty
// this check could be omitted if we put this in MatchTask.
// then we can derive the local strategy (with build side).
if (v1HasNext) {
if (v2HasNext) {
// both sides contain more than one value
// TODO: Decide which side to spill and which to block!
crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
} else {
crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
}
} else {
if (v2HasNext) {
crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
} else {
// both sides contain only one value
matchFunction.join(firstV1, firstV2, collector);
}
}
return true;
}
/**
* Crosses a single value from the first input with N values, all sharing a common key.
* Effectively realizes a <i>1:N</i> match (join).
*
* @param val1 The value form the <i>1</i> side.
* @param firstValN The first of the values from the <i>N</i> side.
* @param valsN Iterator over remaining <i>N</i> side values.
*
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
throws Exception
{
T1 copy1 = this.serializer1.copy(val1);
matchFunction.join(copy1, firstValN, collector);
// set copy and match first element
boolean more = true;
do {
final T2 nRec = valsN.next();
if (valsN.hasNext()) {
copy1 = this.serializer1.copy(val1);
matchFunction.join(copy1, nRec, collector);
} else {
matchFunction.join(val1, nRec, collector);
more = false;
}
}
while (more);
}
/**
* Crosses a single value from the second side with N values, all sharing a common key.
* Effectively realizes a <i>N:1</i> match (join).
*
* @param val1 The value form the <i>1</i> side.
* @param firstValN The first of the values from the <i>N</i> side.
* @param valsN Iterator over remaining <i>N</i> side values.
*
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private void crossSecond1withNValues(T2 val1, T1 firstValN,
Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
throws Exception
{
T2 copy2 = this.serializer2.copy(val1);
matchFunction.join(firstValN, copy2, collector);
// set copy and match first element
boolean more = true;
do {
final T1 nRec = valsN.next();
if (valsN.hasNext()) {
copy2 = this.serializer2.copy(val1);
matchFunction.join(nRec, copy2, collector);
} else {
matchFunction.join(nRec, val1, collector);
more = false;
}
}
while (more);
}
/**
* @param firstV1
* @param spillVals
* @param firstV2
* @param blockVals
*/
private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
final T2 firstV2, final Iterator<T2> blockVals,
final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
throws Exception
{
// ==================================================
// We have one first (head) element from both inputs (firstV1 and firstV2)
// We have an iterator for both inputs.
// we make the V1 side the spilling side and the V2 side the blocking side.
// In order to get the full cross product without unnecessary spilling, we do the
// following:
// 1) cross the heads
// 2) cross the head of the spilling side against the first block of the blocking side
// 3) cross the iterator of the spilling side with the head of the block side
// 4) cross the iterator of the spilling side with the first block
// ---------------------------------------------------
// If the blocking side has more than one block, we really need to make the spilling side fully
// resettable. For each further block on the block side, we do:
// 5) cross the head of the spilling side with the next block
// 6) cross the spilling iterator with the next block.
// match the first values first
T1 copy1 = this.serializer1.copy(firstV1);
T2 blockHeadCopy = this.serializer2.copy(firstV2);
T1 spillHeadCopy = null;
// --------------- 1) Cross the heads -------------------
matchFunction.join(copy1, firstV2, collector);
// for the remaining values, we do a block-nested-loops join
SpillingResettableIterator<T1> spillIt = null;
try {
// create block iterator on the second input
this.blockIt.reopen(blockVals);
// ------------- 2) cross the head of the spilling side with the first block ------------------
while (this.blockIt.hasNext()) {
final T2 nextBlockRec = this.blockIt.next();
copy1 = this.serializer1.copy(firstV1);
matchFunction.join(copy1, nextBlockRec, collector);
}
this.blockIt.reset();
// spilling is required if the blocked input has data beyond the current block.
// in that case, create the spilling iterator
final Iterator<T1> leftSideIter;
final boolean spillingRequired = this.blockIt.hasFurtherInput();
if (spillingRequired)
{
// more data than would fit into one block. we need to wrap the other side in a spilling iterator
// create spilling iterator on first input
spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
leftSideIter = spillIt;
spillIt.open();
spillHeadCopy = this.serializer1.copy(firstV1);
}
else {
leftSideIter = spillVals;
}
// cross the values in the v1 iterator against the current block
while (leftSideIter.hasNext()) {
final T1 nextSpillVal = leftSideIter.next();
copy1 = this.serializer1.copy(nextSpillVal);
// -------- 3) cross the iterator of the spilling side with the head of the block side --------
T2 copy2 = this.serializer2.copy(blockHeadCopy);
matchFunction.join(copy1, copy2, collector);
// -------- 4) cross the iterator of the spilling side with the first block --------
while (this.blockIt.hasNext()) {
T2 nextBlockRec = this.blockIt.next();
// get instances of key and block value
copy1 = this.serializer1.copy(nextSpillVal);
matchFunction.join(copy1, nextBlockRec, collector);
}
// reset block iterator
this.blockIt.reset();
}
// if everything from the block-side fit into a single block, we are done.
// note that in this special case, we did not create a spilling iterator at all
if (!spillingRequired) {
return;
}
// here we are, because we have more blocks on the block side
// loop as long as there are blocks from the blocked input
while (this.blockIt.nextBlock())
{
// rewind the spilling iterator
spillIt.reset();
// ------------- 5) cross the head of the spilling side with the next block ------------
while (this.blockIt.hasNext()) {
copy1 = this.serializer1.copy(spillHeadCopy);
final T2 nextBlockVal = blockIt.next();
matchFunction.join(copy1, nextBlockVal, collector);
}
this.blockIt.reset();
// -------- 6) cross the spilling iterator with the next block. ------------------
while (spillIt.hasNext())
{
// get value from resettable iterator
final T1 nextSpillVal = spillIt.next();
// cross value with block values
while (this.blockIt.hasNext()) {
// get instances of key and block value
final T2 nextBlockVal = this.blockIt.next();
copy1 = this.serializer1.copy(nextSpillVal);
matchFunction.join(copy1, nextBlockVal, collector);
}
// reset block iterator
this.blockIt.reset();
}
// reset v1 iterator
spillIt.reset();
}
}
finally {
if (spillIt != null) {
this.memoryForSpillingIterator.addAll(spillIt.close());
}
}
}
}
......@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.resettable.BlockResettableIterator;
import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
......@@ -45,12 +45,12 @@ import org.apache.flink.util.MutableObjectIterator;
* An implementation of the {@link JoinTaskIterator} that realizes the
* matching through a sort-merge join strategy.
*/
public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
public class ReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
/**
* The log used by this iterator to log messages.
*/
private static final Logger LOG = LoggerFactory.getLogger(MergeMatchIterator.class);
private static final Logger LOG = LoggerFactory.getLogger(ReusingMergeMatchIterator.class);
// --------------------------------------------------------------------------------------------
......@@ -72,7 +72,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
private T2 blockHeadCopy;
private final BlockResettableIterator<T2> blockIt; // for N:M cross products with same key
private final NonReusingBlockResettableIterator<T2> blockIt; // for N:M cross products with same key
private final List<MemorySegment> memoryForSpillingIterator;
......@@ -82,10 +82,16 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
// --------------------------------------------------------------------------------------------
public MergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2,
public ReusingMergeMatchIterator(
MutableObjectIterator<T1> input1,
MutableObjectIterator<T2> input2,
TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, TypePairComparator<T1, T2> pairComparator,
MemoryManager memoryManager, IOManager ioManager, int numMemoryPages, AbstractInvokable parentTask)
TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
TypePairComparator<T1, T2> pairComparator,
MemoryManager memoryManager,
IOManager ioManager,
int numMemoryPages,
AbstractInvokable parentTask)
throws MemoryAllocationException
{
if (numMemoryPages < 2) {
......@@ -108,7 +114,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
this.blockIt = new BlockResettableIterator<T2>(this.memoryManager, this.serializer2,
this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
(numMemoryPages - numPagesForSpiller), parentTask);
this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
}
......
......@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.resettable.BlockResettableIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
......@@ -38,7 +37,7 @@ import org.junit.Before;
import org.junit.Test;
public class BlockResettableIteratorTest
public class NonReusingBlockResettableIteratorTest
{
private static final int MEMORY_CAPACITY = 3 * 128 * 1024;
......@@ -85,7 +84,7 @@ public class BlockResettableIteratorTest
{
final AbstractInvokable memOwner = new DummyInvokable();
// create the resettable Iterator
final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>(
final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>(
this.memman, this.reader, this.serializer, 1, memOwner);
// open the iterator
iterator.open();
......@@ -124,7 +123,7 @@ public class BlockResettableIteratorTest
{
final AbstractInvokable memOwner = new DummyInvokable();
// create the resettable Iterator
final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>(
final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>(
this.memman, this.reader, this.serializer, 2, memOwner);
// open the iterator
iterator.open();
......@@ -164,7 +163,7 @@ public class BlockResettableIteratorTest
{
final AbstractInvokable memOwner = new DummyInvokable();
// create the resettable Iterator
final BlockResettableIterator<Record> iterator = new BlockResettableIterator<Record>(
final NonReusingBlockResettableIterator<Record> iterator = new NonReusingBlockResettableIterator<Record>(
this.memman, this.reader, this.serializer, 12, memOwner);
// open the iterator
iterator.open();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.resettable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ReusingBlockResettableIteratorTest
{
private static final int MEMORY_CAPACITY = 3 * 128 * 1024;
private static final int NUM_VALUES = 20000;
private MemoryManager memman;
private Iterator<Record> reader;
private List<Record> objects;
private final TypeSerializer<Record> serializer = RecordSerializer.get();
@Before
public void startup() {
// set up IO and memory manager
this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 1);
// create test objects
this.objects = new ArrayList<Record>(20000);
for (int i = 0; i < NUM_VALUES; ++i) {
this.objects.add(new Record(new IntValue(i)));
}
// create the reader
this.reader = objects.iterator();
}
@After
public void shutdown() {
this.objects = null;
// check that the memory manager got all segments back
if (!this.memman.verifyEmpty()) {
Assert.fail("A memory leak has occurred: Not all memory was properly returned to the memory manager.");
}
this.memman.shutdown();
this.memman = null;
}
@Test
public void testSerialBlockResettableIterator() throws Exception
{
final AbstractInvokable memOwner = new DummyInvokable();
// create the resettable Iterator
final ReusingBlockResettableIterator<Record> iterator = new ReusingBlockResettableIterator<Record>(
this.memman, this.reader, this.serializer, 1, memOwner);
// open the iterator
iterator.open();
// now test walking through the iterator
int lower = 0;
int upper = 0;
do {
lower = upper;
upper = lower;
// find the upper bound
while (iterator.hasNext()) {
Record target = iterator.next();
int val = target.getField(0, IntValue.class).getValue();
Assert.assertEquals(upper++, val);
}
// now reset the buffer a few times
for (int i = 0; i < 5; ++i) {
iterator.reset();
int count = 0;
while (iterator.hasNext()) {
Record target = iterator.next();
int val = target.getField(0, IntValue.class).getValue();
Assert.assertEquals(lower + (count++), val);
}
Assert.assertEquals(upper - lower, count);
}
} while (iterator.nextBlock());
Assert.assertEquals(NUM_VALUES, upper);
// close the iterator
iterator.close();
}
@Test
public void testDoubleBufferedBlockResettableIterator() throws Exception
{
final AbstractInvokable memOwner = new DummyInvokable();
// create the resettable Iterator
final ReusingBlockResettableIterator<Record> iterator = new ReusingBlockResettableIterator<Record>(
this.memman, this.reader, this.serializer, 2, memOwner);
// open the iterator
iterator.open();
// now test walking through the iterator
int lower = 0;
int upper = 0;
do {
lower = upper;
upper = lower;
// find the upper bound
while (iterator.hasNext()) {
Record target = iterator.next();
int val = target.getField(0, IntValue.class).getValue();
Assert.assertEquals(upper++, val);
}
// now reset the buffer a few times
for (int i = 0; i < 5; ++i) {
iterator.reset();
int count = 0;
while (iterator.hasNext()) {
Record target = iterator.next();
int val = target.getField(0, IntValue.class).getValue();
Assert.assertEquals(lower + (count++), val);
}
Assert.assertEquals(upper - lower, count);
}
} while (iterator.nextBlock());
Assert.assertEquals(NUM_VALUES, upper);
// close the iterator
iterator.close();
}
@Test
public void testTwelveFoldBufferedBlockResettableIterator() throws Exception
{
final AbstractInvokable memOwner = new DummyInvokable();
// create the resettable Iterator
final ReusingBlockResettableIterator<Record> iterator = new ReusingBlockResettableIterator<Record>(
this.memman, this.reader, this.serializer, 12, memOwner);
// open the iterator
iterator.open();
// now test walking through the iterator
int lower = 0;
int upper = 0;
do {
lower = upper;
upper = lower;
// find the upper bound
while (iterator.hasNext()) {
Record target = iterator.next();
int val = target.getField(0, IntValue.class).getValue();
Assert.assertEquals(upper++, val);
}
// now reset the buffer a few times
for (int i = 0; i < 5; ++i) {
iterator.reset();
int count = 0;
while (iterator.hasNext()) {
Record target = iterator.next();
int val = target.getField(0, IntValue.class).getValue();
Assert.assertEquals(lower + (count++), val);
}
Assert.assertEquals(upper - lower, count);
}
} while (iterator.nextBlock());
Assert.assertEquals(NUM_VALUES, upper);
// close the iterator
iterator.close();
}
}
......@@ -37,8 +37,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
......@@ -55,7 +53,7 @@ import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class SortMergeMatchIteratorITCase {
public class NonReusingSortMergeMatchIteratorITCase {
// total memory
private static final int MEMORY_SIZE = 1024 * 1024 * 16;
......@@ -143,8 +141,8 @@ public class SortMergeMatchIteratorITCase {
input2.reset();
// compare with iterator values
MergeMatchIterator<Record, Record, Record> iterator =
new MergeMatchIterator<Record, Record, Record>(
NonReusingMergeMatchIterator<Record, Record, Record> iterator =
new NonReusingMergeMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
......@@ -230,8 +228,8 @@ public class SortMergeMatchIteratorITCase {
// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
// needs to spill for the duplicate keys
MergeMatchIterator<Record, Record, Record> iterator =
new MergeMatchIterator<Record, Record, Record>(
NonReusingMergeMatchIterator<Record, Record, Record> iterator =
new NonReusingMergeMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.TestData.Generator;
import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@SuppressWarnings("deprecation")
public class ReusingSortMergeMatchIteratorITCase {
// total memory
private static final int MEMORY_SIZE = 1024 * 1024 * 16;
private static final int PAGES_FOR_BNLJN = 2;
// the size of the left and right inputs
private static final int INPUT_1_SIZE = 20000;
private static final int INPUT_2_SIZE = 1000;
// random seeds for the left and right input data generators
private static final long SEED1 = 561349061987311L;
private static final long SEED2 = 231434613412342L;
// dummy abstract task
private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
private MemoryManager memoryManager;
private TypeSerializer<Record> serializer1;
private TypeSerializer<Record> serializer2;
private TypeComparator<Record> comparator1;
private TypeComparator<Record> comparator2;
private TypePairComparator<Record, Record> pairComparator;
@SuppressWarnings("unchecked")
@Before
public void beforeTest() {
this.serializer1 = RecordSerializer.get();
this.serializer2 = RecordSerializer.get();
this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
this.ioManager = new IOManagerAsync();
}
@After
public void afterTest() {
if (this.ioManager != null) {
this.ioManager.shutdown();
if (!this.ioManager.isProperlyShutDown()) {
Assert.fail("I/O manager failed to properly shut down.");
}
this.ioManager = null;
}
if (this.memoryManager != null) {
Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
this.memoryManager.verifyEmpty());
this.memoryManager.shutdown();
this.memoryManager = null;
}
}
@Test
public void testMerge() {
try {
final Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
collectData(input1),
collectData(input2));
final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
final Collector<Record> collector = new DiscardingOutputCollector<Record>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingMergeMatchIterator<Record, Record, Record> iterator =
new ReusingMergeMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty());
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testMergeWithHighNumberOfCommonKeys()
{
// the size of the left and right inputs
final int INPUT_1_SIZE = 200;
final int INPUT_2_SIZE = 100;
final int INPUT_1_DUPLICATES = 10;
final int INPUT_2_DUPLICATES = 4000;
final int DUPLICATE_KEY = 13;
try {
final Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
inList1.add(gen1Iter);
inList1.add(const1Iter);
final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
inList2.add(gen2Iter);
inList2.add(const2Iter);
MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, serializer1, comparator1.duplicate());
MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, serializer2, comparator2.duplicate());
// collect expected data
final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
collectData(input1),
collectData(input2));
// re-create the whole thing for actual processing
// reset the generators and iterators
generator1.reset();
generator2.reset();
const1Iter.reset();
const2Iter.reset();
gen1Iter.reset();
gen2Iter.reset();
inList1.clear();
inList1.add(gen1Iter);
inList1.add(const1Iter);
inList2.clear();
inList2.add(gen2Iter);
inList2.add(const2Iter);
input1 = new MergeIterator<Record>(inList1, serializer1, comparator1.duplicate());
input2 = new MergeIterator<Record>(inList2, serializer2, comparator2.duplicate());
final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
final Collector<Record> collector = new DiscardingOutputCollector<Record>();
// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
// needs to spill for the duplicate keys
ReusingMergeMatchIterator<Record, Record, Record> iterator =
new ReusingMergeMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
private Map<TestData.Key, Collection<Match>> matchValues(
Map<TestData.Key, Collection<TestData.Value>> leftMap,
Map<TestData.Key, Collection<TestData.Value>> rightMap)
{
Map<TestData.Key, Collection<Match>> map = new HashMap<TestData.Key, Collection<Match>>();
for (TestData.Key key : leftMap.keySet()) {
Collection<TestData.Value> leftValues = leftMap.get(key);
Collection<TestData.Value> rightValues = rightMap.get(key);
if (rightValues == null) {
continue;
}
if (!map.containsKey(key)) {
map.put(key, new ArrayList<Match>());
}
Collection<Match> matchedValues = map.get(key);
for (TestData.Value leftValue : leftValues) {
for (TestData.Value rightValue : rightValues) {
matchedValues.add(new Match(leftValue, rightValue));
}
}
}
return map;
}
private Map<TestData.Key, Collection<TestData.Value>> collectData(MutableObjectIterator<Record> iter)
throws Exception
{
Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
Record pair = new Record();
while ((pair = iter.next(pair)) != null) {
TestData.Key key = pair.getField(0, TestData.Key.class);
if (!map.containsKey(key)) {
map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
}
Collection<TestData.Value> values = map.get(key);
values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
}
return map;
}
/**
* Private class used for storage of the expected matches in a hashmap.
*/
private static class Match {
private final Value left;
private final Value right;
public Match(Value left, Value right) {
this.left = left;
this.right = right;
}
@Override
public boolean equals(Object obj) {
Match o = (Match) obj;
return this.left.equals(o.left) && this.right.equals(o.right);
}
@Override
public int hashCode() {
return this.left.hashCode() ^ this.right.hashCode();
}
@Override
public String toString() {
return left + ", " + right;
}
}
private static final class MatchRemovingMatcher extends JoinFunction {
private static final long serialVersionUID = 1L;
private final Map<TestData.Key, Collection<Match>> toRemoveFrom;
protected MatchRemovingMatcher(Map<TestData.Key, Collection<Match>> map) {
this.toRemoveFrom = map;
}
@Override
public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
TestData.Key key = rec1.getField(0, TestData.Key.class);
TestData.Value value1 = rec1.getField(1, TestData.Value.class);
TestData.Value value2 = rec2.getField(1, TestData.Value.class);
Collection<Match> matches = this.toRemoveFrom.get(key);
if (matches == null) {
Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
}
boolean contained = matches.remove(new Match(value1, value2));
if (!contained) {
Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
}
if (matches.isEmpty()) {
this.toRemoveFrom.remove(key);
}
}
}
}
......@@ -32,7 +32,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
......@@ -143,8 +143,8 @@ public class HashVsSortMiniBenchmark {
final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();
// compare with iterator values
MergeMatchIterator<Record, Record, Record> iterator =
new MergeMatchIterator<Record, Record, Record>(sortedInput1, sortedInput2,
ReusingMergeMatchIterator<Record, Record, Record> iterator =
new ReusingMergeMatchIterator<Record, Record, Record>(sortedInput1, sortedInput2,
this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册