diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 3763f6592afeee36866d92eade3aa9bd5fa56489..f1e6a3a767cd929b872b16315cecc36b4c3bead4 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger { YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); // so we have to change the number of cores for testing. YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 8e686bbbe34c1035074251019f7dc2e503dd66c5..3327505e32df56d2f90a404d8fa83365ae13b63b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager impleme "Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); - + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( + container.getResource(), null, null, container.getPriority())); if (numPendingContainerRequests > 0) { numPendingContainerRequests--; diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java index 10b2ce97d6fe482614fd60d409d079b2a58671b6..d665df6bc7ce071d3da61242120021334e62ccd6 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger { 1), i)); when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); containerList.add(mockContainer); } @@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger { int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); assertEquals(numInitialTaskManagers, numberOfRegisteredResources); } finally { if (resourceManager != null) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index d41d42d7a05f97030f65d981840dd44bcd77775b..ee325dad0f7b1da111405db99c2b4ad7170d3c6d 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger { resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Remote task executor registers with YarnResourceManager. @@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger { resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending