diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 56331858d66e7e41c2f87fca982da58f03aaad6b..6db32c5ad5e4a8f395bd77037b0029e9f7175d26 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -841,6 +841,7 @@ public abstract class DataSet { switch(strategy) { case OPTIMIZER_CHOOSES: case REPARTITION_SORT_MERGE: + case REPARTITION_HASH_FIRST: case REPARTITION_HASH_SECOND: case BROADCAST_HASH_SECOND: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER); @@ -891,6 +892,7 @@ public abstract class DataSet { case OPTIMIZER_CHOOSES: case REPARTITION_SORT_MERGE: case REPARTITION_HASH_FIRST: + case REPARTITION_HASH_SECOND: case BROADCAST_HASH_FIRST: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER); default: @@ -938,6 +940,8 @@ public abstract class DataSet { switch(strategy) { case OPTIMIZER_CHOOSES: case REPARTITION_SORT_MERGE: + case REPARTITION_HASH_FIRST: + case REPARTITION_HASH_SECOND: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER); default: throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java index 794f0d405e41c96c8c4836ceac7a0526891920f9..9f2aa41970fb5f785ad212f969e3a295bf4a0bef 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java @@ -181,7 +181,7 @@ public class FullOuterJoinOperatorTest { this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE); } - @Test(expected = InvalidProgramException.class) + @Test public void testFullOuterStrategy3() { this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND); } @@ -191,7 +191,7 @@ public class FullOuterJoinOperatorTest { this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND); } - @Test(expected = InvalidProgramException.class) + @Test public void testFullOuterStrategy5() { this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java index cab06c2dee33d448faa7a465a40ad39f07d03c56..bfcc3e874d2453796645a8cff166ae0a47fb0412 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java @@ -192,7 +192,7 @@ public class LeftOuterJoinOperatorTest { this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND); } - @Test(expected = InvalidProgramException.class) + @Test public void testLeftOuterStrategy5() { this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java index 411edd5d711181d517964e4b7947771d07d87bdf..709d830e5ff8b7f624e4a6a59f629bedf4633058 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java @@ -181,7 +181,7 @@ public class RightOuterJoinOperatorTest { this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE); } - @Test(expected = InvalidProgramException.class) + @Test public void testRightOuterStrategy3() { this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java index 78d61d1e586a4b7b1b5764feb61860d8b59cea82..9cc5e48b9c58ef7c4949c15191aefe1e6bb40a20 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java @@ -202,10 +202,14 @@ public abstract class CostEstimator { break; case HYBRIDHASH_BUILD_FIRST: case RIGHT_HYBRIDHASH_BUILD_FIRST: + case LEFT_HYBRIDHASH_BUILD_FIRST: + case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight); break; case HYBRIDHASH_BUILD_SECOND: case LEFT_HYBRIDHASH_BUILD_SECOND: + case RIGHT_HYBRIDHASH_BUILD_SECOND: + case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight); break; case HYBRIDHASH_BUILD_FIRST_CACHED: diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java index 0784de3b82c61028add617f5cfc8649d937e58a3..7c5c849cd08c7e4db5255ebd167c1faf5682acc9 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java @@ -25,8 +25,12 @@ import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoi import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.operators.AbstractJoinDescriptor; +import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildFirstDescriptor; +import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildSecondDescriptor; +import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildFirstDescriptor; import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor; import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor; +import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildSecondDescriptor; import org.apache.flink.optimizer.operators.OperatorDescriptorDual; import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor; import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor; @@ -99,8 +103,10 @@ public class OuterJoinNode extends TwoInputNode { case BROADCAST_HASH_SECOND: list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false)); break; - case BROADCAST_HASH_FIRST: case REPARTITION_HASH_FIRST: + list.add(new HashLeftOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true)); + break; + case BROADCAST_HASH_FIRST: default: throw new CompilerException("Invalid join hint: " + hint + " for left outer join"); } @@ -124,8 +130,10 @@ public class OuterJoinNode extends TwoInputNode { case BROADCAST_HASH_FIRST: list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false)); break; - case BROADCAST_HASH_SECOND: case REPARTITION_HASH_SECOND: + list.add(new HashRightOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true)); + break; + case BROADCAST_HASH_SECOND: default: throw new CompilerException("Invalid join hint: " + hint + " for right outer join"); } @@ -142,10 +150,14 @@ public class OuterJoinNode extends TwoInputNode { case REPARTITION_SORT_MERGE: list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2)); break; + case REPARTITION_HASH_FIRST: + list.add(new HashFullOuterJoinBuildFirstDescriptor(this.keys1, this.keys2)); + break; case REPARTITION_HASH_SECOND: - case BROADCAST_HASH_SECOND: + list.add(new HashFullOuterJoinBuildSecondDescriptor(this.keys1, this.keys2)); + break; case BROADCAST_HASH_FIRST: - case REPARTITION_HASH_FIRST: + case BROADCAST_HASH_SECOND: default: throw new CompilerException("Invalid join hint: " + hint + " for full outer join"); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java new file mode 100644 index 0000000000000000000000000000000000000000..49852484b617a9ffe420ba1bd28c3a4b146a9d34 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor { + + public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2) { + super(keys1, keys2, false, false, true); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_FIRST; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java new file mode 100644 index 0000000000000000000000000000000000000000..d605a19179af48aa04c7d8906963b80b7548e6bd --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor { + public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2) { + super(keys1, keys2, false, false, true); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_SECOND; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java new file mode 100644 index 0000000000000000000000000000000000000000..ab4e1065911833abd0008ed692536dd7e624daf6 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor{ + public HashLeftOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2, + boolean broadcastSecondAllowed, boolean repartitionAllowed) { + super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.LEFT_HYBRIDHASH_BUILD_FIRST; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new OperatorDescriptorDual.LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java new file mode 100644 index 0000000000000000000000000000000000000000..7bb8f1e972e8c447c25a98d7e4959b5a04a72430 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor { + public HashRightOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2, + boolean broadcastFirstAllowed, boolean repartitionAllowed) { + super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_SECOND; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "RightOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index 9ac2ed62519562904d300064a4cd4d379f3f163f..e034dd6504c24a8edba253d8f638001e076c693e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -95,6 +95,7 @@ public abstract class AbstractCachedBuildSideJoinDriver extends Jo this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); @@ -110,6 +111,7 @@ public abstract class AbstractCachedBuildSideJoinDriver extends Jo this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); } else { @@ -128,6 +130,7 @@ public abstract class AbstractCachedBuildSideJoinDriver extends Jo this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); @@ -143,6 +146,7 @@ public abstract class AbstractCachedBuildSideJoinDriver extends Jo this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java index d01594eef9c363708ef6b29da048773e7ec13173..e43350bdb64581e43ae57928777f2d5877050ae2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java @@ -88,12 +88,20 @@ public enum DriverStrategy { HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), // cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), - - // right outer join, the first input is build side, the second side is probe side of a hybrid hash table + + // right outer join, the first input is build side, the second input is probe side of a hybrid hash table. RIGHT_HYBRIDHASH_BUILD_FIRST(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), - // left outer join, the second input is build side, the first side is probe side of a hybrid hash table + // right outer join, the first input is probe side, the second input is build side of a hybrid hash table. + RIGHT_HYBRIDHASH_BUILD_SECOND(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), + // left outer join, the first input is build side, the second input is probe side of a hybrid hash table. + LEFT_HYBRIDHASH_BUILD_FIRST(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), + // left outer join, the first input is probe side, the second input is build side of a hybrid hash table. LEFT_HYBRIDHASH_BUILD_SECOND(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), - + // full outer join, the first input is build side, the second input is the probe side of a hybrid hash table. + FULL_OUTER_HYBRIDHASH_BUILD_FIRST(FullOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), + // full outer join, the first input is probe side, the second input is the build side of a hybrid hash table. + FULL_OUTER_HYBRIDHASH_BUILD_SECOND(FullOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), + // the second input is inner loop, the first input is outer loop and block-wise processed NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0), // the first input is inner loop, the second input is outer loop and block-wise processed diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java index 2c01fec2e294fd29dc9f0725a1d14b127615da3a..a41a6ec6e664f3c3f311f4733817e037c8cde33f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java @@ -24,6 +24,10 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator; +import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; @@ -66,6 +70,28 @@ public class FullOuterJoinDriver extends AbstractOuterJoinDriver(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); + case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: + return new ReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); default: throw new Exception("Unsupported driver strategy for full outer join driver: " + driverStrategy.name()); } @@ -102,8 +128,30 @@ public class FullOuterJoinDriver extends AbstractOuterJoinDriver(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); + case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: + return new NonReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); default: throw new Exception("Unsupported driver strategy for full outer join driver: " + driverStrategy.name()); } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index c55843a619aaa5d6a85d4051b2c1f0ee8fa3786b..f7ad8d1e3f9e2345d6b72eea1894af416bf63f5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -141,6 +141,7 @@ public class JoinDriver implements Driver implements Driver implements Driver implements Driver extends AbstractOuterJoinDriver(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, + false); case LEFT_HYBRIDHASH_BUILD_SECOND: return new ReusingBuildSecondHashJoinIterator<>(in1, in2, serializer1, comparator1, @@ -77,6 +90,7 @@ public class LeftOuterJoinDriver extends AbstractOuterJoinDriver extends AbstractOuterJoinDriver(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, + false); case LEFT_HYBRIDHASH_BUILD_SECOND: return new NonReusingBuildSecondHashJoinIterator<>(in1, in2, serializer1, comparator1, @@ -123,9 +148,10 @@ public class LeftOuterJoinDriver extends AbstractOuterJoinDriver extends AbstractOuterJoinDriver< this.taskContext.getOwningNepheleTask(), driverMemFraction, true, + false, + false); + case RIGHT_HYBRIDHASH_BUILD_SECOND: + return new ReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, false); default: throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name()); @@ -123,9 +137,21 @@ public class RightOuterJoinDriver extends AbstractOuterJoinDriver< this.taskContext.getOwningNepheleTask(), driverMemFraction, true, + false, + false); + case RIGHT_HYBRIDHASH_BUILD_SECOND: + return new NonReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, false); default: throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name()); } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 0bf4433b490499f14884b4b01a34140d60a9ed48..ce4e8271fc5dc43fe1422f49c3a178d74c0b29af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.flink.runtime.operators.util.BitSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,7 @@ import org.apache.flink.util.MutableObjectIterator; *
  * +----------------------------- Bucket x ----------------------------
  * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) |
  * |
  * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
  * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -67,7 +68,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * |
  * +---------------------------- Bucket x + 1--------------------------
  * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) |
  * |
  * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
  * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -163,7 +164,12 @@ public class MutableHashTable implements MemorySegmentSource {
 	 * Offset of the field in the bucket header that holds the forward pointer to its
 	 * first overflow bucket.
 	 */
-	private static final int HEADER_FORWARD_OFFSET = 4;	
+	private static final int HEADER_FORWARD_OFFSET = 4;
+	
+	/**
+	 * Offset of the field in the bucket header that holds the probed bit set.
+	 */
+	static final int HEADER_PROBED_FLAGS_OFFSET = 12;
 	
 	/**
 	 * Constant for the forward pointer, indicating that the pointer is not set. 
@@ -301,7 +307,8 @@ public class MutableHashTable implements MemorySegmentSource {
 	protected MemorySegment[] buckets;
 
 	/** The bloom filter utility used to transform hash buckets of spilled partitions into a
-	 * probabilistic filter */
+	 * probabilistic filter
+	 */
 	private BloomFilter bloomFilter;
 	
 	/**
@@ -333,11 +340,24 @@ public class MutableHashTable implements MemorySegmentSource {
 	 * If true, build side partitions are kept for multiple probe steps.
 	 */
 	protected boolean keepBuildSidePartitions;
+
+	/**
+	 * BitSet which used to mark whether the element(int build side) has successfully matched during
+	 * probe phase. As there are 9 elements in each bucket, we assign 2 bytes to BitSet.
+	 */
+	private final BitSet probedSet = new BitSet(2);
 	
 	protected boolean furtherPartitioning;
 	
 	private boolean running = true;
 	
+	private boolean buildSideOuterJoin = false;
+	
+	private MutableObjectIterator unmatchedBuildIterator;
+	
+	private boolean probeMatchedPhase = true;
+	
+	private boolean unmatchedBuildVisited = false;
 	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
@@ -422,13 +442,32 @@ public class MutableHashTable implements MemorySegmentSource {
 	
 	/**
 	 * Opens the hash join. This method reads the build-side input and constructs the initial
-	 * hash table, gradually spilling partitions that do not fit into memory. 
-	 * 
+	 * hash table, gradually spilling partitions that do not fit into memory.
+	 *
+	 * @param buildSide Build side input.
+	 * @param probeSide Probe side input.
 	 * @throws IOException Thrown, if an I/O problem occurs while spilling a partition.
 	 */
 	public void open(final MutableObjectIterator buildSide, final MutableObjectIterator probeSide)
-	throws IOException
-	{
+		throws IOException {
+
+		open(buildSide, probeSide, false);
+	}
+	
+	/**
+	 * Opens the hash join. This method reads the build-side input and constructs the initial
+	 * hash table, gradually spilling partitions that do not fit into memory.
+	 *
+	 * @param buildSide      Build side input.
+	 * @param probeSide      Probe side input.
+	 * @param buildOuterJoin Whether outer join on build side.
+	 * @throws IOException Thrown, if an I/O problem occurs while spilling a partition.
+	 */
+	public void open(final MutableObjectIterator buildSide,	final MutableObjectIterator probeSide,
+		boolean buildOuterJoin) throws IOException {
+
+		this.buildSideOuterJoin = buildOuterJoin;
+
 		// sanity checks
 		if (!this.closed.compareAndSet(true, false)) {
 			throw new IllegalStateException("Hash Join cannot be opened, because it is currently not closed.");
@@ -446,12 +485,16 @@ public class MutableHashTable implements MemorySegmentSource {
 		this.probeIterator = new ProbeIterator(probeSide, this.probeSideSerializer.createInstance());
 		
 		// the bucket iterator can remain constant over the time
-		this.bucketIterator = new HashBucketIterator(this.buildSideSerializer, this.recordComparator);
+		this.bucketIterator = new HashBucketIterator(this.buildSideSerializer, this.recordComparator, probedSet, buildOuterJoin);
 	}
 	
 	protected boolean processProbeIter() throws IOException{
 		final ProbeIterator probeIter = this.probeIterator;
 		final TypeComparator probeAccessors = this.probeSideComparator;
+
+		if (!this.probeMatchedPhase) {
+			return false;
+		}
 		
 		PT next;
 		while ((next = probeIter.next()) != null) {
@@ -486,11 +529,39 @@ public class MutableHashTable implements MemorySegmentSource {
 			}
 		}
 		// -------------- partition done ---------------
-		
+
 		return false;
 	}
 	
+	protected boolean processUnmatchedBuildIter() throws IOException  {
+		if (this.unmatchedBuildVisited) {
+			return false;
+		}
+		
+		this.probeMatchedPhase = false;
+		UnmatchedBuildIterator unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
+			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
+		this.unmatchedBuildIterator = unmatchedBuildIter;
+		
+		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
+		if (unmatchedBuildIter.next() == null) {
+			this.unmatchedBuildVisited = true;
+			return false;
+		}
+		
+		unmatchedBuildIter.back();
+		
+		// While visit the unmatched build elements, the probe element is null, and the unmatchedBuildIterator
+		// would iterate all the unmatched build elements, so we return false during the second calling of this method.
+		this.unmatchedBuildVisited = true;
+		return true;
+	}
+	
 	protected boolean prepareNextPartition() throws IOException {
+		
+		this.probeMatchedPhase = true;
+		this.unmatchedBuildVisited = false;
+		
 		// finalize and cleanup the partitions of the current table
 		int buffersAvailable = 0;
 		for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
@@ -551,9 +622,11 @@ public class MutableHashTable implements MemorySegmentSource {
 	}
 	
 	public boolean nextRecord() throws IOException {
-
-		final boolean probeProcessing = processProbeIter();
-		return probeProcessing || prepareNextPartition();
+		if (buildSideOuterJoin) {
+			return processProbeIter() || processUnmatchedBuildIter() || prepareNextPartition();
+		} else {
+			return processProbeIter() || prepareNextPartition();
+		}
 	}
 	
 	public HashBucketIterator getMatchesFor(PT record) throws IOException {
@@ -582,11 +655,19 @@ public class MutableHashTable implements MemorySegmentSource {
 	}
 	
 	public PT getCurrentProbeRecord() {
-		return this.probeIterator.getCurrent();
+		if (this.probeMatchedPhase) {
+			return this.probeIterator.getCurrent();
+		} else {
+			return null;
+		}
 	}
 	
-	public HashBucketIterator getBuildSideIterator() {
-		return this.bucketIterator;
+	public MutableObjectIterator getBuildSideIterator() {
+		if (this.probeMatchedPhase) {
+			return this.bucketIterator;
+		} else {
+			return this.unmatchedBuildIterator;
+		}
 	}
 	
 	/**
@@ -976,7 +1057,10 @@ public class MutableHashTable implements MemorySegmentSource {
 			overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
 			
 			// set the count to one
-			overflowSeg.putShort(overflowBucketOffset + HEADER_COUNT_OFFSET, (short) 1); 
+			overflowSeg.putShort(overflowBucketOffset + HEADER_COUNT_OFFSET, (short) 1);
+
+			// initiate the probed bitset to 0.
+			overflowSeg.putShort(overflowBucketOffset + HEADER_PROBED_FLAGS_OFFSET, (short) 0);
 		}
 	}
 	
@@ -1049,6 +1133,7 @@ public class MutableHashTable implements MemorySegmentSource {
 				seg.put(bucketOffset + HEADER_STATUS_OFFSET, BUCKET_STATUS_IN_MEMORY);
 				seg.putShort(bucketOffset + HEADER_COUNT_OFFSET, (short) 0);
 				seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+				seg.putShort(bucketOffset + HEADER_PROBED_FLAGS_OFFSET, (short) 0);
 			}
 			
 			table[i] = seg;
@@ -1417,14 +1502,19 @@ public class MutableHashTable implements MemorySegmentSource {
 		private MemorySegment originalBucket;
 		
 		private long lastPointer;
+	
+		private BitSet probedSet;
 		
-		
-		HashBucketIterator(TypeSerializer accessor, TypePairComparator comparator) {
+		private boolean isBuildOuterJoin = false;
+	
+		HashBucketIterator(TypeSerializer accessor, TypePairComparator comparator, 
+			BitSet probedSet, boolean isBuildOuterJoin) {
 			this.accessor = accessor;
 			this.comparator = comparator;
+			this.probedSet = probedSet;
+			this.isBuildOuterJoin = isBuildOuterJoin;
 		}
 		
-		
 		void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition partition,
 				int searchHashCode, int bucketInSegmentOffset)
 		{
@@ -1435,26 +1525,24 @@ public class MutableHashTable implements MemorySegmentSource {
 			this.searchHashCode = searchHashCode;
 			this.bucketInSegmentOffset = bucketInSegmentOffset;
 			this.originalBucketInSegmentOffset = bucketInSegmentOffset;
-			
 			this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 			this.numInSegment = 0;
 		}
 		
-
 		public BT next(BT reuse) {
 			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
 			while (true) {
-				
+				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET);
 				while (this.numInSegment < this.countInSegment) {
 					
 					final int thisCode = this.bucket.getInt(this.posInSegment);
 					this.posInSegment += HASH_CODE_LEN;
-						
+					
 					// check if the hash code matches
 					if (thisCode == this.searchHashCode) {
 						// get the pointer to the pair
-						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset + 
+						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
 													BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
 						this.numInSegment++;
 						
@@ -1463,16 +1551,17 @@ public class MutableHashTable implements MemorySegmentSource {
 							this.partition.setReadPosition(pointer);
 							reuse = this.accessor.deserialize(reuse, this.partition);
 							if (this.comparator.equalToReference(reuse)) {
+								if (isBuildOuterJoin) {
+									probedSet.set(numInSegment - 1);
+								}
 								this.lastPointer = pointer;
 								return reuse;
 							}
-						}
-						catch (IOException ioex) {
+						} catch (IOException ioex) {
 							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
 									ioex.getMessage(), ioex);
 						}
-					}
-					else {
+					} else {
 						this.numInSegment++;
 					}
 				}
@@ -1495,7 +1584,7 @@ public class MutableHashTable implements MemorySegmentSource {
 		public BT next() {
 			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
 			while (true) {
-
+				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET);
 				while (this.numInSegment < this.countInSegment) {
 
 					final int thisCode = this.bucket.getInt(this.posInSegment);
@@ -1505,7 +1594,7 @@ public class MutableHashTable implements MemorySegmentSource {
 					if (thisCode == this.searchHashCode) {
 						// get the pointer to the pair
 						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
-								BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
+							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
 						this.numInSegment++;
 
 						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
@@ -1513,16 +1602,17 @@ public class MutableHashTable implements MemorySegmentSource {
 							this.partition.setReadPosition(pointer);
 							BT result = this.accessor.deserialize(this.partition);
 							if (this.comparator.equalToReference(result)) {
+								if (isBuildOuterJoin) {
+									probedSet.set(numInSegment - 1);
+								}
 								this.lastPointer = pointer;
 								return result;
 							}
-						}
-						catch (IOException ioex) {
+						} catch (IOException ioex) {
 							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
-									ioex.getMessage(), ioex);
+								ioex.getMessage(), ioex);
 						}
-					}
-					else {
+					} else {
 						this.numInSegment++;
 					}
 				}
@@ -1558,7 +1648,227 @@ public class MutableHashTable implements MemorySegmentSource {
 		}
 
 	} // end HashBucketIterator
+	
+	/**
+	 * Iterate all the elements in memory which has not been probed during probe phase.
+	 */
+	public static class UnmatchedBuildIterator implements MutableObjectIterator {
+	
+		private final TypeSerializer accessor;
+	
+		private final long totalBucketNumber;
+		
+		private final int bucketsPerSegmentBits;
+		
+		private final int bucketsPerSegmentMask;
+		
+		private final MemorySegment[] buckets;
+		
+		private final ArrayList> partitionsBeingBuilt;
+		
+		private final BitSet probedSet;
+		
+		private MemorySegment bucketSegment;
+		
+		private MemorySegment[] overflowSegments;
+		
+		private HashPartition partition;
+		
+		private int scanCount;
+		
+		private int bucketInSegmentOffset;
+		
+		private int countInSegment;
+		
+		private int numInSegment;
+		
+		UnmatchedBuildIterator(
+			TypeSerializer accessor,
+			long totalBucketNumber,
+			int bucketsPerSegmentBits,
+			int bucketsPerSegmentMask,
+			MemorySegment[] buckets,
+			ArrayList> partitionsBeingBuilt,
+			BitSet probedSet) {
+			
+			this.accessor = accessor;
+			this.totalBucketNumber = totalBucketNumber;
+			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
+			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
+			this.buckets = buckets;
+			this.partitionsBeingBuilt = partitionsBeingBuilt;
+			this.probedSet = probedSet;
+			init();
+		}
+		
+		private void init() {
+			scanCount = -1;
+			while (!moveToNextBucket()) {
+				if (scanCount >= totalBucketNumber) {
+					break;
+				}
+			}
+		}
+		
+		public BT next(BT reuse) {
+			// search unprobed record in bucket, while found none, move to next bucket and search.
+			while (true) {
+				BT result = nextInBucket(reuse);
+				if (result == null) {
+					if (!moveToNextOnHeapBucket()) {
+						return null;
+					}
+				} else {
+					return result;
+				}
+			}
+		}
+		
+		public BT next() {
+			// search unProbed record in bucket, while found none, move to next bucket and search.
+			while (true) {
+				BT result = nextInBucket();
+				if (result == null) {
+					// return null while there is no more bucket.
+					if (!moveToNextOnHeapBucket()) {
+						return null;
+					}
+				} else {
+					return result;
+				}
+			}
+		}
+
+		/**
+		 * Loop to make sure that it would move to next on heap bucket, return true while move to a on heap bucket,
+		 * return false if there is no more bucket.
+		 */
+		private boolean moveToNextOnHeapBucket() {
+			while (!moveToNextBucket()) {
+				if (scanCount >= totalBucketNumber) {
+					return false;
+				}
+			}
+			return true;
+		}
+
+		/**
+		 * Move to next bucket, return true while move to a on heap bucket, return false while move to a spilled bucket
+		 * or there is no more bucket.
+		 */
+		private boolean moveToNextBucket() {
+			scanCount++;
+			if (scanCount > totalBucketNumber - 1) {
+				return false;
+			}
+			// move to next bucket, update all the current bucket status with new bucket information.
+			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
+			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+			MemorySegment currentBucket = this.buckets[bucketArrayPos];
+			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+			final HashPartition p = this.partitionsBeingBuilt.get(partitionNumber);
+			if (p.isInMemory()) {
+				setBucket(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
+				return true;
+			} else {
+				return false;
+			}
+		}
+	
+		// update current bucket status.
+		private void setBucket(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition partition,
+			int bucketInSegmentOffset) {
+			this.bucketSegment = bucket;
+			this.overflowSegments = overflowSegments;
+			this.partition = partition;
+			this.bucketInSegmentOffset = bucketInSegmentOffset;
+			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
+			this.numInSegment = 0;
+			// reset probedSet with probedFlags offset in this bucket.
+			this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET);
+		}
+	
+		private BT nextInBucket(BT reuse) {
+			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+			while (true) {
+				while (this.numInSegment < this.countInSegment) {
+					boolean probed = probedSet.get(numInSegment);
+					if (!probed) {
+						final long pointer = this.bucketSegment.getLong(this.bucketInSegmentOffset +
+							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
+						try {
+							this.partition.setReadPosition(pointer);
+							reuse = this.accessor.deserialize(reuse, this.partition);
+							this.numInSegment++;
+							return reuse;
+						} catch (IOException ioex) {
+							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
+								ioex.getMessage(), ioex);
+						}
+					} else {
+						this.numInSegment++;
+					}
+				}
 
+				// this segment is done. check if there is another chained bucket
+				final long forwardPointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
+				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+					return null;
+				}
+
+				final int overflowSegNum = (int) (forwardPointer >>> 32);
+				this.bucketSegment = this.overflowSegments[overflowSegNum];
+				this.bucketInSegmentOffset = (int) forwardPointer;
+				this.countInSegment = this.bucketSegment.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
+				this.numInSegment = 0;
+				// reset probedSet with probedFlags offset in this bucket.
+				this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET);
+			}
+		}
+	
+		private BT nextInBucket() {
+			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+			while (true) {
+				while (this.numInSegment < this.countInSegment) {
+					boolean probed = probedSet.get(numInSegment);
+					if (!probed) {
+						final long pointer = this.bucketSegment.getLong(this.bucketInSegmentOffset +
+							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
+						try {
+							this.partition.setReadPosition(pointer);
+							BT result = this.accessor.deserialize(this.partition);
+							this.numInSegment++;
+							return result;
+						} catch (IOException ioex) {
+							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
+								ioex.getMessage(), ioex);
+						}
+					} else {
+						this.numInSegment++;
+					}
+				}
+	
+				// this segment is done. check if there is another chained bucket
+				final long forwardPointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
+				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+					return null;
+				}
+
+				final int overflowSegNum = (int) (forwardPointer >>> 32);
+				this.bucketSegment = this.overflowSegments[overflowSegNum];
+				this.bucketInSegmentOffset = (int) forwardPointer;
+				this.countInSegment = this.bucketSegment.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
+				this.numInSegment = 0;
+				// reset probedSet with probedFlags offset in this bucket.
+				this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET);
+			}
+		}
+		
+		public void back() {
+			this.numInSegment--;
+		}
+	}
+	
 	// ======================================================================================================
 	
 	public static final class ProbeIterator {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
index 3b940c2238b3c3ae8ab93c89593c2310432a748f..2c21619ab9a5d17d751b78edef9fba4311b8750e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
@@ -53,7 +53,9 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte
 
 	private final MutableObjectIterator secondInput;
 
-	private final boolean joinWithEmptyBuildSide;
+	private final boolean probeSideOuterJoin;
+
+	private final boolean buildSideOuterJoin;
 
 	private volatile boolean running = true;
 
@@ -70,7 +72,8 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte
 			MemoryManager memManager, IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		this.memManager = memManager;
@@ -78,10 +81,11 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer2;
 
-		if(useBitmapFilters && joinWithEmptyBuildSide) {
+		if(useBitmapFilters && probeSideOuterJoin) {
 			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
 		}
-		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		this.probeSideOuterJoin = probeSideOuterJoin;
+		this.buildSideOuterJoin = buildSideOuterJoin;
 
 		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
 				pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
@@ -91,7 +95,7 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte
 	
 	@Override
 	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.firstInput, this.secondInput);
+		this.hashJoin.open(this.firstInput, this.secondInput, this.buildSideOuterJoin);
 	}
 	
 
@@ -112,14 +116,14 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte
 		if (this.hashJoin.nextRecord())
 		{
 			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V1 nextBuildSideRecord;
-			
+			final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
+			final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+			V1 nextBuildSideRecord = buildSideIterator.next();
+
 			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
+			if (probeRecord != null && nextBuildSideRecord != null) {
 				V1 tmpRec;
-				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
+
 				// check if there is another build-side value
 				if ((tmpRec = buildSideIterator.next()) != null) {
 					// more than one build-side value --> copy the probe side
@@ -144,13 +148,23 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte
 					// only single pair matches
 					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
 				}
-			}
-			else if(joinWithEmptyBuildSide) {
-				// build side is empty, join with null
-				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+			} else {
+				// while probe side outer join, join current probe record with null.
+				if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
+					matchFunction.join(null, probeRecord, collector);
+				}
 
-				matchFunction.join(null, probeRecord, collector);
+				// while build side outer join, iterate all build records which have not been probed before,
+				// and join with null.
+				if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
+					matchFunction.join(nextBuildSideRecord, null, collector);
+
+					while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
+						matchFunction.join(nextBuildSideRecord, null, collector);
+					}
+				}
 			}
+
 			return true;
 		}
 		else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
index 77521af9139de776213ebb6bcd67c9b66bf9dc4a..1c193d15c07271d7928c6db23bf1b300c76e3f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
@@ -49,12 +49,13 @@ public class NonReusingBuildFirstReOpenableHashJoinIterator extends N
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+				memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
 		
 		reopenHashTable = (ReOpenableMutableHashTable) hashJoin;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
index 9ea0b743f7d3b3578a4a049d834a6504e5d60b1e..2ac22aea4c84ba51a982994f1ca6b948b82ae313 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
@@ -52,7 +52,9 @@ public class NonReusingBuildSecondHashJoinIterator extends HashJoinIt
 
 	private final MutableObjectIterator secondInput;
 
-	private final boolean joinWithEmptyBuildSide;
+	private final boolean buildSideOuterJoin;
+
+	private final boolean probeSideOuterJoin;
 
 	private volatile boolean running = true;
 
@@ -69,7 +71,8 @@ public class NonReusingBuildSecondHashJoinIterator extends HashJoinIt
 			MemoryManager memManager, IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		this.memManager = memManager;
@@ -77,20 +80,21 @@ public class NonReusingBuildSecondHashJoinIterator extends HashJoinIt
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer1;
 
-		if(useBitmapFilters && joinWithEmptyBuildSide) {
+		if(useBitmapFilters && probeSideOuterJoin) {
 			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
 		}
-		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		this.probeSideOuterJoin = probeSideOuterJoin;
+		this.buildSideOuterJoin = buildSideOuterJoin;
 		
 		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
 				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.secondInput, this.firstInput);
+		this.hashJoin.open(this.secondInput, this.firstInput, buildSideOuterJoin);
 	}
 
 	@Override
@@ -110,14 +114,14 @@ public class NonReusingBuildSecondHashJoinIterator extends HashJoinIt
 		if (this.hashJoin.nextRecord())
 		{
 			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V2 nextBuildSideRecord;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
+			final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
+
+			final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+			V2 nextBuildSideRecord = buildSideIterator.next();
+
+			if (probeRecord != null && nextBuildSideRecord != null) {
 				V2 tmpRec;
-				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
+
 				// check if there is another build-side value
 				if ((tmpRec = buildSideIterator.next()) != null) {
 					// more than one build-side value --> copy the probe side
@@ -142,13 +146,22 @@ public class NonReusingBuildSecondHashJoinIterator extends HashJoinIt
 					// only single pair matches
 					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
 				}
-			}
-			else if(joinWithEmptyBuildSide) {
-				// build side is empty, join with null
-				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+			} else {
+				// while probe side outer join, join current probe record with null.
+				if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
+					matchFunction.join(probeRecord, null, collector);
+				}
 
-				matchFunction.join(probeRecord, null, collector);
+				// while build side outer join, iterate all build records which have not been probed before,
+				// and join with null.
+				if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
+					matchFunction.join(null, nextBuildSideRecord, collector);
+					while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
+						matchFunction.join(null, nextBuildSideRecord, collector);
+					}
+				}
 			}
+
 			return true;
 		}
 		else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
index c9c9165efa507b58b8ff3005a392254c417d03c4..c9d7d0d2ddb559d9433d33f16c4ad1d4e405bc4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
@@ -49,12 +49,13 @@ public class NonReusingBuildSecondReOpenableHashJoinIterator extends
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+				memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
 		
 		reopenHashTable = (ReOpenableMutableHashTable) hashJoin;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index b7a72622e2ba96d0d43b3a8df2202350f257078c..b9ddff8d6a0920bdd3e5167304e0cb9993c78eaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -63,8 +63,8 @@ public class ReOpenableMutableHashTable extends MutableHashTable
 	}
 	
 	@Override
-	public void open(MutableObjectIterator buildSide, MutableObjectIterator probeSide) throws IOException {
-		super.open(buildSide, probeSide);
+	public void open(MutableObjectIterator buildSide, MutableObjectIterator probeSide, boolean buildSideOuterJoin) throws IOException {
+		super.open(buildSide, probeSide, buildSideOuterJoin);
 		initialPartitions = new ArrayList>( partitionsBeingBuilt );
 		initialPartitionFanOut = (byte) partitionsBeingBuilt.size();
 		initialBucketCount = this.numBuckets;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
index c1e601d241e936334719c69e9b09af7b69c404be..78e0ab6bc6886590448b09d9fb7c53e860eb458a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
@@ -56,7 +56,9 @@ public class ReusingBuildFirstHashJoinIterator extends HashJoinIterat
 	
 	private final MutableObjectIterator secondInput;
 
-	private final boolean joinWithEmptyBuildSide;
+	private final boolean probeSideOuterJoin;
+
+	private final boolean buildSideOuterJoin;
 	
 	private volatile boolean running = true;
 	
@@ -74,7 +76,8 @@ public class ReusingBuildFirstHashJoinIterator extends HashJoinIterat
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		this.memManager = memManager;
@@ -82,10 +85,11 @@ public class ReusingBuildFirstHashJoinIterator extends HashJoinIterat
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer2;
 
-		if(useBitmapFilters && joinWithEmptyBuildSide) {
+		if(useBitmapFilters && probeSideOuterJoin) {
 			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
 		}
-		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		this.probeSideOuterJoin = probeSideOuterJoin;
+		this.buildSideOuterJoin = buildSideOuterJoin;
 		
 		this.nextBuildSideObject = serializer1.createInstance();
 		this.tempBuildSideRecord = serializer1.createInstance();
@@ -98,7 +102,7 @@ public class ReusingBuildFirstHashJoinIterator extends HashJoinIterat
 	
 	@Override
 	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.firstInput, this.secondInput);
+		this.hashJoin.open(this.firstInput, this.secondInput, buildSideOuterJoin);
 	}
 	
 
@@ -119,40 +123,34 @@ public class ReusingBuildFirstHashJoinIterator extends HashJoinIterat
 		if (this.hashJoin.nextRecord())
 		{
 			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V1 nextBuildSideRecord = this.nextBuildSideObject;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
-				V1 tmpRec = this.tempBuildSideRecord;
-				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
-				// check if there is another build-side value
-				if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
+			final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
 
-					// call match on the first pair
+			final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+			V1 nextBuildSideRecord = buildSideIterator.next(this.nextBuildSideObject);
+
+			if (probeRecord != null && nextBuildSideRecord != null) {
+				matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+
+				while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
 					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-					
-					// call match on the second pair
-					matchFunction.join(tmpRec, probeRecord, collector);
-					
+				}
+			} else {
+				if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
+					matchFunction.join(null, probeRecord, collector);
+				}
+
+				if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
+					// call match on the first pair
+					matchFunction.join(nextBuildSideRecord, null, collector);
+
 					while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
 						// call match on the next pair
 						// make sure we restore the value of the probe side record
-						matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+						matchFunction.join(nextBuildSideRecord, null, collector);
 					}
 				}
-				else {
-					// only single pair matches
-					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-				}
 			}
-			else if(joinWithEmptyBuildSide) {
-				// build side is empty, join with null
-				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
 
-				matchFunction.join(null, probeRecord, collector);
-			}
 			return true;
 		}
 		else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
index 1cc3f9124e880854c1766be15d708e4f958a99dc..5e29bc58fa1c29151f430eb0d2a41ef8f0e8f4fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
@@ -49,13 +49,14 @@ public class ReusingBuildFirstReOpenableHashJoinIterator extends Reus
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters)
 		throws MemoryAllocationException
 	{
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+				memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
 		reopenHashTable = (ReOpenableMutableHashTable) hashJoin;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
index 4402665499c049d04f3046c067678467cd845daa..fc9788c85d7ddd744310370f4605260744b3d811 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
@@ -56,7 +56,9 @@ public class ReusingBuildSecondHashJoinIterator extends HashJoinItera
 	
 	private final MutableObjectIterator secondInput;
 
-	private final boolean joinWithEmptyBuildSide;
+	private final boolean probeSideOuterJoin;
+
+	private final boolean buildSideOuterJoin;
 	
 	private volatile boolean running = true;
 	
@@ -74,7 +76,8 @@ public class ReusingBuildSecondHashJoinIterator extends HashJoinItera
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		this.memManager = memManager;
@@ -82,10 +85,11 @@ public class ReusingBuildSecondHashJoinIterator extends HashJoinItera
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer1;
 
-		if(useBitmapFilters && joinWithEmptyBuildSide) {
+		if(useBitmapFilters && probeSideOuterJoin) {
 			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
 		}
-		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		this.probeSideOuterJoin = probeSideOuterJoin;
+		this.buildSideOuterJoin = buildSideOuterJoin;
 		
 		this.nextBuildSideObject = serializer2.createInstance();
 		this.tempBuildSideRecord = serializer2.createInstance();
@@ -98,7 +102,7 @@ public class ReusingBuildSecondHashJoinIterator extends HashJoinItera
 	
 	@Override
 	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.secondInput, this.firstInput);
+		this.hashJoin.open(this.secondInput, this.firstInput, buildSideOuterJoin);
 	}
 
 	@Override
@@ -118,39 +122,29 @@ public class ReusingBuildSecondHashJoinIterator extends HashJoinItera
 		if (this.hashJoin.nextRecord())
 		{
 			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V2 nextBuildSideRecord = this.nextBuildSideObject;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
-				V2 tmpRec = this.tempBuildSideRecord;
-				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
-				// check if there is another build-side value
-				if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
-					// call match on the first pair
+			final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator();
+			final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+			V2 nextBuildSideRecord = buildSideIterator.next(this.nextBuildSideObject);
+
+			if (probeRecord != null && nextBuildSideRecord != null) {
+				matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+
+				while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
 					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-					
-					// call match on the second pair
-					matchFunction.join(probeRecord, tmpRec, collector);
-					
+				}
+			} else {
+				if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
+					matchFunction.join(probeRecord, null, collector);
+				}
+
+				if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
+					matchFunction.join(null, nextBuildSideRecord, collector);
 					while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
-						// call match on the next pair
-						// make sure we restore the value of the probe side record
-						matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+						matchFunction.join(null, nextBuildSideRecord, collector);
 					}
 				}
-				else {
-					// only single pair matches
-					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-				}
 			}
-			else if(joinWithEmptyBuildSide) {
-				// build side is empty, join with null
-				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
 
-				matchFunction.join(probeRecord, null, collector);
-			}
 			return true;
 		}
 		else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
index 190398fde82f378b6c0d86ab1fc8f680f8a7cfdc..e603ea8597b5b787590fa0a87b48eb7003ba2ff4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
@@ -49,12 +49,13 @@ public class ReusingBuildSecondReOpenableHashJoinIterator extends Reu
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
 			double memoryFraction,
-			boolean joinWithEmptyBuildSide,
+			boolean probeSideOuterJoin,
+			boolean buildSideOuterJoin,
 			boolean useBitmapFilters) throws MemoryAllocationException {
 		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+				memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
 		
 		reopenHashTable = (ReOpenableMutableHashTable) hashJoin;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
new file mode 100644
index 0000000000000000000000000000000000000000..60547279549963a8dedf238b5acce241ae246f11
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.memory.MemorySegment;
+
+public class BitSet {
+	private MemorySegment memorySegment;
+
+	// MemorySegment byte array offset.
+	private int offset;
+
+	// The BitSet byte size.
+	private int byteLength;
+
+	// The BitSet bit size.
+	private int bitLength;
+
+	private final int BYTE_POSITION_MASK = 0xfffffff8;
+	private final int BYTE_INDEX_MASK = 0x00000007;
+
+	public BitSet(int byteSize) {
+		Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0.");
+		this.byteLength = byteSize;
+		this.bitLength = byteSize << 3;
+	}
+
+	public void setMemorySegment(MemorySegment memorySegment, int offset) {
+		Preconditions.checkArgument(memorySegment != null, "MemorySegment can not be null.");
+		Preconditions.checkArgument(offset >= 0, "Offset should be positive integer.");
+		Preconditions.checkArgument(offset + byteLength <= memorySegment.size(), 
+			"Could not set MemorySegment, the remain buffers is not enough.");
+		this.memorySegment = memorySegment;
+		this.offset = offset;
+	}
+
+	/**
+	 * Sets the bit at specified index.
+	 *
+	 * @param index - position
+	 */
+	public void set(int index) {
+		Preconditions.checkArgument(index < bitLength && index >= 0, 
+			String.format("Input Index[%d] is larger than BitSet available size[%d].", index, bitLength));
+
+		int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+		byte current = memorySegment.get(offset + byteIndex);
+		current |= (1 << (index & BYTE_INDEX_MASK));
+		memorySegment.put(offset + byteIndex, current);
+	}
+
+	/**
+	 * Returns true if the bit is set in the specified index.
+	 *
+	 * @param index - position
+	 * @return - value at the bit position
+	 */
+	public boolean get(int index) {
+		Preconditions.checkArgument(index < bitLength && index >= 0,
+			String.format("Input Index[%d] is larger than BitSet available size[%d].", index, bitLength));
+		
+		int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+		byte current = memorySegment.get(offset + byteIndex);
+		return (current & (1 << (index & BYTE_INDEX_MASK))) != 0;
+	}
+
+	/**
+	 * Number of bits
+	 */
+	public int bitSize() {
+		return bitLength;
+	}
+
+	/**
+	 * Clear the bit set.
+	 */
+	public void clear() {
+		for (int i = 0; i < byteLength; i++) {
+			memorySegment.put(offset + i, (byte) 0);
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder output = new StringBuilder();
+		output.append("BitSet:\n");
+		output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
+		output.append("\tOffset:").append(offset).append("\n");
+		output.append("\tLength:").append(byteLength).append("\n");
+		return output.toString();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 4afa11434c92a5ad9491dbed54a16c1d08700ebe..ed709c0376113481fe7fd328e8ccc5641b0798ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -172,7 +172,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -222,7 +222,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -279,7 +279,7 @@ public class HashTableITCase {
 			
 			int key = 0;
 			
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			if ((record = buildSide.next(recordReuse)) != null) {
 				numBuildValues = 1;
 				key = record.getField(0, IntValue.class).getValue();
@@ -392,7 +392,7 @@ public class HashTableITCase {
 			final Record probeRec = join.getCurrentProbeRecord();
 			int key = probeRec.getField(0, IntValue.class).getValue();
 			
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			if ((record = buildSide.next(recordReuse)) != null) {
 				numBuildValues = 1;
 				Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
@@ -505,7 +505,7 @@ public class HashTableITCase {
 			final Record probeRec = join.getCurrentProbeRecord();
 			int key = probeRec.getField(0, IntValue.class).getValue();
 			
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			if ((record = buildSide.next(recordReuse)) != null) {
 				numBuildValues = 1;
 				Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); 
@@ -608,7 +608,7 @@ public class HashTableITCase {
 
 		try {
 			while (join.nextRecord()) {	
-				HashBucketIterator buildSide = join.getBuildSideIterator();
+				MutableObjectIterator buildSide = join.getBuildSideIterator();
 				if (buildSide.next(recordReuse) == null) {
 					fail("No build side values found for a probe key.");
 				}
@@ -666,7 +666,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -715,7 +715,7 @@ public class HashTableITCase {
 		* NUM_PROBE_VALS;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -770,7 +770,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -819,7 +819,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -879,7 +879,7 @@ public class HashTableITCase {
 			
 			int key = 0;
 			
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			if ((record = buildSide.next(recordReuse)) != null) {
 				numBuildValues = 1;
 				key = record.getKey();
@@ -995,7 +995,7 @@ public class HashTableITCase {
 			final IntPair probeRec = join.getCurrentProbeRecord();
 			int key = probeRec.getKey();
 			
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			if ((record = buildSide.next(recordReuse)) != null) {
 				numBuildValues = 1;
 				Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getKey()); 
@@ -1107,7 +1107,7 @@ public class HashTableITCase {
 			final IntPair probeRec = join.getCurrentProbeRecord();
 			int key = probeRec.getKey();
 			
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			if ((record = buildSide.next(recordReuse)) != null) {
 				numBuildValues = 1;
 				Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getKey()); 
@@ -1210,7 +1210,7 @@ public class HashTableITCase {
 		try {
 			while (join.nextRecord())
 			{	
-				HashBucketIterator buildSide = join.getBuildSideIterator();
+				MutableObjectIterator buildSide = join.getBuildSideIterator();
 				if (buildSide.next(recordReuse) == null) {
 					fail("No build side values found for a probe key.");
 				}
@@ -1267,7 +1267,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1316,7 +1316,7 @@ public class HashTableITCase {
 		* NUM_PROBE_VALS;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1363,7 +1363,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1386,7 +1386,7 @@ public class HashTableITCase {
 		numRecordsInJoinResult = 0;
 		
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1439,7 +1439,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1462,7 +1462,7 @@ public class HashTableITCase {
 		numRecordsInJoinResult = 0;
 
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1524,7 +1524,7 @@ public class HashTableITCase {
 		int numRecordsInJoinResult = 0;
 
 		while (join.nextRecord()) {
-			HashBucketIterator buildSide = join.getBuildSideIterator();
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
 			while (buildSide.next(recordReuse) != null) {
 				numRecordsInJoinResult++;
 			}
@@ -1534,6 +1534,106 @@ public class HashTableITCase {
 		join.close();
 		this.memManager.release(join.getFreedMemory());
 	}
+
+	@Test
+	public void testHashWithBuildSideOuterJoin1() throws Exception {
+		final int NUM_KEYS = 20000;
+		final int BUILD_VALS_PER_KEY = 1;
+		final int PROBE_VALS_PER_KEY = 1;
+
+		// create a build input that gives 40000 pairs with 1 values sharing the same key
+		MutableObjectIterator buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+		// create a probe input that gives 20000 pairs with 1 values sharing a key
+		MutableObjectIterator probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+		// allocate the memory for the HashTable
+		List memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+		final MutableHashTable join = new MutableHashTable(
+			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+			memSegments, ioManager);
+		join.open(buildInput, probeInput, true);
+
+		final IntPair recordReuse = new IntPair();
+		int numRecordsInJoinResult = 0;
+
+		while (join.nextRecord()) {
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+		join.close();
+		this.memManager.release(join.getFreedMemory());
+	}
+	
+	@Test
+	public void testHashWithBuildSideOuterJoin2() throws Exception {
+		final int NUM_KEYS = 40000;
+		final int BUILD_VALS_PER_KEY = 2;
+		final int PROBE_VALS_PER_KEY = 1;
+		
+		// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
+		// after probe phase, make sure build side outer join works well in this case.
+		
+		// create a build input that gives 80000 pairs with 2 values sharing the same key
+		MutableObjectIterator buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		
+		// create a probe input that gives 40000 pairs with 1 values sharing a key
+		MutableObjectIterator probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+		
+		// allocate the memory for the HashTable
+		List memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+		
+		// ----------------------------------------------------------------------------------------
+		
+		final MutableHashTable join = new MutableHashTable(
+			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+			memSegments, ioManager);
+		join.open(buildInput, probeInput, true);
+		
+		final IntPair recordReuse = new IntPair();
+		int numRecordsInJoinResult = 0;
+		
+		while (join.nextRecord()) {
+			MutableObjectIterator buildSide = join.getBuildSideIterator();
+			IntPair next = buildSide.next(recordReuse);
+			if (next == null && join.getCurrentProbeRecord() == null) {
+				fail("Should not return join result that both probe and build element are null.");
+			}
+			while (next != null) {
+				numRecordsInJoinResult++;
+				next = buildSide.next(recordReuse);
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+		
+		join.close();
+		this.memManager.release(join.getFreedMemory());
+	}
 	
 	// ============================================================================================
 	
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 6ef6d47f2a0a427f13858d9e7afb67c982d42fb8..7c385fc661a1c36f6890ff65c91e2753f4ef5f2f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -131,7 +131,7 @@ public class HashTableTest {
 			
 			try {
 				while (table.nextRecord()) {
-					MutableHashTable.HashBucketIterator, Long> matches = table.getBuildSideIterator();
+					MutableObjectIterator> matches = table.getBuildSideIterator();
 					while (matches.next() != null);
 				}
 			}
@@ -240,7 +240,7 @@ public class HashTableTest {
 				new ByteArrayIterator(1, 128,(byte) 1)));
 
 		while(table.nextRecord()) {
-			MutableHashTable.HashBucketIterator iterator = table.getBuildSideIterator();
+			MutableObjectIterator iterator = table.getBuildSideIterator();
 
 			int counter = 0;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
index aff78135e66725e3d89f175f947cbc016dfde261..cc5c4726f58bb645987322db1dde9777e70ae3ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java
@@ -147,7 +147,7 @@ public class NonReusingHashJoinIteratorITCase {
 					new NonReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -234,7 +234,7 @@ public class NonReusingHashJoinIteratorITCase {
 					new NonReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 
 			iterator.open();
 			
@@ -283,7 +283,7 @@ public class NonReusingHashJoinIteratorITCase {
 				new NonReusingBuildSecondHashJoinIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 
 			iterator.open();
 			
@@ -370,7 +370,7 @@ public class NonReusingHashJoinIteratorITCase {
 				new NonReusingBuildSecondHashJoinIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -417,7 +417,7 @@ public class NonReusingHashJoinIteratorITCase {
 					new NonReusingBuildSecondHashJoinIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -464,7 +464,7 @@ public class NonReusingHashJoinIteratorITCase {
 					new NonReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -484,43 +484,43 @@ public class NonReusingHashJoinIteratorITCase {
 			Assert.fail("An exception occurred during the test: " + e.getMessage());
 		}
 	}
-
+	
 	@Test
-	public void testBuildFirstJoinWithEmptyBuild() {
+	public void testBuildFirstAndProbeSideOuterJoin() {
 		try {
 			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
+	
 			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
 			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
+	
 			// collect expected data
 			final Map> expectedMatchesMap = rightOuterJoinTuples(
 					collectTupleData(input1),
 					collectTupleData(input2));
-
+	
 			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
 			final Collector> collector = new DiscardingOutputCollector<>();
-
+	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 			input1.reset();
 			input2.reset();
-
+	
 			// compare with iterator values
 			NonReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator =
 					new NonReusingBuildFirstHashJoinIterator<>(
 							input1, input2, this.recordSerializer, this.record1Comparator,
 							this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-							this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
-
+							this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
+	
 			iterator.open();
-
+	
 			while (iterator.callWithNextKey(matcher, collector));
-
+	
 			iterator.close();
-
+	
 			// assert that each expected match was seen
 			for (Entry> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
@@ -533,43 +533,190 @@ public class NonReusingHashJoinIteratorITCase {
 			Assert.fail("An exception occurred during the test: " + e.getMessage());
 		}
 	}
-
+	
+	@Test
+	public void testBuildFirstAndBuildSideOuterJoin() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = leftOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator =
+				new NonReusingBuildFirstHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
 	@Test
-	public void testBuildSecondJoinWithEmptyBuild() {
+	public void testBuildFirstAndFullOuterJoin() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = fullOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator =
+				new NonReusingBuildFirstHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondAndProbeSideOuterJoin() {
 		try {
 			TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
+	
 			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
 			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
+	
 			// collect expected data
 			final Map> expectedMatchesMap = leftOuterJoinTuples(
 					collectTupleData(input1),
 					collectTupleData(input2));
-
+	
 			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
 			final Collector> collector = new DiscardingOutputCollector<>();
-
+	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 			input1.reset();
 			input2.reset();
-
+	
 			// compare with iterator values
 			NonReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator =
 					new NonReusingBuildSecondHashJoinIterator<>(
 							input1, input2, this.recordSerializer, this.record1Comparator,
 							this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-							this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
-
+							this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
+	
 			iterator.open();
-
+	
 			while (iterator.callWithNextKey(matcher, collector));
-
+	
 			iterator.close();
-
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondAndBuildSideOuterJoin() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = rightOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator =
+				new NonReusingBuildSecondHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
 			// assert that each expected match was seen
 			for (Entry> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
@@ -583,6 +730,55 @@ public class NonReusingHashJoinIteratorITCase {
 		}
 	}
 	
+	@Test
+	public void testBuildSecondAndFullOuterJoin() {
+		try {
+			TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = fullOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator =
+				new NonReusingBuildSecondHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+		
 	// --------------------------------------------------------------------------------------------
 	//                                    Utilities
 	// --------------------------------------------------------------------------------------------
@@ -680,6 +876,53 @@ public class NonReusingHashJoinIteratorITCase {
 
 		return map;
 	}
+
+	public static Map> fullOuterJoinTuples(
+		Map> leftMap,
+		Map> rightMap)
+	{
+		Map> map = new HashMap<>();
+
+		for (Integer key : rightMap.keySet()) {
+			Collection leftValues = leftMap.get(key);
+			Collection rightValues = rightMap.get(key);
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList());
+			}
+
+			Collection matchedValues = map.get(key);
+
+			for (String rightValue : rightValues) {
+				if(leftValues != null) {
+					for (String leftValue : leftValues) {
+						matchedValues.add(new TupleMatch(leftValue, rightValue));
+					}
+				}
+				else {
+					matchedValues.add(new TupleMatch(null, rightValue));
+				}
+			}
+		}
+
+		for (Integer key : leftMap.keySet()) {
+			Collection leftValues = leftMap.get(key);
+			Collection rightValues = rightMap.get(key);
+			if (rightValues == null) {
+				if (!map.containsKey(key)) {
+					map.put(key, new ArrayList());
+				}
+
+				Collection matchedValues = map.get(key);
+
+				for (String leftValue : leftValues) {
+					matchedValues.add(new TupleMatch(leftValue, null));
+				}
+			}
+		}
+
+		return map;
+	}
 	
 	public static Map> joinIntPairs(
 			Map> leftMap,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 814d0ed13724c7a3d1de00bbdbf90bae79b5fbde..576cbd4f53372c8473bcaedb0c89a23780c8c038 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
 import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
 import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
@@ -228,7 +227,7 @@ public class NonReusingReOpenableHashTableITCase {
 				new NonReusingBuildFirstReOpenableHashJoinIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator,
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 
 		iterator.open();
 		// do first join with both inputs
@@ -342,8 +341,8 @@ public class NonReusingReOpenableHashTableITCase {
 
 				final Tuple2 probeRec = join.getCurrentProbeRecord();
 				Integer key = probeRec.f0;
-
-				HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator();
+				
+				MutableObjectIterator> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
 					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
@@ -456,8 +455,8 @@ public class NonReusingReOpenableHashTableITCase {
 
 				final Tuple2 probeRec = join.getCurrentProbeRecord();
 				Integer key = probeRec.f0;
-
-				HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator();
+				
+				MutableObjectIterator> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
 					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
index af814c3b173328a229ee6b4bf5bcfd2cae79bf8e..1d846ed8edfc08e95ad76eb634d6fd5692ca19b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -55,6 +55,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.fullOuterJoinTuples;
 import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
 import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.leftOuterJoinTuples;
 import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.rightOuterJoinTuples;
@@ -155,7 +156,7 @@ public class ReusingHashJoinIteratorITCase {
 					new ReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -242,7 +243,7 @@ public class ReusingHashJoinIteratorITCase {
 					new ReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 
 			iterator.open();
 			
@@ -291,7 +292,7 @@ public class ReusingHashJoinIteratorITCase {
 				new ReusingBuildSecondHashJoinIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 
 			iterator.open();
 			
@@ -378,7 +379,7 @@ public class ReusingHashJoinIteratorITCase {
 				new ReusingBuildSecondHashJoinIterator<>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -425,7 +426,7 @@ public class ReusingHashJoinIteratorITCase {
 					new ReusingBuildSecondHashJoinIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -472,7 +473,7 @@ public class ReusingHashJoinIteratorITCase {
 					new ReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
 			
 			iterator.open();
 			
@@ -492,9 +493,9 @@ public class ReusingHashJoinIteratorITCase {
 			Assert.fail("An exception occurred during the test: " + e.getMessage());
 		}
 	}
-
+	
 	@Test
-	public void testBuildFirstJoinWithEmptyBuild() {
+	public void testBuildFirstAndProbeSideOuterJoin() {
 		try {
 			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
@@ -506,7 +507,7 @@ public class ReusingHashJoinIteratorITCase {
 			final Map> expectedMatchesMap = rightOuterJoinTuples(
 					collectTupleData(input1),
 					collectTupleData(input2));
-
+	
 			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
 			final Collector> collector = new DiscardingOutputCollector<>();
 
@@ -515,20 +516,69 @@ public class ReusingHashJoinIteratorITCase {
 			generator2.reset();
 			input1.reset();
 			input2.reset();
-
+	
 			// compare with iterator values
 			ReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator =
 					new ReusingBuildFirstHashJoinIterator<>(
 							input1, input2, this.recordSerializer, this.record1Comparator,
 							this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-							this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
-
+							this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
+	
 			iterator.open();
-
+	
 			while (iterator.callWithNextKey(matcher, collector));
-
+	
 			iterator.close();
-
+		
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildFirstAndBuildSideOuterJoin() {
+		try {
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ 	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = leftOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator =
+				new ReusingBuildFirstHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
 			// assert that each expected match was seen
 			for (Entry> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
@@ -541,43 +591,190 @@ public class ReusingHashJoinIteratorITCase {
 			Assert.fail("An exception occurred during the test: " + e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testBuildFirstAndFullOuterJoin() {
+		try {
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
 
+			// collect expected data
+			final Map> expectedMatchesMap = fullOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator =
+				new ReusingBuildFirstHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
 	@Test
-	public void testBuildSecondJoinWithEmptyBuild() {
+	public void testBuildSecondAndProbeSideOuterJoin() {
 		try {
 			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
 			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
+	
 			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
 			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
+	
 			// collect expected data
 			final Map> expectedMatchesMap = leftOuterJoinTuples(
 					collectTupleData(input1),
 					collectTupleData(input2));
-
+	
 			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
 			final Collector> collector = new DiscardingOutputCollector<>();
-
+	
 			// reset the generators
 			generator1.reset();
 			generator2.reset();
 			input1.reset();
 			input2.reset();
-
+	
 			// compare with iterator values
 			ReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator =
 					new ReusingBuildSecondHashJoinIterator<>(
 							input1, input2, this.recordSerializer, this.record1Comparator,
 							this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-							this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
-
+							this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
+	
 			iterator.open();
-
+	
 			while (iterator.callWithNextKey(matcher, collector));
-
+	
 			iterator.close();
-
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondAndBuildSideOuterJoin() {
+		try {
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = rightOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator =
+				new ReusingBuildSecondHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBuildSecondAndFullOuterJoin() {
+		try {
+			TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+	
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+	
+			// collect expected data
+			final Map> expectedMatchesMap = fullOuterJoinTuples(
+				collectTupleData(input1),
+				collectTupleData(input2));
+	
+			final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+			final Collector> collector = new DiscardingOutputCollector<>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			ReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator =
+				new ReusingBuildSecondHashJoinIterator<>(
+					input1, input2, this.recordSerializer, this.record1Comparator,
+					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+					this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
+	
+			iterator.open();
+	
+			while (iterator.callWithNextKey(matcher, collector));
+	
+			iterator.close();
+	
 			// assert that each expected match was seen
 			for (Entry> entry : expectedMatchesMap.entrySet()) {
 				if (!entry.getValue().isEmpty()) {
@@ -590,7 +787,6 @@ public class ReusingHashJoinIteratorITCase {
 			Assert.fail("An exception occurred during the test: " + e.getMessage());
 		}
 	}
-
 	
 	// --------------------------------------------------------------------------------------------
 	//                                    Utilities
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index d2229477a6af07787397800dcdf375444583d4fb..6afde162d71e436b8ffe6a97e0c212f3f3f7edaf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -44,7 +44,6 @@ import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
@@ -230,7 +229,7 @@ public class ReusingReOpenableHashTableITCase {
 				new ReusingBuildFirstReOpenableHashJoinIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
 		
 		iterator.open();
 		// do first join with both inputs
@@ -341,7 +340,7 @@ public class ReusingReOpenableHashTableITCase {
 				final Tuple2 probeRec = join.getCurrentProbeRecord();
 				Integer key = probeRec.f0;
 				
-				HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator();
+				MutableObjectIterator> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
 					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
@@ -456,7 +455,7 @@ public class ReusingReOpenableHashTableITCase {
 				final Tuple2 probeRec = join.getCurrentProbeRecord();
 				Integer key = probeRec.f0;
 				
-				HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator();
+				MutableObjectIterator> buildSide = join.getBuildSideIterator();
 				if ((record = buildSide.next(recordReuse)) != null) {
 					numBuildValues = 1;
 					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..ec8ae2b4cf861a40be437444f2fddb96657827ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BitSetTest {
+
+	private BitSet bitSet;
+	int byteSize = 1024;
+	MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+
+	@Before
+	public void init() {
+		bitSet = new BitSet(byteSize);
+		bitSet.setMemorySegment(memorySegment, 0);
+		bitSet.clear();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void verifyBitSetSize1() {
+		bitSet.setMemorySegment(memorySegment, 1);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void verifyBitSetSize2() {
+		bitSet.setMemorySegment(null, 1);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void verifyBitSetSize3() {
+		bitSet.setMemorySegment(memorySegment, -1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void verifyInputIndex1() {
+		bitSet.set(8 * byteSize + 1);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void verifyInputIndex2() {
+		bitSet.set(-1);
+	}
+
+	@Test
+	public void testSetValues() {
+		int bitSize = bitSet.bitSize();
+		assertEquals(bitSize, 8 * 1024);
+		for (int i = 0; i < bitSize; i++) {
+			assertFalse(bitSet.get(i));
+			if (i % 2 == 0) {
+				bitSet.set(i);
+			}
+		}
+
+		for (int i = 0; i < bitSize; i++) {
+			if (i % 2 == 0) {
+				assertTrue(bitSet.get(i));
+			} else {
+				assertFalse(bitSet.get(i));
+			}
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 4f18494fbf8c5b02eedcf7a016ad300635596c71..6205de4d381a719128ceb95b7aa0ab24c382bd80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -130,11 +130,11 @@ public class HashVsSortMiniBenchmark {
 			
 			final UnilateralSortMerger> sorter1 = new UnilateralSortMerger<>(
 					this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, 
-					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
+					this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
 			
 			final UnilateralSortMerger> sorter2 = new UnilateralSortMerger<>(
 					this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, 
-					this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
+					this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
 			
 			final MutableObjectIterator> sortedInput1 = sorter1.getIterator();
 			final MutableObjectIterator> sortedInput2 = sorter2.getIterator();
@@ -184,7 +184,7 @@ public class HashVsSortMiniBenchmark {
 					new ReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
+							this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
 			
 			iterator.open();
 			
@@ -223,7 +223,7 @@ public class HashVsSortMiniBenchmark {
 					new ReusingBuildSecondHashJoinIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
+						this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
 			
 			iterator.open();
 			
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
index c2dca6655d2b5be40092c557aa3a1382db4a7b38..5215a3662316947b80f2d50b8c043ef5db8e9f6f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
@@ -30,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
@@ -57,14 +59,24 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testLeftOuterJoin2() throws Exception {
-		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
 	}
 
 	@Test
 	public void testLeftOuterJoin3() throws Exception {
+		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test
+	public void testLeftOuterJoin4() throws Exception {
 		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
 	}
 
+	@Test (expected = InvalidProgramException.class)
+	public void testLeftOuterJoin5() throws Exception {
+		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+	}
+
 	private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
 		/*
 		 * UDF Join on tuples with key field positions
@@ -102,9 +114,19 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testRightOuterJoin3() throws Exception {
+		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test
+	public void testRightOuterJoin4() throws Exception {
 		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
 	}
 
+	@Test (expected = InvalidProgramException.class)
+	public void testRightOuterJoin5() throws Exception {
+		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+	}
+
 	private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
 		/*
 		 * UDF Join on tuples with key field positions
@@ -135,6 +157,26 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
 	}
 
+	@Test
+	public void testFullOuterJoin2() throws Exception {
+		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
+	}
+
+	@Test
+	public void testFullOuterJoin3() throws Exception {
+		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test (expected = InvalidProgramException.class)
+	public void testFullOuterJoin4() throws Exception {
+		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+	}
+
+	@Test (expected = InvalidProgramException.class)
+	public void testFullOuterJoin5() throws Exception {
+		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+	}
+
 	private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
 		/*
 		 * UDF Join on tuples with key field positions
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index cb20e72e42b346ffa22d8dbd81105631ca751a57..06921967b62b51170d40750a85b716c6f2a97389 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -150,7 +150,7 @@ public class HashTableRecordWidthCombinations {
 
 					try {
 						while (table.nextRecord()) {
-							MutableHashTable.HashBucketIterator, Long> matches = table.getBuildSideIterator();
+							MutableObjectIterator> matches = table.getBuildSideIterator();
 							while (matches.next() != null);
 						}
 					}