提交 976bacc6 编写于 作者: S Stephan Ewen

[FLINK-2994] [client] Report error cause when jobs switch to failing.

For jobs that do not switch to FAILED, but rather RESTARTING, this now prints the error cause
as well. Also minor improvement to exception printing in CliFrontend.
上级 0de13b50
...@@ -125,8 +125,6 @@ public class CliFrontend { ...@@ -125,8 +125,6 @@ public class CliFrontend {
private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
private final File configDirectory;
private final Configuration config; private final Configuration config;
private final FiniteDuration askTimeout; private final FiniteDuration askTimeout;
...@@ -155,12 +153,12 @@ public class CliFrontend { ...@@ -155,12 +153,12 @@ public class CliFrontend {
public CliFrontend(String configDir) throws Exception { public CliFrontend(String configDir) throws Exception {
// configure the config directory // configure the config directory
this.configDirectory = new File(configDir); File configDirectory = new File(configDir);
LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath()); LOG.info("Using configuration directory " + configDirectory.getAbsolutePath());
// load the configuration // load the configuration
LOG.info("Trying to load configuration file"); LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath()); GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration(); this.config = GlobalConfiguration.getConfiguration();
// load the YARN properties // load the YARN properties
...@@ -175,13 +173,9 @@ public class CliFrontend { ...@@ -175,13 +173,9 @@ public class CliFrontend {
Properties yarnProperties = new Properties(); Properties yarnProperties = new Properties();
try { try {
InputStream is = new FileInputStream(propertiesFile); try (InputStream is = new FileInputStream(propertiesFile)) {
try {
yarnProperties.load(is); yarnProperties.load(is);
} }
finally {
is.close();
}
} }
catch (IOException e) { catch (IOException e) {
throw new Exception("Cannot read the YARN properties file", e); throw new Exception("Cannot read the YARN properties file", e);
...@@ -915,9 +909,9 @@ public class CliFrontend { ...@@ -915,9 +909,9 @@ public class CliFrontend {
} }
LOG.error("Error while running the command.", t); LOG.error("Error while running the command.", t);
System.err.println("\n------------------------------------------------------------");
System.err.println(" The program finished with the following exception:\n");
t.printStackTrace(); t.printStackTrace();
System.err.println();
System.err.println("The exception above occurred while trying to run your command.");
return 1; return 1;
} }
......
...@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; ...@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.messages.ExecutionGraphMessages;
...@@ -113,9 +114,9 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval ...@@ -113,9 +114,9 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
// =========== State Change Messages =============== // =========== State Change Messages ===============
if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) { if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
logAndPrintMessage(message); logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);
} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) { } else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
logAndPrintMessage(message); logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);
} }
// ============ JobManager ActorRef resolution =============== // ============ JobManager ActorRef resolution ===============
...@@ -276,13 +277,30 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval ...@@ -276,13 +277,30 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval
return leaderSessionID; return leaderSessionID;
} }
private void logAndPrintMessage(Object message) { private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) {
LOG.info(message.toString()); LOG.info(message.toString());
if (sysoutUpdates) { if (sysoutUpdates) {
System.out.println(message.toString()); System.out.println(message.toString());
} }
} }
private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) {
// by default, this only prints the status, and not any exception.
// in state FAILING, we report the exception in addition
if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) {
LOG.info(message.toString());
if (sysoutUpdates) {
System.out.println(message.toString());
}
} else {
LOG.info(message.toString(), message.error());
if (sysoutUpdates) {
System.out.println(message.toString());
message.error().printStackTrace(System.out);
}
}
}
@Override @Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
getSelf().tell( getSelf().tell(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册