From a24734ea339872763306b44770678c4ace6a369f Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 1 Apr 2020 17:36:49 +0200 Subject: [PATCH] [FLINK-16560] Forward Configuration in PackagedProgramUtils#getPipelineFromProgram Before, when using PackagedProgramUtils (for example in the standalone cluster entrypoint or the web ui) the Flink Configuration would not be applied to the execution environment. This also adds a test that verifies that we forward configuration. --- .../apache/flink/client/cli/CliFrontend.java | 7 +- .../program/OptimizerPlanEnvironment.java | 4 +- .../client/program/PackagedProgramUtils.java | 7 +- .../client/program/StreamPlanEnvironment.java | 4 +- .../cli/CliFrontendPackageProgramTest.java | 2 +- .../flink/client/program/ClientTest.java | 2 +- .../program/ExecutionPlanCreationTest.java | 2 +- .../program/OptimizerPlanEnvironmentTest.java | 4 +- .../program/PackagedProgramUtilsTest.java | 123 ++++++++++++++++++ 9 files changed, 144 insertions(+), 11 deletions(-) create mode 100644 flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index c38c49c6395..55244c3fe9d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -233,7 +233,7 @@ public class CliFrontend { * * @param args Command line arguments for the info action. */ - protected void info(String[] args) throws CliArgsException, FileNotFoundException, ProgramInvocationException { + protected void info(String[] args) throws Exception { LOG.info("Running 'info' command."); final Options commandOptions = CliFrontendParser.getInfoCommandOptions(); @@ -265,7 +265,10 @@ public class CliFrontend { LOG.info("Creating program plan dump"); - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, parallelism, true); + final Configuration effectiveConfiguration = + getEffectiveConfiguration(commandLine, programOptions, program.getJobJarAndDependencies()); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, effectiveConfiguration, parallelism, true); String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline); if (jsonPlan != null) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java index 87ff1a4f515..4f9b40e5b9d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; /** @@ -34,7 +35,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment { return pipeline; } - public OptimizerPlanEnvironment(int parallelism) { + public OptimizerPlanEnvironment(Configuration configuration, int parallelism) { + super(configuration); if (parallelism > 0) { setParallelism(parallelism); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 5761e950ed1..6d2ff10f6dc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -59,7 +59,7 @@ public enum PackagedProgramUtils { int defaultParallelism, @Nullable JobID jobID, boolean suppressOutput) throws ProgramInvocationException { - final Pipeline pipeline = getPipelineFromProgram(packagedProgram, defaultParallelism, suppressOutput); + final Pipeline pipeline = getPipelineFromProgram(packagedProgram, configuration, defaultParallelism, suppressOutput); final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism); if (jobID != null) { @@ -93,6 +93,7 @@ public enum PackagedProgramUtils { public static Pipeline getPipelineFromProgram( PackagedProgram program, + Configuration configuration, int parallelism, boolean suppressOutput) throws CompilerException, ProgramInvocationException { final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -116,9 +117,9 @@ public enum PackagedProgramUtils { } // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(parallelism); + OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(configuration, parallelism); benv.setAsContext(); - StreamPlanEnvironment senv = new StreamPlanEnvironment(parallelism); + StreamPlanEnvironment senv = new StreamPlanEnvironment(configuration, parallelism); senv.setAsContext(); try { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java index 86b05af9066..b2ce783153e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.client.program; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; @@ -37,7 +38,8 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { return pipeline; } - public StreamPlanEnvironment(int parallelism) { + public StreamPlanEnvironment(Configuration configuration, int parallelism) { + super(configuration); if (parallelism > 0) { setParallelism(parallelism); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index dd8f345b308..6750cca9082 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -301,7 +301,7 @@ public class CliFrontendPackageProgramTest extends TestLogger { Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c); // we expect this to fail with a "ClassNotFoundException" - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, 666, true); + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, c, 666, true); FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline); fail("Should have failed with a ClassNotFoundException"); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 6f291e722e9..93248a1b863 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -234,7 +234,7 @@ public class ClientTest extends TestLogger { .build(); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1, true); + Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, new Configuration(), 1, true); OptimizedPlan op = optimizer.compile(plan); assertNotNull(op); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index 0c258e41d74..b47928d137c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -63,7 +63,7 @@ public class ExecutionPlanCreationTest { config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort()); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, -1, true); + Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, config, -1, true); OptimizedPlan op = optimizer.compile(plan); assertNotNull(op); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java index 0bd89e194c9..9e952e1f55d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/OptimizerPlanEnvironmentTest.java @@ -18,6 +18,8 @@ package org.apache.flink.client.program; +import org.apache.flink.configuration.Configuration; + import org.junit.Assert; import org.junit.Test; @@ -47,7 +49,7 @@ public class OptimizerPlanEnvironmentTest { try { // Flink will throw an error because no job graph will be generated by the main method. - PackagedProgramUtils.getPipelineFromProgram(packagedProgram, 1, suppressOutput); + PackagedProgramUtils.getPipelineFromProgram(packagedProgram, new Configuration(), 1, suppressOutput); Assert.fail("This should have failed to create the Flink Plan."); } catch (ProgramInvocationException e) { // Test that that Flink captured the expected stdout/stderr diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java new file mode 100644 index 00000000000..59e75fc1f7a --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests {@link PackagedProgramUtils}. + */ +public class PackagedProgramUtilsTest { + + /** + * This tests whether configuration forwarding from a {@link Configuration} to the environment + * works. + */ + @Test + public void testDataSetConfigurationForwarding() throws Exception { + assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig()); + + PackagedProgram packagedProgram = PackagedProgram.newBuilder() + .setEntryPointClassName(DataSetTestProgram.class.getName()) + .build(); + + Configuration config = createConfigurationWithOption(); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, + config, + 1 /* parallelism */, + false /* suppress output */); + + ExecutionConfig executionConfig = ((Plan) pipeline).getExecutionConfig(); + + assertExpectedOption(executionConfig); + } + + /** + * This tests whether configuration forwarding from a {@link Configuration} to the environment + * works. + */ + @Test + public void testDataStreamConfigurationForwarding() throws Exception { + assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig()); + + PackagedProgram packagedProgram = PackagedProgram.newBuilder() + .setEntryPointClassName(DataStreamTestProgram.class.getName()) + .build(); + + Configuration config = createConfigurationWithOption(); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, + config, + 1 /* parallelism */, + false /* suppress output */); + + ExecutionConfig executionConfig = ((StreamGraph) pipeline).getExecutionConfig(); + + assertExpectedOption(executionConfig); + } + + private static void assertPrecondition(ExecutionConfig executionConfig) { + // we want to test forwarding with this config, ensure that the default is what we expect. + assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(false)); + } + + private static void assertExpectedOption(ExecutionConfig executionConfig) { + // we want to test forwarding with this config, ensure that the default is what we expect. + assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true)); + } + + private static Configuration createConfigurationWithOption() { + Configuration config = new Configuration(); + config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false); + return config; + } + + /** Test Program for the DataSet API. */ + public static class DataSetTestProgram { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello").print(); + env.execute(); + } + } + + /** Test Program for the DataStream API. */ + public static class DataStreamTestProgram { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello").print(); + env.execute(); + } + } +} -- GitLab