未验证 提交 36d9a756 编写于 作者: S shuai-xu 提交者: Till Rohrmann

[FLINK-12038][tests] Harden YARNITCase

Only kill Yarn application if it does not properly terminate.

This closes #9175.
上级 8dec21f5
......@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
......@@ -32,11 +33,14 @@ import org.apache.flink.yarn.util.YarnTestUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
......@@ -50,6 +54,10 @@ import static org.junit.Assert.assertThat;
*/
public class YARNITCase extends YarnTestBase {
private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);
private final int sleepIntervalInMS = 100;
@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
......@@ -113,16 +121,37 @@ public class YARNITCase extends YarnTestBase {
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor);
} finally {
if (clusterClient != null) {
clusterClient.shutdown();
}
if (applicationId != null) {
yarnClusterDescriptor.killCluster(applicationId);
}
}
}
});
}
private void waitApplicationFinishedElseKillIt(
ApplicationId applicationId,
Duration timeout,
YarnClusterDescriptor yarnClusterDescriptor) throws Exception {
Deadline deadline = Deadline.now().plus(timeout);
YarnApplicationState state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
while (state != YarnApplicationState.FINISHED) {
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
Assert.fail("Application became FAILED or KILLED while expecting FINISHED");
}
if (deadline.isOverdue()) {
yarnClusterDescriptor.killCluster(applicationId);
Assert.fail("Application didn't finish before timeout");
}
sleep(sleepIntervalInMS);
state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册