From ec61cf21276fb6eea599b40ea999613ffeef48ef Mon Sep 17 00:00:00 2001 From: Huang Xingbo Date: Fri, 17 May 2019 11:17:23 +0800 Subject: [PATCH] [FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend This closes #8472 --- docs/ops/cli.md | 66 ++++++ docs/ops/cli.zh.md | 63 ++++- .../apache/flink/client/cli/CliFrontend.java | 56 +++-- .../flink/client/cli/CliFrontendParser.java | 29 +++ .../flink/client/cli/ProgramOptions.java | 74 +++++- .../flink/client/program/PackagedProgram.java | 32 ++- .../flink/client/python/PythonDriver.java | 168 +++++++++++++ .../client/python/PythonGatewayServer.java | 21 +- .../flink/client/python/PythonUtil.java | 223 ++++++++++++++++++ .../flink/client/python/PythonDriverTest.java | 104 ++++++++ .../flink/client/python/PythonUtilTest.java | 118 +++++++++ flink-dist/pom.xml | 3 + flink-dist/src/main/assemblies/bin.xml | 7 + flink-python/pyflink/find_flink_home.py | 3 + flink-python/pyflink/java_gateway.py | 14 +- .../pyflink/table/examples/batch/__init__.py | 17 ++ .../table/examples/batch/word_count.py | 79 +++++++ 17 files changed, 1032 insertions(+), 45 deletions(-) create mode 100644 flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java create mode 100644 flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java create mode 100644 flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java create mode 100644 flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java create mode 100644 flink-python/pyflink/table/examples/batch/__init__.py create mode 100644 flink-python/pyflink/table/examples/batch/word_count.py diff --git a/docs/ops/cli.md b/docs/ops/cli.md index b414b30dd91..505207dd8b2 100644 --- a/docs/ops/cli.md +++ b/docs/ops/cli.md @@ -47,6 +47,12 @@ available. {:toc} ## Examples +### Job Submission Examples +----------------------------- + +These examples about how to submit a job in CLI. +
+
- Run example program with no arguments: @@ -88,6 +94,53 @@ available. ./examples/batch/WordCount.jar \ --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out +
+ +
+ +- Run Python Table program: + + ./bin/flink run -py examples/python/table/batch/word_count.py -j + +- Run Python Table program with pyFiles: + + ./bin/flink run -py examples/python/table/batch/word_count.py -j \ + -pyfs file:///user.txt,hdfs:///$namenode_address/username.txt + +- Run Python Table program with pyFiles and pyModule: + + ./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j + +- Run Python Table program with parallelism 16: + + ./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j + +- Run Python Table program with flink log output disabled: + + ./bin/flink run -q -py examples/python/table/batch/word_count.py -j + +- Run Python Table program in detached mode: + + ./bin/flink run -d examples/python/table/batch/word_count.py -j + +- Run Python Table program on a specific JobManager: + + ./bin/flink run -m myJMHost:8081 \ + -py examples/python/table/batch/word_count.py \ + -j + +- Run Python Table program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers: + + ./bin/flink run -m yarn-cluster -yn 2 \ + -py examples/python/table/batch/word_count.py \ + -j +
+ +### Job Management Examples +----------------------------- + +These examples about how to manage a job in CLI. + - Display the optimized execution plan for the WordCount example program as JSON: ./bin/flink info ./examples/batch/WordCount.jar \ @@ -251,6 +304,19 @@ Action "run" compiles and runs a program. program. Optional flag to override the default value specified in the configuration. + -py,--python Python script with the program entry + point.The dependent resources can be + configured with the `--pyFiles` option. + -pyfs,--pyFiles Attach custom python files for job. + Comma can be used as the separator to + specify multiple files. The standard + python resource file suffixes such as + .py/.egg/.zip are all supported. + (eg:--pyFiles file:///tmp/myresource.zip + ,hdfs:///$namenode_address/myresource2.zip) + -pym,--pyModule Python module with the program entry + point. This option must be used in + conjunction with ` --pyFiles`. -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint Path to a savepoint to restore the job diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md index 7c020479756..93f16fb62fd 100644 --- a/docs/ops/cli.zh.md +++ b/docs/ops/cli.zh.md @@ -47,6 +47,12 @@ available. {:toc} ## Examples +### 作业提交示例 +----------------------------- + +这些示例是关于如何通过脚本提交一个作业 +
+
- Run example program with no arguments: @@ -82,11 +88,57 @@ available. ./examples/batch/WordCount.jar \ --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out -- Run example program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers: +- Run example program using a [per-job YARN cluster]({{site.baseurl}}/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers: ./bin/flink run -m yarn-cluster -yn 2 \ ./examples/batch/WordCount.jar \ --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out + +
+ +
+ +- 提交一个Python Table的作业: + + ./bin/flink run -py WordCount.py -j + +- 提交一个有多个依赖的Python Table的作业: + + ./bin/flink run -py examples/python/table/batch/word_count.py -j \ + -pyfs file:///user.txt,hdfs:///$namenode_address/username.txt + +- 提交一个有多个依赖的Python Table的作业,Python作业的主入口通过pym选项指定: + + ./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j + +- 提交一个指定并发度为16的Python Table的作业: + + ./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j + +- 提交一个关闭flink日志输出的Python Table的作业: + + ./bin/flink run -q -py examples/python/table/batch/word_count.py -j + +- 提交一个运行在detached模式下的Python Table的作业: + + ./bin/flink run -d examples/python/table/batch/word_count.py -j + +- 提交一个运行在指定JobManager上的Python Table的作业: + + ./bin/flink run -m myJMHost:8081 \ + -py examples/python/table/batch/word_count.py \ + -j + +- 提交一个运行在有两个TaskManager的[per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn)的Python Table的作业: + + ./bin/flink run -m yarn-cluster -yn 2 \ + -py examples/python/table/batch/word_count.py \ + -j + +
+ +### 作业管理示例 +----------------------------- - Display the optimized execution plan for the WordCount example program as JSON: @@ -251,6 +303,15 @@ Action "run" compiles and runs a program. program. Optional flag to override the default value specified in the configuration. + -py,--python 指定Python作业的入口,依赖的资源文件可以通过 + `--pyFiles`进行指定。 + -pyfs,--pyFiles 指定Python作业依赖的一些自定义的python文件, + 如果有多个文件,可以通过逗号(,)进行分隔。支持 + 常用的python资源文件,例如(.py/.egg/.zip)。 + (例如:--pyFiles file:///tmp/myresource.zip + ,hdfs:///$namenode_address/myresource2.zip) + -pym,--pyModule 指定python程序的运行的模块入口,这个选项必须配合 + `--pyFiles`一起使用。 -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint Path to a savepoint to restore the job diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index c6b5c9abe4a..c591e6e5d5f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -32,6 +32,7 @@ import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.client.program.ProgramParametrizationException; +import org.apache.flink.client.python.PythonDriver; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -185,8 +186,11 @@ public class CliFrontend { return; } - if (runOptions.getJarFilePath() == null) { - throw new CliArgsException("The program JAR file was not specified."); + if (!runOptions.isPython()) { + // Java program should be specified a JAR file + if (runOptions.getJarFilePath() == null) { + throw new CliArgsException("Java program should be specified a JAR file."); + } } final PackagedProgram program; @@ -771,12 +775,42 @@ public class CliFrontend { String jarFilePath = options.getJarFilePath(); List classpaths = options.getClasspaths(); - if (jarFilePath == null) { - throw new IllegalArgumentException("The program JAR file was not specified."); + String entryPointClass; + File jarFile = null; + if (options.isPython()) { + // If the job is specified a jar file + if (jarFilePath != null) { + jarFile = getJarFile(jarFilePath); + } + // The entry point class of python job is PythonDriver + entryPointClass = PythonDriver.class.getCanonicalName(); + } else { + if (jarFilePath == null) { + throw new IllegalArgumentException("The program JAR file was not specified."); + } + jarFile = getJarFile(jarFilePath); + // Get assembler class + entryPointClass = options.getEntryPointClassName(); } - File jarFile = new File(jarFilePath); + PackagedProgram program = entryPointClass == null ? + new PackagedProgram(jarFile, classpaths, programArgs) : + new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); + + program.setSavepointRestoreSettings(options.getSavepointRestoreSettings()); + + return program; + } + /** + * Gets the JAR file from the path. + * + * @param jarFilePath The path of JAR file + * @return The JAR file + * @throws FileNotFoundException The JAR file does not exist. + */ + private File getJarFile(String jarFilePath) throws FileNotFoundException { + File jarFile = new File(jarFilePath); // Check if JAR file exists if (!jarFile.exists()) { throw new FileNotFoundException("JAR file does not exist: " + jarFile); @@ -784,17 +818,7 @@ public class CliFrontend { else if (!jarFile.isFile()) { throw new FileNotFoundException("JAR file is not a file: " + jarFile); } - - // Get assembler class - String entryPointClass = options.getEntryPointClassName(); - - PackagedProgram program = entryPointClass == null ? - new PackagedProgram(jarFile, classpaths, programArgs) : - new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); - - program.setSavepointRestoreSettings(options.getSavepointRestoreSettings()); - - return program; + return jarFile; } // -------------------------------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index cea399808f5..5872a54bd01 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -118,6 +118,20 @@ public class CliFrontendParser { public static final Option STOP_AND_DRAIN = new Option("d", "drain", false, "Send MAX_WATERMARK before taking the savepoint and stopping the pipelne."); + static final Option PY_OPTION = new Option("py", "python", true, + "Python script with the program entry point. " + + "The dependent resources can be configured with the `--pyFiles` option."); + + static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true, + "Attach custom python files for job. " + + "Comma can be used as the separator to specify multiple files. " + + "The standard python resource file suffixes such as .py/.egg/.zip are all supported." + + "(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)"); + + static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true, + "Python module with the program entry point. " + + "This option must be used in conjunction with `--pyFiles`."); + static { HELP_OPTION.setRequired(false); @@ -165,6 +179,15 @@ public class CliFrontendParser { STOP_WITH_SAVEPOINT.setOptionalArg(true); STOP_AND_DRAIN.setRequired(false); + + PY_OPTION.setRequired(false); + PY_OPTION.setArgName("python"); + + PYFILES_OPTION.setRequired(false); + PYFILES_OPTION.setArgName("pyFiles"); + + PYMODULE_OPTION.setRequired(false); + PYMODULE_OPTION.setArgName("pyModule"); } private static final Options RUN_OPTIONS = getRunCommandOptions(); @@ -186,6 +209,9 @@ public class CliFrontendParser { options.addOption(DETACHED_OPTION); options.addOption(SHUTDOWN_IF_ATTACHED_OPTION); options.addOption(YARN_DETACHED_OPTION); + options.addOption(PY_OPTION); + options.addOption(PYFILES_OPTION); + options.addOption(PYMODULE_OPTION); return options; } @@ -196,6 +222,9 @@ public class CliFrontendParser { options.addOption(LOGGING_OPTION); options.addOption(DETACHED_OPTION); options.addOption(SHUTDOWN_IF_ATTACHED_OPTION); + options.addOption(PY_OPTION); + options.addOption(PYFILES_OPTION); + options.addOption(PYMODULE_OPTION); return options; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java index da03d64048c..30b38675e39 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -36,6 +36,9 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION; @@ -62,17 +65,71 @@ public abstract class ProgramOptions extends CommandLineOptions { private final SavepointRestoreSettings savepointSettings; + /** + * Flag indicating whether the job is a Python job. + */ + private final boolean isPython; + protected ProgramOptions(CommandLine line) throws CliArgsException { super(line); String[] args = line.hasOption(ARGS_OPTION.getOpt()) ? - line.getOptionValues(ARGS_OPTION.getOpt()) : - line.getArgs(); + line.getOptionValues(ARGS_OPTION.getOpt()) : + line.getArgs(); + + isPython = line.hasOption(PY_OPTION.getOpt()) | line.hasOption(PYMODULE_OPTION.getOpt()); + // If specified the option -py(--python) + if (line.hasOption(PY_OPTION.getOpt())) { + // Cannot use option -py and -pym simultaneously. + if (line.hasOption(PYMODULE_OPTION.getOpt())) { + throw new CliArgsException("Cannot use option -py and -pym simultaneously."); + } + // The cli cmd args which will be transferred to PythonDriver will be transformed as follows: + // CLI cmd : -py ${python.py} pyfs [optional] ${py-files} [optional] ${other args}. + // PythonDriver args: py ${python.py} [optional] pyfs [optional] ${py-files} [optional] ${other args}. + // -------------------------------transformed------------------------------------------------------- + // e.g. -py wordcount.py(CLI cmd) -----------> py wordcount.py(PythonDriver args) + // e.g. -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd) + // -----> py wordcount.py pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args) + String[] newArgs; + int argIndex; + if (line.hasOption(PYFILES_OPTION.getOpt())) { + newArgs = new String[args.length + 4]; + newArgs[2] = PYFILES_OPTION.getOpt(); + newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt()); + argIndex = 4; + } else { + newArgs = new String[args.length + 2]; + argIndex = 2; + } + newArgs[0] = PY_OPTION.getOpt(); + newArgs[1] = line.getOptionValue(PY_OPTION.getOpt()); + System.arraycopy(args, 0, newArgs, argIndex, args.length); + args = newArgs; + } + + // If specified the option -pym(--pyModule) + if (line.hasOption(PYMODULE_OPTION.getOpt())) { + // If you specify the option -pym, you should specify the option --pyFiles simultaneously. + if (!line.hasOption(PYFILES_OPTION.getOpt())) { + throw new CliArgsException("-pym must be used in conjunction with `--pyFiles`"); + } + // The cli cmd args which will be transferred to PythonDriver will be transformed as follows: + // CLI cmd : -pym ${py-module} -pyfs ${py-files} [optional] ${other args}. + // PythonDriver args: pym ${py-module} pyfs ${py-files} [optional] ${other args}. + // e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> pym AAA.fun -pyfs AAA.zip(PythonDriver args) + String[] newArgs = new String[args.length + 4]; + newArgs[0] = PYMODULE_OPTION.getOpt(); + newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt()); + newArgs[2] = PYFILES_OPTION.getOpt(); + newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt()); + System.arraycopy(args, 0, newArgs, 4, args.length); + args = newArgs; + } if (line.hasOption(JAR_OPTION.getOpt())) { this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt()); - } - else if (args.length > 0) { + } else if (!isPython && args.length > 0) { jarFilePath = args[0]; args = Arrays.copyOfRange(args, 1, args.length); } @@ -95,7 +152,7 @@ public abstract class ProgramOptions extends CommandLineOptions { this.classpaths = classpaths; this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ? - line.getOptionValue(CLASS_OPTION.getOpt()) : null; + line.getOptionValue(CLASS_OPTION.getOpt()) : null; if (line.hasOption(PARALLELISM_OPTION.getOpt())) { String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt()); @@ -156,4 +213,11 @@ public abstract class ProgramOptions extends CommandLineOptions { public SavepointRestoreSettings getSavepointRestoreSettings() { return savepointSettings; } + + /** + * Indicates whether the job is a Python job. + */ + public boolean isPython() { + return isPython; + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 8f5ccba993c..77b5d295159 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -21,6 +21,7 @@ package org.apache.flink.client.program; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Program; import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.client.python.PythonDriver; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; @@ -90,6 +91,11 @@ public class PackagedProgram { private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none(); + /** + * Flag indicating whether the job is a Python job. + */ + private final boolean isPython; + /** * Creates an instance that wraps the plan defined in the jar file using the given * argument. @@ -169,18 +175,21 @@ public class PackagedProgram { * may be a missing / wrong class or manifest files. */ public PackagedProgram(File jarFile, List classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException { - if (jarFile == null) { - throw new IllegalArgumentException("The jar file must not be null."); - } + // Whether the job is a Python job. + isPython = entryPointClassName != null && entryPointClassName.equals(PythonDriver.class.getCanonicalName()); - URL jarFileUrl; - try { - jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL(); - } catch (MalformedURLException e1) { - throw new IllegalArgumentException("The jar file path is invalid."); - } + URL jarFileUrl = null; + if (jarFile != null) { + try { + jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL(); + } catch (MalformedURLException e1) { + throw new IllegalArgumentException("The jar file path is invalid."); + } - checkJarFile(jarFileUrl); + checkJarFile(jarFileUrl); + } else if (!isPython) { + throw new IllegalArgumentException("The jar file must not be null."); + } this.jarFile = jarFileUrl; this.args = args == null ? new String[0] : args; @@ -191,7 +200,7 @@ public class PackagedProgram { } // now that we have an entry point, we can extract the nested jar files (if any) - this.extractedTempLibraries = extractContainedLibraries(jarFileUrl); + this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl); this.classpaths = classpaths; this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader()); @@ -233,6 +242,7 @@ public class PackagedProgram { // load the entry point class this.mainClass = entryPointClass; + isPython = entryPointClass == PythonDriver.class; // if the entry point is a program, instantiate the class and get the plan if (Program.class.isAssignableFrom(this.mainClass)) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java new file mode 100644 index 00000000000..e43a24eec98 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonDriver.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.core.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import py4j.GatewayServer; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A main class used to launch Python applications. It executes python as a + * subprocess and then has it connect back to the JVM to access system properties, etc. + */ +public class PythonDriver { + private static final Logger LOG = LoggerFactory.getLogger(PythonDriver.class); + + public static void main(String[] args) { + // the python job needs at least 2 args. + // e.g. py a.py ... + // e.g. pym a.b -pyfs a.zip ... + if (args.length < 2) { + LOG.error("Required at least two arguments, only python file or python module is available."); + System.exit(1); + } + // parse args + Map> parsedArgs = parseOptions(args); + // start gateway server + GatewayServer gatewayServer = startGatewayServer(); + // prepare python env + + // map filename to its Path + Map filePathMap = new HashMap<>(); + // commands which will be exec in python progress. + List commands = constructPythonCommands(filePathMap, parsedArgs); + try { + // prepare the exec environment of python progress. + PythonUtil.PythonEnvironment pythonEnv = PythonUtil.preparePythonEnvironment(filePathMap); + // set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress. + pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort())); + // start the python process. + Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands); + int exitCode = pythonProcess.waitFor(); + if (exitCode != 0) { + throw new RuntimeException("Python process exits with code: " + exitCode); + } + } catch (Throwable e) { + LOG.error("Run python process failed", e); + } finally { + gatewayServer.shutdown(); + } + } + + /** + * Creates a GatewayServer run in a daemon thread. + * + * @return The created GatewayServer + */ + public static GatewayServer startGatewayServer() { + InetAddress localhost = InetAddress.getLoopbackAddress(); + GatewayServer gatewayServer = new GatewayServer.GatewayServerBuilder() + .javaPort(0) + .javaAddress(localhost) + .build(); + Thread thread = new Thread(gatewayServer::start); + thread.setName("py4j-gateway"); + thread.setDaemon(true); + thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + LOG.error("The gateway server thread join failed.", e); + System.exit(1); + } + return gatewayServer; + } + + /** + * Constructs the Python commands which will be executed in python process. + * + * @param filePathMap stores python file name to its path + * @param parsedArgs parsed args + */ + public static List constructPythonCommands(Map filePathMap, Map> parsedArgs) { + List commands = new ArrayList<>(); + if (parsedArgs.containsKey("py")) { + String pythonFile = parsedArgs.get("py").get(0); + Path pythonFilePath = new Path(pythonFile); + filePathMap.put(pythonFilePath.getName(), pythonFilePath); + commands.add(pythonFilePath.getName()); + } + if (parsedArgs.containsKey("pym")) { + String pyModule = parsedArgs.get("pym").get(0); + commands.add("-m"); + commands.add(pyModule); + } + if (parsedArgs.containsKey("pyfs")) { + List pyFiles = parsedArgs.get("pyfs"); + for (String pyFile : pyFiles) { + Path pyFilePath = new Path(pyFile); + filePathMap.put(pyFilePath.getName(), pyFilePath); + } + } + if (parsedArgs.containsKey("args")) { + commands.addAll(parsedArgs.get("args")); + } + return commands; + } + + /** + * Parses the args to the map format. + * + * @param args ["py", "xxx.py", + * "pyfs", "a.py,b.py,c.py", + * "--input", "in.txt"] + * @return {"py"->List("xxx.py"),"pyfs"->List("a.py","b.py","c.py"),"args"->List("--input","in.txt")} + */ + public static Map> parseOptions(String[] args) { + Map> parsedArgs = new HashMap<>(); + int argIndex = 0; + boolean isEntrypointSpecified = false; + // valid args should include python or pyModule field and their value. + if (args[0].equals("py") || args[0].equals("pym")) { + parsedArgs.put(args[0], Collections.singletonList(args[1])); + argIndex = 2; + isEntrypointSpecified = true; + } + if (isEntrypointSpecified && args.length > 2 && args[2].equals("pyfs")) { + List pyFilesList = new ArrayList<>(Arrays.asList(args[3].split(","))); + parsedArgs.put(args[2], pyFilesList); + argIndex = 4; + } + if (!isEntrypointSpecified) { + throw new RuntimeException("The Python entrypoint has not been specified. It can be specified with option -py or -pym"); + } + // if arg include other args, the key "args" will map to other args. + if (args.length > argIndex) { + List otherArgList = new ArrayList<>(args.length - argIndex); + otherArgList.addAll(Arrays.asList(args).subList(argIndex, args.length)); + parsedArgs.put("args", otherArgList); + } + return parsedArgs; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java index 6432a67ccdb..64f2ef1d382 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java +++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.nio.file.Files; /** * The Py4j Gateway Server provides RPC service for user's python process. @@ -56,15 +57,17 @@ public class PythonGatewayServer { // Tells python side the port of our java rpc server String handshakeFilePath = System.getenv("_PYFLINK_CONN_INFO_PATH"); File handshakeFile = new File(handshakeFilePath); - if (handshakeFile.createNewFile()) { - FileOutputStream fileOutputStream = new FileOutputStream(handshakeFile); - DataOutputStream stream = new DataOutputStream(fileOutputStream); - stream.writeInt(boundPort); - stream.close(); - fileOutputStream.close(); - } else { - System.out.println("Can't create handshake file: " + handshakeFilePath + ", now exit..."); - return; + File tmpPath = Files.createTempFile(handshakeFile.getParentFile().toPath(), + "connection", ".info").toFile(); + FileOutputStream fileOutputStream = new FileOutputStream(tmpPath); + DataOutputStream stream = new DataOutputStream(fileOutputStream); + stream.writeInt(boundPort); + stream.close(); + fileOutputStream.close(); + + if (!tmpPath.renameTo(handshakeFile)) { + System.out.println("Unable to write connection information to handshake file: " + handshakeFilePath + ", now exit..."); + System.exit(1); } // Exit on EOF or broken pipe. This ensures that the server dies diff --git a/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java new file mode 100644 index 00000000000..b9012a38fa4 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonUtil.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.FileUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * The util class help to prepare Python env and run the python process. + */ +public final class PythonUtil { + private static final Logger LOG = LoggerFactory.getLogger(PythonUtil.class); + + private static final String FLINK_OPT_DIR = System.getenv("FLINK_OPT_DIR"); + + private static final String FLINK_OPT_DIR_PYTHON = FLINK_OPT_DIR + File.separator + "python"; + + /** + * Wraps Python exec environment. + */ + public static class PythonEnvironment { + public String workingDirectory; + + public String pythonExec = "python"; + + public String pythonPath; + + Map systemEnv = new HashMap<>(); + } + + /** + * The hook thread that delete the tmp working dir of python process after the python process shutdown. + */ + private static class ShutDownPythonHook extends Thread { + private Process p; + private String pyFileDir; + + public ShutDownPythonHook(Process p, String pyFileDir) { + this.p = p; + this.pyFileDir = pyFileDir; + } + + public void run() { + + p.destroyForcibly(); + + if (pyFileDir != null) { + File pyDir = new File(pyFileDir); + FileUtils.deleteDirectoryQuietly(pyDir); + } + } + } + + + /** + * Prepares PythonEnvironment to start python process. + * + * @param filePathMap map file name to its file path. + * @return PythonEnvironment the Python environment which will be executed in Python process. + */ + public static PythonEnvironment preparePythonEnvironment(Map filePathMap) { + PythonEnvironment env = new PythonEnvironment(); + + // 1. setup temporary local directory for the user files + String tmpDir = System.getProperty("java.io.tmpdir") + + File.separator + "pyflink" + UUID.randomUUID(); + + Path tmpDirPath = new Path(tmpDir); + try { + FileSystem fs = tmpDirPath.getFileSystem(); + if (fs.exists(tmpDirPath)) { + fs.delete(tmpDirPath, true); + } + fs.mkdirs(tmpDirPath); + } catch (IOException e) { + LOG.error("Prepare tmp directory failed.", e); + } + + env.workingDirectory = tmpDirPath.toString(); + + StringBuilder pythonPathEnv = new StringBuilder(); + + pythonPathEnv.append(env.workingDirectory); + + // 2. create symbolLink in the working directory for the pyflink dependency libs. + List pythonLibs = getLibFiles(FLINK_OPT_DIR_PYTHON); + for (java.nio.file.Path libPath : pythonLibs) { + java.nio.file.Path symbolicLinkFilePath = FileSystems.getDefault().getPath(env.workingDirectory, + libPath.getFileName().toString()); + createSymbolicLinkForPyflinkLib(libPath, symbolicLinkFilePath); + pythonPathEnv.append(File.pathSeparator); + pythonPathEnv.append(symbolicLinkFilePath.toString()); + } + + // 3. copy relevant python files to tmp dir and set them in PYTHONPATH. + filePathMap.forEach((sourceFileName, sourcePath) -> { + Path targetPath = new Path(tmpDirPath, sourceFileName); + try { + FileUtils.copy(sourcePath, targetPath, true); + } catch (IOException e) { + LOG.error("Copy files to tmp dir failed", e); + } + String targetFileName = targetPath.toString(); + pythonPathEnv.append(File.pathSeparator); + pythonPathEnv.append(targetFileName); + + }); + + env.pythonPath = pythonPathEnv.toString(); + return env; + } + + /** + * Gets pyflink dependent libs in specified directory. + * + * @param libDir The lib directory + */ + public static List getLibFiles(String libDir) { + final List libFiles = new ArrayList<>(); + SimpleFileVisitor finder = new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { + // exclude .txt file + if (!file.toString().endsWith(".txt")) { + libFiles.add(file); + } + return FileVisitResult.CONTINUE; + } + }; + try { + Files.walkFileTree(FileSystems.getDefault().getPath(libDir), finder); + } catch (IOException e) { + LOG.error("Gets pyflink dependent libs failed.", e); + } + return libFiles; + } + + /** + * Creates symbolLink in working directory for pyflink lib. + * + * @param libPath the pyflink lib file path. + * @param symbolicLinkPath the symbolic link to pyflink lib. + */ + public static void createSymbolicLinkForPyflinkLib(java.nio.file.Path libPath, java.nio.file.Path symbolicLinkPath) { + try { + Files.createSymbolicLink(symbolicLinkPath, libPath); + } catch (IOException e) { + LOG.error("Create symbol link for pyflink lib failed.", e); + LOG.info("Try to copy pyflink lib to working directory"); + try { + Files.copy(libPath, symbolicLinkPath); + } catch (IOException ex) { + LOG.error("Copy pylink lib to working directory failed", ex); + } + } + } + + /** + * Starts python process. + * + * @param pythonEnv the python Environment which will be in a process. + * @param commands the commands that python process will execute. + * @return the process represent the python process. + * @throws IOException Thrown if an error occurred when python process start. + */ + public static Process startPythonProcess(PythonEnvironment pythonEnv, List commands) throws IOException { + ProcessBuilder pythonProcessBuilder = new ProcessBuilder(); + Map env = pythonProcessBuilder.environment(); + env.put("PYTHONPATH", pythonEnv.pythonPath); + pythonEnv.systemEnv.forEach(env::put); + commands.add(0, pythonEnv.pythonExec); + pythonProcessBuilder.command(commands); + // set the working directory. + pythonProcessBuilder.directory(new File(pythonEnv.workingDirectory)); + // redirect the stderr to stdout + pythonProcessBuilder.redirectErrorStream(true); + // set the child process the output same as the parent process. + pythonProcessBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT); + Process process = pythonProcessBuilder.start(); + if (!process.isAlive()) { + throw new RuntimeException("Failed to start Python process. "); + } + + // Make sure that the python sub process will be killed when JVM exit + ShutDownPythonHook hook = new ShutDownPythonHook(process, pythonEnv.workingDirectory); + Runtime.getRuntime().addShutdownHook(hook); + + return process; + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java b/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java new file mode 100644 index 00000000000..0b6f570e75a --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/python/PythonDriverTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.core.fs.Path; + +import org.junit.Assert; +import org.junit.Test; +import py4j.GatewayServer; + +import java.io.IOException; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the {@link PythonDriver}. + */ +public class PythonDriverTest { + @Test + public void testStartGatewayServer() { + GatewayServer gatewayServer = PythonDriver.startGatewayServer(); + try { + Socket socket = new Socket("localhost", gatewayServer.getListeningPort()); + assert socket.isConnected(); + } catch (IOException e) { + throw new RuntimeException("Connect Gateway Server failed"); + } finally { + gatewayServer.shutdown(); + } + } + + @Test + public void testConstructCommands() { + Map filePathMap = new HashMap<>(); + Map> parseArgs = new HashMap<>(); + parseArgs.put("py", Collections.singletonList("xxx.py")); + List pyFilesList = new ArrayList<>(); + pyFilesList.add("a.py"); + pyFilesList.add("b.py"); + pyFilesList.add("c.py"); + parseArgs.put("pyfs", pyFilesList); + List otherArgs = new ArrayList<>(); + otherArgs.add("--input"); + otherArgs.add("in.txt"); + parseArgs.put("args", otherArgs); + List commands = PythonDriver.constructPythonCommands(filePathMap, parseArgs); + Path pythonPath = filePathMap.get("xxx.py"); + Assert.assertNotNull(pythonPath); + Assert.assertEquals(pythonPath.getName(), "xxx.py"); + Path aPyFilePath = filePathMap.get("a.py"); + Assert.assertNotNull(aPyFilePath); + Assert.assertEquals(aPyFilePath.getName(), "a.py"); + Path bPyFilePath = filePathMap.get("b.py"); + Assert.assertNotNull(bPyFilePath); + Assert.assertEquals(bPyFilePath.getName(), "b.py"); + Path cPyFilePath = filePathMap.get("c.py"); + Assert.assertNotNull(cPyFilePath); + Assert.assertEquals(cPyFilePath.getName(), "c.py"); + Assert.assertEquals(3, commands.size()); + Assert.assertEquals(commands.get(0), "xxx.py"); + Assert.assertEquals(commands.get(1), "--input"); + Assert.assertEquals(commands.get(2), "in.txt"); + } + + @Test + public void testParseOptions() { + String[] args = {"py", "xxx.py", "pyfs", "a.py,b.py,c.py", "--input", "in.txt"}; + Map> parsedArgs = PythonDriver.parseOptions(args); + List pythonMainFile = parsedArgs.get("py"); + Assert.assertNotNull(pythonMainFile); + Assert.assertEquals(1, pythonMainFile.size()); + Assert.assertEquals(pythonMainFile.get(0), args[1]); + List pyFilesList = parsedArgs.get("pyfs"); + Assert.assertEquals(3, pyFilesList.size()); + String[] pyFiles = args[3].split(","); + for (int i = 0; i < pyFiles.length; i++) { + assert pyFilesList.get(i).equals(pyFiles[i]); + } + List otherArgs = parsedArgs.get("args"); + for (int i = 4; i < args.length; i++) { + Assert.assertEquals(otherArgs.get(i - 4), args[i]); + } + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java b/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java new file mode 100644 index 00000000000..4b14cede4e3 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/python/PythonUtilTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.python; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Tests for the {@link PythonUtil}. + */ +public class PythonUtilTest { + private Path sourceTmpDirPath; + private Path targetTmpDirPath; + private FileSystem sourceFs; + private FileSystem targetFs; + + @Before + public void prepareTestEnvironment() { + String sourceTmpDir = System.getProperty("java.io.tmpdir") + + File.separator + "source_" + UUID.randomUUID(); + String targetTmpDir = System.getProperty("java.io.tmpdir") + + File.separator + "target_" + UUID.randomUUID(); + + sourceTmpDirPath = new Path(sourceTmpDir); + targetTmpDirPath = new Path(targetTmpDir); + try { + sourceFs = sourceTmpDirPath.getFileSystem(); + if (sourceFs.exists(sourceTmpDirPath)) { + sourceFs.delete(sourceTmpDirPath, true); + } + sourceFs.mkdirs(sourceTmpDirPath); + targetFs = targetTmpDirPath.getFileSystem(); + if (targetFs.exists(targetTmpDirPath)) { + targetFs.delete(targetTmpDirPath, true); + } + targetFs.mkdirs(targetTmpDirPath); + } catch (IOException e) { + throw new RuntimeException("initial PythonUtil test environment failed"); + } + } + + @Test + public void testStartPythonProcess() { + PythonUtil.PythonEnvironment pythonEnv = new PythonUtil.PythonEnvironment(); + pythonEnv.workingDirectory = targetTmpDirPath.toString(); + pythonEnv.pythonPath = targetTmpDirPath.toString(); + List commands = new ArrayList<>(); + Path pyPath = new Path(targetTmpDirPath, "word_count.py"); + try { + targetFs.create(pyPath, FileSystem.WriteMode.OVERWRITE); + File pyFile = new File(pyPath.toString()); + String pyProgram = "#!/usr/bin/python\n" + + "# -*- coding: UTF-8 -*-\n" + + "import sys\n" + + "\n" + + "if __name__=='__main__':\n" + + "\tfilename = sys.argv[1]\n" + + "\tfo = open(filename, \"w\")\n" + + "\tfo.write( \"hello world\")\n" + + "\tfo.close()"; + Files.write(pyFile.toPath(), pyProgram.getBytes(), StandardOpenOption.WRITE); + Path result = new Path(targetTmpDirPath, "word_count_result.txt"); + commands.add(pyFile.getName()); + commands.add(result.getName()); + Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands); + int exitCode = pythonProcess.waitFor(); + if (exitCode != 0) { + throw new RuntimeException("Python process exits with code: " + exitCode); + } + String cmdResult = new String(Files.readAllBytes(new File(result.toString()).toPath())); + Assert.assertEquals(cmdResult, "hello world"); + pythonProcess.destroyForcibly(); + targetFs.delete(pyPath, true); + targetFs.delete(result, true); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("test start Python process failed " + e.getMessage()); + } + } + + @After + public void cleanEnvironment() { + try { + sourceFs.delete(sourceTmpDirPath, true); + targetFs.delete(targetTmpDirPath, true); + } catch (IOException e) { + throw new RuntimeException("delete tmp dir failed " + e.getMessage()); + } + } +} diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 52a6466e513..1350f109588 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -572,6 +572,9 @@ under the License. py4j org.apache.flink.api.python.py4j + + py4j.* + diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 3eb5698c19f..788ec1bbdfc 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -242,6 +242,13 @@ under the License. 0755 + + + ../flink-python/pyflink/table/examples + examples/python/table + 0755 + + diff --git a/flink-python/pyflink/find_flink_home.py b/flink-python/pyflink/find_flink_home.py index 049136864a3..98064908cbe 100644 --- a/flink-python/pyflink/find_flink_home.py +++ b/flink-python/pyflink/find_flink_home.py @@ -28,6 +28,9 @@ def _find_flink_home(): # If the environment has set FLINK_HOME, trust it. if 'FLINK_HOME' in os.environ: return os.environ['FLINK_HOME'] + elif 'FLINK_ROOT_DIR' in os.environ: + os.environ['FLINK_HOME'] = os.environ['FLINK_ROOT_DIR'] + return os.environ['FLINK_ROOT_DIR'] else: try: flink_root_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../") diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index e5c8330e551..b218d231662 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -28,7 +28,6 @@ from threading import RLock from py4j.java_gateway import java_import, JavaGateway, GatewayParameters from pyflink.find_flink_home import _find_flink_home - _gateway = None _lock = RLock() @@ -46,6 +45,9 @@ def get_gateway(): _gateway = JavaGateway(gateway_parameters=gateway_param) else: _gateway = launch_gateway() + + # import the flink view + import_flink_view(_gateway) return _gateway @@ -97,6 +99,14 @@ def launch_gateway(): gateway = JavaGateway( gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True)) + return gateway + + +def import_flink_view(gateway): + """ + import the classes used by PyFlink. + :param gateway:gateway connected to JavaGateWayServer + """ # Import the classes used by PyFlink java_import(gateway.jvm, "org.apache.flink.table.api.*") java_import(gateway.jvm, "org.apache.flink.table.api.java.*") @@ -109,5 +119,3 @@ def launch_gateway(): java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment") java_import(gateway.jvm, "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment") - - return gateway diff --git a/flink-python/pyflink/table/examples/batch/__init__.py b/flink-python/pyflink/table/examples/batch/__init__.py new file mode 100644 index 00000000000..65b48d4d79b --- /dev/null +++ b/flink-python/pyflink/table/examples/batch/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py new file mode 100644 index 00000000000..a324af4747c --- /dev/null +++ b/flink-python/pyflink/table/examples/batch/word_count.py @@ -0,0 +1,79 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import tempfile + +from pyflink.table import TableEnvironment, TableConfig +from pyflink.table.table_sink import CsvTableSink +from pyflink.table.table_source import CsvTableSource +from pyflink.table.types import DataTypes + + +# TODO: the word_count.py is just a test example for CLI. +# After pyflink have aligned Java Table API Connectors, this example will be improved. +def word_count(): + tmp_dir = tempfile.gettempdir() + source_path = tmp_dir + '/streaming.csv' + if os.path.isfile(source_path): + os.remove(source_path) + content = "line Licensed to the Apache Software Foundation ASF under one " \ + "line or more contributor license agreements See the NOTICE file " \ + "line distributed with this work for additional information " \ + "line regarding copyright ownership The ASF licenses this file " \ + "to you under the Apache License Version the " \ + "License you may not use this file except in compliance " \ + "with the License" + + with open(source_path, 'w') as f: + for word in content.split(" "): + f.write(",".join([word, "1"])) + f.write("\n") + f.flush() + f.close() + + t_config = TableConfig.Builder().as_batch_execution().set_parallelism(1).build() + t_env = TableEnvironment.create(t_config) + + field_names = ["word", "cout"] + field_types = [DataTypes.STRING, DataTypes.LONG] + + # register Orders table in table environment + t_env.register_table_source( + "Word", + CsvTableSource(source_path, field_names, field_types)) + + # register Results table in table environment + tmp_dir = tempfile.gettempdir() + tmp_csv = tmp_dir + '/streaming2.csv' + if os.path.isfile(tmp_csv): + os.remove(tmp_csv) + + t_env.register_table_sink( + "Results", + field_names, field_types, CsvTableSink(tmp_csv)) + + t_env.scan("Word") \ + .group_by("word") \ + .select("word, count(1) as count") \ + .insert_into("Results") + + t_env.execute() + + +if __name__ == '__main__': + word_count() -- GitLab