>> getMessagingBcastVars() {
+ return this.bcVarsMessaging;
+ }
+
+
+}
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 8488be695c570440db793bddc87df181d3bc826b..79da664aa98dca08ef84be92c28801567ff10285 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -60,9 +60,7 @@ import com.google.common.base.Preconditions;
*
*
* Vertex-centric graph iterations are instantiated by the
- * {@link #withPlainEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, or the
- * {@link #withValuedEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method, depending on whether
- * the graph's edges are carrying values.
+ * {@link #withEdges(DataSet, VertexUpdateFunction, MessagingFunction, int)} method.
*
* @param The type of the vertex key (the vertex identifier).
* @param The type of the vertex value (the state of the vertex).
@@ -84,7 +82,7 @@ public class VertexCentricIteration
private DataSet> initialVertices;
- private IterationConfiguration configuration;
+ private VertexCentricConfiguration configuration;
// ----------------------------------------------------------------------------------
@@ -362,14 +360,14 @@ public class VertexCentricIteration
*
* @param parameters the configuration parameters
*/
- public void configure(IterationConfiguration parameters) {
+ public void configure(VertexCentricConfiguration parameters) {
this.configuration = parameters;
}
/**
* @return the configuration parameters of this vertex-centric iteration
*/
- public IterationConfiguration getIterationConfiguration() {
+ public VertexCentricConfiguration getIterationConfiguration() {
return this.configuration;
}
}
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 561c87a5c25443350c13cbfc77db57c221ee47c6..9122053b419aaa8f46b3b41b00d026a2e1ed4cfc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -114,7 +114,7 @@ public abstract class VertexUpdateFunction impl
/**
* Gets the broadcast data set registered under the given name. Broadcast data sets
* are available on all parallel instances of a function. They can be registered via
- * {@link VertexCentricIteration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
+ * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
*
* @param name The name under which the broadcast set is registered.
* @return The broadcast data set.
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
new file mode 100644
index 0000000000000000000000000000000000000000..5f5f8b29d562fac9a2008e85cce4f3346e205b15
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.IterationConfiguration;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase {
+
+ public GatherSumApplyConfigurationITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Test
+ public void testRunWithConfiguration() throws Exception {
+ /*
+ * Test Graph's runGatherSumApplyIteration when configuration parameters are provided
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+ TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+ // create the configuration object
+ GSAConfiguration parameters = new GSAConfiguration();
+
+ parameters.addBroadcastSetForGatherFunction("gatherBcastSet", env.fromElements(1, 2, 3));
+ parameters.addBroadcastSetForSumFunction("sumBcastSet", env.fromElements(4, 5, 6));
+ parameters.addBroadcastSetForApplyFunction("applyBcastSet", env.fromElements(7, 8, 9));
+ parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+
+ Graph result = graph.runGatherSumApplyIteration(new Gather(), new Sum(),
+ new Apply(), 10, parameters);
+
+ result.getVertices().writeAsCsv(resultPath, "\n", "\t");
+ env.execute();
+
+ expectedResult = "1 11\n" +
+ "2 11\n" +
+ "3 11\n" +
+ "4 11\n" +
+ "5 11";
+ }
+
+ @Test
+ public void testIterationConfiguration() throws Exception {
+
+ /*
+ * Test name, parallelism and solutionSetUnmanaged parameters
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ GatherSumApplyIteration iteration = GatherSumApplyIteration
+ .withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyGather(),
+ new DummySum(), new DummyApply(), 10);
+
+ GSAConfiguration parameters = new GSAConfiguration();
+ parameters.setName("gelly iteration");
+ parameters.setParallelism(2);
+ parameters.setSolutionSetUnmanagedMemory(true);
+
+ iteration.configure(parameters);
+
+ Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+ Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+ Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+ DataSet> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+
+ result.writeAsCsv(resultPath, "\n", "\t");
+ env.execute();
+ expectedResult = "1 11\n" +
+ "2 12\n" +
+ "3 13\n" +
+ "4 14\n" +
+ "5 15";
+ }
+
+ @SuppressWarnings("serial")
+ private static final class Gather extends GatherFunction {
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List> bcastSet = (List>)(List>)getBroadcastSet("gatherBcastSet");
+ Assert.assertEquals(1, bcastSet.get(0));
+ Assert.assertEquals(2, bcastSet.get(1));
+ Assert.assertEquals(3, bcastSet.get(2));
+
+ // test aggregator
+ if (getSuperstepNumber() == 2) {
+ long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+
+ Assert.assertEquals(7, aggrValue);
+ }
+ }
+
+ public Long gather(Neighbor neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class Sum extends SumFunction {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List> bcastSet = (List>)(List>)getBroadcastSet("sumBcastSet");
+ Assert.assertEquals(4, bcastSet.get(0));
+ Assert.assertEquals(5, bcastSet.get(1));
+ Assert.assertEquals(6, bcastSet.get(2));
+
+ // test aggregator
+ aggregator = getIterationAggregator("superstepAggregator");
+ }
+
+ public Long sum(Long newValue, Long currentValue) {
+ long superstep = getSuperstepNumber();
+ aggregator.aggregate(superstep);
+ return 0l;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class Apply extends ApplyFunction {
+
+ LongSumAggregator aggregator = new LongSumAggregator();
+
+ @Override
+ public void preSuperstep() {
+
+ // test bcast variable
+ @SuppressWarnings("unchecked")
+ List> bcastSet = (List>)(List>)getBroadcastSet("applyBcastSet");
+ Assert.assertEquals(7, bcastSet.get(0));
+ Assert.assertEquals(8, bcastSet.get(1));
+ Assert.assertEquals(9, bcastSet.get(2));
+
+ // test aggregator
+ aggregator = getIterationAggregator("superstepAggregator");
+ }
+
+ public void apply(Long summedValue, Long origValue) {
+ long superstep = getSuperstepNumber();
+ aggregator.aggregate(superstep);
+ setResult(origValue + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DummyGather extends GatherFunction {
+
+ public Long gather(Neighbor neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DummySum extends SumFunction {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return 0l;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class DummyApply extends ApplyFunction {
+
+ public void apply(Long summedValue, Long origValue) {
+ setResult(origValue + 1);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class AssignOneMapper implements MapFunction, Long> {
+
+ public Long map(Vertex value) {
+ return 1l;
+ }
+ }
+}
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index b49707085cc3df7f7b5c64eb7cff08e0e5163335..4e8412cadf94899542f804f76c83c33216d6b220 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -27,12 +27,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.IterationConfiguration;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.junit.After;
@@ -78,7 +78,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
// create the configuration object
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
@@ -87,7 +87,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
Graph result = graph.runVertexCentricIteration(
new UpdateFunction(), new MessageFunction(), 10, parameters);
- result.getVertices().map(new VertexToTuple2Map()).writeAsCsv(resultPath, "\n", "\t");
+ result.getVertices().writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 11\n" +
"2 11\n" +
@@ -108,7 +108,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(),
new DummyMessageFunction(), 10);
- IterationConfiguration parameters = new IterationConfiguration();
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
parameters.setName("gelly iteration");
parameters.setParallelism(2);
parameters.setSolutionSetUnmanagedMemory(true);
@@ -121,7 +121,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
- result.map(new VertexToTuple2Map()).writeAsCsv(resultPath, "\n", "\t");
+ result.writeAsCsv(resultPath, "\n", "\t");
env.execute();
expectedResult = "1 11\n" +
"2 12\n" +