提交 a09bd4c6 编写于 作者: W Wei Zhong 提交者: sunjincheng121

[FLINK-12311][python] Add base python framework and Add Scan, Projection, and...

[FLINK-12311][python] Add base python framework and Add Scan, Projection, and Filter operator support

This closes #8267

make travis tests green

fix documents, refine shell file name and optimize program logic.

Adjust the code examples in python documents to be consistent with scala documents.

Refactor flink-python project structure, remove java file from flink-python package.

delete unnecessary dependency and plugins.
上级 4e505c67
......@@ -14,7 +14,9 @@ tmp
*.iml
*.swp
*.jar
*.zip
*.log
*.pyc
.DS_Store
build-target
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/avro/
......@@ -23,6 +25,9 @@ flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/gener
flink-runtime-web/web-dashboard/node/
flink-runtime-web/web-dashboard/node_modules/
flink-runtime-web/web-dashboard/web/
flink-python/dist/
flink-python/build/
flink-python/pyflink.egg-info/
atlassian-ide-plugin.xml
out/
/docs/api
......
......@@ -297,6 +297,81 @@ val result = orders.where('b === "red")
</tr>
</tbody>
</table>
</div>
<div data-lang="python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Operators</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>Scan</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>Similar to the FROM clause in a SQL query. Performs a scan of a registered table.</p>
{% highlight python %}
orders = table_env.scan("Orders");
{% endhighlight %}
</td>
</tr>
<tr>
<td>
<strong>Select</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>Similar to a SQL SELECT statement. Performs a select operation.</p>
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.select("a, c as d");
{% endhighlight %}
<p>You can use star (<code>*</code>) to act as a wild card, selecting all of the columns in the table.</p>
{% highlight python %}
result = orders.select("*");
{% endhighlight %}
</td>
</tr>
<tr>
<td>
<strong>Alias</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>Renames fields.</p>
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.alias("x, y, z, t");
{% endhighlight %}
</td>
</tr>
<tr>
<td>
<strong>Where / Filter</strong><br>
<span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
</td>
<td>
<p>Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.</p>
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.where("b === 'red'");
{% endhighlight %}
or
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.filter("a % 2 === 0");
{% endhighlight %}
</td>
</tr>
</tbody>
</table>
</div>
</div>
......
......@@ -297,6 +297,81 @@ val result = orders.where('b === "red")
</tr>
</tbody>
</table>
</div>
<div data-lang="python" markdown="1">
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">操作</th>
<th class="text-center">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<strong>Scan</strong><br>
<span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
</td>
<td>
<p>类似于SQL请求中的FROM子句,将一个环境中已注册的表转换成Table对象。</p>
{% highlight python %}
orders = table_env.scan("Orders");
{% endhighlight %}
</td>
</tr>
<tr>
<td>
<strong>Select</strong><br>
<span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
</td>
<td>
<p>类似于SQL请求中的SELECT子句,执行一个select操作。</p>
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.select("a, c as d");
{% endhighlight %}
<p>您可以使用星号 (<code>*</code>) 表示选择表中的所有列。</p>
{% highlight python %}
result = orders.select("*");
{% endhighlight %}
</td>
</tr>
<tr>
<td>
<strong>Alias</strong><br>
<span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
</td>
<td>
<p>重命名字段。</p>
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.alias("x, y, z, t");
{% endhighlight %}
</td>
</tr>
<tr>
<td>
<strong>Where / Filter</strong><br>
<span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
</td>
<td>
<p>类似于SQL请求中的WHERE子句,过滤掉表中不满足条件的行。</p>
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.where("b === 'red'");
{% endhighlight %}
or
{% highlight python %}
orders = table_env.scan("Orders");
result = orders.filter("a % 2 === 0");
{% endhighlight %}
</td>
</tr>
</tbody>
</table>
</div>
</div>
......
......@@ -86,6 +86,13 @@ under the License.
<artifactId>flink-shaded-guava</artifactId>
</dependency>
<!-- Python API dependencies -->
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.8.1</version>
</dependency>
<!-- ================== test dependencies ================== -->
<dependency>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.python;
import py4j.GatewayServer;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
/**
* The Py4j Gateway Server provides RPC service for user's python process.
*/
public class PythonGatewayServer {
/**
* <p>
* Main method to start a local GatewayServer on a ephemeral port.
* It tells python side via a file.
*
* See: py4j.GatewayServer.main()
* </p>
*/
public static void main(String[] args) throws IOException {
InetAddress localhost = InetAddress.getLoopbackAddress();
GatewayServer gatewayServer = new GatewayServer.GatewayServerBuilder()
.javaPort(0)
.javaAddress(localhost)
.build();
gatewayServer.start();
int boundPort = gatewayServer.getListeningPort();
if (boundPort == -1) {
System.out.println("GatewayServer failed to bind; exiting");
System.exit(1);
}
// Tells python side the port of our java rpc server
String handshakeFilePath = System.getenv("_PYFLINK_CONN_INFO_PATH");
File handshakeFile = new File(handshakeFilePath);
if (handshakeFile.createNewFile()) {
FileOutputStream fileOutputStream = new FileOutputStream(handshakeFile);
DataOutputStream stream = new DataOutputStream(fileOutputStream);
stream.writeInt(boundPort);
stream.close();
fileOutputStream.close();
} else {
System.out.println("Can't create handshake file: " + handshakeFilePath + ", now exit...");
return;
}
// Exit on EOF or broken pipe. This ensures that the server dies
// if its parent program dies.
while (System.in.read() != -1) {
// Do nothing
}
gatewayServer.shutdown();
}
}
......@@ -568,6 +568,12 @@ under the License.
<projectName>Apache Flink</projectName>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>py4j</pattern>
<shadedPattern>org.apache.flink.api.python.py4j</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
<execution>
......
......@@ -178,4 +178,11 @@
<fileMode>0644</fileMode>
</file>
</files>
<fileSets>
<fileSet>
<directory>../flink-python/lib/</directory>
<outputDirectory>opt/python/</outputDirectory>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
</assembly>
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# =====================================================================
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
FLINK_CLASSPATH=`constructFlinkClassPath`
ARGS=()
while [[ $# -gt 0 ]]
do
key="$1"
case $key in
-c|--class)
DRIVER=$2
shift
shift
;;
*)
ARGS+=("$1")
shift
;;
esac
done
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH} ${DRIVER} ${ARGS[@]}
......@@ -34,6 +34,7 @@ See bundled license files for details.
- com.esotericsoftware.kryo:kryo:2.24.0
- com.esotericsoftware.minlog:minlog:1.2
- org.clapper:grizzled-slf4j_2.11:1.3.2
- net.sf.py4j:py4j:0.10.8.1
The following dependencies all share the same BSD license which you find under licenses/LICENSE.scala.
......
Copyright (c) 2009-2018, Barthelemy Dagenais and individual contributors. All
rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- The name of the author may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Apache Flink Python API
Apache Flink is an open source stream processing framework with the powerful stream- and batch-processing capabilities.
Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
This packaging allows you to write Flink programs in Python, but it is currently a very initial version and will change in future versions.
In this initial version only Table API is supported, you can find the documentation at [https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html)
## Installation
Currently, you can install PyFlink from Flink source code.
First, you need build the whole Flink project using `mvn clean install -DskipTests` and set the value of the environment variable FLINK_HOME to the `build-target` directory under the root directory of Flink.
Then enter the directory where this README.md file is located and execute `python setup.py install` to install PyFlink on your device.
## Running Tests
Currently you can perform an end-to-end test of PyFlink in the directory where this file is located with the following command:
PYTHONPATH=$PYTHONPATH:./ python ./pyflink/table/tests/test_end_to_end.py
## Python Requirements
PyFlink depends on Py4J (currently version 0.10.8.1).
Copyright (c) 2009-2018, Barthelemy Dagenais and individual contributors. All
rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
- The name of the author may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-parent</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.9-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-python</artifactId>
<name>flink-python</name>
<packaging>pom</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>clean</id>
<phase>clean</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete>
<fileset dir="${project.basedir}/pyflink"
includes="**/*.pyc"/>
</delete>
<delete file="${project.basedir}/lib/pyflink.zip"/>
<delete dir="${project.basedir}/target"/>
</target>
</configuration>
</execution>
<execution>
<id>generate-resources</id>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete>
<fileset dir="${project.basedir}/pyflink"
includes="**/*.pyc"/>
</delete>
<delete file="${project.basedir}/lib/pyflink.zip"/>
<zip destfile="${project.basedir}/lib/pyflink.zip">
<fileset dir="${project.basedir}"
includes="pyflink/**/*"/>
</zip>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from __future__ import print_function
import os
import sys
def _find_flink_home():
"""
Find the FLINK_HOME.
"""
# If the environment has set FLINK_HOME, trust it.
if 'FLINK_HOME' in os.environ:
return os.environ['FLINK_HOME']
else:
try:
flink_root_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/../../")
build_target = flink_root_dir + "/build-target"
pyflink_file = build_target + "/bin/pyflink-gateway-server.sh"
if os.path.isfile(pyflink_file):
os.environ['FLINK_HOME'] = build_target
return build_target
except Exception:
pass
print("Could not find valid FLINK_HOME in current environment.", file=sys.stderr)
sys.exit(-1)
if __name__ == "__main__":
print(_find_flink_home())
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import platform
import shutil
import signal
import struct
import tempfile
import time
from subprocess import Popen, PIPE
from threading import RLock
from py4j.java_gateway import java_import, JavaGateway, GatewayParameters
from pyflink.find_flink_home import _find_flink_home
_gateway = None
_lock = RLock()
def get_gateway():
# type: () -> JavaGateway
global _gateway
global _lock
with _lock:
if _gateway is None:
# if Java Gateway is already running
if 'PYFLINK_GATEWAY_PORT' in os.environ:
gateway_port = int(os.environ['PYFLINK_GATEWAY_PORT'])
gateway_param = GatewayParameters(port=gateway_port, auto_convert=True)
_gateway = JavaGateway(gateway_parameters=gateway_param)
else:
_gateway = launch_gateway()
return _gateway
def launch_gateway():
# type: () -> JavaGateway
"""
launch jvm gateway
"""
FLINK_HOME = _find_flink_home()
# TODO windows support
on_windows = platform.system() == "Windows"
if on_windows:
raise Exception("Windows system is not supported currently.")
script = "./bin/pyflink-gateway-server.sh"
command = [os.path.join(FLINK_HOME, script)]
command += ['-c', 'org.apache.flink.api.python.PythonGatewayServer']
# Create a temporary directory where the gateway server should write the connection information.
conn_info_dir = tempfile.mkdtemp()
try:
fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
os.close(fd)
os.unlink(conn_info_file)
env = dict(os.environ)
env["_PYFLINK_CONN_INFO_PATH"] = conn_info_file
def preexec_func():
# ignore ctrl-c / SIGINT
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Launch the Java gateway.
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
p = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
while not p.poll() and not os.path.isfile(conn_info_file):
time.sleep(0.1)
if not os.path.isfile(conn_info_file):
raise Exception("Java gateway process exited before sending its port number")
with open(conn_info_file, "rb") as info:
gateway_port = struct.unpack("!I", info.read(4))[0]
finally:
shutil.rmtree(conn_info_dir)
# Connect to the gateway
gateway = JavaGateway(gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True))
# Import the classes used by PyFlink
java_import(gateway.jvm, "org.apache.flink.table.api.*")
java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
java_import(gateway.jvm, "org.apache.flink.table.sources.*")
java_import(gateway.jvm, "org.apache.flink.table.sinks.*")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.TypeInformation")
java_import(gateway.jvm, "org.apache.flink.api.common.typeinfo.Types")
java_import(gateway.jvm, "org.apache.flink.api.java.ExecutionEnvironment")
java_import(gateway.jvm, "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
return gateway
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Important classes of Flink Table API:
- :class:`pyflink.table.TableEnvironment`
Main entry point for :class:`Table` and SQL functionality
- :class:`pyflink.table.Table`
The core component of the Table API.
Use the methods of :class:`Table` to transform data.
- :class:`pyflink.table.TableConfig`
A config to define the runtime behavior of the Table API.
It is necessary when creating :class:`TableEnvironment`.
- :class:`pyflink.table.TableSource`
Defines an external data source as a table.
- :class:`pyflink.table.TableSink`
Specifies how to emit a table to an external system or location.
"""
from pyflink.table.table import Table
from pyflink.table.table_config import TableConfig
from pyflink.table.table_environment import TableEnvironment, StreamTableEnvironment, BatchTableEnvironment
from pyflink.table.table_sink import TableSink, CsvTableSink
from pyflink.table.table_source import TableSource, CsvTableSource
from pyflink.table.types import DataTypes
__all__ = [
'TableEnvironment',
'StreamTableEnvironment',
'BatchTableEnvironment',
'Table',
'TableConfig',
'TableSink',
'TableSource',
'CsvTableSink',
'CsvTableSource',
'DataTypes'
]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from py4j.java_gateway import get_method
__all__ = ['Table']
class Table(object):
"""
A :class:`Table` is the core component of the Table API.
Similar to how the batch and streaming APIs have DataSet and DataStream,
the Table API is built around :class:`Table`.
Use the methods of :class:`Table` to transform data.
Example:
::
>>> t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
>>> t_env = TableEnvironment.get_table_environment(t_config)
>>> ...
>>> t_env.register_table_source("source", ...)
>>> t = t_env.scan("source")
>>> t.select(...)
...
>>> t_env.register_table_sink("result", ...)
>>> t.insert_into("result")
>>> t_env.execute()
Operations such as :func:`~pyflink.table.Table.join`, :func:`~pyflink.table.Table.select`,
:func:`~pyflink.table.Table.where` and :func:`~pyflink.table.Table.group_by`
take arguments in an expression string. Please refer to the documentation for
the expression syntax.
"""
def __init__(self, j_table):
self._j_table = j_table
def select(self, fields):
"""
Performs a selection operation. Similar to a SQL SELECT statement. The field expressions
can contain complex expressions.
Example:
::
>>> tab.select("key, value + 'hello'")
:param fields: Expression string.
:return: Result table.
"""
return Table(self._j_table.select(fields))
def alias(self, fields):
"""
Renames the fields of the expression result. Use this to disambiguate fields before
joining to operations.
Example:
::
>>> tab.alias("a, b")
:param fields: Field list expression string.
:return: Result table.
"""
return Table(get_method(self._j_table, "as")(fields))
def filter(self, predicate):
"""
Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
clause.
Example:
::
>>> tab.filter("name = 'Fred'")
:param predicate: Predicate expression string.
:return: Result table.
"""
return Table(self._j_table.filter(predicate))
def where(self, predicate):
"""
Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
clause.
Example:
::
>>> tab.where("name = 'Fred'")
:param predicate: Predicate expression string.
:return: Result table.
"""
return Table(self._j_table.where(predicate))
def insert_into(self, table_name):
"""
Writes the :class:`Table` to a :class:`TableSink` that was registered under the specified name.
Example:
::
>>> tab.insert_into("print")
:param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written.
"""
self._j_table.insertInto(table_name)
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
__all__ = ['TableConfig']
class TableConfig(object):
"""
A config to define the runtime behavior of the Table API.
"""
def __init__(self):
self._is_stream = None
self._parallelism = None
@property
def is_stream(self):
return self._is_stream
@is_stream.setter
def is_stream(self, is_stream):
self._is_stream = is_stream
@property
def parallelism(self):
return self._parallelism
@parallelism.setter
def parallelism(self, parallelism):
self._parallelism = parallelism
class Builder(object):
def __init__(self):
self._is_stream = None
self._parallelism = None
def as_streaming_execution(self):
"""
Configures streaming execution mode.
If this method is called, :class:`StreamTableEnvironment` will be created.
:return: Builder
"""
self._is_stream = True
return self
def as_batch_execution(self):
"""
Configures batch execution mode.
If this method is called, :class:`BatchTableEnvironment` will be created.
:return: Builder
"""
self._is_stream = False
return self
def set_parallelism(self, parallelism):
"""
Sets the parallelism for all operations.
:param parallelism: The parallelism.
:return: Builder
"""
self._parallelism = parallelism
return self
def build(self):
"""
Builds :class:`TableConfig` object.
:return: TableConfig
"""
config = TableConfig()
config.parallelism = self._parallelism
config.is_stream = self._is_stream
return config
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABCMeta
from pyflink.java_gateway import get_gateway
from pyflink.table import Table
from pyflink.util import type_utils, utils
__all__ = [
'BatchTableEnvironment',
'StreamTableEnvironment',
'TableEnvironment'
]
class TableEnvironment(object):
"""
The abstract base class for batch and stream TableEnvironments.
"""
__metaclass__ = ABCMeta
def __init__(self, j_tenv):
self._j_tenv = j_tenv
def from_table_source(self, table_source):
"""
Creates a table from a table source.
:param table_source: The table source used as table.
:return: The result table.
"""
return Table(self._j_tenv.fromTableSource(table_source._j_table_source))
def register_table(self, name, table):
"""
Registers a :class:`Table` under a unique name in the TableEnvironment's catalog.
Registered tables can be referenced in SQL queries.
:param name: The name under which the table will be registered.
:param table: The table to register.
"""
self._j_tenv.registerTable(name, table._java_table)
def register_table_source(self, name, table_source):
"""
Registers an external :class:`TableSource` in this :class:`TableEnvironment`'s catalog.
Registered tables can be referenced in SQL queries.
:param name: The name under which the :class:`TableSource` is registered.
:param table_source: The :class:`TableSource` to register.
"""
self._j_tenv.registerTableSource(name, table_source._j_table_source)
def register_table_sink(self, name, field_names, field_types, table_sink):
"""
Registers an external :class:`TableSink` with given field names and types in this
:class:`TableEnvironment`'s catalog.
Registered sink tables can be referenced in SQL DML statements.
:param name: The name under which the :class:`TableSink` is registered.
:param field_names: The field names to register with the :class:`TableSink`.
:param field_types: The field types to register with the :class:`TableSink`.
:param table_sink: The :class:`TableSink` to register.
"""
gateway = get_gateway()
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[type_utils.to_java_type(field_type) for field_type in field_types])
self._j_tenv.registerTableSink(name, j_field_names, j_field_types, table_sink._j_table_sink)
def scan(self, *table_path):
"""
Scans a registered table and returns the resulting :class:`Table`.
A table to scan must be registered in the TableEnvironment. It can be either directly
registered as TableSource or Table.
Examples:
Scanning a directly registered table
::
>>> tab = t_env.scan("tableName")
Scanning a table from a registered catalog
::
>>> tab = t_env.scan("catalogName", "dbName", "tableName")
:param table_path: The path of the table to scan.
:throws: Exception if no table is found using the given table path.
:return: The resulting :class:`Table`
"""
gateway = get_gateway()
j_table_paths = utils.to_jarray(gateway.jvm.String, table_path)
j_table = self._j_tenv.scan(j_table_paths)
return Table(j_table)
def execute(self, job_name=None):
"""
Triggers the program execution.
:param job_name: Optional, desired name of the job.
"""
if job_name is not None:
self._j_tenv.execEnv().execute(job_name)
else:
self._j_tenv.execEnv().execute()
@classmethod
def get_table_environment(cls, table_config):
"""
Returns a :class:`StreamTableEnvironment` or a :class:`BatchTableEnvironment`
which matches the :class:`TableConfig`'s content.
:type table_config: The TableConfig for the new TableEnvironment.
:return: Desired :class:`TableEnvironment`.
"""
gateway = get_gateway()
if table_config.is_stream:
j_execution_env = gateway.jvm.StreamExecutionEnvironment.getExecutionEnvironment()
j_tenv = gateway.jvm.StreamTableEnvironment.create(j_execution_env)
t_env = StreamTableEnvironment(j_tenv)
else:
j_execution_env = gateway.jvm.ExecutionEnvironment.getExecutionEnvironment()
j_tenv = gateway.jvm.BatchTableEnvironment.create(j_execution_env)
t_env = BatchTableEnvironment(j_tenv)
if table_config.parallelism is not None:
t_env._j_tenv.execEnv().setParallelism(table_config.parallelism)
return t_env
class StreamTableEnvironment(TableEnvironment):
def __init__(self, j_tenv):
self._j_tenv = j_tenv
super(StreamTableEnvironment, self).__init__(j_tenv)
class BatchTableEnvironment(TableEnvironment):
def __init__(self, j_tenv):
self._j_tenv = j_tenv
super(BatchTableEnvironment, self).__init__(j_tenv)
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.java_gateway import get_gateway
__all__ = ['TableSink', 'CsvTableSink']
class TableSink(object):
"""
A :class:`TableSink` specifies how to emit a table to an external system or location.
"""
def __init__(self, j_table_sink):
self._j_table_sink = j_table_sink
class WriteMode(object):
NO_OVERWRITE = 0
OVERWRITE = 1
class CsvTableSink(TableSink):
"""
A simple :class:`TableSink` to emit data as CSV files.
:param path: The output path to write the Table to.
:param field_delimiter: The field delimiter.
:param num_files: The number of files to write to.
:param write_mode: The write mode to specify whether existing files are overwritten or not.
"""
def __init__(self, path, field_delimiter=',', num_files=1, write_mode=None):
# type: (str, str, int, int) -> None
gateway = get_gateway()
if write_mode == WriteMode.NO_OVERWRITE:
j_write_mode = gateway.jvm.scala.Option.apply(
gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE)
elif write_mode == WriteMode.OVERWRITE:
j_write_mode = gateway.jvm.scala.Option.apply(
gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE)
elif write_mode is None:
j_write_mode = gateway.jvm.scala.Option.empty()
else:
raise Exception('Unsupported write_mode: %s' % write_mode)
j_some_field_delimiter = gateway.jvm.scala.Option.apply(field_delimiter)
j_some_num_files = gateway.jvm.scala.Option.apply(num_files)
j_csv_table_sink = gateway.jvm.CsvTableSink(
path, j_some_field_delimiter, j_some_num_files, j_write_mode)
super(CsvTableSink, self).__init__(j_csv_table_sink)
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType
from pyflink.util import type_utils
from pyflink.util import utils
__all__ = ['TableSource', 'CsvTableSource']
class TableSource(object):
"""
Defines a table from an external system or location.
"""
def __init__(self, j_table_source):
self._j_table_source = j_table_source
class CsvTableSource(TableSource):
"""
A :class:`TableSource` for simple CSV files with a
(logically) unlimited number of fields.
:param source_path: The path to the CSV file.
:param field_names: The names of the table fields.
:param field_types: The types of the table fields.
"""
def __init__(self, source_path, field_names, field_types):
# type: (str, list[str], list[DataType]) -> None
gateway = get_gateway()
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[type_utils.to_java_type(field_type) for field_type in field_types])
super(CsvTableSource, self).__init__(gateway.jvm.CsvTableSource(source_path, j_field_names, j_field_types))
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
import tempfile
from pyflink.find_flink_home import _find_flink_home
from pyflink.table import TableEnvironment, TableConfig
from pyflink.table.table_sink import CsvTableSink
from pyflink.table.table_source import CsvTableSource
from pyflink.table.types import DataTypes
def test_end_to_end():
tmp_dir = tempfile.gettempdir()
source_path = tmp_dir + '/streaming.csv'
if os.path.isfile(source_path):
os.remove(source_path)
with open(source_path, 'w') as f:
lines = '1,hi,hello\n' + '2,hi,hello\n'
f.write(lines)
f.close()
_find_flink_home()
print("using %s as FLINK_HOME..." % os.environ["FLINK_HOME"])
t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
t_env = TableEnvironment.get_table_environment(t_config)
field_names = ["a", "b", "c"]
field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
# register Orders table in table environment
t_env.register_table_source(
"Orders",
CsvTableSource(source_path, field_names, field_types))
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
tmp_csv = tmp_dir + '/streaming2.csv'
if os.path.isfile(tmp_csv):
os.remove(tmp_csv)
t_env.register_table_sink(
"Results",
field_names, field_types, CsvTableSink(tmp_csv))
t_env.scan("Orders") \
.where("a > 0") \
.select("a + 1, b, c") \
.insert_into("Results")
t_env.execute()
with open(tmp_csv, 'r') as f:
lines = f.read()
assert lines == '2,hi,hello\n' + '3,hi,hello\n'
print("test passed, the log file is under this directory: %s/log" % os.environ["FLINK_HOME"])
if __name__ == '__main__':
test_end_to_end()
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
if sys.version > '3':
xrange = range
__all__ = ['DataTypes']
class DataType(object):
"""
Base class for data types.
"""
@classmethod
def type_name(cls):
return cls.__name__[:-4].lower()
def __hash__(self):
return hash(self.type_name())
def __eq__(self, other):
return self.type_name() == other.type_name()
def __ne__(self, other):
return self.type_name() != other.type_name()
class DataTypeSingleton(type):
"""
Metaclass for DataType
"""
_instances = {}
def __call__(cls):
if cls not in cls._instances:
cls._instances[cls] = super(DataTypeSingleton, cls).__call__()
return cls._instances[cls]
class AtomicType(DataType):
"""
An internal type used to represent everything that is not
null, arrays, structs, and maps.
"""
class NumericType(AtomicType):
"""
Numeric data types.
"""
class IntegralType(NumericType):
"""
Integral data types.
"""
__metaclass__ = DataTypeSingleton
class FractionalType(NumericType):
"""
Fractional data types.
"""
class StringType(AtomicType):
"""
String data type. SQL VARCHAR
"""
__metaclass__ = DataTypeSingleton
class BooleanType(AtomicType):
"""
Boolean data types. SQL BOOLEAN
"""
__metaclass__ = DataTypeSingleton
class ByteType(IntegralType):
"""
Byte data type. SQL TINYINT
"""
class CharType(IntegralType):
"""
Char data type. SQL CHAR
"""
class ShortType(IntegralType):
"""
Short data types. SQL SMALLINT (16bits)
"""
class IntegerType(IntegralType):
"""
Int data types. SQL INT (32bits)
"""
class LongType(IntegralType):
"""
Long data types. SQL BIGINT (64bits)
"""
class FloatType(FractionalType):
"""
Float data type. SQL FLOAT
"""
__metaclass__ = DataTypeSingleton
class DoubleType(FractionalType):
"""
Double data type. SQL DOUBLE
"""
__metaclass__ = DataTypeSingleton
class DateType(AtomicType):
"""
Date data type. SQL DATE
"""
__metaclass__ = DataTypeSingleton
class TimeType(AtomicType):
"""
Time data type. SQL TIME
"""
__metaclass__ = DataTypeSingleton
class TimestampType(AtomicType):
"""
Timestamp data type. SQL TIMESTAMP
"""
__metaclass__ = DataTypeSingleton
class DataTypes(object):
"""
Utils for types
"""
STRING = StringType()
BOOLEAN = BooleanType()
BYTE = ByteType()
CHAR = CharType()
SHORT = ShortType()
INT = IntegerType()
LONG = LongType()
FLOAT = FloatType()
DOUBLE = DoubleType()
DATE = DateType()
TIME = TimeType()
TIMESTAMP = TimestampType()
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
from threading import RLock
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataTypes
if sys.version > '3':
xrange = range
_data_types_mapping = None
_lock = RLock()
def to_java_type(py_type):
global _data_types_mapping
global _lock
if _data_types_mapping is None:
with _lock:
gateway = get_gateway()
TYPES = gateway.jvm.org.apache.flink.api.common.typeinfo.Types
_data_types_mapping = {
DataTypes.STRING: TYPES.STRING,
DataTypes.BOOLEAN: TYPES.BOOLEAN,
DataTypes.BYTE: TYPES.BYTE,
DataTypes.CHAR: TYPES.CHAR,
DataTypes.SHORT: TYPES.SHORT,
DataTypes.INT: TYPES.INT,
DataTypes.LONG: TYPES.LONG,
DataTypes.FLOAT: TYPES.FLOAT,
DataTypes.DOUBLE: TYPES.DOUBLE,
DataTypes.DATE: TYPES.SQL_DATE,
DataTypes.TIME: TYPES.SQL_TIME,
DataTypes.TIMESTAMP: TYPES.SQL_TIMESTAMP
}
return _data_types_mapping[py_type]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.java_gateway import get_gateway
def to_jarray(j_type, arr):
"""
Convert python list to java type array
:param j_type: java type of element in array
:param arr: python type list
"""
gateway = get_gateway()
j_arr = gateway.new_array(j_type, len(arr))
for i in range(0, len(arr)):
j_arr[i] = arr[i]
return j_arr
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
__version__ = "0.1.0"
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from __future__ import print_function
import io
import os
import sys
from setuptools import setup
if sys.version_info < (2, 7):
print("Python versions prior to 2.7 are not supported for PyFlink.",
file=sys.stderr)
sys.exit(-1)
this_directory = os.path.abspath(os.path.dirname(__file__))
version_file = os.path.join(this_directory, 'pyflink/version.py')
try:
exec(open(version_file).read())
except IOError:
print("Failed to load PyFlink version file for packaging. " +
"'%s' not found!" % version_file,
file=sys.stderr)
sys.exit(-1)
VERSION = __version__ # noqa
with io.open(os.path.join(this_directory, 'README.md'), 'r', encoding='utf-8') as f:
long_description = f.read()
setup(
name='pyflink',
version=VERSION,
packages=['pyflink',
'pyflink.table',
'pyflink.util'],
url='http://flink.apache.org',
license='http://www.apache.org/licenses/LICENSE-2.0',
author='Flink Developers',
author_email='dev@flink.apache.org',
install_requires=['py4j==0.10.8.1'],
description='Apache Flink Python API',
long_description=long_description,
long_description_content_type='text/markdown',
classifiers=[
'Development Status :: 1 - Planning',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.7']
)
......@@ -85,6 +85,7 @@ under the License.
<module>flink-yarn-tests</module>
<module>flink-fs-tests</module>
<module>flink-docs</module>
<module>flink-python</module>
</modules>
<properties>
......@@ -1411,6 +1412,9 @@ under the License.
<exclude>flink-jepsen/store/**</exclude>
<exclude>flink-jepsen/docker/id_rsa*</exclude>
<exclude>flink-jepsen/docker/nodes</exclude>
<!-- Py4j -->
<exclude>flink-python/lib/py4j-LICENSE.txt</exclude>
</excludes>
</configuration>
</plugin>
......
......@@ -84,8 +84,9 @@ block_infected=0
# a) are not deployed during a release
# b) exist only for dev purposes
# c) no-one should depend on them
# exclude flink-python because there are 2 flink-python module currently, current logic goes wrong on this situation
e2e_modules=$(find flink-end-to-end-tests -mindepth 2 -maxdepth 5 -name 'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',')
excluded_modules=\!${e2e_modules//,/,\!},!flink-docs
excluded_modules=\!${e2e_modules//,/,\!},!flink-docs,!flink-python,!flink-libraries/flink-python
echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'."
echo "If you haven't built the project, please do so first by running \"mvn clean install -DskipTests\""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册