提交 ec61cf21 编写于 作者: H Huang Xingbo 提交者: sunjincheng121

[FLINK-12327][python] Adds support to submit Python Table API job in CliFrontend

This closes #8472
上级 3a5bf893
......@@ -47,6 +47,12 @@ available.
{:toc}
## Examples
### Job Submission Examples
-----------------------------
These examples about how to submit a job in CLI.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
- Run example program with no arguments:
......@@ -88,6 +94,53 @@ available.
./examples/batch/WordCount.jar \
--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
</div>
<div data-lang="python" markdown="1">
- Run Python Table program:
./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- Run Python Table program with pyFiles:
./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar> \
-pyfs file:///user.txt,hdfs:///$namenode_address/username.txt
- Run Python Table program with pyFiles and pyModule:
./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j <path/to/flink-table.jar>
- Run Python Table program with parallelism 16:
./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- Run Python Table program with flink log output disabled:
./bin/flink run -q -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- Run Python Table program in detached mode:
./bin/flink run -d examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- Run Python Table program on a specific JobManager:
./bin/flink run -m myJMHost:8081 \
-py examples/python/table/batch/word_count.py \
-j <path/to/flink-table.jar>
- Run Python Table program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
./bin/flink run -m yarn-cluster -yn 2 \
-py examples/python/table/batch/word_count.py \
-j <path/to/flink-table.jar>
</div>
### Job Management Examples
-----------------------------
These examples about how to manage a job in CLI.
- Display the optimized execution plan for the WordCount example program as JSON:
./bin/flink info ./examples/batch/WordCount.jar \
......@@ -251,6 +304,19 @@ Action "run" compiles and runs a program.
program. Optional flag to override the
default value specified in the
configuration.
-py,--python <python-file> Python script with the program entry
point.The dependent resources can be
configured with the `--pyFiles` option.
-pyfs,--pyFiles <python-files> Attach custom python files for job.
Comma can be used as the separator to
specify multiple files. The standard
python resource file suffixes such as
.py/.egg/.zip are all supported.
(eg:--pyFiles file:///tmp/myresource.zip
,hdfs:///$namenode_address/myresource2.zip)
-pym,--pyModule <python-module> Python module with the program entry
point. This option must be used in
conjunction with ` --pyFiles`.
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
......
......@@ -47,6 +47,12 @@ available.
{:toc}
## Examples
### 作业提交示例
-----------------------------
这些示例是关于如何通过脚本提交一个作业
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
- Run example program with no arguments:
......@@ -82,11 +88,57 @@ available.
./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
- Run example program using a [per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
- Run example program using a [per-job YARN cluster]({{site.baseurl}}/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
./bin/flink run -m yarn-cluster -yn 2 \
./examples/batch/WordCount.jar \
--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
</div>
<div data-lang="python" markdown="1">
- 提交一个Python Table的作业:
./bin/flink run -py WordCount.py -j <path/to/flink-table.jar>
- 提交一个有多个依赖的Python Table的作业:
./bin/flink run -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar> \
-pyfs file:///user.txt,hdfs:///$namenode_address/username.txt
- 提交一个有多个依赖的Python Table的作业,Python作业的主入口通过pym选项指定:
./bin/flink run -pym batch.word_count -pyfs examples/python/table/batch -j <path/to/flink-table.jar>
- 提交一个指定并发度为16的Python Table的作业:
./bin/flink run -p 16 -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- 提交一个关闭flink日志输出的Python Table的作业:
./bin/flink run -q -py examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- 提交一个运行在detached模式下的Python Table的作业:
./bin/flink run -d examples/python/table/batch/word_count.py -j <path/to/flink-table.jar>
- 提交一个运行在指定JobManager上的Python Table的作业:
./bin/flink run -m myJMHost:8081 \
-py examples/python/table/batch/word_count.py \
-j <path/to/flink-table.jar>
- 提交一个运行在有两个TaskManager的[per-job YARN cluster]({{site.baseurl}}/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn)的Python Table的作业:
./bin/flink run -m yarn-cluster -yn 2 \
-py examples/python/table/batch/word_count.py \
-j <path/to/flink-table.jar>
</div>
### 作业管理示例
-----------------------------
- Display the optimized execution plan for the WordCount example program as JSON:
......@@ -251,6 +303,15 @@ Action "run" compiles and runs a program.
program. Optional flag to override the
default value specified in the
configuration.
-py,--python <python-file> 指定Python作业的入口,依赖的资源文件可以通过
`--pyFiles`进行指定。
-pyfs,--pyFiles <python-files> 指定Python作业依赖的一些自定义的python文件,
如果有多个文件,可以通过逗号(,)进行分隔。支持
常用的python资源文件,例如(.py/.egg/.zip)。
(例如:--pyFiles file:///tmp/myresource.zip
,hdfs:///$namenode_address/myresource2.zip)
-pym,--pyModule <python-module> 指定python程序的运行的模块入口,这个选项必须配合
`--pyFiles`一起使用。
-q,--sysoutLogging If present, suppress logging output to
standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
......
......@@ -32,6 +32,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.client.python.PythonDriver;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
......@@ -185,8 +186,11 @@ public class CliFrontend {
return;
}
if (runOptions.getJarFilePath() == null) {
throw new CliArgsException("The program JAR file was not specified.");
if (!runOptions.isPython()) {
// Java program should be specified a JAR file
if (runOptions.getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}
final PackagedProgram program;
......@@ -771,12 +775,42 @@ public class CliFrontend {
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
if (jarFilePath == null) {
throw new IllegalArgumentException("The program JAR file was not specified.");
String entryPointClass;
File jarFile = null;
if (options.isPython()) {
// If the job is specified a jar file
if (jarFilePath != null) {
jarFile = getJarFile(jarFilePath);
}
// The entry point class of python job is PythonDriver
entryPointClass = PythonDriver.class.getCanonicalName();
} else {
if (jarFilePath == null) {
throw new IllegalArgumentException("The program JAR file was not specified.");
}
jarFile = getJarFile(jarFilePath);
// Get assembler class
entryPointClass = options.getEntryPointClassName();
}
File jarFile = new File(jarFilePath);
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
return program;
}
/**
* Gets the JAR file from the path.
*
* @param jarFilePath The path of JAR file
* @return The JAR file
* @throws FileNotFoundException The JAR file does not exist.
*/
private File getJarFile(String jarFilePath) throws FileNotFoundException {
File jarFile = new File(jarFilePath);
// Check if JAR file exists
if (!jarFile.exists()) {
throw new FileNotFoundException("JAR file does not exist: " + jarFile);
......@@ -784,17 +818,7 @@ public class CliFrontend {
else if (!jarFile.isFile()) {
throw new FileNotFoundException("JAR file is not a file: " + jarFile);
}
// Get assembler class
String entryPointClass = options.getEntryPointClassName();
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
return program;
return jarFile;
}
// --------------------------------------------------------------------------------------------
......
......@@ -118,6 +118,20 @@ public class CliFrontendParser {
public static final Option STOP_AND_DRAIN = new Option("d", "drain", false,
"Send MAX_WATERMARK before taking the savepoint and stopping the pipelne.");
static final Option PY_OPTION = new Option("py", "python", true,
"Python script with the program entry point. " +
"The dependent resources can be configured with the `--pyFiles` option.");
static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
"Attach custom python files for job. " +
"Comma can be used as the separator to specify multiple files. " +
"The standard python resource file suffixes such as .py/.egg/.zip are all supported." +
"(eg: --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
static final Option PYMODULE_OPTION = new Option("pym", "pyModule", true,
"Python module with the program entry point. " +
"This option must be used in conjunction with `--pyFiles`.");
static {
HELP_OPTION.setRequired(false);
......@@ -165,6 +179,15 @@ public class CliFrontendParser {
STOP_WITH_SAVEPOINT.setOptionalArg(true);
STOP_AND_DRAIN.setRequired(false);
PY_OPTION.setRequired(false);
PY_OPTION.setArgName("python");
PYFILES_OPTION.setRequired(false);
PYFILES_OPTION.setArgName("pyFiles");
PYMODULE_OPTION.setRequired(false);
PYMODULE_OPTION.setArgName("pyModule");
}
private static final Options RUN_OPTIONS = getRunCommandOptions();
......@@ -186,6 +209,9 @@ public class CliFrontendParser {
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
options.addOption(PY_OPTION);
options.addOption(PYFILES_OPTION);
options.addOption(PYMODULE_OPTION);
return options;
}
......@@ -196,6 +222,9 @@ public class CliFrontendParser {
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(PY_OPTION);
options.addOption(PYFILES_OPTION);
options.addOption(PYMODULE_OPTION);
return options;
}
......
......@@ -36,6 +36,9 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PY_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
......@@ -62,17 +65,71 @@ public abstract class ProgramOptions extends CommandLineOptions {
private final SavepointRestoreSettings savepointSettings;
/**
* Flag indicating whether the job is a Python job.
*/
private final boolean isPython;
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
line.getOptionValues(ARGS_OPTION.getOpt()) :
line.getArgs();
line.getOptionValues(ARGS_OPTION.getOpt()) :
line.getArgs();
isPython = line.hasOption(PY_OPTION.getOpt()) | line.hasOption(PYMODULE_OPTION.getOpt());
// If specified the option -py(--python)
if (line.hasOption(PY_OPTION.getOpt())) {
// Cannot use option -py and -pym simultaneously.
if (line.hasOption(PYMODULE_OPTION.getOpt())) {
throw new CliArgsException("Cannot use option -py and -pym simultaneously.");
}
// The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
// CLI cmd : -py ${python.py} pyfs [optional] ${py-files} [optional] ${other args}.
// PythonDriver args: py ${python.py} [optional] pyfs [optional] ${py-files} [optional] ${other args}.
// -------------------------------transformed-------------------------------------------------------
// e.g. -py wordcount.py(CLI cmd) -----------> py wordcount.py(PythonDriver args)
// e.g. -py wordcount.py -pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd)
// -----> py wordcount.py pyfs file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args)
String[] newArgs;
int argIndex;
if (line.hasOption(PYFILES_OPTION.getOpt())) {
newArgs = new String[args.length + 4];
newArgs[2] = PYFILES_OPTION.getOpt();
newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
argIndex = 4;
} else {
newArgs = new String[args.length + 2];
argIndex = 2;
}
newArgs[0] = PY_OPTION.getOpt();
newArgs[1] = line.getOptionValue(PY_OPTION.getOpt());
System.arraycopy(args, 0, newArgs, argIndex, args.length);
args = newArgs;
}
// If specified the option -pym(--pyModule)
if (line.hasOption(PYMODULE_OPTION.getOpt())) {
// If you specify the option -pym, you should specify the option --pyFiles simultaneously.
if (!line.hasOption(PYFILES_OPTION.getOpt())) {
throw new CliArgsException("-pym must be used in conjunction with `--pyFiles`");
}
// The cli cmd args which will be transferred to PythonDriver will be transformed as follows:
// CLI cmd : -pym ${py-module} -pyfs ${py-files} [optional] ${other args}.
// PythonDriver args: pym ${py-module} pyfs ${py-files} [optional] ${other args}.
// e.g. -pym AAA.fun -pyfs AAA.zip(CLI cmd) ----> pym AAA.fun -pyfs AAA.zip(PythonDriver args)
String[] newArgs = new String[args.length + 4];
newArgs[0] = PYMODULE_OPTION.getOpt();
newArgs[1] = line.getOptionValue(PYMODULE_OPTION.getOpt());
newArgs[2] = PYFILES_OPTION.getOpt();
newArgs[3] = line.getOptionValue(PYFILES_OPTION.getOpt());
System.arraycopy(args, 0, newArgs, 4, args.length);
args = newArgs;
}
if (line.hasOption(JAR_OPTION.getOpt())) {
this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
}
else if (args.length > 0) {
} else if (!isPython && args.length > 0) {
jarFilePath = args[0];
args = Arrays.copyOfRange(args, 1, args.length);
}
......@@ -95,7 +152,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
this.classpaths = classpaths;
this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
line.getOptionValue(CLASS_OPTION.getOpt()) : null;
line.getOptionValue(CLASS_OPTION.getOpt()) : null;
if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());
......@@ -156,4 +213,11 @@ public abstract class ProgramOptions extends CommandLineOptions {
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
/**
* Indicates whether the job is a Python job.
*/
public boolean isPython() {
return isPython;
}
}
......@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.client.python.PythonDriver;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
......@@ -90,6 +91,11 @@ public class PackagedProgram {
private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
/**
* Flag indicating whether the job is a Python job.
*/
private final boolean isPython;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
......@@ -169,18 +175,21 @@ public class PackagedProgram {
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
if (jarFile == null) {
throw new IllegalArgumentException("The jar file must not be null.");
}
// Whether the job is a Python job.
isPython = entryPointClassName != null && entryPointClassName.equals(PythonDriver.class.getCanonicalName());
URL jarFileUrl;
try {
jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e1) {
throw new IllegalArgumentException("The jar file path is invalid.");
}
URL jarFileUrl = null;
if (jarFile != null) {
try {
jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e1) {
throw new IllegalArgumentException("The jar file path is invalid.");
}
checkJarFile(jarFileUrl);
checkJarFile(jarFileUrl);
} else if (!isPython) {
throw new IllegalArgumentException("The jar file must not be null.");
}
this.jarFile = jarFileUrl;
this.args = args == null ? new String[0] : args;
......@@ -191,7 +200,7 @@ public class PackagedProgram {
}
// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
this.extractedTempLibraries = jarFileUrl == null ? Collections.emptyList() : extractContainedLibraries(jarFileUrl);
this.classpaths = classpaths;
this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
......@@ -233,6 +242,7 @@ public class PackagedProgram {
// load the entry point class
this.mainClass = entryPointClass;
isPython = entryPointClass == PythonDriver.class;
// if the entry point is a program, instantiate the class and get the plan
if (Program.class.isAssignableFrom(this.mainClass)) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.python;
import org.apache.flink.core.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A main class used to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
public class PythonDriver {
private static final Logger LOG = LoggerFactory.getLogger(PythonDriver.class);
public static void main(String[] args) {
// the python job needs at least 2 args.
// e.g. py a.py ...
// e.g. pym a.b -pyfs a.zip ...
if (args.length < 2) {
LOG.error("Required at least two arguments, only python file or python module is available.");
System.exit(1);
}
// parse args
Map<String, List<String>> parsedArgs = parseOptions(args);
// start gateway server
GatewayServer gatewayServer = startGatewayServer();
// prepare python env
// map filename to its Path
Map<String, Path> filePathMap = new HashMap<>();
// commands which will be exec in python progress.
List<String> commands = constructPythonCommands(filePathMap, parsedArgs);
try {
// prepare the exec environment of python progress.
PythonUtil.PythonEnvironment pythonEnv = PythonUtil.preparePythonEnvironment(filePathMap);
// set env variable PYFLINK_GATEWAY_PORT for connecting of python gateway in python progress.
pythonEnv.systemEnv.put("PYFLINK_GATEWAY_PORT", String.valueOf(gatewayServer.getListeningPort()));
// start the python process.
Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands);
int exitCode = pythonProcess.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Python process exits with code: " + exitCode);
}
} catch (Throwable e) {
LOG.error("Run python process failed", e);
} finally {
gatewayServer.shutdown();
}
}
/**
* Creates a GatewayServer run in a daemon thread.
*
* @return The created GatewayServer
*/
public static GatewayServer startGatewayServer() {
InetAddress localhost = InetAddress.getLoopbackAddress();
GatewayServer gatewayServer = new GatewayServer.GatewayServerBuilder()
.javaPort(0)
.javaAddress(localhost)
.build();
Thread thread = new Thread(gatewayServer::start);
thread.setName("py4j-gateway");
thread.setDaemon(true);
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
LOG.error("The gateway server thread join failed.", e);
System.exit(1);
}
return gatewayServer;
}
/**
* Constructs the Python commands which will be executed in python process.
*
* @param filePathMap stores python file name to its path
* @param parsedArgs parsed args
*/
public static List<String> constructPythonCommands(Map<String, Path> filePathMap, Map<String, List<String>> parsedArgs) {
List<String> commands = new ArrayList<>();
if (parsedArgs.containsKey("py")) {
String pythonFile = parsedArgs.get("py").get(0);
Path pythonFilePath = new Path(pythonFile);
filePathMap.put(pythonFilePath.getName(), pythonFilePath);
commands.add(pythonFilePath.getName());
}
if (parsedArgs.containsKey("pym")) {
String pyModule = parsedArgs.get("pym").get(0);
commands.add("-m");
commands.add(pyModule);
}
if (parsedArgs.containsKey("pyfs")) {
List<String> pyFiles = parsedArgs.get("pyfs");
for (String pyFile : pyFiles) {
Path pyFilePath = new Path(pyFile);
filePathMap.put(pyFilePath.getName(), pyFilePath);
}
}
if (parsedArgs.containsKey("args")) {
commands.addAll(parsedArgs.get("args"));
}
return commands;
}
/**
* Parses the args to the map format.
*
* @param args ["py", "xxx.py",
* "pyfs", "a.py,b.py,c.py",
* "--input", "in.txt"]
* @return {"py"->List("xxx.py"),"pyfs"->List("a.py","b.py","c.py"),"args"->List("--input","in.txt")}
*/
public static Map<String, List<String>> parseOptions(String[] args) {
Map<String, List<String>> parsedArgs = new HashMap<>();
int argIndex = 0;
boolean isEntrypointSpecified = false;
// valid args should include python or pyModule field and their value.
if (args[0].equals("py") || args[0].equals("pym")) {
parsedArgs.put(args[0], Collections.singletonList(args[1]));
argIndex = 2;
isEntrypointSpecified = true;
}
if (isEntrypointSpecified && args.length > 2 && args[2].equals("pyfs")) {
List<String> pyFilesList = new ArrayList<>(Arrays.asList(args[3].split(",")));
parsedArgs.put(args[2], pyFilesList);
argIndex = 4;
}
if (!isEntrypointSpecified) {
throw new RuntimeException("The Python entrypoint has not been specified. It can be specified with option -py or -pym");
}
// if arg include other args, the key "args" will map to other args.
if (args.length > argIndex) {
List<String> otherArgList = new ArrayList<>(args.length - argIndex);
otherArgList.addAll(Arrays.asList(args).subList(argIndex, args.length));
parsedArgs.put("args", otherArgList);
}
return parsedArgs;
}
}
......@@ -25,6 +25,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
/**
* The Py4j Gateway Server provides RPC service for user's python process.
......@@ -56,15 +57,17 @@ public class PythonGatewayServer {
// Tells python side the port of our java rpc server
String handshakeFilePath = System.getenv("_PYFLINK_CONN_INFO_PATH");
File handshakeFile = new File(handshakeFilePath);
if (handshakeFile.createNewFile()) {
FileOutputStream fileOutputStream = new FileOutputStream(handshakeFile);
DataOutputStream stream = new DataOutputStream(fileOutputStream);
stream.writeInt(boundPort);
stream.close();
fileOutputStream.close();
} else {
System.out.println("Can't create handshake file: " + handshakeFilePath + ", now exit...");
return;
File tmpPath = Files.createTempFile(handshakeFile.getParentFile().toPath(),
"connection", ".info").toFile();
FileOutputStream fileOutputStream = new FileOutputStream(tmpPath);
DataOutputStream stream = new DataOutputStream(fileOutputStream);
stream.writeInt(boundPort);
stream.close();
fileOutputStream.close();
if (!tmpPath.renameTo(handshakeFile)) {
System.out.println("Unable to write connection information to handshake file: " + handshakeFilePath + ", now exit...");
System.exit(1);
}
// Exit on EOF or broken pipe. This ensures that the server dies
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.python;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* The util class help to prepare Python env and run the python process.
*/
public final class PythonUtil {
private static final Logger LOG = LoggerFactory.getLogger(PythonUtil.class);
private static final String FLINK_OPT_DIR = System.getenv("FLINK_OPT_DIR");
private static final String FLINK_OPT_DIR_PYTHON = FLINK_OPT_DIR + File.separator + "python";
/**
* Wraps Python exec environment.
*/
public static class PythonEnvironment {
public String workingDirectory;
public String pythonExec = "python";
public String pythonPath;
Map<String, String> systemEnv = new HashMap<>();
}
/**
* The hook thread that delete the tmp working dir of python process after the python process shutdown.
*/
private static class ShutDownPythonHook extends Thread {
private Process p;
private String pyFileDir;
public ShutDownPythonHook(Process p, String pyFileDir) {
this.p = p;
this.pyFileDir = pyFileDir;
}
public void run() {
p.destroyForcibly();
if (pyFileDir != null) {
File pyDir = new File(pyFileDir);
FileUtils.deleteDirectoryQuietly(pyDir);
}
}
}
/**
* Prepares PythonEnvironment to start python process.
*
* @param filePathMap map file name to its file path.
* @return PythonEnvironment the Python environment which will be executed in Python process.
*/
public static PythonEnvironment preparePythonEnvironment(Map<String, Path> filePathMap) {
PythonEnvironment env = new PythonEnvironment();
// 1. setup temporary local directory for the user files
String tmpDir = System.getProperty("java.io.tmpdir") +
File.separator + "pyflink" + UUID.randomUUID();
Path tmpDirPath = new Path(tmpDir);
try {
FileSystem fs = tmpDirPath.getFileSystem();
if (fs.exists(tmpDirPath)) {
fs.delete(tmpDirPath, true);
}
fs.mkdirs(tmpDirPath);
} catch (IOException e) {
LOG.error("Prepare tmp directory failed.", e);
}
env.workingDirectory = tmpDirPath.toString();
StringBuilder pythonPathEnv = new StringBuilder();
pythonPathEnv.append(env.workingDirectory);
// 2. create symbolLink in the working directory for the pyflink dependency libs.
List<java.nio.file.Path> pythonLibs = getLibFiles(FLINK_OPT_DIR_PYTHON);
for (java.nio.file.Path libPath : pythonLibs) {
java.nio.file.Path symbolicLinkFilePath = FileSystems.getDefault().getPath(env.workingDirectory,
libPath.getFileName().toString());
createSymbolicLinkForPyflinkLib(libPath, symbolicLinkFilePath);
pythonPathEnv.append(File.pathSeparator);
pythonPathEnv.append(symbolicLinkFilePath.toString());
}
// 3. copy relevant python files to tmp dir and set them in PYTHONPATH.
filePathMap.forEach((sourceFileName, sourcePath) -> {
Path targetPath = new Path(tmpDirPath, sourceFileName);
try {
FileUtils.copy(sourcePath, targetPath, true);
} catch (IOException e) {
LOG.error("Copy files to tmp dir failed", e);
}
String targetFileName = targetPath.toString();
pythonPathEnv.append(File.pathSeparator);
pythonPathEnv.append(targetFileName);
});
env.pythonPath = pythonPathEnv.toString();
return env;
}
/**
* Gets pyflink dependent libs in specified directory.
*
* @param libDir The lib directory
*/
public static List<java.nio.file.Path> getLibFiles(String libDir) {
final List<java.nio.file.Path> libFiles = new ArrayList<>();
SimpleFileVisitor<java.nio.file.Path> finder = new SimpleFileVisitor<java.nio.file.Path>() {
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException {
// exclude .txt file
if (!file.toString().endsWith(".txt")) {
libFiles.add(file);
}
return FileVisitResult.CONTINUE;
}
};
try {
Files.walkFileTree(FileSystems.getDefault().getPath(libDir), finder);
} catch (IOException e) {
LOG.error("Gets pyflink dependent libs failed.", e);
}
return libFiles;
}
/**
* Creates symbolLink in working directory for pyflink lib.
*
* @param libPath the pyflink lib file path.
* @param symbolicLinkPath the symbolic link to pyflink lib.
*/
public static void createSymbolicLinkForPyflinkLib(java.nio.file.Path libPath, java.nio.file.Path symbolicLinkPath) {
try {
Files.createSymbolicLink(symbolicLinkPath, libPath);
} catch (IOException e) {
LOG.error("Create symbol link for pyflink lib failed.", e);
LOG.info("Try to copy pyflink lib to working directory");
try {
Files.copy(libPath, symbolicLinkPath);
} catch (IOException ex) {
LOG.error("Copy pylink lib to working directory failed", ex);
}
}
}
/**
* Starts python process.
*
* @param pythonEnv the python Environment which will be in a process.
* @param commands the commands that python process will execute.
* @return the process represent the python process.
* @throws IOException Thrown if an error occurred when python process start.
*/
public static Process startPythonProcess(PythonEnvironment pythonEnv, List<String> commands) throws IOException {
ProcessBuilder pythonProcessBuilder = new ProcessBuilder();
Map<String, String> env = pythonProcessBuilder.environment();
env.put("PYTHONPATH", pythonEnv.pythonPath);
pythonEnv.systemEnv.forEach(env::put);
commands.add(0, pythonEnv.pythonExec);
pythonProcessBuilder.command(commands);
// set the working directory.
pythonProcessBuilder.directory(new File(pythonEnv.workingDirectory));
// redirect the stderr to stdout
pythonProcessBuilder.redirectErrorStream(true);
// set the child process the output same as the parent process.
pythonProcessBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
Process process = pythonProcessBuilder.start();
if (!process.isAlive()) {
throw new RuntimeException("Failed to start Python process. ");
}
// Make sure that the python sub process will be killed when JVM exit
ShutDownPythonHook hook = new ShutDownPythonHook(process, pythonEnv.workingDirectory);
Runtime.getRuntime().addShutdownHook(hook);
return process;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.python;
import org.apache.flink.core.fs.Path;
import org.junit.Assert;
import org.junit.Test;
import py4j.GatewayServer;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Tests for the {@link PythonDriver}.
*/
public class PythonDriverTest {
@Test
public void testStartGatewayServer() {
GatewayServer gatewayServer = PythonDriver.startGatewayServer();
try {
Socket socket = new Socket("localhost", gatewayServer.getListeningPort());
assert socket.isConnected();
} catch (IOException e) {
throw new RuntimeException("Connect Gateway Server failed");
} finally {
gatewayServer.shutdown();
}
}
@Test
public void testConstructCommands() {
Map<String, Path> filePathMap = new HashMap<>();
Map<String, List<String>> parseArgs = new HashMap<>();
parseArgs.put("py", Collections.singletonList("xxx.py"));
List<String> pyFilesList = new ArrayList<>();
pyFilesList.add("a.py");
pyFilesList.add("b.py");
pyFilesList.add("c.py");
parseArgs.put("pyfs", pyFilesList);
List<String> otherArgs = new ArrayList<>();
otherArgs.add("--input");
otherArgs.add("in.txt");
parseArgs.put("args", otherArgs);
List<String> commands = PythonDriver.constructPythonCommands(filePathMap, parseArgs);
Path pythonPath = filePathMap.get("xxx.py");
Assert.assertNotNull(pythonPath);
Assert.assertEquals(pythonPath.getName(), "xxx.py");
Path aPyFilePath = filePathMap.get("a.py");
Assert.assertNotNull(aPyFilePath);
Assert.assertEquals(aPyFilePath.getName(), "a.py");
Path bPyFilePath = filePathMap.get("b.py");
Assert.assertNotNull(bPyFilePath);
Assert.assertEquals(bPyFilePath.getName(), "b.py");
Path cPyFilePath = filePathMap.get("c.py");
Assert.assertNotNull(cPyFilePath);
Assert.assertEquals(cPyFilePath.getName(), "c.py");
Assert.assertEquals(3, commands.size());
Assert.assertEquals(commands.get(0), "xxx.py");
Assert.assertEquals(commands.get(1), "--input");
Assert.assertEquals(commands.get(2), "in.txt");
}
@Test
public void testParseOptions() {
String[] args = {"py", "xxx.py", "pyfs", "a.py,b.py,c.py", "--input", "in.txt"};
Map<String, List<String>> parsedArgs = PythonDriver.parseOptions(args);
List<String> pythonMainFile = parsedArgs.get("py");
Assert.assertNotNull(pythonMainFile);
Assert.assertEquals(1, pythonMainFile.size());
Assert.assertEquals(pythonMainFile.get(0), args[1]);
List<String> pyFilesList = parsedArgs.get("pyfs");
Assert.assertEquals(3, pyFilesList.size());
String[] pyFiles = args[3].split(",");
for (int i = 0; i < pyFiles.length; i++) {
assert pyFilesList.get(i).equals(pyFiles[i]);
}
List<String> otherArgs = parsedArgs.get("args");
for (int i = 4; i < args.length; i++) {
Assert.assertEquals(otherArgs.get(i - 4), args[i]);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.python;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Tests for the {@link PythonUtil}.
*/
public class PythonUtilTest {
private Path sourceTmpDirPath;
private Path targetTmpDirPath;
private FileSystem sourceFs;
private FileSystem targetFs;
@Before
public void prepareTestEnvironment() {
String sourceTmpDir = System.getProperty("java.io.tmpdir") +
File.separator + "source_" + UUID.randomUUID();
String targetTmpDir = System.getProperty("java.io.tmpdir") +
File.separator + "target_" + UUID.randomUUID();
sourceTmpDirPath = new Path(sourceTmpDir);
targetTmpDirPath = new Path(targetTmpDir);
try {
sourceFs = sourceTmpDirPath.getFileSystem();
if (sourceFs.exists(sourceTmpDirPath)) {
sourceFs.delete(sourceTmpDirPath, true);
}
sourceFs.mkdirs(sourceTmpDirPath);
targetFs = targetTmpDirPath.getFileSystem();
if (targetFs.exists(targetTmpDirPath)) {
targetFs.delete(targetTmpDirPath, true);
}
targetFs.mkdirs(targetTmpDirPath);
} catch (IOException e) {
throw new RuntimeException("initial PythonUtil test environment failed");
}
}
@Test
public void testStartPythonProcess() {
PythonUtil.PythonEnvironment pythonEnv = new PythonUtil.PythonEnvironment();
pythonEnv.workingDirectory = targetTmpDirPath.toString();
pythonEnv.pythonPath = targetTmpDirPath.toString();
List<String> commands = new ArrayList<>();
Path pyPath = new Path(targetTmpDirPath, "word_count.py");
try {
targetFs.create(pyPath, FileSystem.WriteMode.OVERWRITE);
File pyFile = new File(pyPath.toString());
String pyProgram = "#!/usr/bin/python\n" +
"# -*- coding: UTF-8 -*-\n" +
"import sys\n" +
"\n" +
"if __name__=='__main__':\n" +
"\tfilename = sys.argv[1]\n" +
"\tfo = open(filename, \"w\")\n" +
"\tfo.write( \"hello world\")\n" +
"\tfo.close()";
Files.write(pyFile.toPath(), pyProgram.getBytes(), StandardOpenOption.WRITE);
Path result = new Path(targetTmpDirPath, "word_count_result.txt");
commands.add(pyFile.getName());
commands.add(result.getName());
Process pythonProcess = PythonUtil.startPythonProcess(pythonEnv, commands);
int exitCode = pythonProcess.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Python process exits with code: " + exitCode);
}
String cmdResult = new String(Files.readAllBytes(new File(result.toString()).toPath()));
Assert.assertEquals(cmdResult, "hello world");
pythonProcess.destroyForcibly();
targetFs.delete(pyPath, true);
targetFs.delete(result, true);
} catch (IOException | InterruptedException e) {
throw new RuntimeException("test start Python process failed " + e.getMessage());
}
}
@After
public void cleanEnvironment() {
try {
sourceFs.delete(sourceTmpDirPath, true);
targetFs.delete(targetTmpDirPath, true);
} catch (IOException e) {
throw new RuntimeException("delete tmp dir failed " + e.getMessage());
}
}
}
......@@ -572,6 +572,9 @@ under the License.
<relocation>
<pattern>py4j</pattern>
<shadedPattern>org.apache.flink.api.python.py4j</shadedPattern>
<includes>
<include>py4j.*</include>
</includes>
</relocation>
</relocations>
</configuration>
......
......@@ -242,6 +242,13 @@ under the License.
<fileMode>0755</fileMode>
</fileSet>
<!-- copy python table example to examples of dist -->
<fileSet>
<directory>../flink-python/pyflink/table/examples</directory>
<outputDirectory>examples/python/table</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
</fileSets>
</assembly>
......@@ -28,6 +28,9 @@ def _find_flink_home():
# If the environment has set FLINK_HOME, trust it.
if 'FLINK_HOME' in os.environ:
return os.environ['FLINK_HOME']
elif 'FLINK_ROOT_DIR' in os.environ:
os.environ['FLINK_HOME'] = os.environ['FLINK_ROOT_DIR']
return os.environ['FLINK_ROOT_DIR']
else:
try:
flink_root_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
......
......@@ -28,7 +28,6 @@ from threading import RLock
from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
from pyflink.find_flink_home import _find_flink_home
_gateway = None
_lock = RLock()
......@@ -46,6 +45,9 @@ def get_gateway():
_gateway = JavaGateway(gateway_parameters=gateway_param)
else:
_gateway = launch_gateway()
# import the flink view
import_flink_view(_gateway)
return _gateway
......@@ -97,6 +99,14 @@ def launch_gateway():
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True))
return gateway
def import_flink_view(gateway):
"""
import the classes used by PyFlink.
:param gateway:gateway connected to JavaGateWayServer
"""
# Import the classes used by PyFlink
java_import(gateway.jvm, "org.apache.flink.table.api.*")
java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
......@@ -109,5 +119,3 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment")
java_import(gateway.jvm,
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
return gateway
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import tempfile
from pyflink.table import TableEnvironment, TableConfig
from pyflink.table.table_sink import CsvTableSink
from pyflink.table.table_source import CsvTableSource
from pyflink.table.types import DataTypes
# TODO: the word_count.py is just a test example for CLI.
# After pyflink have aligned Java Table API Connectors, this example will be improved.
def word_count():
tmp_dir = tempfile.gettempdir()
source_path = tmp_dir + '/streaming.csv'
if os.path.isfile(source_path):
os.remove(source_path)
content = "line Licensed to the Apache Software Foundation ASF under one " \
"line or more contributor license agreements See the NOTICE file " \
"line distributed with this work for additional information " \
"line regarding copyright ownership The ASF licenses this file " \
"to you under the Apache License Version the " \
"License you may not use this file except in compliance " \
"with the License"
with open(source_path, 'w') as f:
for word in content.split(" "):
f.write(",".join([word, "1"]))
f.write("\n")
f.flush()
f.close()
t_config = TableConfig.Builder().as_batch_execution().set_parallelism(1).build()
t_env = TableEnvironment.create(t_config)
field_names = ["word", "cout"]
field_types = [DataTypes.STRING, DataTypes.LONG]
# register Orders table in table environment
t_env.register_table_source(
"Word",
CsvTableSource(source_path, field_names, field_types))
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
tmp_csv = tmp_dir + '/streaming2.csv'
if os.path.isfile(tmp_csv):
os.remove(tmp_csv)
t_env.register_table_sink(
"Results",
field_names, field_types, CsvTableSink(tmp_csv))
t_env.scan("Word") \
.group_by("word") \
.select("word, count(1) as count") \
.insert_into("Results")
t_env.execute()
if __name__ == '__main__':
word_count()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册