Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
e1c059d3
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e1c059d3
编写于
7月 14, 2014
作者:
G
gaborhermann
提交者:
Stephan Ewen
8月 18, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[streaming] Shorter log messages
上级
8951d921
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
19 addition
and
20 deletion
+19
-20
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java
...a/eu/stratosphere/streaming/api/FaultToleranceBuffer.java
+0
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
...n/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
+12
-12
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
.../streaming/api/streamcomponent/StreamComponentHelper.java
+1
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
...tratosphere/streaming/api/streamcomponent/StreamSink.java
+2
-2
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java
...atosphere/streaming/api/streamcomponent/StreamSource.java
+1
-1
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
...tratosphere/streaming/api/streamcomponent/StreamTask.java
+3
-3
未找到文件。
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java
浏览文件 @
e1c059d3
...
...
@@ -196,7 +196,6 @@ public class FaultToleranceBuffer {
*/
public
void
failRecord
(
String
recordID
)
{
// Create new id to avoid double counting acks
log
.
warn
(
"Fail ID: "
+
recordID
);
StreamRecord
newRecord
=
removeRecord
(
recordID
).
setId
(
channelID
);
addRecord
(
newRecord
);
reEmit
(
newRecord
);
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java
浏览文件 @
e1c059d3
...
...
@@ -90,7 +90,7 @@ public class JobGraphBuilder {
config
.
setString
(
"componentName"
,
sourceName
);
components
.
put
(
sourceName
,
source
);
numberOfInstances
.
put
(
sourceName
,
1
);
log
.
debug
(
"S
ource set
: "
+
sourceName
);
log
.
debug
(
"S
OURCE
: "
+
sourceName
);
}
/**
...
...
@@ -116,7 +116,7 @@ public class JobGraphBuilder {
config
.
setString
(
"componentName"
,
sourceName
);
components
.
put
(
sourceName
,
source
);
numberOfInstances
.
put
(
sourceName
,
1
);
log
.
debug
(
"S
ource set
: "
+
sourceName
);
log
.
debug
(
"S
OURCE
: "
+
sourceName
);
}
/**
...
...
@@ -138,7 +138,7 @@ public class JobGraphBuilder {
config
.
setString
(
"componentName"
,
taskName
);
components
.
put
(
taskName
,
task
);
numberOfInstances
.
put
(
taskName
,
1
);
log
.
debug
(
"T
ask set
: "
+
taskName
);
log
.
debug
(
"T
ASK
: "
+
taskName
);
}
/**
...
...
@@ -164,7 +164,7 @@ public class JobGraphBuilder {
config
.
setString
(
"componentName"
,
taskName
);
components
.
put
(
taskName
,
task
);
numberOfInstances
.
put
(
taskName
,
parallelism
);
log
.
debug
(
"T
ask set
: "
+
taskName
);
log
.
debug
(
"T
ASK
: "
+
taskName
);
}
/**
...
...
@@ -186,7 +186,7 @@ public class JobGraphBuilder {
config
.
setString
(
"componentName"
,
sinkName
);
components
.
put
(
sinkName
,
sink
);
numberOfInstances
.
put
(
sinkName
,
1
);
log
.
debug
(
"S
ink set
: "
+
sinkName
);
log
.
debug
(
"S
INK
: "
+
sinkName
);
}
/**
...
...
@@ -212,7 +212,7 @@ public class JobGraphBuilder {
config
.
setString
(
"componentName"
,
sinkName
);
components
.
put
(
sinkName
,
sink
);
numberOfInstances
.
put
(
sinkName
,
1
);
log
.
debug
(
"
Sink set
: "
+
sinkName
);
log
.
debug
(
"
TASK
: "
+
sinkName
);
}
/**
...
...
@@ -247,14 +247,14 @@ public class JobGraphBuilder {
"partitionerClass_"
+
upStreamComponent
.
getNumberOfForwardConnections
(),
PartitionerClass
);
log
.
debug
(
"C
omponents connected with
"
+
PartitionerClass
.
getSimpleName
()
+
"
:
"
+
upStreamComponentName
+
"
to
"
+
downStreamComponentName
);
log
.
debug
(
"C
ONNECTED:
"
+
PartitionerClass
.
getSimpleName
()
+
"
-
"
+
upStreamComponentName
+
"
->
"
+
downStreamComponentName
);
}
catch
(
JobGraphDefinitionException
e
)
{
log
.
error
(
"Cannot connect components with "
+
PartitionerClass
.
getSimpleName
()
+
" : "
+
upStreamComponentName
+
"
to
"
+
upStreamComponentName
+
"
->
"
+
downStreamComponentName
,
e
);
}
}
...
...
@@ -338,8 +338,8 @@ public class JobGraphBuilder {
keyPosition
);
addOutputChannels
(
upStreamComponentName
);
log
.
debug
(
"C
omponents connected by field:
"
+
upStreamComponentName
+
"
to "
+
downStreamComponentName
+
" by key position
"
log
.
debug
(
"C
ONNECTED: FIELD PARTITIONING -
"
+
upStreamComponentName
+
"
-> "
+
downStreamComponentName
+
", KEY:
"
+
keyPosition
);
}
catch
(
JobGraphDefinitionException
e
)
{
log
.
error
(
"Cannot connect components by field: "
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java
浏览文件 @
e1c059d3
...
...
@@ -180,7 +180,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
else
{
partitioners
.
add
(
partitioner
.
newInstance
());
}
log
.
debug
(
"Partitioner set: "
+
partitioner
.
getSimpleName
()
+
" with "
log
.
trace
(
"Partitioner set: "
+
partitioner
.
getSimpleName
()
+
" with "
+
nrOutput
+
" outputs"
);
}
catch
(
Exception
e
)
{
log
.
error
(
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSink.java
浏览文件 @
e1c059d3
...
...
@@ -60,7 +60,7 @@ public class StreamSink extends AbstractOutputTask {
@Override
public
void
invoke
()
throws
Exception
{
log
.
debug
(
"S
ink
"
+
name
+
" invoked"
);
log
.
debug
(
"S
INK
"
+
name
+
" invoked"
);
boolean
hasInput
=
true
;
while
(
hasInput
)
{
hasInput
=
false
;
...
...
@@ -82,6 +82,6 @@ public class StreamSink extends AbstractOutputTask {
}
}
System
.
out
.
println
(
"Result: "
+
userFunction
.
getResult
());
log
.
debug
(
"S
ink
"
+
name
+
" invoke finished"
);
log
.
debug
(
"S
INK
"
+
name
+
" invoke finished"
);
}
}
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamSource.java
浏览文件 @
e1c059d3
...
...
@@ -84,7 +84,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
@Override
public
void
invoke
()
throws
Exception
{
log
.
debug
(
"S
ource
"
+
name
+
" invoked with instance id "
+
sourceInstanceID
);
log
.
debug
(
"S
OURCE
"
+
name
+
" invoked with instance id "
+
sourceInstanceID
);
userFunction
.
invoke
();
}
...
...
flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamTask.java
浏览文件 @
e1c059d3
...
...
@@ -81,7 +81,7 @@ public class StreamTask extends AbstractTask {
@Override
public
void
invoke
()
throws
Exception
{
log
.
debug
(
"T
ask
"
+
name
+
" invoked with instance id "
+
taskInstanceID
);
log
.
debug
(
"T
ASK
"
+
name
+
" invoked with instance id "
+
taskInstanceID
);
boolean
hasInput
=
true
;
while
(
hasInput
)
{
...
...
@@ -98,12 +98,12 @@ public class StreamTask extends AbstractTask {
log
.
debug
(
"ACK: "
+
id
+
" -- "
+
name
);
}
catch
(
Exception
e
)
{
streamTaskHelper
.
threadSafePublish
(
new
FailEvent
(
id
),
input
);
log
.
warn
(
"
INVOKE
FAILED: "
+
id
+
" -- "
+
name
+
" -- due to "
+
e
.
getMessage
());
log
.
warn
(
"FAILED: "
+
id
+
" -- "
+
name
+
" -- due to "
+
e
.
getMessage
());
}
}
}
}
log
.
debug
(
"T
ask
"
+
name
+
"invoke finished with instance id "
+
taskInstanceID
);
log
.
debug
(
"T
ASK
"
+
name
+
"invoke finished with instance id "
+
taskInstanceID
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录