Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
e0f2440d
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e0f2440d
编写于
9月 10, 2014
作者:
K
Kostas Tzoumas
提交者:
Aljoscha Krettek
9月 22, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Transitive closure Scala example
上级
a8dd9587
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
224 addition
and
1 deletion
+224
-1
flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
...ache/flink/example/java/graph/TransitiveClosureNaive.java
+22
-0
flink-examples/flink-scala-examples/pom.xml
flink-examples/flink-scala-examples/pom.xml
+23
-1
flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
...e/flink/examples/scala/graph/TransitiveClosureNaive.scala
+119
-0
flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
...nk/test/exampleScalaPrograms/TransitiveClosureITCase.java
+60
-0
未找到文件。
flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
浏览文件 @
e0f2440d
...
...
@@ -19,6 +19,7 @@
package
org.apache.flink.example.java.graph
;
import
org.apache.flink.api.common.functions.CoGroupFunction
;
import
org.apache.flink.api.common.functions.GroupReduceFunction
;
import
org.apache.flink.api.common.functions.JoinFunction
;
import
org.apache.flink.api.java.DataSet
;
...
...
@@ -30,6 +31,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.example.java.graph.util.ConnectedComponentsData
;
import
org.apache.flink.util.Collector
;
import
java.util.HashSet
;
import
java.util.Set
;
@SuppressWarnings
(
"serial"
)
public
class
TransitiveClosureNaive
implements
ProgramDescription
{
...
...
@@ -73,6 +77,24 @@ public class TransitiveClosureNaive implements ProgramDescription {
}
});
DataSet
<
Tuple2
<
Long
,
Long
>>
newPaths
=
paths
.
coGroup
(
nextPaths
)
.
where
(
0
).
equalTo
(
0
)
.
with
(
new
CoGroupFunction
<
Tuple2
<
Long
,
Long
>,
Tuple2
<
Long
,
Long
>,
Tuple2
<
Long
,
Long
>>()
{
Set
prevSet
=
new
HashSet
<
Tuple2
<
Long
,
Long
>>();
@Override
public
void
coGroup
(
Iterable
<
Tuple2
<
Long
,
Long
>>
prevPaths
,
Iterable
<
Tuple2
<
Long
,
Long
>>
nextPaths
,
Collector
<
Tuple2
<
Long
,
Long
>>
out
)
throws
Exception
{
for
(
Tuple2
<
Long
,
Long
>
prev
:
prevPaths
)
{
prevSet
.
add
(
prev
);
}
for
(
Tuple2
<
Long
,
Long
>
next:
nextPaths
)
{
if
(!
prevSet
.
contains
(
next
))
{
out
.
collect
(
next
);
}
}
}
});
DataSet
<
Tuple2
<
Long
,
Long
>>
transitiveClosure
=
paths
.
closeWith
(
nextPaths
);
...
...
flink-examples/flink-scala-examples/pom.xml
浏览文件 @
e0f2440d
...
...
@@ -281,7 +281,29 @@ under the License.
</execution>
-->
<execution>
<id>
TransitiveClosureNaive
</id>
<phase>
package
</phase>
<goals>
<goal>
jar
</goal>
</goals>
<configuration>
<classifier>
TransitiveClosureNaive
</classifier>
<archive>
<manifestEntries>
<program-class>
org.apache.flink.examples.scala.graph.TransitiveClosureNaive
</program-class>
</manifestEntries>
</archive>
<includes>
<include>
**/wordcount/TransitiveClosureNaive*.class
</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
...
...
flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
浏览文件 @
e0f2440d
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.examples.scala.graph
import
org.apache.flink.api.scala._
import
org.apache.flink.example.java.graph.util.ConnectedComponentsData
import
org.apache.flink.util.Collector
object
TransitiveClosureNaive
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
if
(!
parseParameters
(
args
))
{
return
}
val
env
=
ExecutionEnvironment
.
getExecutionEnvironment
val
edges
=
getEdgesDataSet
(
env
)
val
paths
=
edges
.
iterateWithTermination
(
maxIterations
)
{
prevPaths
:
DataSet
[(
Long
,
Long
)]
=>
val
nextPaths
=
prevPaths
.
join
(
edges
)
.
where
(
1
).
equalTo
(
0
)
{
(
left
,
right
)
=>
Some
((
left
.
_1
,
right
.
_2
))
}
.
union
(
prevPaths
)
.
groupBy
(
0
,
1
)
.
reduce
((
l
,
r
)
=>
l
)
val
terminate
=
prevPaths
.
coGroup
(
nextPaths
)
.
where
(
0
).
equalTo
(
0
)
{
(
prev
,
next
,
out
:
Collector
[(
Long
,
Long
)])
=>
{
val
prevPaths
=
prev
.
toList
for
(
n
<-
next
)
if
(!
prevPaths
.
contains
(
n
))
out
.
collect
(
n
)
}
}
(
nextPaths
,
terminate
)
}
if
(
fileOutput
)
paths
.
writeAsCsv
(
outputPath
,
"\n"
,
" "
)
else
paths
.
print
()
env
.
execute
(
"Scala Transitive Closure Example"
)
}
private
var
fileOutput
:
Boolean
=
false
private
var
edgesPath
:
String
=
null
private
var
outputPath
:
String
=
null
private
var
maxIterations
:
Int
=
10
private
def
parseParameters
(
programArguments
:
Array
[
String
])
:
Boolean
=
{
if
(
programArguments
.
length
>
0
)
{
fileOutput
=
true
if
(
programArguments
.
length
==
3
)
{
edgesPath
=
programArguments
(
0
)
outputPath
=
programArguments
(
1
)
maxIterations
=
Integer
.
parseInt
(
programArguments
(
2
))
}
else
{
System
.
err
.
println
(
"Usage: TransitiveClosure <edges path> <result path> <max number of iterations>"
)
return
false
}
}
else
{
System
.
out
.
println
(
"Executing TransitiveClosure example with default parameters and built-in default data."
)
System
.
out
.
println
(
" Provide parameters to read input data from files."
)
System
.
out
.
println
(
" See the documentation for the correct format of input files."
)
System
.
out
.
println
(
" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>"
)
}
return
true
}
private
def
getEdgesDataSet
(
env
:
ExecutionEnvironment
)
:
DataSet
[(
Long
,
Long
)]
=
{
if
(
fileOutput
)
{
env
.
readCsvFile
[(
Long
,
Long
)](
edgesPath
,
fieldDelimiter
=
' '
,
includedFields
=
Array
(
0
,
1
))
.
map
{
x
=>
(
x
.
_1
,
x
.
_2
)}
}
else
{
val
edgeData
=
ConnectedComponentsData
.
EDGES
map
{
case
Array
(
x
,
y
)
=>
(
x
.
asInstanceOf
[
Long
],
y
.
asInstanceOf
[
Long
])
}
env
.
fromCollection
(
edgeData
)
}
}
}
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
...
...
flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java
0 → 100644
浏览文件 @
e0f2440d
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.test.exampleScalaPrograms
;
import
java.io.BufferedReader
;
import
org.apache.flink.examples.scala.graph.TransitiveClosureNaive
;
import
org.apache.flink.test.testdata.ConnectedComponentsData
;
import
org.apache.flink.test.testdata.TransitiveClosureData
;
import
org.apache.flink.test.util.JavaProgramTestBase
;
public
class
TransitiveClosureITCase
extends
JavaProgramTestBase
{
private
static
final
long
SEED
=
0xBADC0FFEEBEEF
L
;
private
static
final
int
NUM_VERTICES
=
1000
;
private
static
final
int
NUM_EDGES
=
10000
;
private
String
edgesPath
;
private
String
resultPath
;
@Override
protected
void
preSubmit
()
throws
Exception
{
edgesPath
=
createTempFile
(
"edges.txt"
,
ConnectedComponentsData
.
getRandomOddEvenEdges
(
NUM_EDGES
,
NUM_VERTICES
,
SEED
));
resultPath
=
getTempFilePath
(
"results"
);
}
@Override
protected
void
testProgram
()
throws
Exception
{
TransitiveClosureNaive
.
main
(
new
String
[]
{
edgesPath
,
resultPath
,
"5"
});
}
@Override
protected
void
postSubmit
()
throws
Exception
{
for
(
BufferedReader
reader
:
getResultReader
(
resultPath
))
{
TransitiveClosureData
.
checkOddEvenResult
(
reader
);
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录