提交 217b03e6 编写于 作者: U Ufuk Celebi 提交者: Stephan Ewen

[FLINK-1110] Implement collection-based execution for coGroup

上级 d9ed4ad4
......@@ -16,35 +16,49 @@
* limitations under the License.
*/
package org.apache.flink.api.common.operators.base;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* @see org.apache.flink.api.common.functions.CoGroupFunction
*/
public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
/**
* The ordering for the order inside a group from input one.
*/
private Ordering groupOrder1;
/**
* The ordering for the order inside a group from input two.
*/
private Ordering groupOrder2;
// --------------------------------------------------------------------------------------------
private boolean combinableFirst;
......@@ -56,98 +70,102 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
this.combinableFirst = false;
this.combinableSecond = false;
}
public CoGroupOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
this(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
}
public CoGroupOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
this(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
}
// --------------------------------------------------------------------------------------------
/**
* Sets the order of the elements within a group for the given input.
*
*
* @param inputNum The number of the input (here either <i>0</i> or <i>1</i>).
* @param order The order for the elements in a group.
* @param order The order for the elements in a group.
*/
public void setGroupOrder(int inputNum, Ordering order) {
if (inputNum == 0) {
this.groupOrder1 = order;
} else if (inputNum == 1) {
}
else if (inputNum == 1) {
this.groupOrder2 = order;
} else {
}
else {
throw new IndexOutOfBoundsException();
}
}
/**
* Sets the order of the elements within a group for the first input.
*
*
* @param order The order for the elements in a group.
*/
public void setGroupOrderForInputOne(Ordering order) {
setGroupOrder(0, order);
}
/**
* Sets the order of the elements within a group for the second input.
*
*
* @param order The order for the elements in a group.
*/
public void setGroupOrderForInputTwo(Ordering order) {
setGroupOrder(1, order);
}
/**
* Gets the value order for an input, i.e. the order of elements within a group.
* If no such order has been set, this method returns null.
*
*
* @param inputNum The number of the input (here either <i>0</i> or <i>1</i>).
* @return The group order.
*/
public Ordering getGroupOrder(int inputNum) {
if (inputNum == 0) {
return this.groupOrder1;
} else if (inputNum == 1) {
}
else if (inputNum == 1) {
return this.groupOrder2;
} else {
}
else {
throw new IndexOutOfBoundsException();
}
}
/**
* Gets the order of elements within a group for the first input.
* If no such order has been set, this method returns null.
*
*
* @return The group order for the first input.
*/
public Ordering getGroupOrderForInputOne() {
return getGroupOrder(0);
}
/**
* Gets the order of elements within a group for the second input.
* If no such order has been set, this method returns null.
*
*
* @return The group order for the second input.
*/
public Ordering getGroupOrderForInputTwo() {
return getGroupOrder(1);
}
// --------------------------------------------------------------------------------------------
public boolean isCombinableFirst() {
return this.combinableFirst;
}
public void setCombinableFirst(boolean combinableFirst) {
this.combinableFirst = combinableFirst;
}
public boolean isCombinableSecond() {
return this.combinableSecond;
}
......@@ -156,9 +174,182 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
this.combinableSecond = combinableSecond;
}
// ------------------------------------------------------------------------
@Override
protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext) throws Exception {
// TODO Auto-generated method stub
return null;
protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, RuntimeContext ctx) throws Exception {
// --------------------------------------------------------------------
// Setup
// --------------------------------------------------------------------
TypeInformation<IN1> inputType1 = getOperatorInfo().getFirstInputType();
TypeInformation<IN2> inputType2 = getOperatorInfo().getSecondInputType();
int[] inputKeys1 = getKeyColumns(0);
int[] inputKeys2 = getKeyColumns(1);
boolean[] inputSortDirections1 = new boolean[inputKeys1.length];
boolean[] inputSortDirections2 = new boolean[inputKeys2.length];
Arrays.fill(inputSortDirections1, true);
Arrays.fill(inputSortDirections2, true);
final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputSortDirections1);
final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputSortDirections2);
CoGroupSortListIterator<IN1, IN2> coGroupIterator =
new CoGroupSortListIterator<IN1, IN2>(input1, inputComparator1, input2, inputComparator2);
// --------------------------------------------------------------------
// Run UDF
// --------------------------------------------------------------------
CoGroupFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, parameters);
List<OUT> result = new ArrayList<OUT>();
Collector<OUT> resultCollector = new ListCollector<OUT>(result);
while (coGroupIterator.next()) {
function.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), resultCollector);
}
FunctionUtils.closeFunction(function);
return result;
}
private <T> TypeComparator<T> getTypeComparator(TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
if (!(inputType instanceof CompositeType)) {
throw new InvalidProgramException("Input types of coGroup must be composite types.");
}
return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections);
}
private static class CoGroupSortListIterator<IN1, IN2> {
private static enum MatchStatus {
NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, FIRST_EMPTY, SECOND_EMPTY
}
private final ListKeyGroupedIterator<IN1> iterator1;
private final ListKeyGroupedIterator<IN2> iterator2;
private final TypePairComparator<IN1, IN2> pairComparator;
private MatchStatus matchStatus;
private Iterable<IN1> firstReturn;
private Iterable<IN2> secondReturn;
private CoGroupSortListIterator(
List<IN1> input1, final TypeComparator<IN1> inputComparator1,
List<IN2> input2, final TypeComparator<IN2> inputComparator2) {
this.pairComparator = new GenericPairComparator<IN1, IN2>(inputComparator1, inputComparator2);
this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, inputComparator1);
this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, inputComparator2);
// ----------------------------------------------------------------
// Sort
// ----------------------------------------------------------------
Collections.sort(input1, new Comparator<IN1>() {
@Override
public int compare(IN1 o1, IN1 o2) {
return inputComparator1.compare(o1, o2);
}
});
Collections.sort(input2, new Comparator<IN2>() {
@Override
public int compare(IN2 o1, IN2 o2) {
return inputComparator2.compare(o1, o2);
}
});
}
private boolean next() throws IOException {
boolean firstEmpty = true;
boolean secondEmpty = true;
if (this.matchStatus != MatchStatus.FIRST_EMPTY) {
if (this.matchStatus == MatchStatus.FIRST_REMAINED) {
// comparator is still set correctly
firstEmpty = false;
}
else {
if (this.iterator1.nextKey()) {
this.pairComparator.setReference(iterator1.getValues().getCurrent());
firstEmpty = false;
}
}
}
if (this.matchStatus != MatchStatus.SECOND_EMPTY) {
if (this.matchStatus == MatchStatus.SECOND_REMAINED) {
secondEmpty = false;
}
else {
if (iterator2.nextKey()) {
secondEmpty = false;
}
}
}
if (firstEmpty && secondEmpty) {
// both inputs are empty
return false;
}
else if (firstEmpty && !secondEmpty) {
// input1 is empty, input2 not
this.firstReturn = Collections.emptySet();
this.secondReturn = this.iterator2.getValues();
this.matchStatus = MatchStatus.FIRST_EMPTY;
return true;
}
else if (!firstEmpty && secondEmpty) {
// input1 is not empty, input 2 is empty
this.firstReturn = this.iterator1.getValues();
this.secondReturn = Collections.emptySet();
this.matchStatus = MatchStatus.SECOND_EMPTY;
return true;
}
else {
// both inputs are not empty
final int comp = this.pairComparator.compareToReference(iterator2.getValues().getCurrent());
if (0 == comp) {
// keys match
this.firstReturn = this.iterator1.getValues();
this.secondReturn = this.iterator2.getValues();
this.matchStatus = MatchStatus.NONE_REMAINED;
}
else if (0 < comp) {
// key1 goes first
this.firstReturn = this.iterator1.getValues();
this.secondReturn = Collections.emptySet();
this.matchStatus = MatchStatus.SECOND_REMAINED;
}
else {
// key 2 goes first
this.firstReturn = Collections.emptySet();
this.secondReturn = this.iterator2.getValues();
this.matchStatus = MatchStatus.FIRST_REMAINED;
}
return true;
}
}
private Iterable<IN1> getValues1() {
return firstReturn;
}
private Iterable<IN2> getValues2() {
return secondReturn;
}
}
}
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.operators.util;
import org.apache.flink.api.common.typeutils.TypeComparator;
......@@ -26,14 +27,11 @@ import java.util.NoSuchElementException;
/**
* The KeyValueIterator returns a key and all values that belong to the key (share the same key).
*
*/
public final class ListKeyGroupedIterator<E> {
private final List<E> input;
// private final TypeSerializer<E> serializer;
private final TypeComparator<E> comparator;
private ValuesIterator valuesIterator;
......@@ -57,8 +55,9 @@ public final class ListKeyGroupedIterator<E> {
}
this.input = input;
// this.serializer = serializer;
this.comparator = comparator;
this.done = input.isEmpty() ? true : false;
}
/**
......@@ -186,5 +185,9 @@ public final class ListKeyGroupedIterator<E> {
public Iterator<E> iterator() {
return this;
}
public E getCurrent() {
return next;
}
}
}
......@@ -35,7 +35,7 @@ import java.util.Arrays;
import java.util.List;
@SuppressWarnings("serial")
public class FlatMapOperatorCollectionExecutionTest implements Serializable {
public class FlatMapOperatorCollectionTest implements Serializable {
@Test
public void testExecuteOnCollection() {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.operators.base;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class CoGroupOperatorCollectionTest implements Serializable {
@Test
public void testExecuteOnCollection() {
try {
List<Tuple2<String, Integer>> input1 = Arrays.asList(
new Tuple2Builder<String, Integer>()
.add("foo", 1)
.add("foobar", 1)
.add("foo", 1)
.add("bar", 1)
.add("foo", 1)
.add("foo", 1)
.build()
);
List<Tuple2<String, Integer>> input2 = Arrays.asList(
new Tuple2Builder<String, Integer>()
.add("foo", 1)
.add("foo", 1)
.add("bar", 1)
.add("foo", 1)
.add("barfoo", 1)
.add("foo", 1)
.build()
);
final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0);
{
SumCoGroup udf = new SumCoGroup();
List<Tuple2<String, Integer>> result = getCoGroupOperator(udf)
.executeOnCollections(input1, input2, ctx);
Assert.assertTrue(udf.isClosed);
Set<Tuple2<String, Integer>> expected = new HashSet<Tuple2<String, Integer>>(
Arrays.asList(new Tuple2Builder<String, Integer>()
.add("foo", 8)
.add("bar", 2)
.add("foobar", 1)
.add("barfoo", 1)
.build()
)
);
Assert.assertEquals(expected, new HashSet(result));
}
{
List<Tuple2<String, Integer>> result = getCoGroupOperator(new SumCoGroup())
.executeOnCollections(Collections.EMPTY_LIST, Collections.EMPTY_LIST, ctx);
Assert.assertEquals(0, result.size());
}
} catch (Throwable t) {
t.printStackTrace();
Assert.fail(t.getMessage());
}
}
private class SumCoGroup extends RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private boolean isOpened = false;
private boolean isClosed = false;
@Override
public void open(Configuration parameters) throws Exception {
isOpened = true;
RuntimeContext ctx = getRuntimeContext();
Assert.assertEquals("Test UDF", ctx.getTaskName());
Assert.assertEquals(4, ctx.getNumberOfParallelSubtasks());
Assert.assertEquals(0, ctx.getIndexOfThisSubtask());
}
@Override
public void coGroup(
Iterable<Tuple2<String, Integer>> first,
Iterable<Tuple2<String, Integer>> second,
Collector<Tuple2<String, Integer>> out) throws Exception {
Assert.assertTrue(isOpened);
Assert.assertFalse(isClosed);
String f0 = null;
int sumF1 = 0;
for (Tuple2<String, Integer> input : first) {
f0 = (f0 == null) ? input.f0 : f0;
sumF1 += input.f1;
}
for (Tuple2<String, Integer> input : second) {
f0 = (f0 == null) ? input.f0 : f0;
sumF1 += input.f1;
}
out.collect(new Tuple2<String, Integer>(f0, sumF1));
}
@Override
public void close() throws Exception {
isClosed = true;
}
}
private CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>, CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>>> getCoGroupOperator(
RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {
return new CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>,
CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>>(
udf,
new BinaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>(
TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>")
),
new int[]{0},
new int[]{0},
"coGroup on Collections"
);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册