diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 07a552bcd8526180e5d1b278b0b2b5302ad69222..a59407cb44f2bd76f0e68c05dd4130f7eb9d45b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.util.ClusterUtil; public class LocalStreamEnvironment extends StreamExecutionEnvironment { - protected static ClassLoader userClassLoader; - /** * Executes the JobGraph of the on a mini cluster of CLusterUtil with a * default name. @@ -32,7 +30,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute() throws Exception { - return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism()); + return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(), + getConfig().isSysoutLoggingEnabled()); } /** @@ -45,6 +44,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute(String jobName) throws Exception { - return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism()); + return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(), + getConfig().isSysoutLoggingEnabled()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 3308ab7dc324fb57a185077600c19e40160161ac..c3060fb307e48e6fa29b115f94026a37f8e5abed 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -69,11 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { this.host = host; this.port = port; this.jarFiles = new ArrayList(); - for (int i = 0; i < jarFiles.length; i++) { - File file = new File(jarFiles[i]); + for (String jarFile : jarFiles) { + File file = new File(jarFile); try { JobWithJars.checkJarFile(file); - } catch (IOException e) { + } + catch (IOException e) { throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e); } this.jarFiles.add(file); @@ -113,7 +114,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { Configuration configuration = jobGraph.getJobConfiguration(); Client client = new Client(new InetSocketAddress(host, port), configuration, JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1); - + client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); + try { JobSubmissionResult result = client.run(jobGraph, true); if(result instanceof JobExecutionResult) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java index 64b7bd81c38feaf3c68d931d3b09afe35db9a50e..409e5eda60fff780fa4c75c0a400523d651a23a1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -42,9 +42,8 @@ public class ClusterUtil { * memorySize * @return The result of the job execution, containing elapsed time and accumulators. */ - public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize) - throws Exception { - + public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize, + boolean printDuringExecution) throws Exception { Configuration configuration = jobGraph.getJobConfiguration(); LocalFlinkMiniCluster exec = null; @@ -58,7 +57,7 @@ public class ClusterUtil { try { exec = new LocalFlinkMiniCluster(configuration, true); - SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true); + SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, printDuringExecution); return result.toJobExecutionResult(ClusterUtil.class.getClassLoader()); } finally { @@ -68,7 +67,8 @@ public class ClusterUtil { } } - public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception { - return runOnMiniCluster(jobGraph, numOfSlots, -1); + public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots, + boolean printDuringExecution) throws Exception { + return runOnMiniCluster(jobGraph, numOfSlots, -1, printDuringExecution); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index fe2fdb821ac550a616e6d462d3b1bf9fc3af9963..c4bd09569379b976ecbcbdc6e62bd19220cef10d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -100,6 +100,7 @@ public class StreamCheckpointingITCase { "localhost", cluster.getJobManagerRPCPort()); env.setParallelism(PARALLELISM); env.enableCheckpointing(200); + env.getConfig().disableSysoutLogging(); DataStream stream = env.addSource(new RichParallelSourceFunction() {