Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
945fc023
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
945fc023
编写于
2月 01, 2016
作者:
Z
zentol
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-3309] [py] Resolve Maven warnings
上级
d3808c7b
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
38 addition
and
12 deletion
+38
-12
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
...in/java/org/apache/flink/python/api/PythonPlanBinder.java
+28
-8
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
...e/flink/python/api/functions/util/NestedKeyDiscarder.java
+1
-0
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
...apache/flink/python/api/functions/util/SerializerMap.java
+2
-1
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
...pache/flink/python/api/streaming/data/PythonReceiver.java
+1
-1
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
.../apache/flink/python/api/streaming/data/PythonSender.java
+2
-0
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
...che/flink/python/api/streaming/plan/PythonPlanSender.java
+2
-1
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
...e/flink/python/api/streaming/util/SerializationUtils.java
+2
-1
未找到文件。
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
浏览文件 @
945fc023
...
...
@@ -91,7 +91,7 @@ public class PythonPlanBinder {
private
static
String
FLINK_HDFS_PATH
=
"hdfs:/tmp"
;
public
static
final
String
FLINK_TMP_DATA_DIR
=
System
.
getProperty
(
"java.io.tmpdir"
)
+
File
.
separator
+
"flink_data"
;
private
HashMap
<
Integer
,
Object
>
sets
=
new
HashMap
();
private
HashMap
<
Integer
,
Object
>
sets
=
new
HashMap
<>
();
public
ExecutionEnvironment
env
;
private
PythonPlanStreamer
streamer
;
...
...
@@ -386,6 +386,7 @@ public class PythonPlanBinder {
return
info
.
parallelism
==
-
1
?
env
.
getParallelism
()
:
info
.
parallelism
;
}
@SuppressWarnings
(
"unchecked"
)
private
void
createCsvSource
(
PythonOperationInfo
info
)
throws
IOException
{
if
(!(
info
.
types
instanceof
TupleTypeInfo
))
{
throw
new
RuntimeException
(
"The output type of a csv source has to be a tuple. The derived type is "
+
info
);
...
...
@@ -395,36 +396,39 @@ public class PythonPlanBinder {
String
fieldD
=
info
.
fieldDelimiter
;
TupleTypeInfo
<?>
types
=
(
TupleTypeInfo
)
info
.
types
;
sets
.
put
(
info
.
setID
,
env
.
createInput
(
new
TupleCsvInputFormat
(
path
,
lineD
,
fieldD
,
types
),
info
.
types
).
setParallelism
(
getParallelism
(
info
)).
name
(
"CsvSource"
)
.
map
(
new
SerializerMap
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"CsvSourcePostStep"
));
.
map
(
new
SerializerMap
<>
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"CsvSourcePostStep"
));
}
private
void
createTextSource
(
PythonOperationInfo
info
)
throws
IOException
{
sets
.
put
(
info
.
setID
,
env
.
readTextFile
(
info
.
path
).
setParallelism
(
getParallelism
(
info
)).
name
(
"TextSource"
)
.
map
(
new
SerializerMap
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"TextSourcePostStep"
));
.
map
(
new
SerializerMap
<
String
>
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"TextSourcePostStep"
));
}
private
void
createValueSource
(
PythonOperationInfo
info
)
throws
IOException
{
sets
.
put
(
info
.
setID
,
env
.
fromElements
(
info
.
values
).
setParallelism
(
getParallelism
(
info
)).
name
(
"ValueSource"
)
.
map
(
new
SerializerMap
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"ValueSourcePostStep"
));
.
map
(
new
SerializerMap
<>
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"ValueSourcePostStep"
));
}
private
void
createSequenceSource
(
PythonOperationInfo
info
)
throws
IOException
{
sets
.
put
(
info
.
setID
,
env
.
generateSequence
(
info
.
from
,
info
.
to
).
setParallelism
(
getParallelism
(
info
)).
name
(
"SequenceSource"
)
.
map
(
new
SerializerMap
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"SequenceSourcePostStep"
));
.
map
(
new
SerializerMap
<
Long
>
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"SequenceSourcePostStep"
));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createCsvSink
(
PythonOperationInfo
info
)
throws
IOException
{
DataSet
parent
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
parent
.
map
(
new
StringTupleDeserializerMap
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"CsvSinkPreStep"
)
.
writeAsCsv
(
info
.
path
,
info
.
lineDelimiter
,
info
.
fieldDelimiter
,
info
.
writeMode
).
setParallelism
(
getParallelism
(
info
)).
name
(
"CsvSink"
);
}
@SuppressWarnings
(
"unchecked"
)
private
void
createTextSink
(
PythonOperationInfo
info
)
throws
IOException
{
DataSet
parent
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
parent
.
map
(
new
StringDeserializerMap
()).
setParallelism
(
getParallelism
(
info
))
.
writeAsText
(
info
.
path
,
info
.
writeMode
).
setParallelism
(
getParallelism
(
info
)).
name
(
"TextSink"
);
}
@SuppressWarnings
(
"unchecked"
)
private
void
createPrintSink
(
PythonOperationInfo
info
)
throws
IOException
{
DataSet
parent
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
parent
.
map
(
new
StringDeserializerMap
()).
setParallelism
(
getParallelism
(
info
)).
name
(
"PrintSinkPreStep"
)
...
...
@@ -432,11 +436,11 @@ public class PythonPlanBinder {
}
private
void
createBroadcastVariable
(
PythonOperationInfo
info
)
throws
IOException
{
UdfOperator
op1
=
(
UdfOperator
)
sets
.
get
(
info
.
parentID
);
DataSet
op2
=
(
DataSet
)
sets
.
get
(
info
.
otherID
);
UdfOperator
<?>
op1
=
(
UdfOperator
)
sets
.
get
(
info
.
parentID
);
DataSet
<?>
op2
=
(
DataSet
)
sets
.
get
(
info
.
otherID
);
op1
.
withBroadcastSet
(
op2
,
info
.
name
);
Configuration
c
=
((
UdfOperator
)
op1
)
.
getParameters
();
Configuration
c
=
op1
.
getParameters
();
if
(
c
==
null
)
{
c
=
new
Configuration
();
...
...
@@ -460,6 +464,7 @@ public class PythonPlanBinder {
sets
.
put
(
info
.
setID
,
ao
.
setParallelism
(
getParallelism
(
info
)).
name
(
"Aggregation"
));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createDistinctOperation
(
PythonOperationInfo
info
)
throws
IOException
{
DataSet
op
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
sets
.
put
(
info
.
setID
,
op
.
distinct
(
info
.
keys
).
setParallelism
(
getParallelism
(
info
)).
name
(
"Distinct"
)
...
...
@@ -476,6 +481,7 @@ public class PythonPlanBinder {
sets
.
put
(
info
.
setID
,
op1
.
groupBy
(
info
.
keys
));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createHashPartitionOperation
(
PythonOperationInfo
info
)
throws
IOException
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
sets
.
put
(
info
.
setID
,
op1
.
partitionByHash
(
info
.
keys
).
setParallelism
(
getParallelism
(
info
))
...
...
@@ -498,12 +504,14 @@ public class PythonPlanBinder {
}
}
@SuppressWarnings
(
"unchecked"
)
private
void
createUnionOperation
(
PythonOperationInfo
info
)
throws
IOException
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
DataSet
op2
=
(
DataSet
)
sets
.
get
(
info
.
otherID
);
sets
.
put
(
info
.
setID
,
op1
.
union
(
op2
).
setParallelism
(
getParallelism
(
info
)).
name
(
"Union"
));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createCoGroupOperation
(
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
DataSet
op2
=
(
DataSet
)
sets
.
get
(
info
.
otherID
);
...
...
@@ -513,6 +521,7 @@ public class PythonPlanBinder {
sets
.
put
(
info
.
setID
,
new
CoGroupRawOperator
(
op1
,
op2
,
key1
,
key2
,
pcg
,
info
.
types
,
info
.
name
).
setParallelism
(
getParallelism
(
info
)));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createCrossOperation
(
DatasizeHint
mode
,
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
DataSet
op2
=
(
DataSet
)
sets
.
get
(
info
.
otherID
);
...
...
@@ -540,11 +549,13 @@ public class PythonPlanBinder {
}
}
@SuppressWarnings
(
"unchecked"
)
private
void
createFilterOperation
(
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
sets
.
put
(
info
.
setID
,
op1
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createFlatMapOperation
(
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
sets
.
put
(
info
.
setID
,
op1
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
));
...
...
@@ -565,21 +576,25 @@ public class PythonPlanBinder {
}
}
@SuppressWarnings
(
"unchecked"
)
private
DataSet
applyGroupReduceOperation
(
DataSet
op1
,
PythonOperationInfo
info
)
{
return
op1
.
reduceGroup
(
new
IdentityGroupReduce
()).
setCombinable
(
false
).
name
(
"PythonGroupReducePreStep"
).
setParallelism
(
getParallelism
(
info
))
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
);
}
@SuppressWarnings
(
"unchecked"
)
private
DataSet
applyGroupReduceOperation
(
UnsortedGrouping
op1
,
PythonOperationInfo
info
)
{
return
op1
.
reduceGroup
(
new
IdentityGroupReduce
()).
setCombinable
(
false
).
setParallelism
(
getParallelism
(
info
)).
name
(
"PythonGroupReducePreStep"
)
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
);
}
@SuppressWarnings
(
"unchecked"
)
private
DataSet
applyGroupReduceOperation
(
SortedGrouping
op1
,
PythonOperationInfo
info
)
{
return
op1
.
reduceGroup
(
new
IdentityGroupReduce
()).
setCombinable
(
false
).
setParallelism
(
getParallelism
(
info
)).
name
(
"PythonGroupReducePreStep"
)
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
);
}
@SuppressWarnings
(
"unchecked"
)
private
void
createJoinOperation
(
DatasizeHint
mode
,
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
DataSet
op2
=
(
DataSet
)
sets
.
get
(
info
.
otherID
);
...
...
@@ -592,6 +607,7 @@ public class PythonPlanBinder {
}
}
@SuppressWarnings
(
"unchecked"
)
private
DataSet
createDefaultJoin
(
DataSet
op1
,
DataSet
op2
,
String
[]
firstKeys
,
String
[]
secondKeys
,
DatasizeHint
mode
,
int
parallelism
)
{
switch
(
mode
)
{
case
NONE:
...
...
@@ -608,11 +624,13 @@ public class PythonPlanBinder {
}
}
@SuppressWarnings
(
"unchecked"
)
private
void
createMapOperation
(
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
sets
.
put
(
info
.
setID
,
op1
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
));
}
@SuppressWarnings
(
"unchecked"
)
private
void
createMapPartitionOperation
(
PythonOperationInfo
info
)
{
DataSet
op1
=
(
DataSet
)
sets
.
get
(
info
.
parentID
);
sets
.
put
(
info
.
setID
,
op1
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
));
...
...
@@ -629,11 +647,13 @@ public class PythonPlanBinder {
}
}
@SuppressWarnings
(
"unchecked"
)
private
DataSet
applyReduceOperation
(
DataSet
op1
,
PythonOperationInfo
info
)
{
return
op1
.
reduceGroup
(
new
IdentityGroupReduce
()).
setCombinable
(
false
).
setParallelism
(
getParallelism
(
info
)).
name
(
"PythonReducePreStep"
)
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
);
}
@SuppressWarnings
(
"unchecked"
)
private
DataSet
applyReduceOperation
(
UnsortedGrouping
op1
,
PythonOperationInfo
info
)
{
return
op1
.
reduceGroup
(
new
IdentityGroupReduce
()).
setCombinable
(
false
).
setParallelism
(
getParallelism
(
info
)).
name
(
"PythonReducePreStep"
)
.
mapPartition
(
new
PythonMapPartition
(
info
.
setID
,
info
.
types
)).
setParallelism
(
getParallelism
(
info
)).
name
(
info
.
name
);
...
...
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
浏览文件 @
945fc023
...
...
@@ -23,6 +23,7 @@ Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
@ForwardedFields
(
"f0.f1->f0; f1.f1->f1"
)
public
class
NestedKeyDiscarder
<
IN
>
implements
MapFunction
<
IN
,
Tuple2
<
byte
[],
byte
[]>>
{
@Override
@SuppressWarnings
(
"unchecked"
)
public
Tuple2
<
byte
[],
byte
[]>
map
(
IN
value
)
throws
Exception
{
Tuple2
<
Tuple2
<
Tuple
,
byte
[]>,
Tuple2
<
Tuple
,
byte
[]>>
x
=
(
Tuple2
<
Tuple2
<
Tuple
,
byte
[]>,
Tuple2
<
Tuple
,
byte
[]>>)
value
;
return
new
Tuple2
<>(
x
.
f0
.
f1
,
x
.
f1
.
f1
);
...
...
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
浏览文件 @
945fc023
...
...
@@ -20,9 +20,10 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
Utility function to serialize values, usually directly from data sources.
*/
public
class
SerializerMap
<
IN
>
implements
MapFunction
<
IN
,
byte
[]>
{
private
Serializer
serializer
=
null
;
private
Serializer
<
IN
>
serializer
=
null
;
@Override
@SuppressWarnings
(
"unchecked"
)
public
byte
[]
map
(
IN
value
)
throws
Exception
{
if
(
serializer
==
null
)
{
serializer
=
SerializationUtils
.
getSerializer
(
value
);
...
...
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
浏览文件 @
945fc023
...
...
@@ -124,7 +124,7 @@ public class PythonReceiver implements Serializable {
}
byte
[]
value
=
new
byte
[
fileBuffer
.
getInt
()];
fileBuffer
.
get
(
value
);
return
new
Tuple2
(
keys
,
value
);
return
new
Tuple2
<>
(
keys
,
value
);
}
}
...
...
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
浏览文件 @
945fc023
...
...
@@ -95,6 +95,7 @@ public class PythonSender<IN> implements Serializable {
* @return size of the written buffer
* @throws IOException
*/
@SuppressWarnings
(
"unchecked"
)
public
int
sendRecord
(
Object
value
)
throws
IOException
{
fileBuffer
.
clear
();
int
group
=
0
;
...
...
@@ -126,6 +127,7 @@ public class PythonSender<IN> implements Serializable {
* @return size of the written buffer
* @throws IOException
*/
@SuppressWarnings
(
"unchecked"
)
public
int
sendBuffer
(
Iterator
i
,
int
group
)
throws
IOException
{
fileBuffer
.
clear
();
...
...
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
浏览文件 @
945fc023
...
...
@@ -28,8 +28,9 @@ public class PythonPlanSender implements Serializable {
this
.
output
=
new
DataOutputStream
(
output
);
}
@SuppressWarnings
(
"unchecked"
)
public
void
sendRecord
(
Object
record
)
throws
IOException
{
byte
[]
data
=
SerializationUtils
.
getSerializer
(
record
).
serialize
(
record
);
output
.
write
(
data
);
}
}
\ No newline at end of file
}
flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
浏览文件 @
945fc023
...
...
@@ -244,9 +244,10 @@ public class SerializationUtils {
}
}
@SuppressWarnings
(
"unchecked"
)
@Override
public
byte
[]
serializeWithoutTypeInfo
(
Tuple
value
)
{
ArrayList
<
byte
[]>
bits
=
new
ArrayList
();
ArrayList
<
byte
[]>
bits
=
new
ArrayList
<>
();
int
totalSize
=
0
;
for
(
int
x
=
0
;
x
<
serializer
.
length
;
x
++)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录