提交 0b73b438 编写于 作者: S Stephan Ewen

[FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.

Also make sure that most tests run with enabled bloom filters, to increase test coverage.
上级 61dcae39
......@@ -244,11 +244,6 @@ free for objects created by user-defined functions. (DEFAULT: 0.7)
This parameter is only evaluated, if `taskmanager.memory.size` is not set.
- `jobclient.polling.interval`: The interval (in seconds) in which the client
polls the JobManager for the status of its job (DEFAULT: 2).
- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
fan-out for spilling hash tables. Limits the number of file handles per operator,
but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
when this fraction of its memory budget is full (DEFAULT: 0.8).
- `taskmanager.heartbeat-interval`: The interval in which the TaskManager sends
heartbeats to the JobManager.
- `jobmanager.max-heartbeat-delay-before-failure.msecs`: The maximum time that a
......@@ -324,6 +319,16 @@ sample exceeds this value (possible because of misconfiguration of the parser),
the sampling aborts. This value can be overridden for a specific input with the
input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
### Runtime Algorithms
- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
fan-out for spilling hash tables. Limits the number of file handles per operator,
but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
when this fraction of its memory budget is full (DEFAULT: 0.8).
- `taskmanager.runtime.hashjoin-bloom-filters`: If true, the hash join uses bloom filters to pre-filter records against spilled partitions. (DEFAULT: true)
## YARN
......
......@@ -172,6 +172,8 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
// --------------------------- Runtime Algorithms -------------------------------
/**
* Parameter for the maximum fan for out-of-core algorithms.
* Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
......@@ -184,18 +186,17 @@ public final class ConfigConstants {
* sorter will start spilling to disk.
*/
public static final String DEFAULT_SORT_SPILLING_THRESHOLD_KEY = "taskmanager.runtime.sort-spilling-threshold";
/**
* Parameter to switch hash join bloom filters for spilled partitions on and off.
*/
public static final String RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY = "taskmanager.runtime.hashjoin-bloom-filters";
/**
* The config parameter defining the timeout for filesystem stream opening.
* A value of 0 indicates infinite waiting.
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
/**
* While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
* to minimize the spilled probe records count.
*/
public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
// ------------------------ YARN Configuration ------------------------
......@@ -543,6 +544,13 @@ public final class ConfigConstants {
* The default task manager's maximum registration duration
*/
public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
// ------------------------ Runtime Algorithms ------------------------
/**
* Default setting for the switch for hash join bloom filters for spilled partitions.
*/
public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;
/**
* The default value for the maximum spilling fan in/out.
......@@ -558,15 +566,9 @@ public final class ConfigConstants {
* The default timeout for filesystem stream opening: infinite (means max long milliseconds).
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
/**
* Enable bloom filter for hash join as it promote hash join performance most of the time.
*/
public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
// ------------------------ YARN Configuration ------------------------
/**
* Minimum amount of Heap memory to subtract from the requested TaskManager size.
* We came up with these values experimentally.
......
......@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import java.util.Map;
import java.util.concurrent.Future;
......@@ -72,14 +73,11 @@ public interface Environment {
Configuration getTaskConfiguration();
/**
* @return The task manager configuration
*/
Configuration getTaskManagerConfiguration();
/**
* @return Hostname of the task manager
* Gets the task manager info, with configuration and hostname.
*
* @return The task manager info, with configuration and hostname.
*/
String getHostname();
TaskManagerRuntimeInfo getTaskManagerInfo();
/**
* Returns the job-wide configuration object that was attached to the JobGraph.
......
......@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
......@@ -74,7 +75,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
double availableMemory = config.getRelativeMemoryDriver();
boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
ExecutionConfig executionConfig = taskContext.getExecutionConfig();
objectReuseEnabled = executionConfig.isObjectReuseEnabled();
......@@ -89,7 +93,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);
} else if (buildSideIndex == 1 && probeSideIndex == 0) {
......@@ -102,7 +107,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);
} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
......@@ -118,7 +124,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);
} else if (buildSideIndex == 1 && probeSideIndex == 0) {
......@@ -131,7 +138,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory);
availableMemory,
hashJoinUseBitMaps);
} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
......@@ -148,12 +156,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
@Override
public void run() throws Exception {
final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
}
@Override
......
......@@ -19,25 +19,27 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The join driver implements the logic of a join operator at runtime. It instantiates either
* hash or sort-merge based strategies to find joining pairs of records.
......@@ -115,19 +117,40 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
if (LOG.isDebugEnabled()) {
LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}
boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
// create and return joining iterator according to provided local strategy.
if (objectReuseEnabled) {
switch (ls) {
case MERGE:
this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
default:
throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
......@@ -135,14 +158,32 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
} else {
switch (ls) {
case MERGE:
this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
hashJoinUseBitMaps);
break;
default:
throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
......
......@@ -16,16 +16,14 @@
* limitations under the License.
*/
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.functions.Function;
/**
* The interface to be implemented by all pact drivers that run alone (or as the primary driver) in a nephele task.
* The driver is the code that deals with everything that specific to a certain PACT. It implements the actual
* <i>map</i> or <i>reduce</i> specific code.
* The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
* A driver implements the actual code to perform a batch operation, like <i>map()</i>,
* <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
*
* @see PactTaskContext
*
......@@ -37,7 +35,7 @@ public interface PactDriver<S extends Function, OT> {
void setup(PactTaskContext<S, OT> context);
/**
* Gets the number of inputs (= Nephele Gates and Readers) that the task has.
* Gets the number of inputs that the task has.
*
* @return The number of inputs.
*/
......
......@@ -26,15 +26,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
/**
* A runtime task is the task that is executed by the flink engine inside a task vertex.
* It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
* deals with the runtime setup and teardown and the control-flow logic. The latter appears especially
* in the case of iterations.
* The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
* the runtime components and configuration that they can use to fulfil their task.
*
* @param <S> The UDF type.
* @param <OT> The produced data type.
......@@ -44,6 +43,8 @@ import org.apache.flink.util.MutableObjectIterator;
public interface PactTaskContext<S, OT> {
TaskConfig getTaskConfig();
TaskManagerRuntimeInfo getTaskManagerInfo();
ClassLoader getUserCodeClassLoader();
......
......@@ -60,6 +60,7 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
......@@ -660,7 +661,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initInputReaders() throws Exception {
final int numInputs = getNumTaskInputs();
final MutableReader<?>[] inputReaders = new MutableReader[numInputs];
final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
int currentReaderOffset = 0;
......@@ -705,7 +706,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initBroadcastInputReaders() throws Exception {
final int numBroadcastInputs = this.config.getNumBroadcastInputs();
final MutableReader<?>[] broadcastInputReaders = new MutableReader[numBroadcastInputs];
final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
int currentReaderOffset = config.getNumInputs();
......@@ -737,8 +738,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
this.inputIterators = new MutableObjectIterator[numInputs];
this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null;
this.inputIterators = new MutableObjectIterator<?>[numInputs];
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
......@@ -764,7 +765,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* Creates all the serializers and iterators for the broadcast inputs.
*/
protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs];
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
......@@ -787,8 +788,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
final MemoryManager memMan = getMemoryManager();
final IOManager ioMan = getIOManager();
this.localStrategies = new CloseableInputProvider[numInputs];
this.inputs = new MutableObjectIterator[numInputs];
this.localStrategies = new CloseableInputProvider<?>[numInputs];
this.inputs = new MutableObjectIterator<?>[numInputs];
this.excludeFromReset = new boolean[numInputs];
this.inputIsCached = new boolean[numInputs];
this.inputIsAsyncMaterialized = new boolean[numInputs];
......@@ -807,8 +808,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
// the second variant spills to the side and will not read unless the result is also consumed
// in a pipelined fashion.
this.resettableInputs = new SpillingResettableMutableObjectIterator[numInputs];
this.tempBarriers = new TempBarrier[numInputs];
this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs];
this.tempBarriers = new TempBarrier<?>[numInputs];
for (int i = 0; i < numInputs; i++) {
final int memoryPages;
......@@ -1043,6 +1044,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
return this.config;
}
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return getEnvironment().getTaskManagerInfo();
}
@Override
public MemoryManager getMemoryManager() {
return getEnvironment().getMemoryManager();
......
......@@ -32,6 +32,7 @@ import java.util.List;
* Common methods for all Hash Join Iterators.
*/
public class HashMatchIteratorBase {
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
TypeSerializer<BT> buildSideSerializer,
TypeComparator<BT> buildSideComparator,
......@@ -41,11 +42,15 @@ public class HashMatchIteratorBase {
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction) throws MemoryAllocationException {
double memoryFraction,
boolean useBloomFilters) throws MemoryAllocationException {
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
buildSideComparator, probeSideComparator, pairComparator,
memorySegments, ioManager,
useBloomFilters);
}
}
......@@ -24,10 +24,9 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -46,22 +45,16 @@ import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
/**
* An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually starts
* spilling contents to disk, when the memory is not sufficient. It does not need to know a priori
* how large the input will be.
* <p>
* The design of this class follows on many parts the design presented in
* "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
* implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.
*<p>
*
*
* <hr>
*
* The layout of the buckets inside a memory segment is as follows:
* <p>The design of this class follows on many parts the design presented in
* "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
* implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.</p>
*
* <p>The layout of the buckets inside a memory segment is as follows:</p>
* <pre>
* +----------------------------- Bucket x ----------------------------
* |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
......@@ -189,8 +182,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
private static final long BUCKET_FORWARD_POINTER_NOT_SET = ~0x0L;
// private static final byte BUCKET_STATUS_SPILLED = 1;
/**
* Constant for the bucket status, indicating that the bucket is in memory.
*/
......@@ -274,11 +265,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
protected final int bucketsPerSegmentBits;
/**
/**
* An estimate for the average record length.
*/
private final int avgRecordLen;
/** Flag to enable/disable bloom filters for spilled partitions */
private final boolean useBloomFilters;
// ------------------------------------------------------------------------
/**
......@@ -296,8 +290,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
private HashBucketIterator<BT, PT> bucketIterator;
// private LazyHashBucketIterator<BT, PT> lazyBucketIterator;
/**
* Iterator over the elements from the probe side.
*/
......@@ -319,6 +311,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
* of hash-codes and pointers to the elements.
*/
protected MemorySegment[] buckets;
/** The bloom filter utility used to transform hash buckets of spilled partitions into a
* probabilistic filter */
private BloomFilter bloomFilter;
/**
* The number of buckets in the current table. The bucket array is not necessarily fully
......@@ -353,25 +349,35 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
protected boolean furtherPartitioning = false;
private boolean running = true;
private BloomFilter bloomFilter;
// ------------------------------------------------------------------------
// Construction and Teardown
// ------------------------------------------------------------------------
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator,
List<MemorySegment> memorySegments, IOManager ioManager)
{
this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
memorySegments, ioManager, true);
}
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager)
TypePairComparator<PT, BT> comparator,
List<MemorySegment> memorySegments,
IOManager ioManager,
boolean useBloomFilters)
{
this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
memorySegments, ioManager, DEFAULT_RECORD_LEN);
memorySegments, ioManager, DEFAULT_RECORD_LEN, useBloomFilters);
}
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments,
IOManager ioManager, int avgRecordLen)
IOManager ioManager, int avgRecordLen, boolean useBloomFilters)
{
// some sanity checks first
if (memorySegments == null) {
......@@ -390,6 +396,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.recordComparator = comparator;
this.availableMemory = memorySegments;
this.ioManager = ioManager;
this.useBloomFilters = useBloomFilters;
this.avgRecordLen = avgRecordLen > 0 ? avgRecordLen :
buildSideSerializer.getLength() == -1 ? DEFAULT_RECORD_LEN : buildSideSerializer.getLength();
......@@ -551,16 +558,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
public boolean nextRecord() throws IOException {
final boolean probeProcessing = processProbeIter();
if(probeProcessing) {
return true;
}
return prepareNextPartition();
return probeProcessing || prepareNextPartition();
}
public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException
{
public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException {
final TypeComparator<PT> probeAccessors = this.probeSideComparator;
final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
final int posHashCode = hash % this.numBuckets;
......@@ -585,32 +588,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
}
// public LazyHashBucketIterator<BT, PT> getLazyMatchesFor(PT record) throws IOException
// {
// final TypeComparator<PT> probeAccessors = this.probeSideComparator;
// final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
// final int posHashCode = hash % this.numBuckets;
//
// // get the bucket for the given hash code
// final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
// final int bucketInSegmentOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
// final MemorySegment bucket = this.buckets[bucketArrayPos];
//
// // get the basic characteristics of the bucket
// final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
// final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
//
// // for an in-memory partition, process set the return iterators, else spill the probe records
// if (p.isInMemory()) {
// this.recordComparator.setReference(record);
// this.lazyBucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
// return this.lazyBucketIterator;
// }
// else {
// throw new IllegalStateException("Method is not applicable to partially spilled hash tables.");
// }
// }
public PT getCurrentProbeRecord() {
return this.probeIterator.getCurrent();
}
......@@ -739,7 +716,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
}
final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
final long totalSize = ((long) bufferSize) * numBuffers;
final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
......@@ -1092,9 +1069,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.buckets = table;
this.numBuckets = numBuckets;
boolean enableBloomFilter = GlobalConfiguration.getBoolean(
ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
if (enableBloomFilter) {
if (useBloomFilters) {
initBloomFilter(numBuckets);
}
}
......@@ -1107,8 +1082,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.numBuckets = 0;
if (this.buckets != null) {
for (int i = 0; i < this.buckets.length; i++) {
this.availableMemory.add(this.buckets[i]);
for (MemorySegment bucket : this.buckets) {
this.availableMemory.add(bucket);
}
this.buckets = null;
}
......@@ -1138,9 +1113,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
final HashPartition<BT, PT> p = partitions.get(largestPartNum);
boolean enableBloomFilter = GlobalConfiguration.getBoolean(
ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
if (enableBloomFilter) {
if (useBloomFilters) {
buildBloomFilterForBucketsInPartition(largestPartNum, p);
}
......@@ -1196,7 +1169,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
}
final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
int totalCount = 0;
boolean skip = false;
long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
......@@ -1207,7 +1180,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
break;
}
MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
int bucketInOverflowSegmentOffset = (int) forwardPointer;
final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
totalCount += count;
......@@ -1587,93 +1560,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
} // end HashBucketIterator
// ======================================================================================================
// public static final class LazyHashBucketIterator<BT, PT> {
//
// private final TypePairComparator<PT, BT> comparator;
//
// private MemorySegment bucket;
//
// private MemorySegment[] overflowSegments;
//
// private HashPartition<BT, PT> partition;
//
// private int bucketInSegmentOffset;
//
// private int searchHashCode;
//
// private int posInSegment;
//
// private int countInSegment;
//
// private int numInSegment;
//
// private LazyHashBucketIterator(TypePairComparator<PT, BT> comparator) {
// this.comparator = comparator;
// }
//
//
// void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
// int searchHashCode, int bucketInSegmentOffset) {
//
// this.bucket = bucket;
// this.overflowSegments = overflowSegments;
// this.partition = partition;
// this.searchHashCode = searchHashCode;
// this.bucketInSegmentOffset = bucketInSegmentOffset;
//
// this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
// this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
// this.numInSegment = 0;
// }
//
// public boolean next(BT target) {
// // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
// while (true) {
//
// while (this.numInSegment < this.countInSegment) {
//
// final int thisCode = this.bucket.getInt(this.posInSegment);
// this.posInSegment += HASH_CODE_LEN;
//
// // check if the hash code matches
// if (thisCode == this.searchHashCode) {
// // get the pointer to the pair
// final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
// BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
// this.numInSegment++;
//
// // check whether it is really equal, or whether we had only a hash collision
// LazyDeSerializable lds = (LazyDeSerializable) target;
// lds.setDeSerializer(this.partition, this.partition.getWriteView(), pointer);
// if (this.comparator.equalToReference(target)) {
// return true;
// }
// }
// else {
// this.numInSegment++;
// }
// }
//
// // this segment is done. check if there is another chained bucket
// final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
// if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
// return false;
// }
//
// final int overflowSegNum = (int) (forwardPointer >>> 32);
// this.bucket = this.overflowSegments[overflowSegNum];
// this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
// this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
// this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
// this.numInSegment = 0;
// }
// }
// }
// ======================================================================================================
......
......@@ -67,16 +67,16 @@ public class NonReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchI
TypePairComparator<V2, V1> pairComparator,
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
this.probeSideSerializer = serializer2;
this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
pairComparator, memManager, ioManager, ownerTask, memoryFraction);
pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
......
......@@ -48,25 +48,32 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
memoryFraction);
memoryFraction, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
@Override
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
buildSideComparator, probeSideComparator, pairComparator,
memorySegments, ioManager, useBitmapFilters);
}
/**
......@@ -76,5 +83,4 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
}
......@@ -66,16 +66,16 @@ public class NonReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatch
TypePairComparator<V1, V2> pairComparator,
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
this.probeSideSerializer = serializer1;
this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
......
......@@ -48,11 +48,12 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
}
......@@ -62,12 +63,17 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
buildSideComparator, probeSideComparator, pairComparator,
memorySegments, ioManager, useBitmapFilters);
}
/**
......@@ -77,5 +83,4 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
}
......@@ -33,17 +33,12 @@ import org.apache.flink.util.MutableObjectIterator;
public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT> {
/**
* Channel for the spilled partitions
*/
/** Channel for the spilled partitions */
private final FileIOChannel.Enumerator spilledInMemoryPartitions;
/**
* Stores the initial partitions and a list of the files that contain the spilled contents
*/
/** Stores the initial partitions and a list of the files that contain the spilled contents */
private List<HashPartition<BT, PT>> initialPartitions;
/**
* The values of these variables are stored here after the initial open()
* Required to restore the initial state before each additional probe phase.
......@@ -58,16 +53,17 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
TypeComparator<BT> buildSideComparator,
TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator,
List<MemorySegment> memorySegments, IOManager ioManager) {
List<MemorySegment> memorySegments, IOManager ioManager,
boolean useBitmapFilters) {
super(buildSideSerializer, probeSideSerializer, buildSideComparator,
probeSideComparator, comparator, memorySegments, ioManager);
probeSideComparator, comparator, memorySegments, ioManager, useBitmapFilters);
keepBuildSidePartitions = true;
spilledInMemoryPartitions = ioManager.createChannelEnumerator();
}
@Override
public void open(MutableObjectIterator<BT> buildSide,
MutableObjectIterator<PT> probeSide) throws IOException {
public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException {
super.open(buildSide, probeSide);
initialPartitions = new ArrayList<HashPartition<BT, PT>>( partitionsBeingBuilt );
initialPartitionFanOut = (byte) partitionsBeingBuilt.size();
......
......@@ -71,9 +71,9 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
......@@ -83,7 +83,7 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
this.tempBuildSideRecord = serializer1.createInstance();
this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
......
......@@ -48,25 +48,32 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
double memoryFraction,
boolean useBitmapFilters)
throws MemoryAllocationException
{
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
memoryFraction);
memoryFraction, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
@Override
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
buildSideComparator, probeSideComparator, pairComparator,
memorySegments, ioManager, useBitmapFilters);
}
/**
......@@ -76,5 +83,4 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
}
......@@ -71,9 +71,9 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
......@@ -83,7 +83,7 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
this.tempBuildSideRecord = serializer2.createInstance();
this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
memManager, ioManager, ownerTask, memoryFraction);
memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
......
......@@ -48,24 +48,31 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction)
throws MemoryAllocationException
{
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
}
@Override
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean useBitmapFilters) throws MemoryAllocationException {
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
buildSideComparator, probeSideComparator, pairComparator,
memorySegments, ioManager, useBitmapFilters);
}
/**
......@@ -75,5 +82,4 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
}
......@@ -75,9 +75,7 @@ public class RuntimeEnvironment implements Environment {
private final AccumulatorRegistry accumulatorRegistry;
private final Configuration taskManagerConfiguration;
private final String hostname;
private final TaskManagerRuntimeInfo taskManagerInfo;
// ------------------------------------------------------------------------
......@@ -124,8 +122,7 @@ public class RuntimeEnvironment implements Environment {
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
this.hostname = taskManagerInfo.getHostname();
this.taskManagerInfo = checkNotNull(taskManagerInfo);
}
// ------------------------------------------------------------------------
......@@ -176,13 +173,8 @@ public class RuntimeEnvironment implements Environment {
}
@Override
public Configuration getTaskManagerConfiguration(){
return taskManagerConfiguration;
}
@Override
public String getHostname(){
return hostname;
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return taskManagerInfo;
}
@Override
......
......@@ -33,6 +33,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -62,6 +63,8 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
private ExecutionConfig executionConfig = new ExecutionConfig();
private TaskManagerRuntimeInfo taskManageInfo;
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
......@@ -70,6 +73,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
public TestTaskContext(long memoryInBytes) {
this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
// --------------------------------------------------------------------------------------------
......@@ -155,6 +159,11 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
return null;
}
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return this.taskManageInfo;
}
@Override
@SuppressWarnings("unchecked")
public <X> MutableObjectIterator<X> getInput(int index) {
......
......@@ -24,9 +24,6 @@ import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
......@@ -40,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
......@@ -192,10 +190,6 @@ public class MutableHashTablePerformanceBenchmark {
InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
GlobalConfiguration.includeConfiguration(conf);
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
......@@ -212,7 +206,7 @@ public class MutableHashTablePerformanceBenchmark {
final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
memSegments, ioManager);
memSegments, ioManager, enableBloomFilter);
join.open(buildIterator, probeIterator);
final StringPair recordReuse = new StringPair();
......
......@@ -155,7 +155,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -242,7 +242,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -291,7 +291,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -378,7 +378,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -425,7 +425,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0);
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -472,7 +472,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0);
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
......
......@@ -239,7 +239,7 @@ public class NonReusingReOpenableHashTableITCase {
new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
// do first join with both inputs
......@@ -277,7 +277,7 @@ public class NonReusingReOpenableHashTableITCase {
//
//
private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
private MutableObjectIterator<Record> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
......@@ -334,9 +334,9 @@ public class NonReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager);
memSegments, ioManager, true);
for(int probe = 0; probe < NUM_PROBES; probe++) {
for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
......@@ -348,9 +348,8 @@ public class NonReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
while (join.nextRecord())
{
int numBuildValues = 0;
while (join.nextRecord()) {
long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
......@@ -370,10 +369,10 @@ public class NonReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
contained = Long.valueOf(numBuildValues);
contained = numBuildValues;
}
else {
contained = Long.valueOf(contained.longValue() + numBuildValues);
contained = contained + numBuildValues;
}
map.put(key, contained);
......@@ -450,11 +449,12 @@ public class NonReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager);
for(int probe = 0; probe < NUM_PROBES; probe++) {
memSegments, ioManager, true);
for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
if (probe == 0) {
join.open(buildInput, probeInput);
} else {
join.reopenProbe(probeInput);
......@@ -462,9 +462,8 @@ public class NonReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
while (join.nextRecord())
{
int numBuildValues = 0;
while (join.nextRecord()) {
long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
......@@ -484,10 +483,10 @@ public class NonReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
contained = Long.valueOf(numBuildValues);
contained = numBuildValues;
}
else {
contained = Long.valueOf(contained.longValue() + numBuildValues);
contained = contained + numBuildValues;
}
map.put(key, contained);
......
......@@ -155,7 +155,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -242,7 +242,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -291,7 +291,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -378,7 +378,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -425,7 +425,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0);
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
......@@ -472,7 +472,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0);
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
......
......@@ -238,7 +238,7 @@ public class ReusingReOpenableHashTableITCase {
new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0);
this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
// do first join with both inputs
......@@ -276,7 +276,7 @@ public class ReusingReOpenableHashTableITCase {
//
//
private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
private MutableObjectIterator<Record> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
......@@ -289,8 +289,7 @@ public class ReusingReOpenableHashTableITCase {
}
@Test
public void testSpillingHashJoinWithMassiveCollisions() throws IOException
{
public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
// the following two values are known to have a hash-code collision on the initial level.
// we use them to make sure one partition grows over-proportionally large
final int REPEATED_VALUE_1 = 40559;
......@@ -311,9 +310,6 @@ public class ReusingReOpenableHashTableITCase {
builds.add(build2);
builds.add(build3);
MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
......@@ -333,7 +329,7 @@ public class ReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager);
memSegments, ioManager, true);
for(int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
......@@ -347,9 +343,8 @@ public class ReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
while (join.nextRecord())
{
int numBuildValues = 0;
while (join.nextRecord()) {
long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
......@@ -369,10 +364,10 @@ public class ReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
contained = Long.valueOf(numBuildValues);
contained = numBuildValues;
}
else {
contained = Long.valueOf(contained.longValue() + numBuildValues);
contained = contained + numBuildValues;
}
map.put(key, contained);
......@@ -449,8 +444,9 @@ public class ReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager);
for(int probe = 0; probe < NUM_PROBES; probe++) {
memSegments, ioManager, true);
for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
......@@ -463,7 +459,7 @@ public class ReusingReOpenableHashTableITCase {
while (join.nextRecord())
{
int numBuildValues = 0;
long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
......@@ -483,10 +479,10 @@ public class ReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
contained = Long.valueOf(numBuildValues);
contained = numBuildValues;
}
else {
contained = Long.valueOf(contained.longValue() + numBuildValues);
contained = contained + numBuildValues;
}
map.put(key, contained);
......@@ -526,5 +522,4 @@ public class ReusingReOpenableHashTableITCase {
}
return copy;
}
}
......@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.junit.Assert;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
......@@ -69,11 +70,11 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
private final List<UnilateralSortMerger<Record>> sorters;
private final AbstractInvokable owner;
private final Configuration config;
private final TaskConfig taskConfig;
private final TaskManagerRuntimeInfo taskManageInfo;
protected final long perSortMem;
protected final double perSortFractionMem;
......@@ -111,11 +112,9 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
this.owner = new DummyInvokable();
this.config = new Configuration();
this.taskConfig = new TaskConfig(this.config);
this.taskConfig = new TaskConfig(new Configuration());
this.executionConfig = executionConfig;
this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
@Parameterized.Parameters
......@@ -279,7 +278,10 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
return this.taskConfig;
}
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return this.taskManageInfo;
}
@Override
public ExecutionConfig getExecutionConfig() {
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
......@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.mockito.invocation.InvocationOnMock;
......@@ -193,13 +193,8 @@ public class MockEnvironment implements Environment {
}
@Override
public Configuration getTaskManagerConfiguration(){
return new UnmodifiableConfiguration(new Configuration());
}
@Override
public String getHostname(){
return "localhost";
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
}
@Override
......
......@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.ResettablePactDriver;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
......@@ -54,7 +55,9 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
protected static final int PAGE_SIZE = 32 * 1024;
protected static final int PAGE_SIZE = 32 * 1024;
private final TaskManagerRuntimeInfo taskManageInfo;
private final IOManager ioManager;
......@@ -110,6 +113,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
this.executionConfig = executionConfig;
this.comparators = new ArrayList<TypeComparator<IN>>(2);
this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
@Parameterized.Parameters
......@@ -291,6 +296,11 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
return this.memManager;
}
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return this.taskManageInfo;
}
@Override
public <X> MutableObjectIterator<X> getInput(int index) {
MutableObjectIterator<IN> in = this.input;
......
......@@ -187,7 +187,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
iterator.open();
......@@ -226,7 +226,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
iterator.open();
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.JobID;
......@@ -45,6 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -86,7 +88,8 @@ public class StreamMockEnvironment implements Environment {
private final int bufferSize;
public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.jobConfiguration = jobConfig;
this.taskConfiguration = taskConfig;
this.inputs = new LinkedList<InputGate>();
......@@ -293,13 +296,8 @@ public class StreamMockEnvironment implements Environment {
}
@Override
public Configuration getTaskManagerConfiguration(){
return new UnmodifiableConfiguration(new Configuration());
}
@Override
public String getHostname(){
return "localhost";
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册