From a6d188cda8ba527db6e0f3cee57df85821be53af Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Thu, 15 May 2014 18:03:21 +0200 Subject: [PATCH] Removed combine methods from CoGroup function and some minor improvements. --- .../common/functions/GenericCoGrouper.java | 23 -------- .../api/java/functions/CoGroupFunction.java | 55 +------------------ .../PlanUnwrappingCoGroupOperator.java | 12 +--- .../record/functions/CoGroupFunction.java | 35 ------------ .../record/operators/CoGroupOperator.java | 18 +----- .../CoGroupConnectedComponentsITCase.java | 16 ------ .../CustomCompensatableDotProductCoGroup.java | 7 +-- .../graph/ConnectedComponentsWithCoGroup.java | 17 +----- 8 files changed, 9 insertions(+), 174 deletions(-) diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java index 5caff99415d..4d7f59039ab 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java @@ -31,27 +31,4 @@ public interface GenericCoGrouper extends Function { */ void coGroup(Iterator records1, Iterator records2, Collector out) throws Exception; - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their first input. In addition, the extending class must be annotated as CombinableFirst. - * - * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - */ - V1 combineFirst(Iterator records) throws Exception; - - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their second input. In addition, the extending class must be annotated as CombinableSecond. - * - * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - */ - V2 combineSecond(Iterator records) throws Exception; } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java index 5ea597763ec..d9ea8e330d3 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java @@ -14,14 +14,11 @@ **********************************************************************************************************************/ package eu.stratosphere.api.java.functions; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.util.Iterator; import eu.stratosphere.api.common.functions.AbstractFunction; import eu.stratosphere.api.common.functions.GenericCoGrouper; +import eu.stratosphere.api.java.operators.Keys; import eu.stratosphere.util.Collector; /** @@ -35,7 +32,7 @@ import eu.stratosphere.util.Collector; * * set1.coGroup(set2).where().equalTo().with(new MyCoGroupFunction()); * - * The keys can be defined in various ways, such as through field names, tuple field positions, or key extractors. + * The keys can be defined through tuple field positions or key extractors. * See {@link Keys} for details. *

* Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked @@ -48,57 +45,11 @@ public abstract class CoGroupFunction extends AbstractFunction im /** * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same - * key. The elements of the groups are returned by the repestive iterators. + * key. The elements of the groups are returned by the respective iterators. * * It is possible that one of the two groups is empty, in which case the respective iterator has no elements. */ @Override public abstract void coGroup(Iterator first, Iterator second, Collector out) throws Exception; - - - /** - * This function is not implemented by default. It must be overridden, if the function declares through the - * {@link CombinableFirst} annotation that the first input is combinable. - * - * @see eu.stratosphere.api.common.functions.GenericCoGrouper#combineFirst(java.util.Iterator) - */ - @Override - public IN1 combineFirst(Iterator records) throws Exception { - throw new UnsupportedOperationException("combineFirst() is not overridden by this UDF. " + - "Using the CombinableFirst annotation requires that this method is overridden."); - } - /** - * This function is not implemented by default. It must be overridden, if the function declares through the - * {@link CombinableFirst} annotation that the first input is combinable. - * - * @see eu.stratosphere.api.common.functions.GenericCoGrouper#combineFirst(java.util.Iterator) - */ - @Override - public IN2 combineSecond(Iterator records) throws Exception { - throw new UnsupportedOperationException("combineSecond() is not overridden by this UDF. " + - "Using the CombinableSecond annotation requires that this method is overridden."); - } - - // -------------------------------------------------------------------------------------------- - // Annotations for to declare individual CoGroup inputs combinable - // -------------------------------------------------------------------------------------------- - - /** - * This marker interface can be added to a CoGroup function implementation. It declares its first input - * combinable. Similar as for the {@link GroupReduceFunction} function, the framework may invoke the - * {@link CoGroupFunction#combineFirst(Iterator)} function to reduce the data volume early. - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableFirst {}; - - /** - * This marker interface can be added to a CoGroup function implementation. It declares its second input - * combinable. Similar as for the {@link GroupReduceFunction} function, the framework may invoke the - * {@link CoGroupFunction#combineSecond(Iterator)} function to reduce the data volume early. - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableSecond {}; } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java index e1d4e17f3e6..ecef5147605 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java @@ -103,17 +103,7 @@ public class PlanUnwrappingCoGroupOperator public void coGroup(Iterator> records1, Iterator> records2, Collector out) throws Exception { this.wrappedFunction.coGroup(new UnwrappingKeyIterator(records1), new UnwrappingKeyIterator(records2), out); } - - - @Override - public Tuple2 combineFirst(Iterator> records) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - public Tuple2 combineSecond(Iterator> records) throws Exception { - throw new UnsupportedOperationException(); - } + } public static class UnwrappingKeyIterator implements Iterator { diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java index 41fb8935e41..3a81d34402c 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java @@ -43,39 +43,4 @@ public abstract class CoGroupFunction extends AbstractFunction implements Generi @Override public abstract void coGroup(Iterator records1, Iterator records2, Collector out) throws Exception; - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their first input. In addition, the extending class must be annotated as CombinableFirst. - *

- * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the combine task and lets the fail-over logic - * decide whether to retry the combiner execution. - */ - @Override - public Record combineFirst(Iterator records) throws Exception { - throw new UnsupportedOperationException(); - } - - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their second input. In addition, the extending class must be annotated as CombinableSecond. - *

- * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the combine task and lets the fail-over logic - * decide whether to retry the combiner execution. - */ - @Override - public Record combineSecond(Iterator records) throws Exception { - throw new UnsupportedOperationException(); - } } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java index c50264278d9..159cf3404dd 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java @@ -13,10 +13,6 @@ package eu.stratosphere.api.java.record.operators; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -106,22 +102,14 @@ public class CoGroupOperator extends CoGroupOperatorBase implem @Override public boolean isCombinableFirst() { - return super.isCombinableFirst() || getUserCodeWrapper().getUserCodeAnnotation(CombinableFirst.class) != null; + return false; } @Override public boolean isCombinableSecond() { - return super.isCombinableSecond() || getUserCodeWrapper().getUserCodeAnnotation(CombinableSecond.class) != null; + return false; } - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableFirst {}; - - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableSecond {}; - // -------------------------------------------------------------------------------------------- @@ -168,7 +156,7 @@ public class CoGroupOperator extends CoGroupOperatorBase implem } /** - * Creates a Builder with the provided {@link JoinFunction} implementation. This method is intended + * Creates a Builder with the provided {@link CoGroupFunction} implementation. This method is intended * for special case sub-types only. * * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java index 50c9fad48d8..f299f2e8cfc 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java @@ -27,7 +27,6 @@ import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFiel import eu.stratosphere.api.java.record.io.CsvInputFormat; import eu.stratosphere.api.java.record.io.CsvOutputFormat; import eu.stratosphere.api.java.record.operators.CoGroupOperator; -import eu.stratosphere.api.java.record.operators.CoGroupOperator.CombinableFirst; import eu.stratosphere.api.java.record.operators.JoinOperator; import eu.stratosphere.api.java.record.operators.MapOperator; import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap; @@ -77,7 +76,6 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase { // -------------------------------------------------------------------------------------------- - @CombinableFirst @ConstantFieldsFirst(0) @ConstantFieldsSecond(0) public static final class MinIdAndUpdate extends CoGroupFunction implements Serializable { @@ -108,20 +106,6 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase { out.collect(old); } } - - @Override - public Record combineFirst(Iterator records) { - Record next = null; - long min = Long.MAX_VALUE; - while (records.hasNext()) { - next = records.next(); - min = Math.min(min, next.getField(1, LongValue.class).getValue()); - } - - newComponentId.setValue(min); - next.setField(1, newComponentId); - return next; - } } @SuppressWarnings("unchecked") diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java index a24a75d9589..927d380f646 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java @@ -119,10 +119,5 @@ public class CustomCompensatableDotProductCoGroup extends AbstractFunction imple aggregator.reset(); } } - - @Override - public VertexWithRankAndDangling combineFirst(Iterator records) { throw new UnsupportedOperationException(); } - - @Override - public VertexWithRank combineSecond(Iterator records) { throw new UnsupportedOperationException(); } + } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java index 923e4ad50f7..c50301827f3 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java @@ -18,9 +18,9 @@ import java.util.Iterator; import eu.stratosphere.api.common.Plan; import eu.stratosphere.api.common.Program; +import eu.stratosphere.api.common.operators.DeltaIteration; import eu.stratosphere.api.common.operators.FileDataSink; import eu.stratosphere.api.common.operators.FileDataSource; -import eu.stratosphere.api.common.operators.DeltaIteration; import eu.stratosphere.api.java.record.functions.CoGroupFunction; import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst; import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond; @@ -29,7 +29,6 @@ import eu.stratosphere.api.java.record.io.CsvOutputFormat; import eu.stratosphere.api.java.record.operators.CoGroupOperator; import eu.stratosphere.api.java.record.operators.JoinOperator; import eu.stratosphere.api.java.record.operators.MapOperator; -import eu.stratosphere.api.java.record.operators.CoGroupOperator.CombinableFirst; import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap; import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin; import eu.stratosphere.types.LongValue; @@ -44,7 +43,6 @@ public class ConnectedComponentsWithCoGroup implements Program { private static final long serialVersionUID = 1L; - @CombinableFirst @ConstantFieldsFirst(0) @ConstantFieldsSecond(0) public static final class MinIdAndUpdate extends CoGroupFunction implements Serializable { @@ -76,19 +74,6 @@ public class ConnectedComponentsWithCoGroup implements Program { } } - @Override - public Record combineFirst(Iterator records) { - Record next = null; - long min = Long.MAX_VALUE; - while (records.hasNext()) { - next = records.next(); - min = Math.min(min, next.getField(1, LongValue.class).getValue()); - } - - newComponentId.setValue(min); - next.setField(1, newComponentId); - return next; - } } @SuppressWarnings("unchecked") -- GitLab