From b70431239a5e18555866addb41ee6edf2b79ff60 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 13 Apr 2015 19:02:47 +0200 Subject: [PATCH] [FLINK-1878] [streaming] Stream environments accept a flag that controls sysout logging during execution. --- .../api/environment/LocalStreamEnvironment.java | 8 ++++---- .../api/environment/RemoteStreamEnvironment.java | 10 ++++++---- .../org/apache/flink/streaming/util/ClusterUtil.java | 12 ++++++------ .../checkpointing/StreamCheckpointingITCase.java | 1 + 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 07a552bcd85..a59407cb44f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.util.ClusterUtil; public class LocalStreamEnvironment extends StreamExecutionEnvironment { - protected static ClassLoader userClassLoader; - /** * Executes the JobGraph of the on a mini cluster of CLusterUtil with a * default name. @@ -32,7 +30,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute() throws Exception { - return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism()); + return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(), + getConfig().isSysoutLoggingEnabled()); } /** @@ -45,6 +44,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public JobExecutionResult execute(String jobName) throws Exception { - return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism()); + return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(), + getConfig().isSysoutLoggingEnabled()); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 3308ab7dc32..c3060fb307e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -69,11 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { this.host = host; this.port = port; this.jarFiles = new ArrayList(); - for (int i = 0; i < jarFiles.length; i++) { - File file = new File(jarFiles[i]); + for (String jarFile : jarFiles) { + File file = new File(jarFile); try { JobWithJars.checkJarFile(file); - } catch (IOException e) { + } + catch (IOException e) { throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e); } this.jarFiles.add(file); @@ -113,7 +114,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { Configuration configuration = jobGraph.getJobConfiguration(); Client client = new Client(new InetSocketAddress(host, port), configuration, JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1); - + client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); + try { JobSubmissionResult result = client.run(jobGraph, true); if(result instanceof JobExecutionResult) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java index 64b7bd81c38..409e5eda60f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java @@ -42,9 +42,8 @@ public class ClusterUtil { * memorySize * @return The result of the job execution, containing elapsed time and accumulators. */ - public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize) - throws Exception { - + public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize, + boolean printDuringExecution) throws Exception { Configuration configuration = jobGraph.getJobConfiguration(); LocalFlinkMiniCluster exec = null; @@ -58,7 +57,7 @@ public class ClusterUtil { try { exec = new LocalFlinkMiniCluster(configuration, true); - SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true); + SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, printDuringExecution); return result.toJobExecutionResult(ClusterUtil.class.getClassLoader()); } finally { @@ -68,7 +67,8 @@ public class ClusterUtil { } } - public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception { - return runOnMiniCluster(jobGraph, numOfSlots, -1); + public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots, + boolean printDuringExecution) throws Exception { + return runOnMiniCluster(jobGraph, numOfSlots, -1, printDuringExecution); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index fe2fdb821ac..c4bd0956937 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -100,6 +100,7 @@ public class StreamCheckpointingITCase { "localhost", cluster.getJobManagerRPCPort()); env.setParallelism(PARALLELISM); env.enableCheckpointing(200); + env.getConfig().disableSysoutLogging(); DataStream stream = env.addSource(new RichParallelSourceFunction() { -- GitLab