Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
ec0b00d6
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
ec0b00d6
编写于
7月 11, 2014
作者:
S
Stephan Ewen
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables
上级
a8224861
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
170 addition
and
5 deletion
+170
-5
flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
...org/apache/flink/compiler/BranchingPlansCompilerTest.java
+5
-5
flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
...t/java/org/apache/flink/compiler/PipelineBreakerTest.java
+137
-0
flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
...apache/flink/compiler/testfunctions/SelectOneReducer.java
+28
-0
未找到文件。
flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
浏览文件 @
ec0b00d6
...
...
@@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
}
}
@SuppressWarnings
({
"unchecked"
,
"deprecation"
})
@Test
public
void
testBranchEachContractType
()
{
try
{
...
...
@@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.
name
(
"Reduce 1"
)
.
build
();
@SuppressWarnings
(
"unchecked"
)
JoinOperator
match1
=
JoinOperator
.
builder
(
new
DummyMatchStub
(),
IntValue
.
class
,
0
,
0
)
.
input1
(
sourceB
,
sourceB
,
sourceC
)
.
input2
(
sourceC
)
...
...
@@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.
build
();
FileDataSink
sink
=
new
FileDataSink
(
new
DummyOutputFormat
(),
OUT_FILE
,
cogroup7
);
//
sink.addInput(sourceA);
// sink.addInput(co
3);
// sink.addInput(co
4);
// sink.addInput(co
1);
sink
.
addInput
(
sourceA
);
sink
.
addInput
(
cogroup
3
);
sink
.
addInput
(
cogroup
4
);
sink
.
addInput
(
cogroup
1
);
// return the PACT plan
Plan
plan
=
new
Plan
(
sink
,
"Branching of each contract type"
);
...
...
flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
0 → 100644
浏览文件 @
ec0b00d6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
org.apache.flink.compiler
;
import
static
org
.
junit
.
Assert
.*;
import
org.junit.Test
;
import
org.apache.flink.api.common.Plan
;
import
org.apache.flink.api.java.DataSet
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.IterativeDataSet
;
import
org.apache.flink.compiler.plan.BulkIterationPlanNode
;
import
org.apache.flink.compiler.plan.OptimizedPlan
;
import
org.apache.flink.compiler.plan.SingleInputPlanNode
;
import
org.apache.flink.compiler.plan.SinkPlanNode
;
import
org.apache.flink.compiler.plandump.PlanJSONDumpGenerator
;
import
org.apache.flink.compiler.testfunctions.IdentityMapper
;
import
org.apache.flink.compiler.testfunctions.SelectOneReducer
;
@SuppressWarnings
(
"serial"
)
public
class
PipelineBreakerTest
extends
CompilerTestBase
{
@Test
public
void
testPipelineBreakerWithBroadcastVariable
()
{
try
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setDegreeOfParallelism
(
64
);
DataSet
<
Long
>
source
=
env
.
generateSequence
(
1
,
10
).
map
(
new
IdentityMapper
<
Long
>());
DataSet
<
Long
>
result
=
source
.
map
(
new
IdentityMapper
<
Long
>())
.
map
(
new
IdentityMapper
<
Long
>())
.
withBroadcastSet
(
source
,
"bc"
);
result
.
print
();
Plan
p
=
env
.
createProgramPlan
();
OptimizedPlan
op
=
compileNoStats
(
p
);
SinkPlanNode
sink
=
op
.
getDataSinks
().
iterator
().
next
();
SingleInputPlanNode
mapper
=
(
SingleInputPlanNode
)
sink
.
getInput
().
getSource
();
assertTrue
(
mapper
.
getInput
().
getTempMode
().
breaksPipeline
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
fail
(
e
.
getMessage
());
}
}
@Test
public
void
testPipelineBreakerBroadcastedAllReduce
()
{
try
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setDegreeOfParallelism
(
64
);
DataSet
<
Long
>
sourceWithMapper
=
env
.
generateSequence
(
1
,
10
).
map
(
new
IdentityMapper
<
Long
>());
DataSet
<
Long
>
bcInput1
=
sourceWithMapper
.
map
(
new
IdentityMapper
<
Long
>())
.
reduce
(
new
SelectOneReducer
<
Long
>());
DataSet
<
Long
>
bcInput2
=
env
.
generateSequence
(
1
,
10
);
DataSet
<
Long
>
result
=
sourceWithMapper
.
map
(
new
IdentityMapper
<
Long
>())
.
withBroadcastSet
(
bcInput1
,
"bc1"
)
.
withBroadcastSet
(
bcInput2
,
"bc2"
);
result
.
print
();
Plan
p
=
env
.
createProgramPlan
();
OptimizedPlan
op
=
compileNoStats
(
p
);
SinkPlanNode
sink
=
op
.
getDataSinks
().
iterator
().
next
();
SingleInputPlanNode
mapper
=
(
SingleInputPlanNode
)
sink
.
getInput
().
getSource
();
assertTrue
(
mapper
.
getInput
().
getTempMode
().
breaksPipeline
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
fail
(
e
.
getMessage
());
}
}
@Test
public
void
testPipelineBreakerBroadcastedPartialSolution
()
{
try
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
env
.
setDegreeOfParallelism
(
64
);
DataSet
<
Long
>
initialSource
=
env
.
generateSequence
(
1
,
10
);
IterativeDataSet
<
Long
>
iteration
=
initialSource
.
iterate
(
100
);
DataSet
<
Long
>
sourceWithMapper
=
env
.
generateSequence
(
1
,
10
).
map
(
new
IdentityMapper
<
Long
>());
DataSet
<
Long
>
bcInput1
=
sourceWithMapper
.
map
(
new
IdentityMapper
<
Long
>())
.
reduce
(
new
SelectOneReducer
<
Long
>());
DataSet
<
Long
>
result
=
sourceWithMapper
.
map
(
new
IdentityMapper
<
Long
>())
.
withBroadcastSet
(
iteration
,
"bc2"
)
.
withBroadcastSet
(
bcInput1
,
"bc1"
);
iteration
.
closeWith
(
result
).
print
();
Plan
p
=
env
.
createProgramPlan
();
OptimizedPlan
op
=
compileNoStats
(
p
);
SinkPlanNode
sink
=
op
.
getDataSinks
().
iterator
().
next
();
BulkIterationPlanNode
iterationPlanNode
=
(
BulkIterationPlanNode
)
sink
.
getInput
().
getSource
();
SingleInputPlanNode
mapper
=
(
SingleInputPlanNode
)
iterationPlanNode
.
getRootOfStepFunction
();
assertTrue
(
mapper
.
getInput
().
getTempMode
().
breaksPipeline
());
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
fail
(
e
.
getMessage
());
}
}
}
flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
0 → 100644
浏览文件 @
ec0b00d6
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package
org.apache.flink.compiler.testfunctions
;
import
org.apache.flink.api.java.functions.ReduceFunction
;
public
class
SelectOneReducer
<
T
>
extends
ReduceFunction
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
@Override
public
T
reduce
(
T
value1
,
T
value2
)
throws
Exception
{
return
value1
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录