From 976bacc65908cc35382ddbbcdc80249a700bc2d3 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 10 Nov 2015 19:42:58 +0100 Subject: [PATCH] [FLINK-2994] [client] Report error cause when jobs switch to failing. For jobs that do not switch to FAILED, but rather RESTARTING, this now prints the error cause as well. Also minor improvement to exception printing in CliFrontend. --- .../org/apache/flink/client/CliFrontend.java | 18 +++++--------- .../flink/runtime/client/JobClientActor.java | 24 ++++++++++++++++--- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 2211012c648..933a22ca1b0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -125,8 +125,6 @@ public class CliFrontend { private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); - private final File configDirectory; - private final Configuration config; private final FiniteDuration askTimeout; @@ -155,12 +153,12 @@ public class CliFrontend { public CliFrontend(String configDir) throws Exception { // configure the config directory - this.configDirectory = new File(configDir); - LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath()); + File configDirectory = new File(configDir); + LOG.info("Using configuration directory " + configDirectory.getAbsolutePath()); // load the configuration LOG.info("Trying to load configuration file"); - GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath()); + GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); this.config = GlobalConfiguration.getConfiguration(); // load the YARN properties @@ -175,13 +173,9 @@ public class CliFrontend { Properties yarnProperties = new Properties(); try { - InputStream is = new FileInputStream(propertiesFile); - try { + try (InputStream is = new FileInputStream(propertiesFile)) { yarnProperties.load(is); } - finally { - is.close(); - } } catch (IOException e) { throw new Exception("Cannot read the YARN properties file", e); @@ -915,9 +909,9 @@ public class CliFrontend { } LOG.error("Error while running the command.", t); + System.err.println("\n------------------------------------------------------------"); + System.err.println(" The program finished with the following exception:\n"); t.printStackTrace(); - System.err.println(); - System.err.println("The exception above occurred while trying to run your command."); return 1; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java index d08046be343..b1177309775 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.ExecutionGraphMessages; @@ -113,9 +114,9 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval // =========== State Change Messages =============== if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) { - logAndPrintMessage(message); + logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message); } else if (message instanceof ExecutionGraphMessages.JobStatusChanged) { - logAndPrintMessage(message); + logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message); } // ============ JobManager ActorRef resolution =============== @@ -276,13 +277,30 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval return leaderSessionID; } - private void logAndPrintMessage(Object message) { + private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) { LOG.info(message.toString()); if (sysoutUpdates) { System.out.println(message.toString()); } } + private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) { + // by default, this only prints the status, and not any exception. + // in state FAILING, we report the exception in addition + if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) { + LOG.info(message.toString()); + if (sysoutUpdates) { + System.out.println(message.toString()); + } + } else { + LOG.info(message.toString(), message.error()); + if (sysoutUpdates) { + System.out.println(message.toString()); + message.error().printStackTrace(System.out); + } + } + } + @Override public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { getSelf().tell( -- GitLab