提交 0efa6441 编写于 作者: G Greg Hogan

[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic

Adds a GraphAnalytic to replace the checksumHashCode Java and Scala
utility functions.

This closes #2188
上级 149e7a01
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.Utils
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.AbstractID
import scala.reflect.ClassTag
package object utils {
/**
* This class provides utility methods for computing checksums over a Graph.
*
* @param self Graph
*/
implicit class GraphUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
/**
* Convenience method to get the count (number of elements) of a Graph
* as well as the checksum (sum over element hashes). The vertex and
* edge DataSets are processed in a single job and the resultant counts
* and checksums are merged locally.
*
* @return the checksum over the vertices and edges
*/
@throws(classOf[Exception])
def checksumHashCode(): Utils.ChecksumHashCode = {
val verticesId = new AbstractID().toString
self.getVertices.output(new Utils.ChecksumHashCodeHelper[Vertex[K,VV]](verticesId))
val edgesId = new AbstractID().toString
self.getEdges.output(new Utils.ChecksumHashCodeHelper[Edge[K,EV]](edgesId))
val res = self.getWrappedGraph.getContext.execute()
val checksum = res.getAccumulatorResult[Utils.ChecksumHashCode](verticesId)
checksum.add(res.getAccumulatorResult[Utils.ChecksumHashCode](edgesId))
checksum
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.util
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.utils._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@RunWith(classOf[Parameterized])
class GraphUtilsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testChecksumHashCodeVerticesAndEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val checksum = graph.checksumHashCode()
assertEquals(checksum.getCount, 12L)
assertEquals(checksum.getChecksum, 19665L)
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.metric;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.AbstractGraphAnalytic;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.AbstractID;
/**
* Convenience method to get the count (number of elements) of a Graph
* as well as the checksum (sum over element hashes). The vertex and
* edge DataSets are processed in a single job and the resultant counts
* and checksums are merged locally.
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public class ChecksumHashCode<K, VV, EV>
extends AbstractGraphAnalytic<K, VV, EV, Utils.ChecksumHashCode> {
private String verticesId = new AbstractID().toString();
private String edgesId = new AbstractID().toString();
@Override
public ChecksumHashCode<K, VV, EV> run(Graph<K, VV, EV> input)
throws Exception {
super.run(input);
input
.getVertices()
.output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId))
.name("ChecksumHashCode vertices");
input
.getEdges()
.output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId))
.name("ChecksumHashCode edges");
return this;
}
@Override
public Utils.ChecksumHashCode getResult() {
JobExecutionResult res = env.getLastJobExecutionResult();
Utils.ChecksumHashCode checksum = res.getAccumulatorResult(verticesId);
checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
return checksum;
}
}
......@@ -18,45 +18,15 @@
package org.apache.flink.graph.utils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.AbstractID;
import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
public class GraphUtils {
/**
* Convenience method to get the count (number of elements) of a Graph
* as well as the checksum (sum over element hashes). The vertex and
* edge DataSets are processed in a single job and the resultant counts
* and checksums are merged locally.
*
* @param graph Graph over which to compute the count and checksum
* @return the checksum over the vertices and edges
*/
public static <K, VV, EV> Utils.ChecksumHashCode checksumHashCode(Graph<K, VV, EV> graph) throws Exception {
final String verticesId = new AbstractID().toString();
graph.getVertices().output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)).name("ChecksumHashCode vertices");
final String edgesId = new AbstractID().toString();
graph.getEdges().output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)).name("ChecksumHashCode edges");
JobExecutionResult res = graph.getContext().execute();
Utils.ChecksumHashCode checksum = res.<Utils.ChecksumHashCode>getAccumulatorResult(verticesId);
checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
return checksum;
}
/**
* Count the number of elements in a DataSet.
*
......
......@@ -18,10 +18,10 @@
package org.apache.flink.graph.asm.degree.filter.undirected;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.library.metric.ChecksumHashCode;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
......@@ -62,8 +62,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode checksum = GraphUtils.checksumHashCode(undirectedRMatGraph
.run(new MaximumDegree<LongValue, NullValue, NullValue>(16)));
Utils.ChecksumHashCode checksum = undirectedRMatGraph
.run(new MaximumDegree<LongValue, NullValue, NullValue>(16))
.run(new ChecksumHashCode<LongValue, NullValue, NullValue>())
.execute();
assertEquals(805, checksum.getCount());
assertEquals(0x0000000008028b43L, checksum.getChecksum());
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.graph.library.metric;
import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.test.TestGraphUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ChecksumHashCodeTest
extends AsmTestBase {
@Test
public void testSmallGraph() throws Exception {
Graph<Long, Long, Long> graph = Graph.fromDataSet(
TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env),
env);
Utils.ChecksumHashCode checksum = graph
.run(new ChecksumHashCode<Long, Long, Long>())
.execute();
assertEquals(checksum.getCount(), 12L);
assertEquals(checksum.getChecksum(), 19665L);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.test.util;
import static org.junit.Assert.assertEquals;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class GraphUtilsITCase extends MultipleProgramsTestBase {
public GraphUtilsITCase(TestExecutionMode mode){
super(mode);
}
@Test
public void testChecksumHashCodeVerticesAndEdges() throws Exception {
/*
* Test checksum hashcode
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(
TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env),
env);
ChecksumHashCode checksum = GraphUtils.checksumHashCode(graph);
assertEquals(checksum.getCount(), 12L);
assertEquals(checksum.getChecksum(), 19665L);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册