diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 2211012c648c5742c81fa9a1e68722f8053c01ae..933a22ca1b083d101c03660580d429f206eb2518 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -125,8 +125,6 @@ public class CliFrontend { private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); - private final File configDirectory; - private final Configuration config; private final FiniteDuration askTimeout; @@ -155,12 +153,12 @@ public class CliFrontend { public CliFrontend(String configDir) throws Exception { // configure the config directory - this.configDirectory = new File(configDir); - LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath()); + File configDirectory = new File(configDir); + LOG.info("Using configuration directory " + configDirectory.getAbsolutePath()); // load the configuration LOG.info("Trying to load configuration file"); - GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath()); + GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); this.config = GlobalConfiguration.getConfiguration(); // load the YARN properties @@ -175,13 +173,9 @@ public class CliFrontend { Properties yarnProperties = new Properties(); try { - InputStream is = new FileInputStream(propertiesFile); - try { + try (InputStream is = new FileInputStream(propertiesFile)) { yarnProperties.load(is); } - finally { - is.close(); - } } catch (IOException e) { throw new Exception("Cannot read the YARN properties file", e); @@ -915,9 +909,9 @@ public class CliFrontend { } LOG.error("Error while running the command.", t); + System.err.println("\n------------------------------------------------------------"); + System.err.println(" The program finished with the following exception:\n"); t.printStackTrace(); - System.err.println(); - System.err.println("The exception above occurred while trying to run your command."); return 1; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java index d08046be34341b7f1f9491f11fa66ab328e9aa44..b1177309775b1f7636c6688330df2addf5275ae5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.ExecutionGraphMessages; @@ -113,9 +114,9 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval // =========== State Change Messages =============== if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) { - logAndPrintMessage(message); + logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message); } else if (message instanceof ExecutionGraphMessages.JobStatusChanged) { - logAndPrintMessage(message); + logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message); } // ============ JobManager ActorRef resolution =============== @@ -276,13 +277,30 @@ public class JobClientActor extends FlinkUntypedActor implements LeaderRetrieval return leaderSessionID; } - private void logAndPrintMessage(Object message) { + private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged message) { LOG.info(message.toString()); if (sysoutUpdates) { System.out.println(message.toString()); } } + private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged message) { + // by default, this only prints the status, and not any exception. + // in state FAILING, we report the exception in addition + if (message.newJobStatus() != JobStatus.FAILING || message.error() == null) { + LOG.info(message.toString()); + if (sysoutUpdates) { + System.out.println(message.toString()); + } + } else { + LOG.info(message.toString(), message.error()); + if (sysoutUpdates) { + System.out.println(message.toString()); + message.error().printStackTrace(System.out); + } + } + } + @Override public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { getSelf().tell(