Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
e26d90fc
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e26d90fc
编写于
11月 10, 2018
作者:
S
Shuyi Chen
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
This closes #7078
上级
c8675b84
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
19 addition
and
1 deletion
+19
-1
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
...sts/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+2
-0
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
.../java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+2
-0
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
.../main/java/org/apache/flink/yarn/YarnResourceManager.java
+2
-1
flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
...a/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+9
-0
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
...t/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+4
-0
未找到文件。
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
浏览文件 @
e26d90fc
...
...
@@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION
.
setInt
(
YarnConfiguration
.
DEBUG_NM_DELETE_DELAY_SEC
,
3600
);
YARN_CONFIGURATION
.
setBoolean
(
YarnConfiguration
.
LOG_AGGREGATION_ENABLED
,
false
);
YARN_CONFIGURATION
.
setInt
(
YarnConfiguration
.
NM_VCORES
,
666
);
// memory is overwritten in the MiniYARNCluster.
YARN_CONFIGURATION
.
set
(
"yarn.scheduler.capacity.resource-calculator"
,
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
);
// so we have to change the number of cores for testing.
YARN_CONFIGURATION
.
setInt
(
YarnConfiguration
.
RM_AM_EXPIRY_INTERVAL_MS
,
20000
);
// 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
...
...
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
浏览文件 @
e26d90fc
...
...
@@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
numPendingContainerRequests
=
Math
.
max
(
0
,
numPendingContainerRequests
-
1
);
LOG
.
info
(
"Received new container: {} - Remaining pending container requests: {}"
,
container
.
getId
(),
numPendingContainerRequests
);
resourceManagerClient
.
removeContainerRequest
(
new
AMRMClient
.
ContainerRequest
(
container
.
getResource
(),
null
,
null
,
container
.
getPriority
()));
// decide whether to return the container, or whether to start a TaskManager
if
(
numRegistered
+
containersInLaunch
.
size
()
<
numRequired
)
{
...
...
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
浏览文件 @
e26d90fc
...
...
@@ -361,7 +361,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
"Received new container: {} - Remaining pending container requests: {}"
,
container
.
getId
(),
numPendingContainerRequests
);
resourceManagerClient
.
removeContainerRequest
(
new
AMRMClient
.
ContainerRequest
(
container
.
getResource
(),
null
,
null
,
container
.
getPriority
()));
if
(
numPendingContainerRequests
>
0
)
{
numPendingContainerRequests
--;
...
...
flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
浏览文件 @
e26d90fc
...
...
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import
org.apache.hadoop.yarn.api.records.ContainerId
;
import
org.apache.hadoop.yarn.api.records.ContainerLaunchContext
;
import
org.apache.hadoop.yarn.api.records.NodeId
;
import
org.apache.hadoop.yarn.api.records.Priority
;
import
org.apache.hadoop.yarn.api.records.Resource
;
import
org.apache.hadoop.yarn.client.api.AMRMClient
;
import
org.apache.hadoop.yarn.client.api.NMClient
;
import
org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
;
...
...
@@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline;
import
scala.concurrent.duration.FiniteDuration
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
mockito
.
Mockito
.
any
;
import
static
org
.
mockito
.
Mockito
.
doAnswer
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
times
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
mockito
.
Mockito
.
when
;
/**
...
...
@@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
1
),
i
));
when
(
mockContainer
.
getNodeId
()).
thenReturn
(
NodeId
.
newInstance
(
"container"
,
1234
));
when
(
mockContainer
.
getResource
()).
thenReturn
(
Resource
.
newInstance
(
200
,
1
));
when
(
mockContainer
.
getPriority
()).
thenReturn
(
Priority
.
UNDEFINED
);
containerList
.
add
(
mockContainer
);
}
...
...
@@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
int
numberOfRegisteredResources
=
(
Integer
)
Await
.
result
(
numberOfRegisteredResourcesFuture
,
deadline
.
timeLeft
());
verify
(
resourceManagerClient
,
times
(
numInitialTaskManagers
)).
removeContainerRequest
(
any
(
AMRMClient
.
ContainerRequest
.
class
));
assertEquals
(
numInitialTaskManagers
,
numberOfRegisteredResources
);
}
finally
{
if
(
resourceManager
!=
null
)
{
...
...
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
浏览文件 @
e26d90fc
...
...
@@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger {
resourceManager
.
onContainersAllocated
(
ImmutableList
.
of
(
testingContainer
));
verify
(
mockResourceManagerClient
).
addContainerRequest
(
any
(
AMRMClient
.
ContainerRequest
.
class
));
verify
(
mockResourceManagerClient
).
removeContainerRequest
(
any
(
AMRMClient
.
ContainerRequest
.
class
));
verify
(
mockNMClient
).
startContainer
(
eq
(
testingContainer
),
any
(
ContainerLaunchContext
.
class
));
// Remote task executor registers with YarnResourceManager.
...
...
@@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger {
resourceManager
.
onContainersAllocated
(
ImmutableList
.
of
(
testingContainer
));
verify
(
mockResourceManagerClient
).
addContainerRequest
(
any
(
AMRMClient
.
ContainerRequest
.
class
));
verify
(
mockResourceManagerClient
).
removeContainerRequest
(
any
(
AMRMClient
.
ContainerRequest
.
class
));
verify
(
mockNMClient
).
startContainer
(
eq
(
testingContainer
),
any
(
ContainerLaunchContext
.
class
));
// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录