diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java index ccf2bb18c7078289856719087b3b632a48689585..26291055b9a794b1cec50d641c0f5129f031c0cf 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java @@ -144,7 +144,7 @@ public class CommunityDetection implements GraphAlgorithm 0) { // find the label with the highest score from the ones received - double maxScore = Double.MIN_VALUE; + double maxScore = -Double.MAX_VALUE; long maxScoreLabel = vertex.getValue().f0; for (long curLabel : receivedLabelsWithScores.keySet()) { diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/CommunityDetectionTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/CommunityDetectionTest.java new file mode 100644 index 0000000000000000000000000000000000000000..cbabcfebe9e2e2c5814865baf722626bc5060705 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/CommunityDetectionTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.apache.flink.graph.generator.SingletonEdgeGraph; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CommunityDetection}. + */ +public class CommunityDetectionTest extends AsmTestBase { + + @Test + public void testWithSimpleGraph() throws Exception { + Graph result = undirectedSimpleGraph + .mapVertices(v -> (long) v.getId().getValue(), + new TypeHint>(){}.getTypeInfo()) + .mapEdges(e -> (double) e.getTarget().getValue() + e.getSource().getValue(), + new TypeHint>(){}.getTypeInfo()) + .run(new CommunityDetection<>(10, 0.5)); + + String expectedResult = + "(0,3)\n" + + "(1,5)\n" + + "(2,5)\n" + + "(3,3)\n" + + "(4,5)\n" + + "(5,5)\n"; + + TestBaseUtils.compareResultAsText(result.getVertices().collect(), expectedResult); + } + + @Test + public void testWithSingletonEdgeGraph() throws Exception { + Graph result = new SingletonEdgeGraph(env, 1) + .generate() + .mapVertices(v -> v.getId().getValue(), + new TypeHint>(){}.getTypeInfo()) + .mapEdges(e -> 1.0, + new TypeHint>(){}.getTypeInfo()) + .run(new CommunityDetection<>(10, 0.5)); + + String expectedResult = + "(0,0)\n" + + "(1,1)\n"; + + TestBaseUtils.compareResultAsText(result.getVertices().collect(), expectedResult); + } + + @Test + public void testWithEmptyGraphWithVertices() throws Exception { + emptyGraphWithVertices + .mapVertices(v -> 0L, + new TypeHint>(){}.getTypeInfo()) + .mapEdges(e -> 0.0, + new TypeHint>(){}.getTypeInfo()) + .run(new CommunityDetection<>(10, 0.5)); + } + + @Test + public void testWithEmptyGraphWithoutVertices() throws Exception { + emptyGraphWithoutVertices + .mapVertices(v -> 0L, + new TypeHint>(){}.getTypeInfo()) + .mapEdges(e -> 0.0, + new TypeHint>(){}.getTypeInfo()) + .run(new CommunityDetection<>(10, 0.5)); + } + + @Test + public void testWithRMatGraph() throws Exception { + Graph result = undirectedRMatGraph(8, 4) + .mapVertices(v -> v.getId().getValue(), + new TypeHint>(){}.getTypeInfo()) + .mapEdges(e -> (double) e.getTarget().getValue() - e.getSource().getValue(), + new TypeHint>(){}.getTypeInfo()) + .run(new CommunityDetection<>(10, 0.5)); + + Checksum checksum = new ChecksumHashCode>() + .run(result.getVertices()) + .execute(); + + assertEquals(184, checksum.getCount()); + assertEquals(0x00000000000cdc96L, checksum.getChecksum()); + } +}