diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java index 5caff99415d6d94d03d32aff5180e90695513647..4d7f59039abab96b1c18cb2a28c9afcf7c7d83ce 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericCoGrouper.java @@ -31,27 +31,4 @@ public interface GenericCoGrouper extends Function { */ void coGroup(Iterator records1, Iterator records2, Collector out) throws Exception; - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their first input. In addition, the extending class must be annotated as CombinableFirst. - * - * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - */ - V1 combineFirst(Iterator records) throws Exception; - - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their second input. In addition, the extending class must be annotated as CombinableSecond. - * - * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - */ - V2 combineSecond(Iterator records) throws Exception; } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java index 5ea597763ec5eb23f58055ccaf253ef4d1985096..d9ea8e330d363b78bb84f960ade5181e3cf3f65d 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/CoGroupFunction.java @@ -14,14 +14,11 @@ **********************************************************************************************************************/ package eu.stratosphere.api.java.functions; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.util.Iterator; import eu.stratosphere.api.common.functions.AbstractFunction; import eu.stratosphere.api.common.functions.GenericCoGrouper; +import eu.stratosphere.api.java.operators.Keys; import eu.stratosphere.util.Collector; /** @@ -35,7 +32,7 @@ import eu.stratosphere.util.Collector; * * set1.coGroup(set2).where().equalTo().with(new MyCoGroupFunction()); * - * The keys can be defined in various ways, such as through field names, tuple field positions, or key extractors. + * The keys can be defined through tuple field positions or key extractors. * See {@link Keys} for details. *

* Some keys may only be contained in one of the two original data sets. In that case, the CoGroup function is invoked @@ -48,57 +45,11 @@ public abstract class CoGroupFunction extends AbstractFunction im /** * The core method of the CoGroupFunction. This method is called for each pair of groups that have the same - * key. The elements of the groups are returned by the repestive iterators. + * key. The elements of the groups are returned by the respective iterators. * * It is possible that one of the two groups is empty, in which case the respective iterator has no elements. */ @Override public abstract void coGroup(Iterator first, Iterator second, Collector out) throws Exception; - - - /** - * This function is not implemented by default. It must be overridden, if the function declares through the - * {@link CombinableFirst} annotation that the first input is combinable. - * - * @see eu.stratosphere.api.common.functions.GenericCoGrouper#combineFirst(java.util.Iterator) - */ - @Override - public IN1 combineFirst(Iterator records) throws Exception { - throw new UnsupportedOperationException("combineFirst() is not overridden by this UDF. " + - "Using the CombinableFirst annotation requires that this method is overridden."); - } - /** - * This function is not implemented by default. It must be overridden, if the function declares through the - * {@link CombinableFirst} annotation that the first input is combinable. - * - * @see eu.stratosphere.api.common.functions.GenericCoGrouper#combineFirst(java.util.Iterator) - */ - @Override - public IN2 combineSecond(Iterator records) throws Exception { - throw new UnsupportedOperationException("combineSecond() is not overridden by this UDF. " + - "Using the CombinableSecond annotation requires that this method is overridden."); - } - - // -------------------------------------------------------------------------------------------- - // Annotations for to declare individual CoGroup inputs combinable - // -------------------------------------------------------------------------------------------- - - /** - * This marker interface can be added to a CoGroup function implementation. It declares its first input - * combinable. Similar as for the {@link GroupReduceFunction} function, the framework may invoke the - * {@link CoGroupFunction#combineFirst(Iterator)} function to reduce the data volume early. - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableFirst {}; - - /** - * This marker interface can be added to a CoGroup function implementation. It declares its second input - * combinable. Similar as for the {@link GroupReduceFunction} function, the framework may invoke the - * {@link CoGroupFunction#combineSecond(Iterator)} function to reduce the data volume early. - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableSecond {}; } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java index e1d4e17f3e6095571eee5dbd593ffb1d977f090e..ecef514760568ee49c361f4e9dc1b355e46e4531 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java @@ -103,17 +103,7 @@ public class PlanUnwrappingCoGroupOperator public void coGroup(Iterator> records1, Iterator> records2, Collector out) throws Exception { this.wrappedFunction.coGroup(new UnwrappingKeyIterator(records1), new UnwrappingKeyIterator(records2), out); } - - - @Override - public Tuple2 combineFirst(Iterator> records) throws Exception { - throw new UnsupportedOperationException(); - } - - @Override - public Tuple2 combineSecond(Iterator> records) throws Exception { - throw new UnsupportedOperationException(); - } + } public static class UnwrappingKeyIterator implements Iterator { diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java index 41fb8935e41803ab3465ce8777445509c02b0a1a..3a81d34402ce80b95552542bab539a17d6e2d1fd 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/CoGroupFunction.java @@ -43,39 +43,4 @@ public abstract class CoGroupFunction extends AbstractFunction implements Generi @Override public abstract void coGroup(Iterator records1, Iterator records2, Collector out) throws Exception; - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their first input. In addition, the extending class must be annotated as CombinableFirst. - *

- * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the combine task and lets the fail-over logic - * decide whether to retry the combiner execution. - */ - @Override - public Record combineFirst(Iterator records) throws Exception { - throw new UnsupportedOperationException(); - } - - /** - * This method must be overridden by CoGoup UDFs that want to make use of the combining feature - * on their second input. In addition, the extending class must be annotated as CombinableSecond. - *

- * The use of the combiner is typically a pre-reduction of the data. - * - * @param records The records to be combined. - * @param out The collector to write the result to. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the combine task and lets the fail-over logic - * decide whether to retry the combiner execution. - */ - @Override - public Record combineSecond(Iterator records) throws Exception { - throw new UnsupportedOperationException(); - } } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java index c50264278d9a33a138a726ed212a445efbed713f..159cf3404ddffdc0faed2db06cc4ceb9c9eac1c2 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/CoGroupOperator.java @@ -13,10 +13,6 @@ package eu.stratosphere.api.java.record.operators; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -106,22 +102,14 @@ public class CoGroupOperator extends CoGroupOperatorBase implem @Override public boolean isCombinableFirst() { - return super.isCombinableFirst() || getUserCodeWrapper().getUserCodeAnnotation(CombinableFirst.class) != null; + return false; } @Override public boolean isCombinableSecond() { - return super.isCombinableSecond() || getUserCodeWrapper().getUserCodeAnnotation(CombinableSecond.class) != null; + return false; } - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableFirst {}; - - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - public static @interface CombinableSecond {}; - // -------------------------------------------------------------------------------------------- @@ -168,7 +156,7 @@ public class CoGroupOperator extends CoGroupOperatorBase implem } /** - * Creates a Builder with the provided {@link JoinFunction} implementation. This method is intended + * Creates a Builder with the provided {@link CoGroupFunction} implementation. This method is intended * for special case sub-types only. * * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java index 50c9fad48d87bb3ea9b7ec442f0b3bf212262826..f299f2e8cfc4027c655a38830f44ed2b85a22fc0 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsITCase.java @@ -27,7 +27,6 @@ import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFiel import eu.stratosphere.api.java.record.io.CsvInputFormat; import eu.stratosphere.api.java.record.io.CsvOutputFormat; import eu.stratosphere.api.java.record.operators.CoGroupOperator; -import eu.stratosphere.api.java.record.operators.CoGroupOperator.CombinableFirst; import eu.stratosphere.api.java.record.operators.JoinOperator; import eu.stratosphere.api.java.record.operators.MapOperator; import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap; @@ -77,7 +76,6 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase { // -------------------------------------------------------------------------------------------- - @CombinableFirst @ConstantFieldsFirst(0) @ConstantFieldsSecond(0) public static final class MinIdAndUpdate extends CoGroupFunction implements Serializable { @@ -108,20 +106,6 @@ public class CoGroupConnectedComponentsITCase extends RecordAPITestBase { out.collect(old); } } - - @Override - public Record combineFirst(Iterator records) { - Record next = null; - long min = Long.MAX_VALUE; - while (records.hasNext()) { - next = records.next(); - min = Math.min(min, next.getField(1, LongValue.class).getValue()); - } - - newComponentId.setValue(min); - next.setField(1, newComponentId); - return next; - } } @SuppressWarnings("unchecked") diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java index a24a75d958955902eb9e4cc239111ece99c5cdfb..927d380f646bb7c4171c77dbea7463ea2c3cc94f 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java @@ -119,10 +119,5 @@ public class CustomCompensatableDotProductCoGroup extends AbstractFunction imple aggregator.reset(); } } - - @Override - public VertexWithRankAndDangling combineFirst(Iterator records) { throw new UnsupportedOperationException(); } - - @Override - public VertexWithRank combineSecond(Iterator records) { throw new UnsupportedOperationException(); } + } diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java index 923e4ad50f7f84710dade59f01a48a84054a4d34..c50301827f335632b6908655eff4dc366ad6188c 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/graph/ConnectedComponentsWithCoGroup.java @@ -18,9 +18,9 @@ import java.util.Iterator; import eu.stratosphere.api.common.Plan; import eu.stratosphere.api.common.Program; +import eu.stratosphere.api.common.operators.DeltaIteration; import eu.stratosphere.api.common.operators.FileDataSink; import eu.stratosphere.api.common.operators.FileDataSource; -import eu.stratosphere.api.common.operators.DeltaIteration; import eu.stratosphere.api.java.record.functions.CoGroupFunction; import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst; import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond; @@ -29,7 +29,6 @@ import eu.stratosphere.api.java.record.io.CsvOutputFormat; import eu.stratosphere.api.java.record.operators.CoGroupOperator; import eu.stratosphere.api.java.record.operators.JoinOperator; import eu.stratosphere.api.java.record.operators.MapOperator; -import eu.stratosphere.api.java.record.operators.CoGroupOperator.CombinableFirst; import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap; import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin; import eu.stratosphere.types.LongValue; @@ -44,7 +43,6 @@ public class ConnectedComponentsWithCoGroup implements Program { private static final long serialVersionUID = 1L; - @CombinableFirst @ConstantFieldsFirst(0) @ConstantFieldsSecond(0) public static final class MinIdAndUpdate extends CoGroupFunction implements Serializable { @@ -76,19 +74,6 @@ public class ConnectedComponentsWithCoGroup implements Program { } } - @Override - public Record combineFirst(Iterator records) { - Record next = null; - long min = Long.MAX_VALUE; - while (records.hasNext()) { - next = records.next(); - min = Math.min(min, next.getField(1, LongValue.class).getValue()); - } - - newComponentId.setValue(min); - next.setField(1, newComponentId); - return next; - } } @SuppressWarnings("unchecked")