diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index c38c49c63959b47affd677489184f9e35dd31c31..55244c3fe9d94a18943ef653a4f5d9fac957db89 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -233,7 +233,7 @@ public class CliFrontend { * * @param args Command line arguments for the info action. */ - protected void info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { + protected void info(String[] args) throws Exception { LOG.info("Running 'info' command."); final Options commandOptions = CliFrontendParser.getInfoCommandOptions(); @@ -265,7 +265,10 @@ public class CliFrontend { LOG.info("Creating program plan dump"); - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, parallelism, true); + final Configuration effectiveConfiguration = + getEffectiveConfiguration(commandLine, programOptions, program.getJobJarAndDependencies()); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true); String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline); if (jsonPlan != null) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java index 87ff1a4f515b772ddf02bde54475c5bf562d3c0c..4f9b40e5b9d7bcf1d843ed7d961b152dd63cf6b0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; /** @@ -34,7 +35,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment { return pipeline; } - public OptimizerPlanEnvironment(int parallelism) { + public OptimizerPlanEnvironment(Configuration configuration, int parallelism) { + super(configuration); if (parallelism > 0) { setParallelism(parallelism); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 5761e950ed125e92fcb464db2de3d70c368028c0..6d2ff10f6dcb88a7d7f5e486a4b8013f1741f9b1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -59,7 +59,7 @@ public enum PackagedProgramUtils { int defaultParallelism, @Nullable JobID jobID, boolean suppressOutput) throws ProgramInvocationException { - final Pipeline pipeline = getPipelineFromProgram(packagedProgram, defaultParallelism, suppressOutput); + final Pipeline pipeline = getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism); if (jobID != null) { @@ -93,6 +93,7 @@ public enum PackagedProgramUtils { public static Pipeline getPipelineFromProgram( PackagedProgram program, + Configuration configuration, int parallelism, boolean suppressOutput) throws CompilerException, ProgramInvocationException { final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -116,9 +117,9 @@ public enum PackagedProgramUtils { } // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(parallelism); + OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(configuration, parallelism); benv.setAsContext(); - StreamPlanEnvironment senv = new StreamPlanEnvironment(parallelism); + StreamPlanEnvironment senv = new StreamPlanEnvironment(configuration, parallelism); senv.setAsContext(); try { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java index 86b05af90663afb6b107701053b056b2bd36f182..b2ce783153e4752c42fee49dc5feece8e79366c8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.client.program; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; @@ -37,7 +38,8 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { return pipeline; } - public StreamPlanEnvironment(int parallelism) { + public StreamPlanEnvironment(Configuration configuration, int parallelism) { + super(configuration); if (parallelism > 0) { setParallelism(parallelism); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index dd8f345b3089e27445d09c7933ea9f32d127e0d9..6750cca90822b5a8922bcb5bf9b52f7d898c18aa 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -301,7 +301,7 @@ public class CliFrontendPackageProgramTest extends TestLogger { Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c); // we expect this to fail with a "ClassNotFoundException" - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, 666, true); + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, c, 666, true); FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline); fail("Should have failed with a ClassNotFoundException"); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 6f291e722e98761de7d907458bad786ecb096cd6..93248a1b863d69d5eed5b9d3bb4cabd7cbd741cd 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -234,7 +234,7 @@ public class ClientTest extends TestLogger { .build(); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1, true); + Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, new Configuration(), 1, true); OptimizedPlan op = optimizer.compile(plan); assertNotNull(op); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index 0c258e41d74b4c414e8e182c64679d86acb48338..b47928d137c92620167f406cbe3a62d97e8c3397 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -63,7 +63,7 @@ public class ExecutionPlanCreationTest { config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort()); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, -1, true); + Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, config, -1, true); OptimizedPlan op = optimizer.compile(plan); assertNotNull(op); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java index 0bd89e194c9a146f271ba6db10bc6a4e56ecd1e1..9e952e1f55d0cb12a9ee7322970c233665f96425 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java @@ -18,6 +18,8 @@ package org.apache.flink.client.program; +import org.apache.flink.configuration.Configuration; + import org.junit.Assert; import org.junit.Test; @@ -47,7 +49,7 @@ public class OptimizerPlanEnvironmentTest { try { // Flink will throw an error because no job graph will be generated by the main method. - PackagedProgramUtils.getPipelineFromProgram(packagedProgram, 1, suppressOutput); + PackagedProgramUtils.getPipelineFromProgram(packagedProgram, new Configuration(), 1, suppressOutput); Assert.fail("This should have failed to create the Flink Plan."); } catch (ProgramInvocationException e) { // Test that that Flink captured the expected stdout/stderr diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..59e75fc1f7a6f9278d986028d1c93a4231c158fc --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests {@link PackagedProgramUtils}. + */ +public class PackagedProgramUtilsTest { + + /** + * This tests whether configuration forwarding from a {@link Configuration} to the environment + * works. + */ + @Test + public void testDataSetConfigurationForwarding() throws Exception { + assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig()); + + PackagedProgram packagedProgram = PackagedProgram.newBuilder() + .setEntryPointClassName(DataSetTestProgram.class.getName()) + .build(); + + Configuration config = createConfigurationWithOption(); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, + config, + 1 /* parallelism */, + false /* suppress output */); + + ExecutionConfig executionConfig = ((Plan) pipeline).getExecutionConfig(); + + assertExpectedOption(executionConfig); + } + + /** + * This tests whether configuration forwarding from a {@link Configuration} to the environment + * works. + */ + @Test + public void testDataStreamConfigurationForwarding() throws Exception { + assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig()); + + PackagedProgram packagedProgram = PackagedProgram.newBuilder() + .setEntryPointClassName(DataStreamTestProgram.class.getName()) + .build(); + + Configuration config = createConfigurationWithOption(); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, + config, + 1 /* parallelism */, + false /* suppress output */); + + ExecutionConfig executionConfig = ((StreamGraph) pipeline).getExecutionConfig(); + + assertExpectedOption(executionConfig); + } + + private static void assertPrecondition(ExecutionConfig executionConfig) { + // we want to test forwarding with this config, ensure that the default is what we expect. + assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(false)); + } + + private static void assertExpectedOption(ExecutionConfig executionConfig) { + // we want to test forwarding with this config, ensure that the default is what we expect. + assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true)); + } + + private static Configuration createConfigurationWithOption() { + Configuration config = new Configuration(); + config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false); + return config; + } + + /** Test Program for the DataSet API. */ + public static class DataSetTestProgram { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello").print(); + env.execute(); + } + } + + /** Test Program for the DataStream API. */ + public static class DataStreamTestProgram { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello").print(); + env.execute(); + } + } +}