diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index e93025f19e5a62063593b02e73391d3bcf9a2e78..ab3af854a63386c1a8f43d2758a9783ec324255a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -40,6 +40,7 @@ import java.util.Properties; import akka.actor.ActorSystem; import org.apache.commons.cli.CommandLine; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -920,7 +921,18 @@ public class CliFrontend { System.err.println("\n------------------------------------------------------------"); System.err.println(" The program finished with the following exception:\n"); - t.printStackTrace(); + if (t.getCause() instanceof InvalidProgramException) { + System.err.println(t.getCause().getMessage()); + StackTraceElement[] trace = t.getCause().getStackTrace(); + for (StackTraceElement ele: trace) { + System.err.println("\t" + ele.toString()); + if (ele.getMethodName().equals("main")) { + break; + } + } + } else { + t.printStackTrace(); + } return 1; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 7d226d2ace17bcec7e43fcd525afe2b0b463c7ad..8f92c51aa00a2df92f1deab0f986cda00b74d68a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -242,9 +242,8 @@ public class Client { } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); - ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(), - prog.getUserCodeClassLoader(), parallelism, true); - + ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(), + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true)); // invoke here try { prog.invokeInteractiveModeForExecution(); @@ -269,18 +268,18 @@ public class Client { } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); - ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(), - prog.getUserCodeClassLoader(), parallelism, false); + ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false); + ContextEnvironment.setAsContext(factory); // invoke here try { prog.invokeInteractiveModeForExecution(); + return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); } finally { ContextEnvironment.unsetContext(); } - - return new JobSubmissionResult(lastJobID); } else { throw new RuntimeException("PackagedProgram does not have a valid invocation mode."); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index d5a28fc9ce74af2d49b03b9428c614c92565fcf9..1e3d0d49c7644aa47ee179160a24fe83e1465233 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -23,41 +23,30 @@ import java.util.List; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Execution Environment for remote execution with the Client. + * Execution Environment for remote execution with the Client in blocking fashion. */ public class ContextEnvironment extends ExecutionEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class); + protected final Client client; - private final Client client; + protected final List jarFilesToAttach; - private final List jarFilesToAttach; - - private final List classpathsToAttach; - - private final ClassLoader userCodeClassLoader; - - private final boolean wait; - + protected final List classpathsToAttach; + protected final ClassLoader userCodeClassLoader; public ContextEnvironment(Client remoteConnection, List jarFiles, List classpaths, - ClassLoader userCodeClassLoader, boolean wait) { + ClassLoader userCodeClassLoader) { this.client = remoteConnection; this.jarFilesToAttach = jarFiles; this.classpathsToAttach = classpaths; this.userCodeClassLoader = userCodeClassLoader; - this.wait = wait; } @Override @@ -65,17 +54,8 @@ public class ContextEnvironment extends ExecutionEnvironment { Plan p = createProgramPlan(jobName); JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach, this.userCodeClassLoader); - - if (wait) { - this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism()); - return this.lastJobExecutionResult; - } - else { - JobSubmissionResult result = client.runDetached(toRun, getParallelism()); - LOG.warn("Job was executed in detached mode, the results will be available on completion."); - this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result); - return this.lastJobExecutionResult; - } + this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism()); + return this.lastJobExecutionResult; } @Override @@ -93,10 +73,6 @@ public class ContextEnvironment extends ExecutionEnvironment { jobID = JobID.generate(); } - public boolean isWait() { - return wait; - } - @Override public String toString() { return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism()) @@ -114,63 +90,18 @@ public class ContextEnvironment extends ExecutionEnvironment { public List getClasspaths(){ return classpathsToAttach; } + + public ClassLoader getUserCodeClassLoader() { + return userCodeClassLoader; + } // -------------------------------------------------------------------------------------------- - static void setAsContext(Client client, List jarFilesToAttach, List classpathsToAttach, - ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait) - { - ContextEnvironmentFactory factory = new ContextEnvironmentFactory(client, jarFilesToAttach, - classpathsToAttach, userCodeClassLoader, defaultParallelism, wait); + static void setAsContext(ContextEnvironmentFactory factory) { initializeContextEnvironment(factory); } static void unsetContext() { resetContextEnvironment(); } - - // -------------------------------------------------------------------------------------------- - - /** - * The factory that instantiates the environment to be used when running jobs that are - * submitted through a pre-configured client connection. - * This happens for example when a job is submitted from the command line. - */ - public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { - - private final Client client; - - private final List jarFilesToAttach; - - private final List classpathsToAttach; - - private final ClassLoader userCodeClassLoader; - - private final int defaultParallelism; - - private final boolean wait; - - - public ContextEnvironmentFactory(Client client, List jarFilesToAttach, - List classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, - boolean wait) - { - this.client = client; - this.jarFilesToAttach = jarFilesToAttach; - this.classpathsToAttach = classpathsToAttach; - this.userCodeClassLoader = userCodeClassLoader; - this.defaultParallelism = defaultParallelism; - this.wait = wait; - } - - @Override - public ExecutionEnvironment createExecutionEnvironment() { - ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, - userCodeClassLoader, wait); - if (defaultParallelism > 0) { - env.setParallelism(defaultParallelism); - } - return env; - } - } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..55f705b5ea2a10efd705162b86bf20e28c156618 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironmentFactory; + +import java.net.URL; +import java.util.List; + +/** + * The factory that instantiates the environment to be used when running jobs that are + * submitted through a pre-configured client connection. + * This happens for example when a job is submitted from the command line. + */ +public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { + + private final Client client; + + private final List jarFilesToAttach; + + private final List classpathsToAttach; + + private final ClassLoader userCodeClassLoader; + + private final int defaultParallelism; + + private final boolean wait; + + private ExecutionEnvironment lastEnvCreated; + + + public ContextEnvironmentFactory(Client client, List jarFilesToAttach, + List classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, + boolean wait) + { + this.client = client; + this.jarFilesToAttach = jarFilesToAttach; + this.classpathsToAttach = classpathsToAttach; + this.userCodeClassLoader = userCodeClassLoader; + this.defaultParallelism = defaultParallelism; + this.wait = wait; + } + + @Override + public ExecutionEnvironment createExecutionEnvironment() { + if (!wait && lastEnvCreated != null) { + throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode"); + } + + lastEnvCreated = wait ? + new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader) : + new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader); + if (defaultParallelism > 0) { + lastEnvCreated.setParallelism(defaultParallelism); + } + return lastEnvCreated; + } + + public ExecutionEnvironment getLastEnvCreated() { + return lastEnvCreated; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java new file mode 100644 index 0000000000000000000000000000000000000000..0b1ae1d012b07d22cd398ab4bdde5fed421c4404 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.List; +import java.util.Map; + +/** + * Execution Environment for remote execution with the Client in detached mode. + */ +public class DetachedEnvironment extends ContextEnvironment { + + /** Keeps track of the program plan for the Client to access. */ + private FlinkPlan detachedPlan; + + private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class); + + public DetachedEnvironment(Client remoteConnection, List jarFiles, List classpaths, ClassLoader userCodeClassLoader) { + super(remoteConnection, jarFiles, classpaths, userCodeClassLoader); + } + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + Plan p = createProgramPlan(jobName); + setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, getParallelism())); + LOG.warn("Job was executed in detached mode, the results will be available on completion."); + this.lastJobExecutionResult = DetachedJobExecutionResult.INSTANCE; + return this.lastJobExecutionResult; + } + + public void setDetachedPlan(FlinkPlan plan) { + if (detachedPlan == null) { + detachedPlan = plan; + } else { + throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE); + } + } + + /** + * Finishes this Context Environment's execution by explicitly running the plan constructed. + */ + JobSubmissionResult finalizeExecute() throws ProgramInvocationException { + return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader); + } + + public static final class DetachedJobExecutionResult extends JobExecutionResult { + + public static final DetachedJobExecutionResult INSTANCE = new DetachedJobExecutionResult(); + + static final String DETACHED_MESSAGE = "Job was submitted in detached mode. "; + + static final String EXECUTE_TWICE_MESSAGE = "Only one call to execute is allowed. "; + + static final String EAGER_FUNCTION_MESSAGE = "Please make sure your program doesn't call " + + "an eager execution function [collect, print, printToErr, count]. "; + + static final String JOB_RESULT_MESSAGE = "Results of job execution, such as accumulators," + + " runtime, job id etc. are not available. "; + + private DetachedJobExecutionResult() { + super(null, -1, null); + } + + @Override + public long getNetRuntime() { + throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); + } + + @Override + public T getAccumulatorResult(String accumulatorName) { + throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE + EAGER_FUNCTION_MESSAGE); + } + + @Override + public Map getAllAccumulatorResults() { + throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); + } + + @Override + public Integer getIntCounterResult(String accumulatorName) { + throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); + } + + @Override + public JobID getJobID() { + throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE); + } + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 1fbf68183189a63b48b9630eea377d6e13807eac..cc32d9c448eb2c5bb3af777b683421afcf7af8fd 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.DataStatistics; @@ -75,6 +76,9 @@ public class ClientTest { private ActorSystem jobManagerSystem; + private static final String ACCUMULATOR_NAME = "test_accumulator"; + + private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException."; @Before public void setUp() throws Exception { @@ -117,6 +121,75 @@ public class ClientTest { } } + /** + * Tests that invalid detached mode programs fail. + */ + @Test + public void testDetachedMode() throws Exception{ + jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME()); + Client out = new Client(config); + + try { + PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class); + out.runDetached(prg, 1); + fail(FAIL_MESSAGE); + } catch (ProgramInvocationException e) { + assertEquals( + DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE, + e.getCause().getMessage()); + } + + try { + PackagedProgram prg = new PackagedProgram(TestEager.class); + out.runDetached(prg, 1); + fail(FAIL_MESSAGE); + } catch (ProgramInvocationException e) { + assertEquals( + DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE, + e.getCause().getMessage()); + } + + try { + PackagedProgram prg = new PackagedProgram(TestGetRuntime.class); + out.runDetached(prg, 1); + fail(FAIL_MESSAGE); + } catch (ProgramInvocationException e) { + assertEquals( + DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE, + e.getCause().getMessage()); + } + + try { + PackagedProgram prg = new PackagedProgram(TestGetJobID.class); + out.runDetached(prg, 1); + fail(FAIL_MESSAGE); + } catch (ProgramInvocationException e) { + assertEquals( + DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE, + e.getCause().getMessage()); + } + + try { + PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class); + out.runDetached(prg, 1); + fail(FAIL_MESSAGE); + } catch (ProgramInvocationException e) { + assertEquals( + DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE, + e.getCause().getMessage()); + } + + try { + PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class); + out.runDetached(prg, 1); + fail(FAIL_MESSAGE); + } catch (ProgramInvocationException e) { + assertEquals( + DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE, + e.getCause().getMessage()); + } + } + /** * This test verifies correct job submission messaging logic and plan translation calls. */ @@ -304,4 +377,58 @@ public class ClientTest { return "TestOptimizerPlan "; } } + + private static final class TestExecuteTwice { + + public static void main(String args[]) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2).output(new DiscardingOutputFormat()); + env.execute(); + env.fromElements(1, 2).collect(); + } + } + + private static final class TestEager { + + public static void main(String args[]) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2).collect(); + } + } + + private static final class TestGetRuntime { + + public static void main(String args[]) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2).output(new DiscardingOutputFormat()); + env.execute().getNetRuntime(); + } + } + + private static final class TestGetJobID { + + public static void main(String args[]) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2).output(new DiscardingOutputFormat()); + env.execute().getJobID(); + } + } + + private static final class TestGetAccumulator { + + public static void main(String args[]) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2).output(new DiscardingOutputFormat()); + env.execute().getAccumulatorResult(ACCUMULATOR_NAME); + } + } + + private static final class TestGetAllAccumulator { + + public static void main(String args[]) throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(1, 2).output(new DiscardingOutputFormat()); + env.execute().getAllAccumulatorResults(); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 240c9d24a6f918bacae96d0f69a5a2e7a9a8b7a8..7a68fc52a556e27605486e36ca8a5f3132a6952a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -17,17 +17,13 @@ package org.apache.flink.streaming.api.environment; -import java.net.URL; -import java.util.List; - import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobSubmissionResult; -import org.apache.flink.client.program.Client; -import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ContextEnvironment; +import org.apache.flink.client.program.DetachedEnvironment; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; - import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,42 +32,19 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class); - private final List jars; + private final ContextEnvironment ctx; - private final List classpaths; - - private final Client client; - - private final ClassLoader userCodeClassLoader; - - private final boolean wait; - - protected StreamContextEnvironment(Client client, List jars, List classpaths, int parallelism, - boolean wait) { - this.client = client; - this.jars = jars; - this.classpaths = classpaths; - this.wait = wait; - - this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths, - getClass().getClassLoader()); - - if (parallelism > 0) { - setParallelism(parallelism); - } - else { - // determine parallelism + protected StreamContextEnvironment(ContextEnvironment ctx) { + this.ctx = ctx; + if (ctx.getParallelism() > 0) { + setParallelism(ctx.getParallelism()); + } else { setParallelism(GlobalConfiguration.getInteger( ConfigConstants.DEFAULT_PARALLELISM_KEY, ConfigConstants.DEFAULT_PARALLELISM)); } } - @Override - public JobExecutionResult execute() throws Exception { - return execute(DEFAULT_JOB_NAME); - } - @Override public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull("Streaming Job name should not be null."); @@ -82,12 +55,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { transformations.clear(); // execute the programs - if (wait) { - return client.runBlocking(streamGraph, jars, classpaths, userCodeClassLoader); - } else { - JobSubmissionResult result = client.runDetached(streamGraph, jars, classpaths, userCodeClassLoader); + if (ctx instanceof DetachedEnvironment) { LOG.warn("Job was executed in detached mode, the results will be available on completion."); - return JobExecutionResult.fromJobSubmissionResult(result); + ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph); + return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; + } else { + return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader()); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 3c961f08c297ec99a4fc7034d7ba846c47d2b14d..5cc0007652bbd07ba680bb80cf442c98d01ca437 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.client.program.Client; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.OptimizerPlanEnvironment; import org.apache.flink.client.program.PreviewPlanEnvironment; @@ -70,7 +69,6 @@ import org.apache.flink.util.SplittableIterator; import java.io.IOException; import java.io.Serializable; -import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1272,9 +1270,7 @@ public abstract class StreamExecutionEnvironment { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { - ContextEnvironment ctx = (ContextEnvironment) env; - return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(), - ctx.getParallelism(), ctx.isWait()); + return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) { return new StreamPlanEnvironment(env); } else { @@ -1282,12 +1278,6 @@ public abstract class StreamExecutionEnvironment { } } - private static StreamExecutionEnvironment createContextEnvironment( - Client client, List jars, List classpaths, int parallelism, boolean wait) - { - return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait); - } - /** * Creates a {@link LocalStreamEnvironment}. The local execution environment * will run the program in a multi-threaded fashion in the same JVM as the