Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
d0f2db06
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d0f2db06
编写于
11月 20, 2014
作者:
S
Stephan Ewen
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-1264] [compiler] Properly forward custom partitioners to the runtime
上级
b3e5ed0b
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
82 addition
and
7 deletion
+82
-7
flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
...link/compiler/plantranslate/NepheleJobGraphGenerator.java
+9
-7
flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
.../org/apache/flink/test/misc/CustomPartitioningITCase.java
+73
-0
未找到文件。
flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
浏览文件 @
d0f2db06
...
...
@@ -1072,19 +1072,21 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if
(
channel
.
getShipStrategy
()
==
ShipStrategyType
.
PARTITION_RANGE
)
{
final
DataDistribution
dataDistribution
=
channel
.
getDataDistribution
();
if
(
dataDistribution
!=
null
)
{
if
(
dataDistribution
!=
null
)
{
sourceConfig
.
setOutputDataDistribution
(
dataDistribution
,
outputIndex
);
}
else
{
throw
new
RuntimeException
(
"Range partitioning requires data distribution"
);
// TODO: inject code and configuration for automatic histogram generation
}
}
// if (targetContract instanceof GenericDataSink) {
// final DataDistribution distri = ((GenericDataSink) targetContract).getDataDistribution();
// if (distri != null) {
// configForOutputShipStrategy.setOutputDataDistribution(distri);
// }
// }
if
(
channel
.
getShipStrategy
()
==
ShipStrategyType
.
PARTITION_CUSTOM
)
{
if
(
channel
.
getPartitioner
()
!=
null
)
{
sourceConfig
.
setOutputPartitioner
(
channel
.
getPartitioner
(),
outputIndex
);
}
else
{
throw
new
CompilerException
(
"The ship strategy was set to custom partitioning, but no partitioner was set."
);
}
}
// ---------------- configure the receiver -------------------
if
(
isBroadcast
)
{
...
...
flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
0 → 100644
浏览文件 @
d0f2db06
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.test.misc
;
import
org.apache.flink.api.common.functions.Partitioner
;
import
org.apache.flink.api.common.functions.RichMapFunction
;
import
org.apache.flink.api.java.ExecutionEnvironment
;
import
org.apache.flink.api.java.functions.KeySelector
;
import
org.apache.flink.api.java.io.DiscardingOutputFormat
;
import
org.apache.flink.test.util.JavaProgramTestBase
;
import
org.junit.Assert
;
@SuppressWarnings
(
"serial"
)
public
class
CustomPartitioningITCase
extends
JavaProgramTestBase
{
@Override
protected
void
testProgram
()
throws
Exception
{
ExecutionEnvironment
env
=
ExecutionEnvironment
.
getExecutionEnvironment
();
if
(!
isCollectionExecution
())
{
Assert
.
assertTrue
(
env
.
getDegreeOfParallelism
()
>
1
);
}
env
.
generateSequence
(
1
,
1000
)
.
partitionCustom
(
new
AllZeroPartitioner
(),
new
IdKeySelector
<
Long
>())
.
map
(
new
FailExceptInPartitionZeroMapper
())
.
output
(
new
DiscardingOutputFormat
<
Long
>());
env
.
execute
();
}
public
static
class
FailExceptInPartitionZeroMapper
extends
RichMapFunction
<
Long
,
Long
>
{
@Override
public
Long
map
(
Long
value
)
throws
Exception
{
if
(
getRuntimeContext
().
getIndexOfThisSubtask
()
==
0
)
{
return
value
;
}
else
{
throw
new
Exception
(
"Received data in a partition other than partition 0"
);
}
}
}
public
static
class
AllZeroPartitioner
implements
Partitioner
<
Long
>
{
@Override
public
int
partition
(
Long
key
,
int
numPartitions
)
{
return
0
;
}
}
public
static
class
IdKeySelector
<
T
>
implements
KeySelector
<
T
,
T
>
{
@Override
public
T
getKey
(
T
value
)
{
return
value
;
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录