Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
62c5a3c0
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
62c5a3c0
编写于
6月 20, 2016
作者:
A
Aljoscha Krettek
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-3714] Rename getCleanupTimeForWindow to cleanupTime in WindowOperator
上级
0104a926
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
7 addition
and
7 deletion
+7
-7
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
...streaming/runtime/operators/windowing/WindowOperator.java
+7
-7
未找到文件。
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
浏览文件 @
62c5a3c0
...
...
@@ -323,7 +323,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
});
//
check if the window is already inactiv
e
//
drop if the window is already lat
e
if
(
isLate
(
actualWindow
))
{
LOG
.
info
(
"Dropped element "
+
element
+
" for window "
+
actualWindow
+
" due to lateness."
);
continue
;
...
...
@@ -352,7 +352,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
else
{
for
(
W
window:
elementWindows
)
{
//
check if the window is already inactiv
e
//
drop if the window is already lat
e
if
(
isLate
(
window
))
{
LOG
.
info
(
"Dropped element "
+
element
+
" for window "
+
window
+
" due to lateness."
);
continue
;
...
...
@@ -528,7 +528,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* considered when triggering.
*/
protected
boolean
isLate
(
W
window
)
{
return
(
windowAssigner
.
isEventTime
()
&&
(
getCleanupTimeForWindow
(
window
)
<=
currentWatermark
));
return
(
windowAssigner
.
isEventTime
()
&&
(
cleanupTime
(
window
)
<=
currentWatermark
));
}
/**
...
...
@@ -537,7 +537,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* the window whose state to discard
*/
protected
void
registerCleanupTimer
(
W
window
)
{
long
cleanupTime
=
getCleanupTimeForWindow
(
window
);
long
cleanupTime
=
cleanupTime
(
window
);
if
(
windowAssigner
.
isEventTime
())
{
context
.
registerEventTimeTimer
(
cleanupTime
);
}
else
{
...
...
@@ -551,7 +551,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* the window whose state to discard
*/
protected
void
deleteCleanupTimer
(
W
window
)
{
long
cleanupTime
=
getCleanupTimeForWindow
(
window
);
long
cleanupTime
=
cleanupTime
(
window
);
if
(
windowAssigner
.
isEventTime
())
{
context
.
deleteEventTimeTimer
(
cleanupTime
);
}
else
{
...
...
@@ -568,7 +568,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*
* @param window the window whose cleanup time we are computing.
*/
private
long
getCleanupTimeForWindow
(
W
window
)
{
private
long
cleanupTime
(
W
window
)
{
long
cleanupTime
=
window
.
maxTimestamp
()
+
allowedLateness
;
return
cleanupTime
>=
window
.
maxTimestamp
()
?
cleanupTime
:
Long
.
MAX_VALUE
;
}
...
...
@@ -585,7 +585,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* @return {@code true} if it is time to clean up the window state, {@code false} otherwise.
*/
protected
final
boolean
isCleanupTime
(
W
window
,
long
time
)
{
long
cleanupTime
=
getCleanupTimeForWindow
(
window
);
long
cleanupTime
=
cleanupTime
(
window
);
return
cleanupTime
==
time
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录