提交 bf29de98 编写于 作者: M Maximilian Michels

[streaming] delegate JobGraph generation to Client class

上级 7fdaa4e2
......@@ -341,14 +341,14 @@ public class Client {
}
public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
ClassLoader classLoader) throws ProgramInvocationException
{
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
return runBlocking(job, classLoader);
}
public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
ClassLoader classLoader) throws ProgramInvocationException
{
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
......
......@@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
......@@ -185,7 +186,10 @@ public class FlinkClient {
topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
}
final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
final StreamGraph streamGraph = topology.getStreamGraph();
streamGraph.setJobName(name);
final JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.addJar(new Path(uploadedJarUri));
final Configuration configuration = jobGraph.getJobConfiguration();
......
......@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -73,7 +74,10 @@ public class FlinkLocalCluster {
topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
}
JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
StreamGraph streamGraph = topology.getStreamGraph();
streamGraph.setJobName(topologyName);
JobGraph jobGraph = streamGraph.getJobGraph();
this.flink.submitJobDetached(jobGraph);
}
......
......@@ -26,6 +26,7 @@ import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -81,8 +82,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
......
......@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -35,9 +34,8 @@ import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -166,36 +164,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
return executeRemotely(jobGraph);
return executeRemotely(streamGraph);
}
/**
* Executes the remote job.
*
* @param jobGraph
* jobGraph to execute
* @param streamGraph
* Stream Graph to execute
* @return The result of the job execution, containing elapsed time and accumulators.
*/
private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
private JobExecutionResult executeRemotely(StreamGraph streamGraph) throws ProgramInvocationException {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
for (URL jarFile : jarFiles) {
try {
jobGraph.addJar(new Path(jarFile.toURI()));
} catch (URISyntaxException e) {
throw new ProgramInvocationException("URL is invalid", e);
}
}
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
getClass().getClassLoader());
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.addAll(this.config);
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
......@@ -211,7 +201,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
try {
return client.runBlocking(jobGraph, usercodeClassLoader);
return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader);
}
catch (ProgramInvocationException e) {
throw e;
......
......@@ -20,15 +20,15 @@ package org.apache.flink.streaming.api.environment;
import java.net.URL;
import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -69,33 +69,23 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute() throws Exception {
return execute(null);
return execute(DEFAULT_JOB_NAME);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull("Streaming Job name should not be null.");
JobGraph jobGraph;
if (jobName == null) {
jobGraph = this.getStreamGraph().getJobGraph();
} else {
jobGraph = this.getStreamGraph().getJobGraph(jobName);
}
StreamGraph streamGraph = this.getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
// attach all necessary jar files to the JobGraph
for (URL file : jars) {
jobGraph.addJar(new Path(file.toURI()));
}
jobGraph.setClasspaths(classpaths);
// execute the programs
if (wait) {
return client.runBlocking(jobGraph, userCodeClassLoader);
return client.runBlocking(streamGraph, jars, classpaths, userCodeClassLoader);
} else {
JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
JobSubmissionResult result = client.runDetached(streamGraph, jars, classpaths, userCodeClassLoader);
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
return JobExecutionResult.fromJobSubmissionResult(result);
}
......
......@@ -126,6 +126,10 @@ public class StreamGraph extends StreamingPlan {
return executionConfig;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
......@@ -554,20 +558,9 @@ public class StreamGraph extends StreamingPlan {
}
/**
* Gets the assembled {@link JobGraph} and adds a default name for it.
* Gets the assembled {@link JobGraph}.
*/
public JobGraph getJobGraph() {
return getJobGraph(jobName);
}
/**
* Gets the assembled {@link JobGraph} and adds a user specified name for
* it.
*
* @param jobGraphName
* name of the jobGraph
*/
public JobGraph getJobGraph(String jobGraphName) {
// temporarily forbid checkpointing for iterative jobs
if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
throw new UnsupportedOperationException(
......@@ -576,11 +569,9 @@ public class StreamGraph extends StreamingPlan {
+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
}
setJobName(jobGraphName);
StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
return jobgraphGenerator.createJobGraph(jobGraphName);
return jobgraphGenerator.createJobGraph(jobName);
}
@Override
......
......@@ -83,7 +83,7 @@ public class StreamingJobGraphGenerator {
}
public JobGraph createJobGraph(String jobName) {
jobGraph = new JobGraph(jobName);
jobGraph = new JobGraph(streamGraph.getJobName());
// make sure that all vertices start immediately
jobGraph.setScheduleMode(ScheduleMode.ALL);
......
......@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
/**
......@@ -41,7 +42,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
final JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
final StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
final JobGraph jobGraph = streamGraph.getJobGraph();
return executor.submitJobAndWait(jobGraph, false);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册