提交 5b2ad7f0 编写于 作者: M Maximilian Michels

[FLINK-1946] reduce verbosity of Yarn cluster setup

This removes repeated printing of messages retrieved from the Yarn
cluster. Only new messages are printed.

- reduce waiting time between subsequent cluster queries

This closes #2147
上级 3b593632
......@@ -727,8 +727,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
int waittime = 0;
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop: while( true ) {
try {
report = yarnClient.getApplicationReport(appId);
......@@ -750,14 +751,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("YARN application has been deployed successfully.");
break loop;
default:
LOG.info("Deploying cluster, current state " + appState);
if(waittime > 60000) {
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if(System.currentTimeMillis() - startTime > 60000) {
LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
waittime += 1000;
Thread.sleep(1000);
lastAppState = appState;
Thread.sleep(250);
}
// print the application id for user to cancel themselves.
if (isDetachedMode()) {
......
......@@ -155,22 +155,21 @@ public class YarnClusterClient extends ClusterClient {
logAndSysout("Waiting until all TaskManagers have connected");
while (true) {
GetClusterStatusResponse status = getClusterStatus();
if (status != null) {
if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
+ clusterDescriptor.getTaskManagerCount() + ")");
} else {
for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
currentStatus = getClusterStatus();
if (currentStatus != null && !currentStatus.equals(lastStatus)) {
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
+ clusterDescriptor.getTaskManagerCount() + ")");
if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
logAndSysout("All TaskManagers are connected");
break;
}
} else {
} else if (lastStatus == null) {
logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
try {
Thread.sleep(500);
Thread.sleep(250);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for TaskManagers");
System.err.println("Thread is interrupted");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册