提交 b7043123 编写于 作者: S Stephan Ewen

[FLINK-1878] [streaming] Stream environments accept a flag that controls...

[FLINK-1878] [streaming] Stream environments accept a flag that controls sysout logging during execution.
上级 df7c61e2
...@@ -22,8 +22,6 @@ import org.apache.flink.streaming.util.ClusterUtil; ...@@ -22,8 +22,6 @@ import org.apache.flink.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment { public class LocalStreamEnvironment extends StreamExecutionEnvironment {
protected static ClassLoader userClassLoader;
/** /**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
* default name. * default name.
...@@ -32,7 +30,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { ...@@ -32,7 +30,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/ */
@Override @Override
public JobExecutionResult execute() throws Exception { public JobExecutionResult execute() throws Exception {
return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism()); return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism(),
getConfig().isSysoutLoggingEnabled());
} }
/** /**
...@@ -45,6 +44,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { ...@@ -45,6 +44,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/ */
@Override @Override
public JobExecutionResult execute(String jobName) throws Exception { public JobExecutionResult execute(String jobName) throws Exception {
return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism()); return ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getParallelism(),
getConfig().isSysoutLoggingEnabled());
} }
} }
...@@ -69,11 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { ...@@ -69,11 +69,12 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.jarFiles = new ArrayList<File>(); this.jarFiles = new ArrayList<File>();
for (int i = 0; i < jarFiles.length; i++) { for (String jarFile : jarFiles) {
File file = new File(jarFiles[i]); File file = new File(jarFile);
try { try {
JobWithJars.checkJarFile(file); JobWithJars.checkJarFile(file);
} catch (IOException e) { }
catch (IOException e) {
throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e); throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
} }
this.jarFiles.add(file); this.jarFiles.add(file);
...@@ -113,6 +114,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { ...@@ -113,6 +114,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
Configuration configuration = jobGraph.getJobConfiguration(); Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(host, port), configuration, Client client = new Client(new InetSocketAddress(host, port), configuration,
JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1); JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
try { try {
JobSubmissionResult result = client.run(jobGraph, true); JobSubmissionResult result = client.run(jobGraph, true);
......
...@@ -42,9 +42,8 @@ public class ClusterUtil { ...@@ -42,9 +42,8 @@ public class ClusterUtil {
* memorySize * memorySize
* @return The result of the job execution, containing elapsed time and accumulators. * @return The result of the job execution, containing elapsed time and accumulators.
*/ */
public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize) public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize,
throws Exception { boolean printDuringExecution) throws Exception {
Configuration configuration = jobGraph.getJobConfiguration(); Configuration configuration = jobGraph.getJobConfiguration();
LocalFlinkMiniCluster exec = null; LocalFlinkMiniCluster exec = null;
...@@ -58,7 +57,7 @@ public class ClusterUtil { ...@@ -58,7 +57,7 @@ public class ClusterUtil {
try { try {
exec = new LocalFlinkMiniCluster(configuration, true); exec = new LocalFlinkMiniCluster(configuration, true);
SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true); SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, printDuringExecution);
return result.toJobExecutionResult(ClusterUtil.class.getClassLoader()); return result.toJobExecutionResult(ClusterUtil.class.getClassLoader());
} }
finally { finally {
...@@ -68,7 +67,8 @@ public class ClusterUtil { ...@@ -68,7 +67,8 @@ public class ClusterUtil {
} }
} }
public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception { public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, int numOfSlots,
return runOnMiniCluster(jobGraph, numOfSlots, -1); boolean printDuringExecution) throws Exception {
return runOnMiniCluster(jobGraph, numOfSlots, -1, printDuringExecution);
} }
} }
...@@ -100,6 +100,7 @@ public class StreamCheckpointingITCase { ...@@ -100,6 +100,7 @@ public class StreamCheckpointingITCase {
"localhost", cluster.getJobManagerRPCPort()); "localhost", cluster.getJobManagerRPCPort());
env.setParallelism(PARALLELISM); env.setParallelism(PARALLELISM);
env.enableCheckpointing(200); env.enableCheckpointing(200);
env.getConfig().disableSysoutLogging();
DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() { DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册