Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
680c2c3e
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
680c2c3e
编写于
2月 03, 2016
作者:
U
Ufuk Celebi
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[hotfix, yarn] Exit JVM after YARN actor system shut down
This closes #1582, #1576.
上级
35ec26cd
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
97 addition
and
1 deletion
+97
-1
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
...src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+6
-0
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java
...cala/org/apache/flink/yarn/YarnProcessShutDownThread.java
+82
-0
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
...rc/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+9
-1
未找到文件。
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
浏览文件 @
680c2c3e
...
...
@@ -183,6 +183,12 @@ class YarnJobManager(
// Shutdown and discard all queued messages
context
.
system
.
shutdown
()
// Await actor system termination and shut down JVM
new
YarnProcessShutDownThread
(
log
.
logger
,
context
.
system
,
FiniteDuration
(
10
,
SECONDS
)).
start
()
case
RegisterApplicationClient
=>
val
client
=
sender
()
...
...
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnProcessShutDownThread.java
0 → 100644
浏览文件 @
680c2c3e
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.yarn
;
import
akka.actor.ActorSystem
;
import
org.slf4j.Logger
;
import
scala.concurrent.duration.Duration
;
import
java.util.concurrent.TimeoutException
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
/**
* JVM shut down thread awaiting actor system shut down for a certain amount
* of time before exiting the JVM.
*
* <p>On some Linux distributions, YARN is not able to stop containers, because
* the <code>kill</code> command has different arguments. For example when
* running Flink on GCE ("Debian GNU/Linux 7.9 (wheezy)"), YARN containers will
* not properly shut down when we don't call <code>System.exit()</code>.
*/
class
YarnProcessShutDownThread
extends
Thread
{
/** Log of the corresponding YARN process. */
private
final
Logger
log
;
/** Actor system to await termination of. */
private
final
ActorSystem
actorSystem
;
/** Actor system termination timeout before shutting down the JVM. */
private
final
Duration
terminationTimeout
;
/**
* Creates a shut down thread.
*
* @param log Log of the corresponding YARN process.
* @param actorSystem Actor system to await termination of.
* @param terminationTimeout Actor system termination timeout before
* shutting down the JVM.
*/
public
YarnProcessShutDownThread
(
Logger
log
,
ActorSystem
actorSystem
,
Duration
terminationTimeout
)
{
this
.
log
=
checkNotNull
(
log
,
"Logger"
);
this
.
actorSystem
=
checkNotNull
(
actorSystem
,
"Actor system"
);
this
.
terminationTimeout
=
checkNotNull
(
terminationTimeout
,
"Termination timeout"
);
}
@Override
public
void
run
()
{
try
{
actorSystem
.
awaitTermination
(
terminationTimeout
);
}
catch
(
Exception
e
)
{
if
(
e
instanceof
TimeoutException
)
{
log
.
error
(
"Actor system shut down timed out."
,
e
);
}
else
{
log
.
error
(
"Failure during actor system shut down."
,
e
);
}
}
finally
{
log
.
info
(
"Shutdown completed. Stopping JVM."
);
System
.
exit
(
0
);
}
}
}
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
浏览文件 @
680c2c3e
...
...
@@ -23,9 +23,11 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
import
org.apache.flink.runtime.io.network.NetworkEnvironment
import
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import
org.apache.flink.runtime.memory.MemoryManager
import
org.apache.flink.runtime.taskmanager.
{
NetworkEnvironmentConfiguration
,
TaskManagerConfiguration
,
TaskManager
}
import
org.apache.flink.runtime.taskmanager.
{
TaskManagerConfiguration
,
TaskManager
}
import
org.apache.flink.yarn.YarnMessages.StopYarnSession
import
scala.concurrent.duration._
/** An extension of the TaskManager that listens for additional YARN related
* messages.
*/
...
...
@@ -55,5 +57,11 @@ class YarnTaskManager(
log
.
info
(
s
"Stopping YARN TaskManager with final application status $status "
+
s
"and diagnostics: $diagnostics"
)
context
.
system
.
shutdown
()
// Await actor system termination and shut down JVM
new
YarnProcessShutDownThread
(
log
.
logger
,
context
.
system
,
FiniteDuration
(
10
,
SECONDS
)).
start
()
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录