Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
f9552d8d
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f9552d8d
编写于
6月 23, 2016
作者:
G
Greg Hogan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-3898] [gelly] Adamic-Adar Similarity
This closes #2160
上级
90cfe0a7
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
641 addition
and
6 deletion
+641
-6
docs/apis/batch/libs/gelly.md
docs/apis/batch/libs/gelly.md
+23
-0
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
.../org/apache/flink/graph/generator/SingletonEdgeGraph.java
+1
-1
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
...brary/clustering/directed/LocalClusteringCoefficient.java
+3
-3
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
...org/apache/flink/graph/library/similarity/AdamicAdar.java
+456
-0
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
...g/apache/flink/graph/library/similarity/JaccardIndex.java
+2
-2
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
...rc/main/java/org/apache/flink/graph/utils/Murmur3_32.java
+22
-0
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
...apache/flink/graph/library/similarity/AdamicAdarTest.java
+134
-0
未找到文件。
docs/apis/batch/libs/gelly.md
浏览文件 @
f9552d8d
...
...
@@ -1832,6 +1832,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
*
[
Triangle Enumerator
](
#triangle-enumerator
)
*
[
Hyperlink-Induced Topic Search
](
#hyperlink-induced-topic-search
)
*
[
Summarization
](
#summarization
)
*
[
Adamic-Adar
](
#adamic-adar
)
*
[
Jaccard Index
](
#jaccard-index
)
*
[
Local Clustering Coefficient
](
#local-clustering-coefficient
)
*
[
Global Clustering Coefficient
](
#global-clustering-coefficient
)
...
...
@@ -2074,6 +2075,28 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i
vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each
vertex and edge in the output graph stores the common group value and the number of represented elements.
### Adamic-Adar
#### Overview
Adamic-Adar measures the similarity between pairs of vertices as the sum of the inverse logarithm of degree over shared
neighbors. Scores are non-negative and unbounded. A vertex with higher degree has greater overall influence but is less
influential to each pair of neighbors.
#### Details
The algorithm first annotates each vertex with the inverse of the logarithm of the vertex degree then joins this score
onto edges by source vertex. Grouping on the source vertex, each pair of neighbors is emitted with the vertex score.
Grouping on two-paths, the Adamic-Adar score is summed.
See the
[
Jaccard Index
](
#jaccard-index
)
library method for a similar algorithm.
#### Usage
The algorithm takes a simple, undirected graph as input and outputs a
`DataSet`
of tuples containing two vertex IDs and
the Adamic-Adair similarity score. The graph ID type must be
`Comparable`
and
`Copyable`
.
*
`setLittleParallelism`
: override the parallelism of operators processing small amounts of data
*
`setMinimumRatio`
: filter out Adamic-Adar scores less than the given ratio times the average score
*
`setMinimumScore`
: filter out Adamic-Adar scores less than the given minimum
### Jaccard Index
#### Overview
...
...
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
浏览文件 @
f9552d8d
...
...
@@ -80,7 +80,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
}
@ForwardedFields
(
"*->f0"
)
private
class
LinkVertexToSingletonNeighbor
private
static
class
LinkVertexToSingletonNeighbor
implements
MapFunction
<
LongValue
,
Edge
<
LongValue
,
NullValue
>>
{
private
LongValue
source
=
new
LongValue
();
...
...
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
浏览文件 @
f9552d8d
...
...
@@ -126,7 +126,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
*
* @param <T> ID type
*/
private
class
SplitTriangles
<
T
>
private
static
class
SplitTriangles
<
T
>
implements
FlatMapFunction
<
TriangleListing
.
Result
<
T
>,
Tuple2
<
T
,
LongValue
>>
{
private
LongValue
one
=
new
LongValue
(
1
);
...
...
@@ -159,7 +159,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"0"
)
private
class
CountTriangles
<
T
>
private
static
class
CountTriangles
<
T
>
implements
ReduceFunction
<
Tuple2
<
T
,
LongValue
>>
{
@Override
public
Tuple2
<
T
,
LongValue
>
reduce
(
Tuple2
<
T
,
LongValue
>
left
,
Tuple2
<
T
,
LongValue
>
right
)
...
...
@@ -176,7 +176,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
*/
@FunctionAnnotation
.
ForwardedFieldsFirst
(
"0; 1.0->1.0"
)
@FunctionAnnotation
.
ForwardedFieldsSecond
(
"0"
)
private
class
JoinVertexDegreeWithTriangleCount
<
T
>
private
static
class
JoinVertexDegreeWithTriangleCount
<
T
>
implements
JoinFunction
<
Vertex
<
T
,
Degrees
>,
Tuple2
<
T
,
LongValue
>,
Result
<
T
>>
{
private
LongValue
zero
=
new
LongValue
(
0
);
...
...
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
0 → 100644
浏览文件 @
f9552d8d
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.graph.library.similarity
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.GroupReduceFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.functions.RichGroupReduceFunction
;
import
org.apache.flink.api.common.operators.Order
;
import
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
;
import
org.apache.flink.api.java.DataSet
;
import
org.apache.flink.api.java.functions.FunctionAnnotation
;
import
org.apache.flink.api.java.operators.GroupReduceOperator
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.tuple.Tuple3
;
import
org.apache.flink.api.java.tuple.Tuple4
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.graph.Edge
;
import
org.apache.flink.graph.Graph
;
import
org.apache.flink.graph.GraphAlgorithm
;
import
org.apache.flink.graph.Vertex
;
import
org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree
;
import
org.apache.flink.graph.library.similarity.AdamicAdar.Result
;
import
org.apache.flink.graph.utils.Murmur3_32
;
import
org.apache.flink.types.CopyableValue
;
import
org.apache.flink.types.FloatValue
;
import
org.apache.flink.types.IntValue
;
import
org.apache.flink.types.LongValue
;
import
org.apache.flink.util.Collector
;
import
org.apache.flink.util.Preconditions
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.List
;
import
static
org
.
apache
.
flink
.
api
.
common
.
ExecutionConfig
.
PARALLELISM_DEFAULT
;
/**
* http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
* <br/>
* Adamic-Adar measures the similarity between pairs of vertices as the sum of
* the inverse logarithm of degree over shared neighbors. Scores are non-negative
* and unbounded. A vertex with higher degree has greater overall influence but
* is less influential to each pair of neighbors.
* <br/>
* This implementation produces similarity scores for each pair of vertices
* in the graph with at least one shared neighbor; equivalently, this is the
* set of all non-zero Adamic-Adar coefficients.
* <br/>
* The input graph must be a simple, undirected graph containing no duplicate
* edges or self-loops.
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
*/
public
class
AdamicAdar
<
K
extends
CopyableValue
<
K
>,
VV
,
EV
>
implements
GraphAlgorithm
<
K
,
VV
,
EV
,
DataSet
<
Result
<
K
>>>
{
private
static
final
int
GROUP_SIZE
=
64
;
private
static
final
String
SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS
=
"sum of scores and number of vertices"
;
// Optional configuration
private
float
minimumScore
=
0.0f
;
private
float
minimumRatio
=
0.0f
;
private
int
littleParallelism
=
PARALLELISM_DEFAULT
;
/**
* Filter out Adamic-Adar scores less than the given minimum.
*
* @param score minimum score
* @return this
*/
public
AdamicAdar
<
K
,
VV
,
EV
>
setMinimumScore
(
float
score
)
{
Preconditions
.
checkArgument
(
score
>=
0
,
"Minimum score must be non-negative"
);
this
.
minimumScore
=
score
;
return
this
;
}
/**
* Filter out Adamic-Adar scores less than the given ratio times the average score.
*
* @param ratio minimum ratio
* @return this
*/
public
AdamicAdar
<
K
,
VV
,
EV
>
setMinimumRatio
(
float
ratio
)
{
Preconditions
.
checkArgument
(
ratio
>=
0
,
"Minimum ratio must be non-negative"
);
this
.
minimumRatio
=
ratio
;
return
this
;
}
/**
* Override the parallelism of operators processing small amounts of data.
*
* @param littleParallelism operator parallelism
* @return this
*/
public
AdamicAdar
<
K
,
VV
,
EV
>
setLittleParallelism
(
int
littleParallelism
)
{
Preconditions
.
checkArgument
(
littleParallelism
>
0
||
littleParallelism
==
PARALLELISM_DEFAULT
,
"The parallelism must be greater than zero."
);
this
.
littleParallelism
=
littleParallelism
;
return
this
;
}
/*
* Implementation notes:
*
* The requirement that "K extends CopyableValue<K>" can be removed when
* Flink has a self-join which performs the skew distribution handled by
* GenerateGroupSpans / GenerateGroups / GenerateGroupPairs.
*/
@Override
public
DataSet
<
Result
<
K
>>
run
(
Graph
<
K
,
VV
,
EV
>
input
)
throws
Exception
{
// s, d(s), 1/log(d(s))
DataSet
<
Tuple3
<
K
,
LongValue
,
FloatValue
>>
inverseLogDegree
=
input
.
run
(
new
VertexDegree
<
K
,
VV
,
EV
>()
.
setParallelism
(
littleParallelism
))
.
map
(
new
VertexInverseLogDegree
<
K
>())
.
setParallelism
(
littleParallelism
)
.
name
(
"Vertex score"
);
// s, t, 1/log(d(s))
DataSet
<
Tuple3
<
K
,
K
,
FloatValue
>>
sourceInverseLogDegree
=
input
.
getEdges
()
.
join
(
inverseLogDegree
,
JoinHint
.
REPARTITION_HASH_SECOND
)
.
where
(
0
)
.
equalTo
(
0
)
.
projectFirst
(
0
,
1
)
.<
Tuple3
<
K
,
K
,
FloatValue
>>
projectSecond
(
2
)
.
setParallelism
(
littleParallelism
)
.
name
(
"Edge score"
);
// group span, s, t, 1/log(d(s))
DataSet
<
Tuple4
<
IntValue
,
K
,
K
,
FloatValue
>>
groupSpans
=
sourceInverseLogDegree
.
groupBy
(
0
)
.
sortGroup
(
1
,
Order
.
ASCENDING
)
.
reduceGroup
(
new
GenerateGroupSpans
<
K
>())
.
setParallelism
(
littleParallelism
)
.
name
(
"Generate group spans"
);
// group, s, t, 1/log(d(s))
DataSet
<
Tuple4
<
IntValue
,
K
,
K
,
FloatValue
>>
groups
=
groupSpans
.
rebalance
()
.
setParallelism
(
littleParallelism
)
.
name
(
"Rebalance"
)
.
flatMap
(
new
GenerateGroups
<
K
>())
.
setParallelism
(
littleParallelism
)
.
name
(
"Generate groups"
);
// t, u, 1/log(d(s)) where (s, t) and (s, u) are edges in graph
DataSet
<
Tuple3
<
K
,
K
,
FloatValue
>>
twoPaths
=
groups
.
groupBy
(
0
,
1
)
.
sortGroup
(
2
,
Order
.
ASCENDING
)
.
reduceGroup
(
new
GenerateGroupPairs
<
K
>())
.
name
(
"Generate group pairs"
);
// t, u, adamic-adar score
GroupReduceOperator
<
Tuple3
<
K
,
K
,
FloatValue
>,
Result
<
K
>>
scores
=
twoPaths
.
groupBy
(
0
,
1
)
.
reduceGroup
(
new
ComputeScores
<
K
>(
minimumScore
,
minimumRatio
))
.
name
(
"Compute scores"
);
if
(
minimumRatio
>
0.0f
)
{
// total score, number of pairs of neighbors
DataSet
<
Tuple2
<
FloatValue
,
LongValue
>>
sumOfScoresAndNumberOfNeighborPairs
=
inverseLogDegree
.
map
(
new
ComputeScoreFromVertex
<
K
>())
.
setParallelism
(
littleParallelism
)
.
name
(
"Average score"
)
.
sum
(
0
)
.
andSum
(
1
);
scores
.
withBroadcastSet
(
sumOfScoresAndNumberOfNeighborPairs
,
SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS
);
}
return
scores
;
}
/**
* Compute the inverse logarithm of the vertex degree. This is computed
* before enumerating neighbor pairs since logarithm and division are quite
* computationally intensive.
*
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"0; 1"
)
private
static
class
VertexInverseLogDegree
<
T
>
implements
MapFunction
<
Vertex
<
T
,
LongValue
>,
Tuple3
<
T
,
LongValue
,
FloatValue
>>
{
private
Tuple3
<
T
,
LongValue
,
FloatValue
>
output
=
new
Tuple3
<>(
null
,
null
,
new
FloatValue
());
@Override
public
Tuple3
<
T
,
LongValue
,
FloatValue
>
map
(
Vertex
<
T
,
LongValue
>
value
)
throws
Exception
{
output
.
f0
=
value
.
f0
;
output
.
f1
=
value
.
f1
;
long
degree
=
value
.
f1
.
getValue
();
// when the degree is one the logarithm is zero so avoid dividing by this value
float
inverseLogDegree
=
(
degree
==
1
)
?
0.0f
:
1.0f
/
(
float
)
Math
.
log
(
value
.
f1
.
getValue
());
output
.
f2
.
setValue
(
inverseLogDegree
);
return
output
;
}
}
/**
* @see JaccardIndex.GenerateGroupSpans
*
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"0->1; 1->2 ; 2->3"
)
private
static
class
GenerateGroupSpans
<
T
>
implements
GroupReduceFunction
<
Tuple3
<
T
,
T
,
FloatValue
>,
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>>
{
private
IntValue
groupSpansValue
=
new
IntValue
();
private
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>
output
=
new
Tuple4
<>(
groupSpansValue
,
null
,
null
,
null
);
@Override
public
void
reduce
(
Iterable
<
Tuple3
<
T
,
T
,
FloatValue
>>
values
,
Collector
<
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>>
out
)
throws
Exception
{
int
groupCount
=
0
;
int
groupSpans
=
1
;
groupSpansValue
.
setValue
(
groupSpans
);
for
(
Tuple3
<
T
,
T
,
FloatValue
>
edge
:
values
)
{
output
.
f1
=
edge
.
f0
;
output
.
f2
=
edge
.
f1
;
output
.
f3
=
edge
.
f2
;
out
.
collect
(
output
);
if
(++
groupCount
==
GROUP_SIZE
)
{
groupCount
=
0
;
groupSpansValue
.
setValue
(++
groupSpans
);
}
}
}
}
/**
* @see JaccardIndex.GenerateGroups
*
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"1; 2; 3"
)
private
static
class
GenerateGroups
<
T
>
implements
FlatMapFunction
<
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>,
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>>
{
@Override
public
void
flatMap
(
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>
value
,
Collector
<
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>>
out
)
throws
Exception
{
int
spans
=
value
.
f0
.
getValue
();
for
(
int
idx
=
0
;
idx
<
spans
;
idx
++
)
{
value
.
f0
.
setValue
(
idx
);
out
.
collect
(
value
);
}
}
}
/**
* @see JaccardIndex.GenerateGroupPairs
*
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"3->2"
)
private
static
class
GenerateGroupPairs
<
T
extends
CopyableValue
<
T
>>
implements
GroupReduceFunction
<
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>,
Tuple3
<
T
,
T
,
FloatValue
>>
{
private
Tuple3
<
T
,
T
,
FloatValue
>
output
=
new
Tuple3
<>();
private
boolean
initialized
=
false
;
private
List
<
T
>
visited
=
new
ArrayList
<>(
GROUP_SIZE
);
@Override
public
void
reduce
(
Iterable
<
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>>
values
,
Collector
<
Tuple3
<
T
,
T
,
FloatValue
>>
out
)
throws
Exception
{
int
visitedCount
=
0
;
for
(
Tuple4
<
IntValue
,
T
,
T
,
FloatValue
>
edge
:
values
)
{
output
.
f1
=
edge
.
f2
;
output
.
f2
=
edge
.
f3
;
for
(
int
i
=
0
;
i
<
visitedCount
;
i
++)
{
output
.
f0
=
visited
.
get
(
i
);
out
.
collect
(
output
);
}
if
(
visitedCount
<
GROUP_SIZE
)
{
if
(!
initialized
)
{
initialized
=
true
;
for
(
int
i
=
0
;
i
<
GROUP_SIZE
;
i
++)
{
visited
.
add
(
edge
.
f2
.
copy
());
}
}
else
{
edge
.
f2
.
copyTo
(
visited
.
get
(
visitedCount
));
}
visitedCount
+=
1
;
}
}
}
}
/**
* Compute the sum of scores emitted by the vertex over all pairs of neighbors.
*
* @param <T> ID type
*/
private
static
class
ComputeScoreFromVertex
<
T
>
implements
MapFunction
<
Tuple3
<
T
,
LongValue
,
FloatValue
>,
Tuple2
<
FloatValue
,
LongValue
>>
{
private
FloatValue
sumOfScores
=
new
FloatValue
();
private
LongValue
numberOfNeighborPairs
=
new
LongValue
();
private
Tuple2
<
FloatValue
,
LongValue
>
output
=
new
Tuple2
<>(
sumOfScores
,
numberOfNeighborPairs
);
@Override
public
Tuple2
<
FloatValue
,
LongValue
>
map
(
Tuple3
<
T
,
LongValue
,
FloatValue
>
value
)
throws
Exception
{
long
degree
=
value
.
f1
.
getValue
();
long
neighborPairs
=
degree
*
(
degree
-
1
)
/
2
;
sumOfScores
.
setValue
(
value
.
f2
.
getValue
()
*
neighborPairs
);
numberOfNeighborPairs
.
setValue
(
neighborPairs
);
return
output
;
}
}
/**
* Compute the Adamic-Adar similarity as the sum over common neighbors of
* the inverse logarithm of degree.
*
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"0; 1"
)
private
static
class
ComputeScores
<
T
>
extends
RichGroupReduceFunction
<
Tuple3
<
T
,
T
,
FloatValue
>,
Result
<
T
>>
{
private
float
minimumScore
;
private
float
minimumRatio
;
private
Result
<
T
>
output
=
new
Result
<>();
public
ComputeScores
(
float
minimumScore
,
float
minimumRatio
)
{
this
.
minimumScore
=
minimumScore
;
this
.
minimumRatio
=
minimumRatio
;
}
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
if
(
minimumRatio
>
0.0f
)
{
Collection
<
Tuple2
<
FloatValue
,
LongValue
>>
var
;
var
=
getRuntimeContext
().
getBroadcastVariable
(
SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS
);
Tuple2
<
FloatValue
,
LongValue
>
sumAndCount
=
var
.
iterator
().
next
();
float
averageScore
=
sumAndCount
.
f0
.
getValue
()
/
sumAndCount
.
f1
.
getValue
();
minimumScore
=
Math
.
max
(
minimumScore
,
averageScore
*
minimumRatio
);
}
}
@Override
public
void
reduce
(
Iterable
<
Tuple3
<
T
,
T
,
FloatValue
>>
values
,
Collector
<
Result
<
T
>>
out
)
throws
Exception
{
float
sum
=
0
;
Tuple3
<
T
,
T
,
FloatValue
>
edge
=
null
;
for
(
Tuple3
<
T
,
T
,
FloatValue
>
next
:
values
)
{
edge
=
next
;
sum
+=
next
.
f2
.
getValue
();
}
if
(
sum
>=
minimumScore
)
{
output
.
f0
=
edge
.
f0
;
output
.
f1
=
edge
.
f1
;
output
.
f2
.
setValue
(
sum
);
out
.
collect
(
output
);
}
}
}
/**
* Wraps the vertex type to encapsulate results from the Adamic-Adar algorithm.
*
* @param <T> ID type
*/
public
static
class
Result
<
T
>
extends
Edge
<
T
,
FloatValue
>
{
public
static
final
int
HASH_SEED
=
0xe405f6d1
;
private
Murmur3_32
hasher
=
new
Murmur3_32
(
HASH_SEED
);
/**
* No-args constructor.
*/
public
Result
()
{
f2
=
new
FloatValue
();
}
/**
* Get the Adamic-Adar score, equal to the sum over common neighbors of
* the inverse logarithm of degree
*
* @return Adamic-Adar score
*/
public
FloatValue
getAdamicAdarScore
()
{
return
f2
;
}
public
String
toVerboseString
()
{
return
"Vertex IDs: ("
+
f0
+
", "
+
f1
+
"), adamic-adar score: "
+
getAdamicAdarScore
();
}
@Override
public
int
hashCode
()
{
return
hasher
.
reset
()
.
hash
(
f0
.
hashCode
())
.
hash
(
f1
.
hashCode
())
.
hash
(
f2
.
getValue
())
.
hash
();
}
}
}
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
浏览文件 @
f9552d8d
...
...
@@ -245,7 +245,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
throw
new
RuntimeException
(
"Degree overflows IntValue"
);
}
// group span, u, v, d(
u
)
// group span, u, v, d(
v
)
output
.
f1
=
edge
.
f0
;
output
.
f2
=
edge
.
f1
;
output
.
f3
.
setValue
((
int
)
degree
);
...
...
@@ -364,7 +364,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
* @param <T> ID type
*/
@FunctionAnnotation
.
ForwardedFields
(
"0; 1"
)
private
class
ComputeScores
<
T
>
private
static
class
ComputeScores
<
T
>
implements
GroupReduceFunction
<
Tuple3
<
T
,
T
,
IntValue
>,
Result
<
T
>>
{
private
boolean
unboundedScores
;
...
...
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Murmur3_32.java
浏览文件 @
f9552d8d
...
...
@@ -57,6 +57,28 @@ public class Murmur3_32 implements Serializable {
return
this
;
}
/**
* Process a {@code double} value.
*
* @param input 64-bit input value
* @return this
*/
public
Murmur3_32
hash
(
double
input
)
{
hash
(
Double
.
doubleToLongBits
(
input
));
return
this
;
}
/**
* Process a {@code float} value.
*
* @param input 32-bit input value
* @return this
*/
public
Murmur3_32
hash
(
float
input
)
{
hash
(
Float
.
floatToIntBits
(
input
));
return
this
;
}
/**
* Process an {@code integer} value.
*
...
...
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
0 → 100644
浏览文件 @
f9552d8d
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.graph.library.similarity
;
import
org.apache.commons.math3.random.JDKRandomGenerator
;
import
org.apache.flink.api.java.DataSet
;
import
org.apache.flink.graph.Graph
;
import
org.apache.flink.graph.asm.AsmTestBase
;
import
org.apache.flink.graph.asm.simple.undirected.Simplify
;
import
org.apache.flink.graph.generator.RMatGraph
;
import
org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory
;
import
org.apache.flink.graph.generator.random.RandomGenerableFactory
;
import
org.apache.flink.graph.library.similarity.AdamicAdar.Result
;
import
org.apache.flink.test.util.TestBaseUtils
;
import
org.apache.flink.types.IntValue
;
import
org.apache.flink.types.LongValue
;
import
org.apache.flink.types.NullValue
;
import
org.junit.Test
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
public
class
AdamicAdarTest
extends
AsmTestBase
{
private
float
[]
ilog
=
{
1.0f
/
(
float
)
Math
.
log
(
2
),
1.0f
/
(
float
)
Math
.
log
(
3
),
1.0f
/
(
float
)
Math
.
log
(
3
),
1.0f
/
(
float
)
Math
.
log
(
4
),
1.0f
/
(
float
)
Math
.
log
(
1
),
1.0f
/
(
float
)
Math
.
log
(
1
)
};
@Test
public
void
testSimpleGraph
()
throws
Exception
{
DataSet
<
Result
<
IntValue
>>
aa
=
undirectedSimpleGraph
.
run
(
new
AdamicAdar
<
IntValue
,
NullValue
,
NullValue
>());
String
expectedResult
=
"(0,1,"
+
ilog
[
2
]
+
")\n"
+
"(0,2,"
+
ilog
[
1
]
+
")\n"
+
"(0,3,"
+
(
ilog
[
1
]
+
ilog
[
2
])
+
")\n"
+
"(1,2,"
+
(
ilog
[
0
]
+
ilog
[
3
])
+
")\n"
+
"(1,3,"
+
ilog
[
2
]
+
")\n"
+
"(1,4,"
+
ilog
[
3
]
+
")\n"
+
"(1,5,"
+
ilog
[
3
]
+
")\n"
+
"(2,3,"
+
ilog
[
1
]
+
")\n"
+
"(2,4,"
+
ilog
[
3
]
+
")\n"
+
"(2,5,"
+
ilog
[
3
]
+
")\n"
+
"(4,5,"
+
ilog
[
3
]
+
")"
;
TestBaseUtils
.
compareResultAsText
(
aa
.
collect
(),
expectedResult
);
}
@Test
public
void
testSimpleGraphWithMinimumScore
()
throws
Exception
{
DataSet
<
Result
<
IntValue
>>
aa
=
undirectedSimpleGraph
.
run
(
new
AdamicAdar
<
IntValue
,
NullValue
,
NullValue
>()
.
setMinimumScore
(
0.75f
));
String
expectedResult
=
"(0,1,"
+
ilog
[
2
]
+
")\n"
+
"(0,2,"
+
ilog
[
1
]
+
")\n"
+
"(0,3,"
+
(
ilog
[
1
]
+
ilog
[
2
])
+
")\n"
+
"(1,2,"
+
(
ilog
[
0
]
+
ilog
[
3
])
+
")\n"
+
"(1,3,"
+
ilog
[
2
]
+
")\n"
+
"(2,3,"
+
ilog
[
1
]
+
")"
;
TestBaseUtils
.
compareResultAsText
(
aa
.
collect
(),
expectedResult
);
}
@Test
public
void
testSimpleGraphWithMinimumRatio
()
throws
Exception
{
DataSet
<
Result
<
IntValue
>>
aa
=
undirectedSimpleGraph
.
run
(
new
AdamicAdar
<
IntValue
,
NullValue
,
NullValue
>()
.
setMinimumRatio
(
1.5f
));
String
expectedResult
=
"(0,3,"
+
(
ilog
[
1
]
+
ilog
[
2
])
+
")\n"
+
"(1,2,"
+
(
ilog
[
0
]
+
ilog
[
3
])
+
")"
;
TestBaseUtils
.
compareResultAsText
(
aa
.
collect
(),
expectedResult
);
}
@Test
public
void
testCompleteGraph
()
throws
Exception
{
float
expectedScore
=
(
completeGraphVertexCount
-
2
)
/
(
float
)
Math
.
log
(
completeGraphVertexCount
-
1
);
DataSet
<
Result
<
LongValue
>>
aa
=
completeGraph
.
run
(
new
AdamicAdar
<
LongValue
,
NullValue
,
NullValue
>());
for
(
Result
<
LongValue
>
result
:
aa
.
collect
())
{
assertEquals
(
expectedScore
,
result
.
getAdamicAdarScore
().
getValue
(),
0.00001
);
}
}
@Test
public
void
testRMatGraph
()
throws
Exception
{
long
vertexCount
=
1
<<
8
;
long
edgeCount
=
8
*
vertexCount
;
RandomGenerableFactory
<
JDKRandomGenerator
>
rnd
=
new
JDKRandomGeneratorFactory
();
Graph
<
LongValue
,
NullValue
,
NullValue
>
graph
=
new
RMatGraph
<>(
env
,
rnd
,
vertexCount
,
edgeCount
)
.
generate
()
.
run
(
new
Simplify
<
LongValue
,
NullValue
,
NullValue
>(
false
));
DataSet
<
Result
<
LongValue
>>
aa
=
graph
.
run
(
new
AdamicAdar
<
LongValue
,
NullValue
,
NullValue
>());
assertEquals
(
13954
,
aa
.
count
());
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录