From e26d90fc86b266978b4bac84fe02ca34b62983fe Mon Sep 17 00:00:00 2001 From: Shuyi Chen Date: Sat, 10 Nov 2018 00:42:49 -0800 Subject: [PATCH] [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success This closes #7078 --- .../test/java/org/apache/flink/yarn/YarnTestBase.java | 2 ++ .../org/apache/flink/yarn/YarnFlinkResourceManager.java | 2 ++ .../java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++- .../apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++ .../org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++ 5 files changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 3763f6592af..f1e6a3a767c 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger { YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); // so we have to change the number of cores for testing. YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 8e686bbbe34..3327505e32d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager impleme "Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); - + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( + container.getResource(), null, null, container.getPriority())); if (numPendingContainerRequests > 0) { numPendingContainerRequests--; diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java index 10b2ce97d6f..d665df6bc7c 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger { 1), i)); when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); containerList.add(mockContainer); } @@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger { int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); assertEquals(numInitialTaskManagers, numberOfRegisteredResources); } finally { if (resourceManager != null) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index d41d42d7a05..ee325dad0f7 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger { resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Remote task executor registers with YarnResourceManager. @@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger { resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending -- GitLab