Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
f0cc5d64
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f0cc5d64
编写于
11月 27, 2014
作者:
G
Gyula Fora
提交者:
mbalassi
12月 05, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-1279] [streaming] Forward partitioning changed to use round-robin method
上级
64baa00b
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
65 addition
and
102 deletion
+65
-102
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
.../java/org/apache/flink/streaming/api/JobGraphBuilder.java
+3
-4
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
...org/apache/flink/streaming/api/datastream/DataStream.java
+3
-11
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
...e/flink/streaming/api/datastream/IterativeDataStream.java
+3
-3
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
...che/flink/streaming/partitioner/BroadcastPartitioner.java
+8
-2
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
...he/flink/streaming/partitioner/DistributePartitioner.java
+9
-11
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
...apache/flink/streaming/partitioner/FieldsPartitioner.java
+3
-3
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
...pache/flink/streaming/partitioner/ForwardPartitioner.java
+0
-43
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
...apache/flink/streaming/partitioner/GlobalPartitioner.java
+4
-4
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
...pache/flink/streaming/partitioner/ShufflePartitioner.java
+3
-4
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
...apache/flink/streaming/partitioner/StreamPartitioner.java
+19
-8
flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
...c/test/java/org/apache/flink/streaming/api/PrintTest.java
+1
-0
flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
...link/streaming/partitioner/DistributePartitionerTest.java
+1
-1
flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
...e/flink/streaming/partitioner/ForwardPartitionerTest.java
+8
-8
未找到文件。
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
浏览文件 @
f0cc5d64
...
...
@@ -37,8 +37,8 @@ import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
import
org.apache.flink.streaming.api.streamvertex.StreamIterationHead
;
import
org.apache.flink.streaming.api.streamvertex.StreamIterationTail
;
import
org.apache.flink.streaming.api.streamvertex.StreamVertex
;
import
org.apache.flink.streaming.partitioner.ForwardPartitioner
;
import
org.apache.flink.streaming.partitioner.StreamPartitioner
;
import
org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy
;
import
org.apache.flink.streaming.state.OperatorState
;
import
org.apache.flink.streaming.util.serialization.TypeWrapper
;
import
org.slf4j.Logger
;
...
...
@@ -80,7 +80,6 @@ public class JobGraphBuilder {
private
Map
<
String
,
Long
>
iterationWaitTime
;
private
Map
<
String
,
Map
<
String
,
OperatorState
<?>>>
operatorStates
;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
* and consists of sources, tasks (intermediate vertices) and sinks.
...
...
@@ -355,7 +354,7 @@ public class JobGraphBuilder {
StreamConfig
config
=
new
StreamConfig
(
upStreamVertex
.
getConfiguration
());
if
(
partitionerObject
.
get
Class
().
equals
(
ForwardPartitioner
.
class
)
)
{
if
(
partitionerObject
.
get
Strategy
()
==
PartitioningStrategy
.
FORWARD
)
{
downStreamVertex
.
connectNewDataSetAsInput
(
upStreamVertex
,
DistributionPattern
.
POINTWISE
);
}
else
{
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
浏览文件 @
f0cc5d64
...
...
@@ -66,7 +66,6 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import
org.apache.flink.streaming.partitioner.BroadcastPartitioner
;
import
org.apache.flink.streaming.partitioner.DistributePartitioner
;
import
org.apache.flink.streaming.partitioner.FieldsPartitioner
;
import
org.apache.flink.streaming.partitioner.ForwardPartitioner
;
import
org.apache.flink.streaming.partitioner.ShufflePartitioner
;
import
org.apache.flink.streaming.partitioner.StreamPartitioner
;
import
org.apache.flink.streaming.util.keys.FieldsKeySelector
;
...
...
@@ -127,7 +126,7 @@ public class DataStream<OUT> {
this
.
jobGraphBuilder
=
environment
.
getJobGraphBuilder
();
this
.
userDefinedNames
=
new
ArrayList
<
String
>();
this
.
selectAll
=
false
;
this
.
partitioner
=
new
ForwardPartitioner
<
OUT
>(
);
this
.
partitioner
=
new
DistributePartitioner
<
OUT
>(
true
);
this
.
outTypeWrapper
=
outTypeWrapper
;
this
.
mergedStreams
=
new
ArrayList
<
DataStream
<
OUT
>>();
this
.
mergedStreams
.
add
(
this
);
...
...
@@ -158,13 +157,6 @@ public class DataStream<OUT> {
}
/**
* Partitioning strategy on the stream.
*/
public
static
enum
ConnectionType
{
SHUFFLE
,
BROADCAST
,
FIELD
,
FORWARD
,
DISTRIBUTE
}
/**
* Returns the ID of the {@link DataStream}.
*
...
...
@@ -341,7 +333,7 @@ public class DataStream<OUT> {
* @return The DataStream with shuffle partitioning set.
*/
public
DataStream
<
OUT
>
forward
()
{
return
setConnectionType
(
new
ForwardPartitioner
<
OUT
>(
));
return
setConnectionType
(
new
DistributePartitioner
<
OUT
>(
true
));
}
/**
...
...
@@ -351,7 +343,7 @@ public class DataStream<OUT> {
* @return The DataStream with shuffle partitioning set.
*/
public
DataStream
<
OUT
>
distribute
()
{
return
setConnectionType
(
new
DistributePartitioner
<
OUT
>());
return
setConnectionType
(
new
DistributePartitioner
<
OUT
>(
false
));
}
/**
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
浏览文件 @
f0cc5d64
...
...
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import
java.util.Arrays
;
import
java.util.List
;
import
org.apache.flink.streaming.partitioner.
Forward
Partitioner
;
import
org.apache.flink.streaming.partitioner.
Distribute
Partitioner
;
/**
* The iterative data stream represents the start of an iteration in a
...
...
@@ -91,8 +91,8 @@ public class IterativeDataStream<IN> extends
for
(
DataStream
<
IN
>
stream
:
iterationTail
.
mergedStreams
)
{
String
inputID
=
stream
.
getId
();
jobGraphBuilder
.
setEdge
(
inputID
,
returnStream
.
getId
(),
new
ForwardPartitioner
<
IN
>(),
0
,
name
,
false
);
jobGraphBuilder
.
setEdge
(
inputID
,
returnStream
.
getId
(),
new
DistributePartitioner
<
IN
>(
true
),
0
,
name
,
false
);
}
return
iterationTail
;
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
浏览文件 @
f0cc5d64
...
...
@@ -26,16 +26,21 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
* @param <T>
* Type of the Tuple
*/
public
class
BroadcastPartitioner
<
T
>
implement
s
StreamPartitioner
<
T
>
{
public
class
BroadcastPartitioner
<
T
>
extend
s
StreamPartitioner
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
int
[]
returnArray
;
boolean
set
;
int
setNumber
;
public
BroadcastPartitioner
()
{
super
(
PartitioningStrategy
.
BROADCAST
);
}
@Override
public
int
[]
selectChannels
(
SerializationDelegate
<
StreamRecord
<
T
>>
record
,
int
numberOfOutputChannels
)
{
if
(
set
)
{
if
(
set
&&
setNumber
==
numberOfOutputChannels
)
{
return
returnArray
;
}
else
{
this
.
returnArray
=
new
int
[
numberOfOutputChannels
];
...
...
@@ -43,6 +48,7 @@ public class BroadcastPartitioner<T> implements StreamPartitioner<T> {
returnArray
[
i
]
=
i
;
}
set
=
true
;
setNumber
=
numberOfOutputChannels
;
return
returnArray
;
}
}
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
浏览文件 @
f0cc5d64
...
...
@@ -27,22 +27,20 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
* @param <T>
* Type of the Tuple
*/
public
class
DistributePartitioner
<
T
>
implement
s
StreamPartitioner
<
T
>
{
public
class
DistributePartitioner
<
T
>
extend
s
StreamPartitioner
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
currentChannelIndex
;
private
int
[]
returnArray
;
private
int
[]
returnArray
=
new
int
[]
{-
1
};
public
DistributePartitioner
()
{
this
.
currentChannelIndex
=
0
;
this
.
returnArray
=
new
int
[
1
];
public
DistributePartitioner
(
boolean
forward
)
{
super
(
forward
?
PartitioningStrategy
.
FORWARD
:
PartitioningStrategy
.
DISTRIBUTE
);
}
@Override
public
int
[]
selectChannels
(
SerializationDelegate
<
StreamRecord
<
T
>>
record
,
int
numberOfOutputChannels
)
{
returnArray
[
0
]
=
currentChannelIndex
;
currentChannelIndex
=
(
currentChannelIndex
+
1
)
%
numberOfOutputChannels
;
return
returnArray
;
this
.
returnArray
[
0
]
=
(
this
.
returnArray
[
0
]
+
1
)
%
numberOfOutputChannels
;
return
this
.
returnArray
;
}
}
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
浏览文件 @
f0cc5d64
...
...
@@ -28,15 +28,15 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
* @param <T>
* Type of the Tuple
*/
public
class
FieldsPartitioner
<
T
>
implement
s
StreamPartitioner
<
T
>
{
public
class
FieldsPartitioner
<
T
>
extend
s
StreamPartitioner
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
[]
returnArray
;
private
int
[]
returnArray
=
new
int
[
1
];
;
KeySelector
<
T
,
?>
keySelector
;
public
FieldsPartitioner
(
KeySelector
<
T
,
?>
keySelector
)
{
super
(
PartitioningStrategy
.
FIELDS
);
this
.
keySelector
=
keySelector
;
this
.
returnArray
=
new
int
[
1
];
}
@Override
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
已删除
100755 → 0
浏览文件 @
64baa00b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.streaming.partitioner
;
import
org.apache.flink.runtime.plugable.SerializationDelegate
;
import
org.apache.flink.streaming.api.streamrecord.StreamRecord
;
/**
* Partitioner that forwards the tuples to the local subtask of the output vertex
*
* @param <T>
* Type of the Tuple
*/
public
class
ForwardPartitioner
<
T
>
implements
StreamPartitioner
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
[]
returnArray
;
public
ForwardPartitioner
()
{
this
.
returnArray
=
new
int
[]{
0
};
}
@Override
public
int
[]
selectChannels
(
SerializationDelegate
<
StreamRecord
<
T
>>
record
,
int
numberOfOutputChannels
)
{
return
returnArray
;
}
}
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
浏览文件 @
f0cc5d64
...
...
@@ -21,13 +21,13 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
import
org.apache.flink.streaming.api.streamrecord.StreamRecord
;
//Group to the partitioner with the lowest id
public
class
GlobalPartitioner
<
T
>
implement
s
StreamPartitioner
<
T
>
{
public
class
GlobalPartitioner
<
T
>
extend
s
StreamPartitioner
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
int
[]
returnArray
;
private
int
[]
returnArray
=
new
int
[]
{
0
}
;
public
GlobalPartitioner
()
{
this
.
returnArray
=
new
int
[]
{
0
}
;
super
(
PartitioningStrategy
.
GLOBAL
)
;
}
@Override
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
浏览文件 @
f0cc5d64
...
...
@@ -29,16 +29,15 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
* @param <T>
* Type of the Tuple
*/
public
class
ShufflePartitioner
<
T
>
implement
s
StreamPartitioner
<
T
>
{
public
class
ShufflePartitioner
<
T
>
extend
s
StreamPartitioner
<
T
>
{
private
static
final
long
serialVersionUID
=
1L
;
private
Random
random
=
new
Random
();
private
int
[]
returnArray
;
private
int
[]
returnArray
=
new
int
[
1
]
;
public
ShufflePartitioner
()
{
this
.
random
=
new
Random
();
this
.
returnArray
=
new
int
[
1
];
super
(
PartitioningStrategy
.
SHUFFLE
);
}
@Override
...
...
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
浏览文件 @
f0cc5d64
...
...
@@ -22,12 +22,23 @@ import org.apache.flink.runtime.io.network.api.ChannelSelector;
import
org.apache.flink.runtime.plugable.SerializationDelegate
;
import
org.apache.flink.streaming.api.streamrecord.StreamRecord
;
/**
* Empty interface to encapsulate partitioners.
*
* @param <T>
* Type of the Tuple
*/
public
interface
StreamPartitioner
<
T
>
extends
ChannelSelector
<
SerializationDelegate
<
StreamRecord
<
T
>>>,
Serializable
{
public
abstract
class
StreamPartitioner
<
T
>
implements
ChannelSelector
<
SerializationDelegate
<
StreamRecord
<
T
>>>,
Serializable
{
public
enum
PartitioningStrategy
{
FORWARD
,
DISTRIBUTE
,
SHUFFLE
,
BROADCAST
,
GLOBAL
,
FIELDS
;
}
private
static
final
long
serialVersionUID
=
1L
;
private
PartitioningStrategy
strategy
;
public
StreamPartitioner
(
PartitioningStrategy
strategy
)
{
this
.
strategy
=
strategy
;
}
public
PartitioningStrategy
getStrategy
()
{
return
strategy
;
}
}
flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
浏览文件 @
f0cc5d64
...
...
@@ -27,6 +27,7 @@ import org.junit.Test;
public
class
PrintTest
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
static
final
long
MEMORYSIZE
=
32
;
private
static
final
class
IdentityMap
implements
MapFunction
<
Long
,
Long
>
{
...
...
flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
浏览文件 @
f0cc5d64
...
...
@@ -34,7 +34,7 @@ public class DistributePartitionerTest {
@Before
public
void
setPartitioner
()
{
distributePartitioner
=
new
DistributePartitioner
<
Tuple
>();
distributePartitioner
=
new
DistributePartitioner
<
Tuple
>(
false
);
}
@Test
...
...
flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
浏览文件 @
f0cc5d64
...
...
@@ -27,14 +27,14 @@ import org.junit.Test;
public
class
ForwardPartitionerTest
{
private
Forward
Partitioner
<
Tuple
>
forwardPartitioner
;
private
Distribute
Partitioner
<
Tuple
>
forwardPartitioner
;
private
StreamRecord
<
Tuple
>
streamRecord
=
new
StreamRecord
<
Tuple
>();
private
SerializationDelegate
<
StreamRecord
<
Tuple
>>
sd
=
new
SerializationDelegate
<
StreamRecord
<
Tuple
>>(
null
);
@Before
public
void
setPartitioner
()
{
forwardPartitioner
=
new
ForwardPartitioner
<
Tuple
>(
);
forwardPartitioner
=
new
DistributePartitioner
<
Tuple
>(
true
);
}
@Test
...
...
@@ -49,7 +49,7 @@ public class ForwardPartitionerTest {
public
void
testSelectChannelsInterval
()
{
sd
.
setInstance
(
streamRecord
);
assertEquals
(
0
,
forwardPartitioner
.
selectChannels
(
sd
,
1
)[
0
]);
assertEquals
(
0
,
forwardPartitioner
.
selectChannels
(
sd
,
2
)[
0
]);
assertEquals
(
0
,
forwardPartitioner
.
selectChannels
(
sd
,
1024
)[
0
]);
assertEquals
(
1
,
forwardPartitioner
.
selectChannels
(
sd
,
2
)[
0
]);
assertEquals
(
2
,
forwardPartitioner
.
selectChannels
(
sd
,
1024
)[
0
]);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录