提交 eaf540fd 编写于 作者: C chengxiang li

[FLINK-2871] support outer join for hash on build side.

this commit support full outer join includes:
    1. left outer join with REPARTITION_HASH_FIRST.
    2. right outer join with REPARTITION_HASH_SECOND.
    3. fullouter join with REPARTITION_HASH_FIRST and REPARTITION_HASH_SECOND.

this close #1469
上级 a7d4334f
......@@ -841,6 +841,7 @@ public abstract class DataSet<T> {
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
......@@ -891,6 +892,7 @@ public abstract class DataSet<T> {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_FIRST:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
default:
......@@ -938,6 +940,8 @@ public abstract class DataSet<T> {
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
default:
throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
......
......@@ -181,7 +181,7 @@ public class FullOuterJoinOperatorTest {
this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
}
@Test(expected = InvalidProgramException.class)
@Test
public void testFullOuterStrategy3() {
this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
}
......@@ -191,7 +191,7 @@ public class FullOuterJoinOperatorTest {
this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
}
@Test(expected = InvalidProgramException.class)
@Test
public void testFullOuterStrategy5() {
this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
}
......
......@@ -192,7 +192,7 @@ public class LeftOuterJoinOperatorTest {
this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
}
@Test(expected = InvalidProgramException.class)
@Test
public void testLeftOuterStrategy5() {
this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
}
......
......@@ -181,7 +181,7 @@ public class RightOuterJoinOperatorTest {
this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
}
@Test(expected = InvalidProgramException.class)
@Test
public void testRightOuterStrategy3() {
this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
}
......
......@@ -202,10 +202,14 @@ public abstract class CostEstimator {
break;
case HYBRIDHASH_BUILD_FIRST:
case RIGHT_HYBRIDHASH_BUILD_FIRST:
case LEFT_HYBRIDHASH_BUILD_FIRST:
case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_SECOND:
case LEFT_HYBRIDHASH_BUILD_SECOND:
case RIGHT_HYBRIDHASH_BUILD_SECOND:
case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST_CACHED:
......
......@@ -25,8 +25,12 @@ import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoi
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
......@@ -99,8 +103,10 @@ public class OuterJoinNode extends TwoInputNode {
case BROADCAST_HASH_SECOND:
list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false));
break;
case BROADCAST_HASH_FIRST:
case REPARTITION_HASH_FIRST:
list.add(new HashLeftOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true));
break;
case BROADCAST_HASH_FIRST:
default:
throw new CompilerException("Invalid join hint: " + hint + " for left outer join");
}
......@@ -124,8 +130,10 @@ public class OuterJoinNode extends TwoInputNode {
case BROADCAST_HASH_FIRST:
list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false));
break;
case BROADCAST_HASH_SECOND:
case REPARTITION_HASH_SECOND:
list.add(new HashRightOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true));
break;
case BROADCAST_HASH_SECOND:
default:
throw new CompilerException("Invalid join hint: " + hint + " for right outer join");
}
......@@ -142,10 +150,14 @@ public class OuterJoinNode extends TwoInputNode {
case REPARTITION_SORT_MERGE:
list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
break;
case REPARTITION_HASH_FIRST:
list.add(new HashFullOuterJoinBuildFirstDescriptor(this.keys1, this.keys2));
break;
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
list.add(new HashFullOuterJoinBuildSecondDescriptor(this.keys1, this.keys2));
break;
case BROADCAST_HASH_FIRST:
case REPARTITION_HASH_FIRST:
case BROADCAST_HASH_SECOND:
default:
throw new CompilerException("Invalid join hint: " + hint + " for full outer join");
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;
import java.util.Collections;
import java.util.List;
public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {
public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2) {
super(keys1, keys2, false, false, true);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_FIRST;
}
@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;
import java.util.Collections;
import java.util.List;
public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2) {
super(keys1, keys2, false, false, true);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_SECOND;
}
@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;
import java.util.Collections;
import java.util.List;
public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor{
public HashLeftOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
boolean broadcastSecondAllowed, boolean repartitionAllowed) {
super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.LEFT_HYBRIDHASH_BUILD_FIRST;
}
@Override
protected List<OperatorDescriptorDual.LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new OperatorDescriptorDual.LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;
import java.util.Collections;
import java.util.List;
public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
public HashRightOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
boolean broadcastFirstAllowed, boolean repartitionAllowed) {
super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed);
}
@Override
public DriverStrategy getStrategy() {
return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_SECOND;
}
@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
......@@ -95,6 +95,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getOwningNepheleTask(),
availableMemory,
false,
false,
hashJoinUseBitMaps);
......@@ -110,6 +111,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getOwningNepheleTask(),
availableMemory,
false,
false,
hashJoinUseBitMaps);
} else {
......@@ -128,6 +130,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getOwningNepheleTask(),
availableMemory,
false,
false,
hashJoinUseBitMaps);
......@@ -143,6 +146,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getOwningNepheleTask(),
availableMemory,
false,
false,
hashJoinUseBitMaps);
} else {
......
......@@ -88,12 +88,20 @@ public enum DriverStrategy {
HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// right outer join, the first input is build side, the second side is probe side of a hybrid hash table
// right outer join, the first input is build side, the second input is probe side of a hybrid hash table.
RIGHT_HYBRIDHASH_BUILD_FIRST(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// left outer join, the second input is build side, the first side is probe side of a hybrid hash table
// right outer join, the first input is probe side, the second input is build side of a hybrid hash table.
RIGHT_HYBRIDHASH_BUILD_SECOND(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// left outer join, the first input is build side, the second input is probe side of a hybrid hash table.
LEFT_HYBRIDHASH_BUILD_FIRST(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// left outer join, the first input is probe side, the second input is build side of a hybrid hash table.
LEFT_HYBRIDHASH_BUILD_SECOND(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// full outer join, the first input is build side, the second input is the probe side of a hybrid hash table.
FULL_OUTER_HYBRIDHASH_BUILD_FIRST(FullOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// full outer join, the first input is probe side, the second input is the build side of a hybrid hash table.
FULL_OUTER_HYBRIDHASH_BUILD_SECOND(FullOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// the second input is inner loop, the first input is outer loop and block-wise processed
NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),
// the first input is inner loop, the second input is outer loop and block-wise processed
......
......@@ -24,6 +24,10 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
......@@ -66,6 +70,28 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
numPages,
super.taskContext.getOwningNepheleTask()
);
case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
true,
false);
case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
return new ReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
true,
false);
default:
throw new Exception("Unsupported driver strategy for full outer join driver: " + driverStrategy.name());
}
......@@ -102,8 +128,30 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
numPages,
super.taskContext.getOwningNepheleTask()
);
case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
true,
false);
case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
return new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
true,
false);
default:
throw new Exception("Unsupported driver strategy for full outer join driver: " + driverStrategy.name());
}
}
}
\ No newline at end of file
}
......@@ -141,6 +141,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
false,
false,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
......@@ -152,6 +153,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
false,
false,
hashJoinUseBitMaps);
break;
default:
......@@ -176,6 +178,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
false,
false,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
......@@ -187,6 +190,7 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
false,
false,
hashJoinUseBitMaps);
break;
default:
......
......@@ -24,7 +24,9 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
......@@ -68,6 +70,17 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
numPages,
super.taskContext.getOwningNepheleTask()
);
case LEFT_HYBRIDHASH_BUILD_FIRST:
return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
false,
true,
false);
case LEFT_HYBRIDHASH_BUILD_SECOND:
return new ReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
......@@ -77,6 +90,7 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
false,
false);
default:
throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name());
......@@ -114,6 +128,17 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
numPages,
super.taskContext.getOwningNepheleTask()
);
case LEFT_HYBRIDHASH_BUILD_FIRST:
return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
false,
true,
false);
case LEFT_HYBRIDHASH_BUILD_SECOND:
return new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
......@@ -123,9 +148,10 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
false,
false);
default:
throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name());
}
}
}
\ No newline at end of file
}
......@@ -25,7 +25,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
......@@ -77,6 +79,18 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
false,
false);
case RIGHT_HYBRIDHASH_BUILD_SECOND:
return new ReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
false,
true,
false);
default:
throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name());
......@@ -123,9 +137,21 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
true,
false,
false);
case RIGHT_HYBRIDHASH_BUILD_SECOND:
return new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
driverMemFraction,
false,
true,
false);
default:
throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name());
}
}
}
\ No newline at end of file
}
......@@ -53,7 +53,9 @@ public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIte
private final MutableObjectIterator<V2> secondInput;
private final boolean joinWithEmptyBuildSide;
private final boolean probeSideOuterJoin;
private final boolean buildSideOuterJoin;
private volatile boolean running = true;
......@@ -70,7 +72,8 @@ public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIte
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
......@@ -78,10 +81,11 @@ public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIte
this.secondInput = secondInput;
this.probeSideSerializer = serializer2;
if(useBitmapFilters && joinWithEmptyBuildSide) {
if(useBitmapFilters && probeSideOuterJoin) {
throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
}
this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
this.probeSideOuterJoin = probeSideOuterJoin;
this.buildSideOuterJoin = buildSideOuterJoin;
this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
......@@ -91,7 +95,7 @@ public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIte
@Override
public void open() throws IOException, MemoryAllocationException, InterruptedException {
this.hashJoin.open(this.firstInput, this.secondInput);
this.hashJoin.open(this.firstInput, this.secondInput, this.buildSideOuterJoin);
}
......@@ -112,14 +116,14 @@ public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIte
if (this.hashJoin.nextRecord())
{
// we have a next record, get the iterators to the probe and build side values
final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
V1 nextBuildSideRecord;
final MutableObjectIterator<V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
V1 nextBuildSideRecord = buildSideIterator.next();
// get the first build side value
if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
if (probeRecord != null && nextBuildSideRecord != null) {
V1 tmpRec;
final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
// check if there is another build-side value
if ((tmpRec = buildSideIterator.next()) != null) {
// more than one build-side value --> copy the probe side
......@@ -144,13 +148,23 @@ public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIte
// only single pair matches
matchFunction.join(nextBuildSideRecord, probeRecord, collector);
}
}
else if(joinWithEmptyBuildSide) {
// build side is empty, join with null
final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
} else {
// while probe side outer join, join current probe record with null.
if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
matchFunction.join(null, probeRecord, collector);
}
matchFunction.join(null, probeRecord, collector);
// while build side outer join, iterate all build records which have not been probed before,
// and join with null.
if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
matchFunction.join(nextBuildSideRecord, null, collector);
while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
matchFunction.join(nextBuildSideRecord, null, collector);
}
}
}
return true;
}
else {
......
......@@ -49,12 +49,13 @@ public class NonReusingBuildFirstReOpenableHashJoinIterator<V1, V2, O> extends N
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
......
......@@ -52,7 +52,9 @@ public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIt
private final MutableObjectIterator<V2> secondInput;
private final boolean joinWithEmptyBuildSide;
private final boolean buildSideOuterJoin;
private final boolean probeSideOuterJoin;
private volatile boolean running = true;
......@@ -69,7 +71,8 @@ public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIt
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
......@@ -77,20 +80,21 @@ public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIt
this.secondInput = secondInput;
this.probeSideSerializer = serializer1;
if(useBitmapFilters && joinWithEmptyBuildSide) {
if(useBitmapFilters && probeSideOuterJoin) {
throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
}
this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
this.probeSideOuterJoin = probeSideOuterJoin;
this.buildSideOuterJoin = buildSideOuterJoin;
this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
@Override
public void open() throws IOException, MemoryAllocationException, InterruptedException {
this.hashJoin.open(this.secondInput, this.firstInput);
this.hashJoin.open(this.secondInput, this.firstInput, buildSideOuterJoin);
}
@Override
......@@ -110,14 +114,14 @@ public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIt
if (this.hashJoin.nextRecord())
{
// we have a next record, get the iterators to the probe and build side values
final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
V2 nextBuildSideRecord;
// get the first build side value
if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
final MutableObjectIterator<V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
V2 nextBuildSideRecord = buildSideIterator.next();
if (probeRecord != null && nextBuildSideRecord != null) {
V2 tmpRec;
final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
// check if there is another build-side value
if ((tmpRec = buildSideIterator.next()) != null) {
// more than one build-side value --> copy the probe side
......@@ -142,13 +146,22 @@ public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIt
// only single pair matches
matchFunction.join(probeRecord, nextBuildSideRecord, collector);
}
}
else if(joinWithEmptyBuildSide) {
// build side is empty, join with null
final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
} else {
// while probe side outer join, join current probe record with null.
if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
matchFunction.join(probeRecord, null, collector);
}
matchFunction.join(probeRecord, null, collector);
// while build side outer join, iterate all build records which have not been probed before,
// and join with null.
if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
matchFunction.join(null, nextBuildSideRecord, collector);
while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
matchFunction.join(null, nextBuildSideRecord, collector);
}
}
}
return true;
}
else {
......
......@@ -49,12 +49,13 @@ public class NonReusingBuildSecondReOpenableHashJoinIterator<V1, V2, O> extends
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
}
......
......@@ -63,8 +63,8 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
}
@Override
public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException {
super.open(buildSide, probeSide);
public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide, boolean buildSideOuterJoin) throws IOException {
super.open(buildSide, probeSide, buildSideOuterJoin);
initialPartitions = new ArrayList<HashPartition<BT, PT>>( partitionsBeingBuilt );
initialPartitionFanOut = (byte) partitionsBeingBuilt.size();
initialBucketCount = this.numBuckets;
......
......@@ -56,7 +56,9 @@ public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIterat
private final MutableObjectIterator<V2> secondInput;
private final boolean joinWithEmptyBuildSide;
private final boolean probeSideOuterJoin;
private final boolean buildSideOuterJoin;
private volatile boolean running = true;
......@@ -74,7 +76,8 @@ public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIterat
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
......@@ -82,10 +85,11 @@ public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIterat
this.secondInput = secondInput;
this.probeSideSerializer = serializer2;
if(useBitmapFilters && joinWithEmptyBuildSide) {
if(useBitmapFilters && probeSideOuterJoin) {
throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
}
this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
this.probeSideOuterJoin = probeSideOuterJoin;
this.buildSideOuterJoin = buildSideOuterJoin;
this.nextBuildSideObject = serializer1.createInstance();
this.tempBuildSideRecord = serializer1.createInstance();
......@@ -98,7 +102,7 @@ public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIterat
@Override
public void open() throws IOException, MemoryAllocationException, InterruptedException {
this.hashJoin.open(this.firstInput, this.secondInput);
this.hashJoin.open(this.firstInput, this.secondInput, buildSideOuterJoin);
}
......@@ -119,40 +123,34 @@ public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIterat
if (this.hashJoin.nextRecord())
{
// we have a next record, get the iterators to the probe and build side values
final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
V1 nextBuildSideRecord = this.nextBuildSideObject;
// get the first build side value
if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
V1 tmpRec = this.tempBuildSideRecord;
final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
// check if there is another build-side value
if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
final MutableObjectIterator<V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
// call match on the first pair
final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
V1 nextBuildSideRecord = buildSideIterator.next(this.nextBuildSideObject);
if (probeRecord != null && nextBuildSideRecord != null) {
matchFunction.join(nextBuildSideRecord, probeRecord, collector);
while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
matchFunction.join(nextBuildSideRecord, probeRecord, collector);
// call match on the second pair
matchFunction.join(tmpRec, probeRecord, collector);
}
} else {
if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
matchFunction.join(null, probeRecord, collector);
}
if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
// call match on the first pair
matchFunction.join(nextBuildSideRecord, null, collector);
while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
// call match on the next pair
// make sure we restore the value of the probe side record
matchFunction.join(nextBuildSideRecord, probeRecord, collector);
matchFunction.join(nextBuildSideRecord, null, collector);
}
}
else {
// only single pair matches
matchFunction.join(nextBuildSideRecord, probeRecord, collector);
}
}
else if(joinWithEmptyBuildSide) {
// build side is empty, join with null
final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
matchFunction.join(null, probeRecord, collector);
}
return true;
}
else {
......
......@@ -49,13 +49,14 @@ public class ReusingBuildFirstReOpenableHashJoinIterator<V1, V2, O> extends Reus
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters)
throws MemoryAllocationException
{
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
......
......@@ -56,7 +56,9 @@ public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinItera
private final MutableObjectIterator<V2> secondInput;
private final boolean joinWithEmptyBuildSide;
private final boolean probeSideOuterJoin;
private final boolean buildSideOuterJoin;
private volatile boolean running = true;
......@@ -74,7 +76,8 @@ public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinItera
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
this.memManager = memManager;
......@@ -82,10 +85,11 @@ public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinItera
this.secondInput = secondInput;
this.probeSideSerializer = serializer1;
if(useBitmapFilters && joinWithEmptyBuildSide) {
if(useBitmapFilters && probeSideOuterJoin) {
throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
}
this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
this.probeSideOuterJoin = probeSideOuterJoin;
this.buildSideOuterJoin = buildSideOuterJoin;
this.nextBuildSideObject = serializer2.createInstance();
this.tempBuildSideRecord = serializer2.createInstance();
......@@ -98,7 +102,7 @@ public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinItera
@Override
public void open() throws IOException, MemoryAllocationException, InterruptedException {
this.hashJoin.open(this.secondInput, this.firstInput);
this.hashJoin.open(this.secondInput, this.firstInput, buildSideOuterJoin);
}
@Override
......@@ -118,39 +122,29 @@ public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinItera
if (this.hashJoin.nextRecord())
{
// we have a next record, get the iterators to the probe and build side values
final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
V2 nextBuildSideRecord = this.nextBuildSideObject;
// get the first build side value
if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
V2 tmpRec = this.tempBuildSideRecord;
final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
// check if there is another build-side value
if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
// call match on the first pair
final MutableObjectIterator<V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
V2 nextBuildSideRecord = buildSideIterator.next(this.nextBuildSideObject);
if (probeRecord != null && nextBuildSideRecord != null) {
matchFunction.join(probeRecord, nextBuildSideRecord, collector);
while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
matchFunction.join(probeRecord, nextBuildSideRecord, collector);
// call match on the second pair
matchFunction.join(probeRecord, tmpRec, collector);
}
} else {
if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
matchFunction.join(probeRecord, null, collector);
}
if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) {
matchFunction.join(null, nextBuildSideRecord, collector);
while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
// call match on the next pair
// make sure we restore the value of the probe side record
matchFunction.join(probeRecord, nextBuildSideRecord, collector);
matchFunction.join(null, nextBuildSideRecord, collector);
}
}
else {
// only single pair matches
matchFunction.join(probeRecord, nextBuildSideRecord, collector);
}
}
else if(joinWithEmptyBuildSide) {
// build side is empty, join with null
final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
matchFunction.join(probeRecord, null, collector);
}
return true;
}
else {
......
......@@ -49,12 +49,13 @@ public class ReusingBuildSecondReOpenableHashJoinIterator<V1, V2, O> extends Reu
IOManager ioManager,
AbstractInvokable ownerTask,
double memoryFraction,
boolean joinWithEmptyBuildSide,
boolean probeSideOuterJoin,
boolean buildSideOuterJoin,
boolean useBitmapFilters) throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.util;
import com.google.common.base.Preconditions;
import org.apache.flink.core.memory.MemorySegment;
public class BitSet {
private MemorySegment memorySegment;
// MemorySegment byte array offset.
private int offset;
// The BitSet byte size.
private int byteLength;
// The BitSet bit size.
private int bitLength;
private final int BYTE_POSITION_MASK = 0xfffffff8;
private final int BYTE_INDEX_MASK = 0x00000007;
public BitSet(int byteSize) {
Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0.");
this.byteLength = byteSize;
this.bitLength = byteSize << 3;
}
public void setMemorySegment(MemorySegment memorySegment, int offset) {
Preconditions.checkArgument(memorySegment != null, "MemorySegment can not be null.");
Preconditions.checkArgument(offset >= 0, "Offset should be positive integer.");
Preconditions.checkArgument(offset + byteLength <= memorySegment.size(),
"Could not set MemorySegment, the remain buffers is not enough.");
this.memorySegment = memorySegment;
this.offset = offset;
}
/**
* Sets the bit at specified index.
*
* @param index - position
*/
public void set(int index) {
Preconditions.checkArgument(index < bitLength && index >= 0,
String.format("Input Index[%d] is larger than BitSet available size[%d].", index, bitLength));
int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
byte current = memorySegment.get(offset + byteIndex);
current |= (1 << (index & BYTE_INDEX_MASK));
memorySegment.put(offset + byteIndex, current);
}
/**
* Returns true if the bit is set in the specified index.
*
* @param index - position
* @return - value at the bit position
*/
public boolean get(int index) {
Preconditions.checkArgument(index < bitLength && index >= 0,
String.format("Input Index[%d] is larger than BitSet available size[%d].", index, bitLength));
int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
byte current = memorySegment.get(offset + byteIndex);
return (current & (1 << (index & BYTE_INDEX_MASK))) != 0;
}
/**
* Number of bits
*/
public int bitSize() {
return bitLength;
}
/**
* Clear the bit set.
*/
public void clear() {
for (int i = 0; i < byteLength; i++) {
memorySegment.put(offset + i, (byte) 0);
}
}
@Override
public String toString() {
StringBuilder output = new StringBuilder();
output.append("BitSet:\n");
output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
output.append("\tOffset:").append(offset).append("\n");
output.append("\tLength:").append(byteLength).append("\n");
return output.toString();
}
}
......@@ -172,7 +172,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -222,7 +222,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -279,7 +279,7 @@ public class HashTableITCase {
int key = 0;
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
key = record.getField(0, IntValue.class).getValue();
......@@ -392,7 +392,7 @@ public class HashTableITCase {
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
......@@ -505,7 +505,7 @@ public class HashTableITCase {
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
......@@ -608,7 +608,7 @@ public class HashTableITCase {
try {
while (join.nextRecord()) {
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
if (buildSide.next(recordReuse) == null) {
fail("No build side values found for a probe key.");
}
......@@ -666,7 +666,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -715,7 +715,7 @@ public class HashTableITCase {
* NUM_PROBE_VALS;
while (join.nextRecord()) {
HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Record> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -770,7 +770,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -819,7 +819,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -879,7 +879,7 @@ public class HashTableITCase {
int key = 0;
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
key = record.getKey();
......@@ -995,7 +995,7 @@ public class HashTableITCase {
final IntPair probeRec = join.getCurrentProbeRecord();
int key = probeRec.getKey();
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getKey());
......@@ -1107,7 +1107,7 @@ public class HashTableITCase {
final IntPair probeRec = join.getCurrentProbeRecord();
int key = probeRec.getKey();
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getKey());
......@@ -1210,7 +1210,7 @@ public class HashTableITCase {
try {
while (join.nextRecord())
{
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
if (buildSide.next(recordReuse) == null) {
fail("No build side values found for a probe key.");
}
......@@ -1267,7 +1267,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1316,7 +1316,7 @@ public class HashTableITCase {
* NUM_PROBE_VALS;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1363,7 +1363,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1386,7 +1386,7 @@ public class HashTableITCase {
numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1439,7 +1439,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1462,7 +1462,7 @@ public class HashTableITCase {
numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1524,7 +1524,7 @@ public class HashTableITCase {
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
......@@ -1534,6 +1534,106 @@ public class HashTableITCase {
join.close();
this.memManager.release(join.getFreedMemory());
}
@Test
public void testHashWithBuildSideOuterJoin1() throws Exception {
final int NUM_KEYS = 20000;
final int BUILD_VALS_PER_KEY = 1;
final int PROBE_VALS_PER_KEY = 1;
// create a build input that gives 40000 pairs with 1 values sharing the same key
MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
// create a probe input that gives 20000 pairs with 1 values sharing a key
MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
// 33 is minimum number of pages required to perform hash join this inputs
memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
}
catch (MemoryAllocationException maex) {
fail("Memory for the Join could not be provided.");
return;
}
// ----------------------------------------------------------------------------------------
final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
memSegments, ioManager);
join.open(buildInput, probeInput, true);
final IntPair recordReuse = new IntPair();
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
}
Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
join.close();
this.memManager.release(join.getFreedMemory());
}
@Test
public void testHashWithBuildSideOuterJoin2() throws Exception {
final int NUM_KEYS = 40000;
final int BUILD_VALS_PER_KEY = 2;
final int PROBE_VALS_PER_KEY = 1;
// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
// after probe phase, make sure build side outer join works well in this case.
// create a build input that gives 80000 pairs with 2 values sharing the same key
MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
// create a probe input that gives 40000 pairs with 1 values sharing a key
MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
// 33 is minimum number of pages required to perform hash join this inputs
memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
}
catch (MemoryAllocationException maex) {
fail("Memory for the Join could not be provided.");
return;
}
// ----------------------------------------------------------------------------------------
final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
memSegments, ioManager);
join.open(buildInput, probeInput, true);
final IntPair recordReuse = new IntPair();
int numRecordsInJoinResult = 0;
while (join.nextRecord()) {
MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
IntPair next = buildSide.next(recordReuse);
if (next == null && join.getCurrentProbeRecord() == null) {
fail("Should not return join result that both probe and build element are null.");
}
while (next != null) {
numRecordsInJoinResult++;
next = buildSide.next(recordReuse);
}
}
Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
join.close();
this.memManager.release(join.getFreedMemory());
}
// ============================================================================================
......
......@@ -131,7 +131,7 @@ public class HashTableTest {
try {
while (table.nextRecord()) {
MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
MutableObjectIterator<Tuple2<Long, byte[]>> matches = table.getBuildSideIterator();
while (matches.next() != null);
}
}
......@@ -240,7 +240,7 @@ public class HashTableTest {
new ByteArrayIterator(1, 128,(byte) 1)));
while(table.nextRecord()) {
MutableHashTable.HashBucketIterator<byte[], byte[]> iterator = table.getBuildSideIterator();
MutableObjectIterator<byte[]> iterator = table.getBuildSideIterator();
int counter = 0;
......
......@@ -147,7 +147,7 @@ public class NonReusingHashJoinIteratorITCase {
new NonReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -234,7 +234,7 @@ public class NonReusingHashJoinIteratorITCase {
new NonReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -283,7 +283,7 @@ public class NonReusingHashJoinIteratorITCase {
new NonReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -370,7 +370,7 @@ public class NonReusingHashJoinIteratorITCase {
new NonReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -417,7 +417,7 @@ public class NonReusingHashJoinIteratorITCase {
new NonReusingBuildSecondHashJoinIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -464,7 +464,7 @@ public class NonReusingHashJoinIteratorITCase {
new NonReusingBuildFirstHashJoinIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -484,43 +484,43 @@ public class NonReusingHashJoinIteratorITCase {
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildFirstJoinWithEmptyBuild() {
public void testBuildFirstAndProbeSideOuterJoin() {
try {
TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = rightOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
NonReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new NonReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
......@@ -533,43 +533,190 @@ public class NonReusingHashJoinIteratorITCase {
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildFirstAndBuildSideOuterJoin() {
try {
TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = leftOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
NonReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new NonReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildSecondJoinWithEmptyBuild() {
public void testBuildFirstAndFullOuterJoin() {
try {
TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = fullOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
NonReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new NonReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildSecondAndProbeSideOuterJoin() {
try {
TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = leftOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
NonReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new NonReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildSecondAndBuildSideOuterJoin() {
try {
TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = rightOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
NonReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new NonReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
......@@ -583,6 +730,55 @@ public class NonReusingHashJoinIteratorITCase {
}
}
@Test
public void testBuildSecondAndFullOuterJoin() {
try {
TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = fullOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
NonReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new NonReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
......@@ -680,6 +876,53 @@ public class NonReusingHashJoinIteratorITCase {
return map;
}
public static Map<Integer, Collection<TupleMatch>> fullOuterJoinTuples(
Map<Integer, Collection<String>> leftMap,
Map<Integer, Collection<String>> rightMap)
{
Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
for (Integer key : rightMap.keySet()) {
Collection<String> leftValues = leftMap.get(key);
Collection<String> rightValues = rightMap.get(key);
if (!map.containsKey(key)) {
map.put(key, new ArrayList<TupleMatch>());
}
Collection<TupleMatch> matchedValues = map.get(key);
for (String rightValue : rightValues) {
if(leftValues != null) {
for (String leftValue : leftValues) {
matchedValues.add(new TupleMatch(leftValue, rightValue));
}
}
else {
matchedValues.add(new TupleMatch(null, rightValue));
}
}
}
for (Integer key : leftMap.keySet()) {
Collection<String> leftValues = leftMap.get(key);
Collection<String> rightValues = rightMap.get(key);
if (rightValues == null) {
if (!map.containsKey(key)) {
map.put(key, new ArrayList<TupleMatch>());
}
Collection<TupleMatch> matchedValues = map.get(key);
for (String leftValue : leftValues) {
matchedValues.add(new TupleMatch(leftValue, null));
}
}
}
return map;
}
public static Map<Integer, Collection<TupleIntPairMatch>> joinIntPairs(
Map<Integer, Collection<Integer>> leftMap,
......
......@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
......@@ -228,7 +227,7 @@ public class NonReusingReOpenableHashTableITCase {
new NonReusingBuildFirstReOpenableHashJoinIterator<>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
// do first join with both inputs
......@@ -342,8 +341,8 @@ public class NonReusingReOpenableHashTableITCase {
final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
Integer key = probeRec.f0;
HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
......@@ -456,8 +455,8 @@ public class NonReusingReOpenableHashTableITCase {
final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
Integer key = probeRec.f0;
HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
......
......@@ -55,6 +55,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.fullOuterJoinTuples;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.leftOuterJoinTuples;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.rightOuterJoinTuples;
......@@ -155,7 +156,7 @@ public class ReusingHashJoinIteratorITCase {
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -242,7 +243,7 @@ public class ReusingHashJoinIteratorITCase {
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -291,7 +292,7 @@ public class ReusingHashJoinIteratorITCase {
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -378,7 +379,7 @@ public class ReusingHashJoinIteratorITCase {
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -425,7 +426,7 @@ public class ReusingHashJoinIteratorITCase {
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -472,7 +473,7 @@ public class ReusingHashJoinIteratorITCase {
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
......@@ -492,9 +493,9 @@ public class ReusingHashJoinIteratorITCase {
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildFirstJoinWithEmptyBuild() {
public void testBuildFirstAndProbeSideOuterJoin() {
try {
TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
......@@ -506,7 +507,7 @@ public class ReusingHashJoinIteratorITCase {
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = rightOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
......@@ -515,20 +516,69 @@ public class ReusingHashJoinIteratorITCase {
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildFirstAndBuildSideOuterJoin() {
try {
TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = leftOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
......@@ -541,43 +591,190 @@ public class ReusingHashJoinIteratorITCase {
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildFirstAndFullOuterJoin() {
try {
TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = fullOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildSecondJoinWithEmptyBuild() {
public void testBuildSecondAndProbeSideOuterJoin() {
try {
TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = leftOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildSecondAndBuildSideOuterJoin() {
try {
TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = rightOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
}
}
catch (Exception e) {
e.printStackTrace();
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
@Test
public void testBuildSecondAndFullOuterJoin() {
try {
TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = fullOuterJoinTuples(
collectTupleData(input1),
collectTupleData(input2));
final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
generator2.reset();
input1.reset();
input2.reset();
// compare with iterator values
ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false);
iterator.open();
while (iterator.callWithNextKey(matcher, collector));
iterator.close();
// assert that each expected match was seen
for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
......@@ -590,7 +787,6 @@ public class ReusingHashJoinIteratorITCase {
Assert.fail("An exception occurred during the test: " + e.getMessage());
}
}
// --------------------------------------------------------------------------------------------
// Utilities
......
......@@ -44,7 +44,6 @@ import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
......@@ -230,7 +229,7 @@ public class ReusingReOpenableHashTableITCase {
new ReusingBuildFirstReOpenableHashJoinIterator<>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true);
iterator.open();
// do first join with both inputs
......@@ -341,7 +340,7 @@ public class ReusingReOpenableHashTableITCase {
final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
Integer key = probeRec.f0;
HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
......@@ -456,7 +455,7 @@ public class ReusingReOpenableHashTableITCase {
final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
Integer key = probeRec.f0;
HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.util;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class BitSetTest {
private BitSet bitSet;
int byteSize = 1024;
MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
@Before
public void init() {
bitSet = new BitSet(byteSize);
bitSet.setMemorySegment(memorySegment, 0);
bitSet.clear();
}
@Test(expected = IllegalArgumentException.class)
public void verifyBitSetSize1() {
bitSet.setMemorySegment(memorySegment, 1);
}
@Test(expected = IllegalArgumentException.class)
public void verifyBitSetSize2() {
bitSet.setMemorySegment(null, 1);
}
@Test(expected = IllegalArgumentException.class)
public void verifyBitSetSize3() {
bitSet.setMemorySegment(memorySegment, -1);
}
@Test(expected = IllegalArgumentException.class)
public void verifyInputIndex1() {
bitSet.set(8 * byteSize + 1);
}
@Test(expected = IllegalArgumentException.class)
public void verifyInputIndex2() {
bitSet.set(-1);
}
@Test
public void testSetValues() {
int bitSize = bitSet.bitSize();
assertEquals(bitSize, 8 * 1024);
for (int i = 0; i < bitSize; i++) {
assertFalse(bitSet.get(i));
if (i % 2 == 0) {
bitSet.set(i);
}
}
for (int i = 0; i < bitSize; i++) {
if (i % 2 == 0) {
assertTrue(bitSet.get(i));
} else {
assertFalse(bitSet.get(i));
}
}
}
}
......@@ -130,11 +130,11 @@ public class HashVsSortMiniBenchmark {
final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1,
this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
final UnilateralSortMerger<Tuple2<Integer, String>> sorter2 = new UnilateralSortMerger<>(
this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2,
this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
final MutableObjectIterator<Tuple2<Integer, String>> sortedInput1 = sorter1.getIterator();
final MutableObjectIterator<Tuple2<Integer, String>> sortedInput2 = sorter2.getIterator();
......@@ -184,7 +184,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
iterator.open();
......@@ -223,7 +223,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
iterator.open();
......
......@@ -18,6 +18,7 @@
package org.apache.flink.test.javaApiOperators;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
......@@ -30,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
......@@ -57,14 +59,24 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
@Test
public void testLeftOuterJoin2() throws Exception {
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
}
@Test
public void testLeftOuterJoin3() throws Exception {
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
}
@Test
public void testLeftOuterJoin4() throws Exception {
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
}
@Test (expected = InvalidProgramException.class)
public void testLeftOuterJoin5() throws Exception {
testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
}
private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
/*
* UDF Join on tuples with key field positions
......@@ -102,9 +114,19 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
@Test
public void testRightOuterJoin3() throws Exception {
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
}
@Test
public void testRightOuterJoin4() throws Exception {
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
}
@Test (expected = InvalidProgramException.class)
public void testRightOuterJoin5() throws Exception {
testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
}
private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
/*
* UDF Join on tuples with key field positions
......@@ -135,6 +157,26 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
}
@Test
public void testFullOuterJoin2() throws Exception {
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
}
@Test
public void testFullOuterJoin3() throws Exception {
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
}
@Test (expected = InvalidProgramException.class)
public void testFullOuterJoin4() throws Exception {
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
}
@Test (expected = InvalidProgramException.class)
public void testFullOuterJoin5() throws Exception {
testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
}
private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
/*
* UDF Join on tuples with key field positions
......
......@@ -150,7 +150,7 @@ public class HashTableRecordWidthCombinations {
try {
while (table.nextRecord()) {
MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
MutableObjectIterator<Tuple2<Long, byte[]>> matches = table.getBuildSideIterator();
while (matches.next() != null);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册