提交 a6d188cd 编写于 作者: F Fabian Hueske

Removed combine methods from CoGroup function and some minor improvements.

上级 9e1db403
......@@ -31,27 +31,4 @@ public interface GenericCoGrouper<V1, V2, O> extends Function {
*/
void coGroup(Iterator<V1> records1, Iterator<V2> records2, Collector<O> out) throws Exception;
/**
* This method must be overridden by CoGoup UDFs that want to make use of the combining feature
* on their first input. In addition, the extending class must be annotated as CombinableFirst.
*
* The use of the combiner is typically a pre-reduction of the data.
*
* @param records The records to be combined.
* @param out The collector to write the result to.
*
*/
V1 combineFirst(Iterator<V1> records) throws Exception;
/**
* This method must be overridden by CoGoup UDFs that want to make use of the combining feature
* on their second input. In addition, the extending class must be annotated as CombinableSecond.
*
* The use of the combiner is typically a pre-reduction of the data.
*
* @param records The records to be combined.
* @param out The collector to write the result to.
*
*/
V2 combineSecond(Iterator<V2> records) throws Exception;
}
......@@ -14,14 +14,11 @@
**********************************************************************************************************************/
package eu.stratosphere.api.java.functions;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Iterator;
import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.common.functions.GenericCoGrouper;
import eu.stratosphere.api.java.operators.Keys;
import eu.stratosphere.util.Collector;
/**
......@@ -35,7 +32,7 @@ import eu.stratosphere.util.Collector;
*
* set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
* </pre>
* The keys can be defined in various ways, such as through field names, tuple field positions, or key extractors.
* The keys can be defined through tuple field positions or key extractors.
* See {@link Keys} for details.
* <p>
* Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked
......@@ -48,57 +45,11 @@ public abstract class CoGroupFunction<IN1, IN2, OUT> extends AbstractFunction im
/**
* The core method of the CoGroupFunction. This method is called for each pair of groups that have the same
* key. The elements of the groups are returned by the repestive iterators.
* key. The elements of the groups are returned by the respective iterators.
*
* It is possible that one of the two groups is empty, in which case the respective iterator has no elements.
*/
@Override
public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, Collector<OUT> out) throws Exception;
/**
* This function is not implemented by default. It must be overridden, if the function declares through the
* {@link CombinableFirst} annotation that the first input is combinable.
*
* @see eu.stratosphere.api.common.functions.GenericCoGrouper#combineFirst(java.util.Iterator)
*/
@Override
public IN1 combineFirst(Iterator<IN1> records) throws Exception {
throw new UnsupportedOperationException("combineFirst() is not overridden by this UDF. " +
"Using the CombinableFirst annotation requires that this method is overridden.");
}
/**
* This function is not implemented by default. It must be overridden, if the function declares through the
* {@link CombinableFirst} annotation that the first input is combinable.
*
* @see eu.stratosphere.api.common.functions.GenericCoGrouper#combineFirst(java.util.Iterator)
*/
@Override
public IN2 combineSecond(Iterator<IN2> records) throws Exception {
throw new UnsupportedOperationException("combineSecond() is not overridden by this UDF. " +
"Using the CombinableSecond annotation requires that this method is overridden.");
}
// --------------------------------------------------------------------------------------------
// Annotations for to declare individual CoGroup inputs combinable
// --------------------------------------------------------------------------------------------
/**
* This marker interface can be added to a CoGroup function implementation. It declares its first input
* combinable. Similar as for the {@link GroupReduceFunction} function, the framework may invoke the
* {@link CoGroupFunction#combineFirst(Iterator)} function to reduce the data volume early.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public static @interface CombinableFirst {};
/**
* This marker interface can be added to a CoGroup function implementation. It declares its second input
* combinable. Similar as for the {@link GroupReduceFunction} function, the framework may invoke the
* {@link CoGroupFunction#combineSecond(Iterator)} function to reduce the data volume early.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public static @interface CombinableSecond {};
}
......@@ -103,17 +103,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
public void coGroup(Iterator<Tuple2<K, I1>> records1, Iterator<Tuple2<K, I2>> records2, Collector<OUT> out) throws Exception {
this.wrappedFunction.coGroup(new UnwrappingKeyIterator<K, I1>(records1), new UnwrappingKeyIterator<K, I2>(records2), out);
}
@Override
public Tuple2<K, I1> combineFirst(Iterator<Tuple2<K, I1>> records) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public Tuple2<K, I2> combineSecond(Iterator<Tuple2<K, I2>> records) throws Exception {
throw new UnsupportedOperationException();
}
}
public static class UnwrappingKeyIterator<K, I1> implements Iterator<I1> {
......
......@@ -43,39 +43,4 @@ public abstract class CoGroupFunction extends AbstractFunction implements Generi
@Override
public abstract void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception;
/**
* This method must be overridden by CoGoup UDFs that want to make use of the combining feature
* on their first input. In addition, the extending class must be annotated as CombinableFirst.
* <p>
* The use of the combiner is typically a pre-reduction of the data.
*
* @param records The records to be combined.
* @param out The collector to write the result to.
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the combine task and lets the fail-over logic
* decide whether to retry the combiner execution.
*/
@Override
public Record combineFirst(Iterator<Record> records) throws Exception {
throw new UnsupportedOperationException();
}
/**
* This method must be overridden by CoGoup UDFs that want to make use of the combining feature
* on their second input. In addition, the extending class must be annotated as CombinableSecond.
* <p>
* The use of the combiner is typically a pre-reduction of the data.
*
* @param records The records to be combined.
* @param out The collector to write the result to.
*
* @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
* runtime catches an exception, it aborts the combine task and lets the fail-over logic
* decide whether to retry the combiner execution.
*/
@Override
public Record combineSecond(Iterator<Record> records) throws Exception {
throw new UnsupportedOperationException();
}
}
......@@ -13,10 +13,6 @@
package eu.stratosphere.api.java.record.operators;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -106,22 +102,14 @@ public class CoGroupOperator extends CoGroupOperatorBase<CoGroupFunction> implem
@Override
public boolean isCombinableFirst() {
return super.isCombinableFirst() || getUserCodeWrapper().getUserCodeAnnotation(CombinableFirst.class) != null;
return false;
}
@Override
public boolean isCombinableSecond() {
return super.isCombinableSecond() || getUserCodeWrapper().getUserCodeAnnotation(CombinableSecond.class) != null;
return false;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public static @interface CombinableFirst {};
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public static @interface CombinableSecond {};
// --------------------------------------------------------------------------------------------
......@@ -168,7 +156,7 @@ public class CoGroupOperator extends CoGroupOperatorBase<CoGroupFunction> implem
}
/**
* Creates a Builder with the provided {@link JoinFunction} implementation. This method is intended
* Creates a Builder with the provided {@link CoGroupFunction} implementation. This method is intended
* for special case sub-types only.
*
* @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
......
......@@ -27,7 +27,6 @@ import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFiel
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.CoGroupOperator;
import eu.stratosphere.api.java.record.operators.CoGroupOperator.CombinableFirst;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap;
......@@ -77,7 +76,6 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
// --------------------------------------------------------------------------------------------
@CombinableFirst
@ConstantFieldsFirst(0)
@ConstantFieldsSecond(0)
public static final class MinIdAndUpdate extends CoGroupFunction implements Serializable {
......@@ -108,20 +106,6 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase {
out.collect(old);
}
}
@Override
public Record combineFirst(Iterator<Record> records) {
Record next = null;
long min = Long.MAX_VALUE;
while (records.hasNext()) {
next = records.next();
min = Math.min(min, next.getField(1, LongValue.class).getValue());
}
newComponentId.setValue(min);
next.setField(1, newComponentId);
return next;
}
}
@SuppressWarnings("unchecked")
......
......@@ -119,10 +119,5 @@ public class CustomCompensatableDotProductCoGroup extends AbstractFunction imple
aggregator.reset();
}
}
@Override
public VertexWithRankAndDangling combineFirst(Iterator<VertexWithRankAndDangling> records) { throw new UnsupportedOperationException(); }
@Override
public VertexWithRank combineSecond(Iterator<VertexWithRank> records) { throw new UnsupportedOperationException(); }
}
......@@ -18,9 +18,9 @@ import java.util.Iterator;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.java.record.functions.CoGroupFunction;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond;
......@@ -29,7 +29,6 @@ import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.CoGroupOperator;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.CoGroupOperator.CombinableFirst;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin;
import eu.stratosphere.types.LongValue;
......@@ -44,7 +43,6 @@ public class ConnectedComponentsWithCoGroup implements Program {
private static final long serialVersionUID = 1L;
@CombinableFirst
@ConstantFieldsFirst(0)
@ConstantFieldsSecond(0)
public static final class MinIdAndUpdate extends CoGroupFunction implements Serializable {
......@@ -76,19 +74,6 @@ public class ConnectedComponentsWithCoGroup implements Program {
}
}
@Override
public Record combineFirst(Iterator<Record> records) {
Record next = null;
long min = Long.MAX_VALUE;
while (records.hasNext()) {
next = records.next();
min = Math.min(min, next.getField(1, LongValue.class).getValue());
}
newComponentId.setValue(min);
next.setField(1, newComponentId);
return next;
}
}
@SuppressWarnings("unchecked")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册