提交 247cad1c 编写于 作者: Z zentol 提交者: Maximilian Michels

[FLINK-1924] minor refactoring of the Python API

- code formatting
- simpler python process initialization
- renaming of the python connection following the switch to TCP

This closes #616.
上级 fceb90a3
......@@ -54,11 +54,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
public static boolean DEBUG = false;
public static void setLocalMode() {
FLINK_HDFS_PATH = System.getProperty("java.io.tmpdir") + "/flink";
}
protected HashMap<Integer, Object> sets;
protected HashMap<Integer, Object> sets = new HashMap();
public static ExecutionEnvironment env;
protected Receiver receiver;
......
......@@ -16,7 +16,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
......@@ -57,8 +56,8 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan";
protected static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
......@@ -104,13 +103,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
split = x;
}
}
try {
prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
receivePlan();
if (env instanceof LocalEnvironment) {
FLINK_HDFS_PATH = "file:/tmp/flink";
FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink";
}
distributeFiles(env);
......@@ -172,7 +172,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
}
private void startPython(String[] args) throws IOException {
sets = new HashMap();
StringBuilder argsBuilder = new StringBuilder();
for (String arg : args) {
argsBuilder.append(" ").append(arg);
......@@ -180,23 +179,15 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
receiver = new Receiver(null);
receiver.open(null);
if (usePython3) {
try {
Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
}
process = Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH + " -B "
+ FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
} else {
try {
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
}
process = Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH + " -B "
+ FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
try {
Runtime.getRuntime().exec(pythonBinaryPath);
} catch (IOException ex) {
throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
}
process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
new StreamPrinter(process.getInputStream()).start();
new StreamPrinter(process.getErrorStream()).start();
......@@ -210,7 +201,7 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
if (value != 0) {
throw new RuntimeException("Plan file caused an error. Check log-files for details.");
}
} catch (IllegalThreadStateException ise) {
} catch (IllegalThreadStateException ise) {//Process still running
}
}
......
......@@ -23,9 +23,7 @@ import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_
import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
import org.apache.flink.languagebinding.api.java.common.streaming.Streamer;
import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_KEY;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_KEY;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
/**
......@@ -85,21 +83,15 @@ public class PythonStreamer extends Streamer {
importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, FLINK_PYTHON_PLAN_NAME.length() - 3));
}
if (usePython3) {
try {
Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
}
pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
} else {
try {
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
}
pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
try {
Runtime.getRuntime().exec(pythonBinaryPath);
} catch (IOException ex) {
throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
}
pb.command(pythonBinaryPath, "-O", "-B", executorPath, "" + server.getLocalPort());
if (debug) {
socket.setSoTimeout(0);
LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
......
......@@ -60,7 +60,7 @@ class OneWayBusyBufferingMappedFileConnection(object):
self._file_output_buffer.write(b'\x01')
class BufferingUDPMappedFileConnection(object):
class BufferingTCPMappedFileConnection(object):
def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
self._input_file = open(input_file, "rb+")
self._output_file = open(output_file, "rb+")
......@@ -71,7 +71,7 @@ class BufferingUDPMappedFileConnection(object):
self._out = deque()
self._out_size = 0
self._input = ""
self._input = b""
self._input_offset = 0
self._input_size = 0
self._was_last = False
......@@ -124,13 +124,13 @@ class BufferingUDPMappedFileConnection(object):
self._was_last = False
self._input_size = 0
self._input_offset = 0
self._input = ""
self._input = b""
class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, socket)
self._input = ["", ""]
super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, socket)
self._input = [b"", b""]
self._input_offset = [0, 0]
self._input_size = [0, 0]
self._was_last = [False, False]
......@@ -161,6 +161,6 @@ class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
self._was_last = [False, False]
self._input_size = [0, 0]
self._input_offset = [0, 0]
self._input = ["", ""]
self._input = [b"", b""]
......@@ -26,7 +26,7 @@ class CoGroupFunction(Function.Function):
self._keys2 = None
def _configure(self, input_file, output_file, port):
self._connection = Connection.TwinBufferingUDPMappedFileConnection(input_file, output_file, port)
self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection, 0)
self._iterator2 = Iterator.Iterator(self._connection, 1)
self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
......
......@@ -35,7 +35,7 @@ class Function(object):
self._meta = None
def _configure(self, input_file, output_file, port):
self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self._configure_chain(Collector.Collector(self._connection))
......
......@@ -31,13 +31,13 @@ class GroupReduceFunction(Function.Function):
def _configure(self, input_file, output_file, port):
if self._combine:
self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self._collector = Collector.Collector(self._connection)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self._run = self._run_combine
else:
self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
......
......@@ -29,13 +29,13 @@ class ReduceFunction(Function.Function):
def _configure(self, input_file, output_file, port):
if self._combine:
self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self._collector = Collector.Collector(self._connection)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self._run = self._run_combine
else:
self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
if self._keys is None:
self._run = self._run_allreduce
......
......@@ -30,7 +30,6 @@ from flink.functions.MapFunction import MapFunction
from flink.functions.MapPartitionFunction import MapPartitionFunction
from flink.functions.ReduceFunction import ReduceFunction
def deduct_output_type(dataset):
skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册