提交 cf9fad89 编写于 作者: S StephanEwen

Proper open/close handling of combine stubs in combining sorter.

Combine stubs are lazily initialized upon spilling.
Adjusted combining sorter test cases.
上级 c61bc993
......@@ -23,6 +23,10 @@ import org.apache.log4j.PatternLayout;
public class LogUtils {
public static void initializeDefaultConsoleLogger() {
initializeDefaultConsoleLogger(Level.INFO);
}
public static void initializeDefaultTestConsoleLogger() {
initializeDefaultConsoleLogger(Level.WARN);
}
......
......@@ -26,6 +26,7 @@ import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.services.iomanager.BlockChannelAccess;
import eu.stratosphere.nephele.services.iomanager.BlockChannelWriter;
import eu.stratosphere.nephele.services.iomanager.Channel;
......@@ -61,12 +62,9 @@ import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
* spill) which communicate through a set of blocking queues (forming a closed loop).
* Memory is allocated using the {@link MemoryManager} interface. Thus the component will most likely not exceed the
* user-provided memory limits.
*
* @author Stephan Ewen
* @author Fabian Hueske
*/
public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
{
public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// ------------------------------------------------------------------------
// Constants & Fields
// ------------------------------------------------------------------------
......@@ -78,7 +76,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
private final GenericReducer<E, ?> combineStub; // the user code stub that does the combining
private final boolean combineLastMerge; // Flag indicating whether the last merge also combines the values.
private Configuration udfConfig;
// ------------------------------------------------------------------------
......@@ -110,11 +108,11 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
public CombiningUnilateralSortMerger(GenericReducer<E, ?> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializer<E> serializer, TypeComparator<E> comparator,
long totalMemory, int maxNumFileHandles, float startSpillingFraction, boolean combineLastMerge)
long totalMemory, int maxNumFileHandles, float startSpillingFraction)
throws IOException, MemoryAllocationException
{
this(combineStub, memoryManager, ioManager, input, parentTask, serializer, comparator,
totalMemory, -1, maxNumFileHandles, startSpillingFraction, combineLastMerge);
totalMemory, -1, maxNumFileHandles, startSpillingFraction);
}
/**
......@@ -144,23 +142,23 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializer<E> serializer, TypeComparator<E> comparator,
long totalMemory, int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean combineLastMerge)
float startSpillingFraction)
throws IOException, MemoryAllocationException
{
super(memoryManager, ioManager, input, parentTask, serializer, comparator,
totalMemory, numSortBuffers, maxNumFileHandles, startSpillingFraction, false);
this.combineStub = combineStub;
this.combineLastMerge = combineLastMerge;
}
public void setUdfConfiguration(Configuration config) {
this.udfConfig = config;
}
// ------------------------------------------------------------------------
// Factory Methods
// ------------------------------------------------------------------------
/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.sort.UnilateralSortMerger#getSpillingThread(eu.stratosphere.pact.runtime.sort.ExceptionHandler, eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.CircularQueues, eu.stratosphere.nephele.template.AbstractInvokable, eu.stratosphere.nephele.services.memorymanager.MemoryManager, eu.stratosphere.nephele.services.iomanager.IOManager, eu.stratosphere.pact.runtime.plugable.TypeSerializer, eu.stratosphere.pact.runtime.plugable.TypeComparator, java.util.List, java.util.List, int)
*/
@Override
protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager,
......@@ -175,8 +173,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
// Threads
// ------------------------------------------------------------------------
protected class CombiningSpillingThread extends SpillingThread
{
protected class CombiningSpillingThread extends SpillingThread {
private final TypeComparator<E> comparator2;
public CombiningSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
......@@ -193,8 +191,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
/**
* Entry point of the thread.
*/
public void go() throws IOException
{
public void go() throws IOException {
// ------------------- In-Memory Cache ------------------------
final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
......@@ -256,20 +253,24 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
iterators.size() == 1 ? iterators.get(0) :
new MergeIterator<E>(iterators, this.serializer, this.comparator);
if (CombiningUnilateralSortMerger.this.combineLastMerge) {
KeyGroupedIterator<E> iter = new KeyGroupedIterator<E>(resIter, this.serializer, this.comparator2);
setResultIterator(new CombiningIterator<E>(CombiningUnilateralSortMerger.this.combineStub, iter, this.serializer));
} else {
setResultIterator(resIter);
}
setResultIterator(resIter);
return;
}
}
// ------------------- Spilling Phase ------------------------
final GenericReducer<E, ?> combineStub = CombiningUnilateralSortMerger.this.combineStub;
// now that we are actually spilling, take the combiner, and open it
try {
Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
combineStub.open(conf == null ? new Configuration() : conf);
}
catch (Throwable t) {
throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
}
final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
......@@ -318,8 +319,6 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
final InMemorySorter<E> buffer = element.buffer;
final CombineValueIterator<E> iter = new CombineValueIterator<E>(buffer, this.serializer.createInstance());
final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
final GenericReducer<E, ?> combineStub = CombiningUnilateralSortMerger.this.combineStub;
int i = 0;
int stop = buffer.size() - 1;
......@@ -374,6 +373,21 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
// clear the sort buffers, but do not return the memory to the manager, as we use it for merging
disposeSortBuffers(false);
if (LOG.isDebugEnabled())
LOG.debug("Closing combiner user code.");
// close the user code
try {
combineStub.close();
}
catch (Throwable t) {
throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
}
if (LOG.isDebugEnabled())
LOG.debug("User code closed.");
// ------------------- Merging Phase ------------------------
......@@ -407,12 +421,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
// set the target for the user iterator
// if the final merge combines, create a combining iterator around the merge iterator,
// otherwise not
if (CombiningUnilateralSortMerger.this.combineLastMerge) {
KeyGroupedIterator<E> iter = new KeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
setResultIterator(new CombiningIterator<E>(CombiningUnilateralSortMerger.this.combineStub, iter, this.serializer));
} else {
setResultIterator(mergeIterator);
}
setResultIterator(mergeIterator);
}
// done
......@@ -490,8 +499,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
* This class implements an iterator over values from a sort buffer. The iterator returns the values of a given
* interval.
*/
private static final class CombineValueIterator<E> implements Iterator<E>
{
private static final class CombineValueIterator<E> implements Iterator<E> {
private final InMemorySorter<E> buffer; // the buffer from which values are returned
private final E record;
......@@ -524,22 +533,13 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
this.position = first;
}
/*
* (non-Javadoc)
* @see java.util.Iterator#hasNext()
*/
@Override
public boolean hasNext() {
return this.position <= this.last;
}
/*
* (non-Javadoc)
* @see java.util.Iterator#next()
*/
@Override
public E next()
{
public E next() {
if (this.position <= this.last) {
try {
this.buffer.getRecord(this.record, this.position);
......@@ -556,15 +556,10 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
}
}
/*
* (non-Javadoc)
* @see java.util.Iterator#remove()
*/
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
// ------------------------------------------------------------------------
......@@ -572,8 +567,8 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
/**
* A simple collector that collects Key and Value and writes them into a given <code>Writer</code>.
*/
private static final class WriterCollector<E> implements Collector<E>
{
private static final class WriterCollector<E> implements Collector<E> {
private final ChannelWriterOutputView output; // the writer to write to
private final TypeSerializer<E> serializer;
......@@ -588,12 +583,6 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
this.serializer = serializer;
}
/*
* (non-Javadoc)
* @see
* eu.stratosphere.pact.common.stub.Collector#collect(eu.stratosphere.pact.common.type.Key,
* eu.stratosphere.pact.common.type.Value)
*/
@Override
public void collect(E record) {
try {
......@@ -604,102 +593,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E>
}
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.common.stub.Collector#close()
*/
@Override
public void close() {}
}
// ------------------------------------------------------------------------
/**
* A simple collector that collects Key and Value and puts them into an <tt>ArrayList</tt>.
*/
private static final class ListCollector<E> implements Collector<E>
{
private final ArrayDeque<E> list; // the list to collect pairs in
private final TypeSerializer<E> serializer; // the serializer that creates copies
/**
* Creates a new collector that collects output in the given list.
*
* @param list The list to collect output in.
*/
private ListCollector(ArrayDeque<E> list, TypeSerializer<E> serializer) {
this.list = list;
this.serializer = serializer;
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.common.stub.Collector#collect(eu.stratosphere.pact.common.type.Key, eu.stratosphere.pact.common.type.Value)
*/
@Override
public void collect(E record) {
this.list.add(this.serializer.createCopy(record));
}
/*
* (non-Javadoc)
* @see eu.stratosphere.pact.common.stub.Collector#close()
*/
@Override
public void close() {
// does nothing
}
}
// ------------------------------------------------------------------------
private static final class CombiningIterator<E> implements MutableObjectIterator<E>
{
private final GenericReducer<E, ?> combineStub;
private final KeyGroupedIterator<E> iterator;
private final ArrayDeque<E> results;
private final ListCollector<E> collector;
private final TypeSerializer<E> serializer;
private CombiningIterator(GenericReducer<E, ?> combineStub, KeyGroupedIterator<E> iterator, TypeSerializer<E> serializer)
{
this.combineStub = combineStub;
this.iterator = iterator;
this.serializer = serializer;
this.results = new ArrayDeque<E>();
this.collector = new ListCollector<E>(this.results, serializer);
}
/* (non-Javadoc)
* @see eu.stratosphere.pact.runtime.util.ReadingIterator#next(java.lang.Object)
*/
@Override
public boolean next(E target) throws IOException
{
try {
while (this.results.isEmpty() && this.iterator.nextKey()) {
this.combineStub.combine(this.iterator.getValues(), this.collector);
}
}
catch (Exception ex) {
throw new RuntimeException("An exception occurred in the combiner user code: " + ex.getMessage(), ex);
}
if (!this.results.isEmpty()) {
this.serializer.copyTo(this.results.poll(), target);
return true;
}
else {
return false;
}
}
}
}
......@@ -720,14 +720,13 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
}
// instantiate ourselves a combiner. we should not use the stub, because the sort and the
// subsequent reduce would otherwise share it multithreaded
// subsequent reduce would otherwise share it multi-threaded
final S localStub;
try {
final Class<S> userCodeFunctionType = this.driver.getStubType();
// if the class is null, the driver has no user code
if (userCodeFunctionType != null && GenericReducer.class.isAssignableFrom(userCodeFunctionType)) {
localStub = initStub(userCodeFunctionType);
localStub.open(this.config.getStubParameters());
} else {
throw new IllegalStateException("Performing combining sort outside a reduce task!");
}
......@@ -741,7 +740,9 @@ public class RegularPactTask<S extends Stub, OT> extends AbstractTask implements
(GenericReducer) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
this.config.getSpillingThresholdInput(inputNum), false);
this.config.getSpillingThresholdInput(inputNum));
cSorter.setUdfConfiguration(this.config.getStubParameters());
// set the input to null such that it will be lazily fetched from the input strategy
this.inputs[inputNum] = null;
this.localStrategies[inputNum] = cSorter;
......
......@@ -15,18 +15,23 @@
package eu.stratosphere.pact.runtime.sort;
import java.io.IOException;
import java.util.Comparator;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
......@@ -35,6 +40,7 @@ import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.common.stubs.ReduceStub;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactInteger;
import eu.stratosphere.pact.common.util.LogUtils;
import eu.stratosphere.pact.common.util.MutableObjectIterator;
import eu.stratosphere.pact.generic.types.TypeComparator;
import eu.stratosphere.pact.generic.types.TypeSerializer;
......@@ -45,13 +51,11 @@ import eu.stratosphere.pact.runtime.test.util.TestData;
import eu.stratosphere.pact.runtime.test.util.TestData.Key;
import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode;
import eu.stratosphere.pact.runtime.test.util.TestData.Generator.ValueMode;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
/**
* @author Fabian Hueske
* @author Stephan Ewen
*/
public class CombiningUnilateralSortMergerITCase
{
public class CombiningUnilateralSortMergerITCase {
private static final Log LOG = LogFactory.getLog(CombiningUnilateralSortMergerITCase.class);
private static final long SEED = 649180756312423613L;
......@@ -74,10 +78,15 @@ public class CombiningUnilateralSortMergerITCase
private TypeComparator<PactRecord> comparator;
@BeforeClass
public static void setup() {
LogUtils.initializeDefaultTestConsoleLogger();
}
@SuppressWarnings("unchecked")
@Before
public void beforeTest()
{
public void beforeTest() {
this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE);
this.ioManager = new IOManager();
......@@ -86,8 +95,7 @@ public class CombiningUnilateralSortMergerITCase
}
@After
public void afterTest()
{
public void afterTest() {
this.ioManager.shutdown();
if (!this.ioManager.isProperlyShutDown()) {
Assert.fail("I/O Manager was not properly shut down.");
......@@ -111,10 +119,52 @@ public class CombiningUnilateralSortMergerITCase
LOG.debug("initializing sortmerger");
Sorter<PactRecord> merger = new CombiningUnilateralSortMerger<PactRecord>(new TestCountCombiner(),
this.memoryManager, this.ioManager,
reader, this.parentTask, this.serializer, this.comparator,
64 * 1024 * 1024, 64, 0.7f, true);
TestCountCombiner comb = new TestCountCombiner();
Sorter<PactRecord> merger = new CombiningUnilateralSortMerger<PactRecord>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializer, this.comparator,
64 * 1024 * 1024, 64, 0.7f);
final PactRecord rec = new PactRecord();
rec.setField(1, new PactInteger(1));
final TestData.Key key = new TestData.Key();
for (int i = 0; i < noKeyCnt; i++) {
for (int j = 0; j < noKeys; j++) {
key.setKey(j);
rec.setField(0, key);
reader.emit(rec);
}
}
reader.close();
MutableObjectIterator<PactRecord> iterator = merger.getIterator();
Iterator<Integer> result = getReducingIterator(iterator, serializer, comparator.duplicate());
while (result.hasNext()) {
Assert.assertEquals(noKeyCnt, result.next().intValue());
}
merger.close();
// if the combiner was opened, it must have been closed
Assert.assertTrue(comb.opened == comb.closed);
}
@Test
public void testCombineSpilling() throws Exception {
int noKeys = 100;
int noKeyCnt = 10000;
MockRecordReader reader = new MockRecordReader();
LOG.debug("initializing sortmerger");
TestCountCombiner comb = new TestCountCombiner();
Sorter<PactRecord> merger = new CombiningUnilateralSortMerger<PactRecord>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializer, this.comparator,
3 * 1024 * 1024, 64, 0.005f);
final PactRecord rec = new PactRecord();
rec.setField(1, new PactInteger(1));
......@@ -131,12 +181,15 @@ public class CombiningUnilateralSortMergerITCase
MutableObjectIterator<PactRecord> iterator = merger.getIterator();
PactRecord target = new PactRecord();
while (iterator.next(target)) {
Assert.assertEquals(noKeyCnt, target.getField(1, PactInteger.class).getValue());
Iterator<Integer> result = getReducingIterator(iterator, serializer, comparator.duplicate());
while (result.hasNext()) {
Assert.assertEquals(noKeyCnt, result.next().intValue());
}
merger.close();
// if the combiner was opened, it must have been closed
Assert.assertTrue(comb.opened == comb.closed);
}
@Test
......@@ -156,10 +209,11 @@ public class CombiningUnilateralSortMergerITCase
// merge iterator
LOG.debug("initializing sortmerger");
Sorter<PactRecord> merger = new CombiningUnilateralSortMerger<PactRecord>(new TestCountCombiner2(),
this.memoryManager, this.ioManager,
reader, this.parentTask, this.serializer, this.comparator,
64 * 1024 * 1024, 2, 0.7f, true);
TestCountCombiner2 comb = new TestCountCombiner2();
Sorter<PactRecord> merger = new CombiningUnilateralSortMerger<PactRecord>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializer, this.comparator,
64 * 1024 * 1024, 2, 0.7f);
// emit data
LOG.debug("emitting data");
......@@ -207,17 +261,24 @@ public class CombiningUnilateralSortMergerITCase
}
merger.close();
// if the combiner was opened, it must have been closed
Assert.assertTrue(comb.opened == comb.closed);
}
// --------------------------------------------------------------------------------------------
public class TestCountCombiner extends ReduceStub
{
public static class TestCountCombiner extends ReduceStub {
private final PactInteger count = new PactInteger();
public volatile boolean opened = false;
public volatile boolean closed = false;
@Override
public void combine(Iterator<PactRecord> values, Collector<PactRecord> out)
{
public void combine(Iterator<PactRecord> values, Collector<PactRecord> out) {
PactRecord rec = null;
int cnt = 0;
while (values.hasNext()) {
......@@ -231,16 +292,27 @@ public class CombiningUnilateralSortMergerITCase
}
@Override
public void reduce(Iterator<PactRecord> values, Collector<PactRecord> out) {
// yo, nothing, mon
public void reduce(Iterator<PactRecord> values, Collector<PactRecord> out) {}
@Override
public void open(Configuration parameters) throws Exception {
opened = true;
}
@Override
public void close() throws Exception {
closed = true;
}
}
public class TestCountCombiner2 extends ReduceStub
{
public static class TestCountCombiner2 extends ReduceStub {
public volatile boolean opened = false;
public volatile boolean closed = false;
@Override
public void combine(Iterator<PactRecord> values, Collector<PactRecord> out)
{
public void combine(Iterator<PactRecord> values, Collector<PactRecord> out) {
PactRecord rec = null;
int cnt = 0;
while (values.hasNext()) {
......@@ -255,5 +327,65 @@ public class CombiningUnilateralSortMergerITCase
public void reduce(Iterator<PactRecord> values, Collector<PactRecord> out) {
// yo, nothing, mon
}
@Override
public void open(Configuration parameters) throws Exception {
opened = true;
}
@Override
public void close() throws Exception {
closed = true;
}
}
private static Iterator<Integer> getReducingIterator(MutableObjectIterator<PactRecord> data, TypeSerializer<PactRecord> serializer, TypeComparator<PactRecord> comparator) {
final KeyGroupedIterator<PactRecord> groupIter = new KeyGroupedIterator<PactRecord>(data, serializer, comparator);
return new Iterator<Integer>() {
private boolean hasNext = false;
@Override
public boolean hasNext() {
if (hasNext) {
return true;
}
try {
hasNext = groupIter.nextKey();
} catch (IOException e) {
throw new RuntimeException(e);
}
return hasNext;
}
@Override
public Integer next() {
if (hasNext()) {
hasNext = false;
Iterator<PactRecord> values = groupIter.getValues();
PactRecord rec = null;
int cnt = 0;
while (values.hasNext()) {
rec = values.next();
cnt += rec.getField(1, PactInteger.class).getValue();
}
return cnt;
} else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
......@@ -132,7 +132,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<GenericReducer<Pact
try {
sorter = new CombiningUnilateralSortMerger<PactRecord>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformPactRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), PactRecordSerializer.get(), this.comparator.duplicate(), this.perSortMem, 2, 0.8f, false);
getOwningNepheleTask(), PactRecordSerializer.get(), this.comparator.duplicate(), this.perSortMem, 2, 0.8f);
addInput(sorter.getIterator());
ReduceDriver<PactRecord, PactRecord> testTask = new ReduceDriver<PactRecord, PactRecord>();
......@@ -176,7 +176,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<GenericReducer<Pact
try {
sorter = new CombiningUnilateralSortMerger<PactRecord>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformPactRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), PactRecordSerializer.get(), this.comparator.duplicate(), this.perSortMem, 2, 0.8f, false);
getOwningNepheleTask(), PactRecordSerializer.get(), this.comparator.duplicate(), this.perSortMem, 2, 0.8f);
addInput(sorter.getIterator());
ReduceDriver<PactRecord, PactRecord> testTask = new ReduceDriver<PactRecord, PactRecord>();
......
......@@ -127,7 +127,7 @@ public class ReduceTaskTest extends DriverTestBase<GenericReducer<PactRecord, Pa
try {
sorter = new CombiningUnilateralSortMerger<PactRecord>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformPactRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), PactRecordSerializer.get(), this.comparator.duplicate(), this.perSortMem, 4, 0.8f, false);
getOwningNepheleTask(), PactRecordSerializer.get(), this.comparator.duplicate(), this.perSortMem, 4, 0.8f);
addInput(sorter.getIterator());
ReduceDriver<PactRecord, PactRecord> testTask = new ReduceDriver<PactRecord, PactRecord>();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册