未验证 提交 8ab2f932 编写于 作者: C Chesnay Schepler 提交者: GitHub

[FLINK-12903][py] Remove old Python APIs

上级 9ef4b74b
......@@ -5789,17 +5789,6 @@ Copyright 2018 The Apache Software Foundation
flink-cep
Copyright 2014-2019 The Apache Software Foundation
flink-streaming-python
Copyright 2014-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Python Software Foundation License. (https://opensource.org/licenses/PythonSoftFoundation.php)
See bundled license files for details.
- org.python:jython-standalone:2.7.1
flink-metrics-graphite
Copyright 2014-2019 The Apache Software Foundation
......@@ -5901,7 +5890,3 @@ Copyright 2006-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
flink-python
Copyright 2014-2019 The Apache Software Foundation
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -91,13 +91,6 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-python_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
......@@ -271,13 +264,6 @@ under the License.
<!-- start optional Flink libraries -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
......@@ -371,7 +357,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python</artifactId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<classifier>java-binding</classifier>
<version>${project.version}</version>
<scope>provided</scope>
......
......@@ -234,20 +234,6 @@ under the License.
<include>flink-gelly-examples_${scala.binary.version}-${project.version}.jar</include>
</includes>
</fileSet>
<!-- copy python example to examples of dist -->
<fileSet>
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory>
<outputDirectory>examples/python/batch</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<!-- copy python streaming example to examples of dist -->
<fileSet>
<directory>../flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples</directory>
<outputDirectory>examples/python/streaming</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<!-- copy python table example to examples of dist -->
<fileSet>
......
......@@ -169,25 +169,9 @@
<fileMode>0644</fileMode>
</file>
<!-- Batch Python API -->
<file>
<source>../flink-libraries/flink-python/target/flink-python_${scala.binary.version}-${project.version}.jar</source>
<outputDirectory>opt</outputDirectory>
<destName>flink-python_${scala.binary.version}-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>
<!-- Streaming Python API -->
<file>
<source>../flink-libraries/flink-streaming-python/target/flink-streaming-python_${scala.binary.version}-${project.version}.jar</source>
<outputDirectory>opt</outputDirectory>
<destName>flink-streaming-python_${scala.binary.version}-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>
<!-- Python -->
<file>
<source>../flink-python/target/flink-python-${project.version}-java-binding.jar</source>
<source>../flink-python/target/flink-python_${scala.binary.version}-${project.version}-java-binding.jar</source>
<outputDirectory>opt</outputDirectory>
<destName>flink-python-${project.version}-java-binding.jar</destName>
<fileMode>0644</fileMode>
......
#!/bin/bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
"$FLINK_BIN_DIR"/flink run --class org.apache.flink.streaming.python.api.PythonStreamBinder -v "$FLINK_HOME"/opt/flink-streaming-python*.jar "$@"
::###############################################################################
:: Licensed to the Apache Software Foundation (ASF) under one
:: or more contributor license agreements. See the NOTICE file
:: distributed with this work for additional information
:: regarding copyright ownership. The ASF licenses this file
:: to you under the Apache License, Version 2.0 (the
:: "License"); you may not use this file except in compliance
:: with the License. You may obtain a copy of the License at
::
:: http://www.apache.org/licenses/LICENSE-2.0
::
:: Unless required by applicable law or agreed to in writing, software
:: distributed under the License is distributed on an "AS IS" BASIS,
:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
:: See the License for the specific language governing permissions and
:: limitations under the License.
::###############################################################################
@echo off
setlocal EnableDelayedExpansion
SET bin=%~dp0
SET FLINK_HOME=%bin%..
"%FLINK_HOME%\bin\flink" run -v "%FLINK_HOME%"\opt\flink-python_*.jar %*
#!/bin/bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
"$FLINK_BIN_DIR"/flink run -v "$FLINK_HOME"/opt/flink-python_*.jar "$@"
......@@ -50,8 +50,6 @@ echo "Flink distribution directory: $FLINK_DIR"
run_test "State Migration end-to-end test from 1.6" "$END_TO_END_DIR/test-scripts/test_state_migration.sh"
run_test "State Evolution end-to-end test" "$END_TO_END_DIR/test-scripts/test_state_evolution.sh"
run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh"
run_test "Streaming Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh"
run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh file"
run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop"
run_test "Shaded Presto S3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh presto"
......
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source "$(dirname "$0")"/common.sh
start_cluster
$FLINK_DIR/bin/pyflink.sh $FLINK_DIR/examples/python/batch/WordCount.py - $TEST_INFRA_DIR/test-data/words $TEST_DATA_DIR/out/py_wc_out
check_result_hash "BatchPythonWordCount" $TEST_DATA_DIR/out/py_wc_out "dd9d7a7bbc8b52747c7d4e15c9d2b069"
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source "$(dirname "$0")"/common.sh
start_cluster
$FLINK_DIR/bin/pyflink-stream.sh $FLINK_DIR/examples/python/streaming/word_count.py - --input $TEST_INFRA_DIR/test-data/words --output $TEST_DATA_DIR/out/py_wc_out
check_result_hash "StreamingPythonWordCount" $TEST_DATA_DIR/out/py_wc_out "f6d079fb475e8b745caa6c45b1ac1dff"
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-libraries</artifactId>
<version>1.9-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<name>flink-python</name>
<packaging>jar</packaging>
<build>
<resources>
<resource>
<!-- include the zip generated by the assembly-plugin in the jar as a resource -->
<directory>target</directory>
<includes>
<include>python-source.zip</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<!-- generate zip containing the flink python library -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/python.xml</descriptor>
</descriptors>
<finalName>python-source</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>generate-config-docs</id>
<activation>
<property>
<name>generate-config-docs</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
<configuration>
<target>
<mkdir dir="${rootDir}/${generated.docs.dir}"/>
<java classname="org.apache.flink.docs.configuration.ConfigOptionsDocGenerator" fork="true">
<classpath refid="maven.compile.classpath" />
<arg value="${rootDir}/${generated.docs.dir}/" />
<!--package with configuration classes-->
<arg value="org.apache.flink.python.api" />
</java>
</target>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>python</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/python/org/apache/flink/python/api/flink</directory>
<outputDirectory>flink</outputDirectory>
</fileSet>
</fileSets>
</assembly>
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
/**
* Generic container for all information required to an operation to the DataSet API.
*/
public class PythonOperationInfo {
public final String identifier;
public final int parentID; //DataSet that an operation is applied on
public final int otherID; //secondary DataSet
public final int setID; //ID for new DataSet
public final List<String> keys;
public final List<String> keys1; //join/cogroup keys
public final List<String> keys2; //join/cogroup keys
public final TypeInformation<?> types; //typeinformation about output type
public final List<Object> values;
public final int count;
public final String field;
public final Order order;
public final String path;
public final String fieldDelimiter;
public final String lineDelimiter;
public final long frm;
public final long to;
public final WriteMode writeMode;
public final boolean toError;
public final String name;
public final boolean usesUDF;
public final int parallelism;
public final int envID;
public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throws IOException {
identifier = (String) streamer.getRecord();
parentID = (Integer) streamer.getRecord(true);
otherID = (Integer) streamer.getRecord(true);
field = "f0.f" + (Integer) streamer.getRecord(true);
int encodedOrder = (Integer) streamer.getRecord(true);
switch (encodedOrder) {
case 0:
order = Order.NONE;
break;
case 1:
order = Order.ASCENDING;
break;
case 2:
order = Order.DESCENDING;
break;
case 3:
order = Order.ANY;
break;
default:
order = Order.NONE;
break;
}
keys = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
keys1 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
keys2 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
Object tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
usesUDF = (Boolean) streamer.getRecord();
name = (String) streamer.getRecord();
lineDelimiter = (String) streamer.getRecord();
fieldDelimiter = (String) streamer.getRecord();
writeMode = ((Integer) streamer.getRecord(true)) == 1
? WriteMode.OVERWRITE
: WriteMode.NO_OVERWRITE;
path = (String) streamer.getRecord();
frm = (Long) streamer.getRecord();
to = (Long) streamer.getRecord();
setID = (Integer) streamer.getRecord(true);
toError = (Boolean) streamer.getRecord();
count = (Integer) streamer.getRecord(true);
int valueCount = (Integer) streamer.getRecord(true);
List<Object> valueList = new ArrayList<>(valueCount);
for (int x = 0; x < valueCount; x++) {
valueList.add(streamer.getRecord());
}
values = valueList;
parallelism = (Integer) streamer.getRecord(true);
envID = environmentID;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SetID: ").append(setID).append("\n");
sb.append("ParentID: ").append(parentID).append("\n");
sb.append("OtherID: ").append(otherID).append("\n");
sb.append("Name: ").append(name).append("\n");
sb.append("Types: ").append(types).append("\n");
sb.append("Keys1: ").append(keys1).append("\n");
sb.append("Keys2: ").append(keys2).append("\n");
sb.append("Keys: ").append(keys).append("\n");
sb.append("Count: ").append(count).append("\n");
sb.append("Field: ").append(field).append("\n");
sb.append("Order: ").append(order.toString()).append("\n");
sb.append("Path: ").append(path).append("\n");
sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n");
sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
sb.append("From: ").append(frm).append("\n");
sb.append("To: ").append(to).append("\n");
sb.append("WriteMode: ").append(writeMode).append("\n");
sb.append("toError: ").append(toError).append("\n");
return sb.toString();
}
enum DatasizeHint {
NONE,
TINY,
HUGE
}
//====Utility=======================================================================================================
private static String[] normalizeKeys(Object keys) {
if (keys instanceof Tuple) {
Tuple tupleKeys = (Tuple) keys;
if (tupleKeys.getArity() == 0) {
return new String[0];
}
if (tupleKeys.getField(0) instanceof Integer) {
String[] stringKeys = new String[tupleKeys.getArity()];
for (int x = 0; x < stringKeys.length; x++) {
stringKeys[x] = "f0.f" + (Integer) tupleKeys.getField(x);
}
return stringKeys;
}
if (tupleKeys.getField(0) instanceof String) {
return tupleToStringArray(tupleKeys);
}
throw new RuntimeException("Key argument contains field that is neither an int nor a String: " + tupleKeys);
}
if (keys instanceof int[]) {
int[] intKeys = (int[]) keys;
String[] stringKeys = new String[intKeys.length];
for (int x = 0; x < stringKeys.length; x++) {
stringKeys[x] = "f0.f" + intKeys[x];
}
return stringKeys;
}
throw new RuntimeException("Key argument is neither an int[] nor a Tuple: " + keys.toString());
}
private static String[] tupleToStringArray(Tuple tuple) {
String[] keys = new String[tuple.getArity()];
for (int y = 0; y < tuple.getArity(); y++) {
keys[y] = (String) tuple.getField(y);
}
return keys;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api;
import org.apache.flink.configuration.ConfigOption;
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* Configuration options for the Python API.
*/
public class PythonOptions {
/**
* The config parameter defining the path to the python binary to use.
*/
public static final ConfigOption<String> PYTHON_BINARY_PATH =
key("python.binary.path")
.defaultValue("python")
.withDeprecatedKeys("python.binary.python2", "python.binary.python3");
/**
* The config parameter defining the size of the memory-mapped files, in kb.
* This value must be large enough to ensure that the largest serialized record can be written completely into
* the file.
*
* <p>Every task will allocate 2 memory-files, each with this size.
*/
public static final ConfigOption<Long> MMAP_FILE_SIZE =
key("python.mmap.size.kb")
.defaultValue(4L);
/**
* The config parameter defining where temporary plan-related files are stored on the client.
*/
public static final ConfigOption<String> PLAN_TMP_DIR =
key("python.plan.tmp.dir")
.noDefaultValue();
/**
* The config parameter defining where the memory-mapped files will be created.
*/
public static final ConfigOption<String> DATA_TMP_DIR =
key("python.mmap.tmp.dir")
.noDefaultValue();
private PythonOptions() {
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer;
import org.apache.flink.util.Collector;
import java.io.IOException;
/**
* CoGroupFunction that uses a python script.
*
* @param <IN1>
* @param <IN2>
* @param <OUT>
*/
public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable<OUT> {
private static final long serialVersionUID = -3997396583317513873L;
private final PythonDualInputStreamer<IN1, IN2, OUT> streamer;
private final transient TypeInformation<OUT> typeInformation;
public PythonCoGroup(Configuration config, int envID, int setID, TypeInformation<OUT> typeInformation) {
this.typeInformation = typeInformation;
streamer = new PythonDualInputStreamer<>(this, config, envID, setID, typeInformation instanceof PrimitiveArrayTypeInfo);
}
/**
* Opens this function.
*
* @param config configuration
* @throws IOException
*/
@Override
public void open(Configuration config) throws IOException {
streamer.open();
streamer.sendBroadCastVariables(config);
}
/**
* Calls the external python function.
*
* @param first
* @param second
* @param out collector
* @throws IOException
*/
@Override
public final void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception {
streamer.streamBufferWithGroups(first.iterator(), second.iterator(), out);
}
/**
* Closes this function.
*
* @throws IOException
*/
@Override
public void close() throws IOException {
streamer.close();
}
@Override
public TypeInformation<OUT> getProducedType() {
return typeInformation;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonSingleInputStreamer;
import org.apache.flink.util.Collector;
import java.io.IOException;
/**
* Multi-purpose class, usable by all operations using a python script with one input source and possibly differing
* in-/output types.
*
* @param <IN>
* @param <OUT>
*/
public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable<OUT>{
private static final long serialVersionUID = 3866306483023916413L;
private final PythonSingleInputStreamer<IN, OUT> streamer;
private final transient TypeInformation<OUT> typeInformation;
public PythonMapPartition(Configuration config, int envId, int setId, TypeInformation<OUT> typeInformation) {
this.typeInformation = typeInformation;
streamer = new PythonSingleInputStreamer<>(this, config, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo);
}
/**
* Opens this function.
*
* @param config configuration
* @throws IOException
*/
@Override
public void open(Configuration config) throws IOException {
streamer.open();
streamer.sendBroadCastVariables(config);
}
@Override
public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception {
streamer.streamBufferWithoutGroups(values.iterator(), out);
}
/**
* Closes this function.
*
* @throws IOException
*/
@Override
public void close() throws IOException {
streamer.close();
}
@Override
public TypeInformation<OUT> getProducedType() {
return typeInformation;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.util.Collector;
/**
* Utility function to group and sort data.
* @param <IN> input type
*/
@ForwardedFields("*->*")
public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> {
@Override
public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception {
for (IN value : values) {
out.collect(value);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* Utility function to extract the value from a Key-Value Tuple.
*/
@ForwardedFields("f1->*")
public class KeyDiscarder <T> implements MapFunction<Tuple2<T, byte[]>, byte[]> {
@Override
public byte[] map(Tuple2<T, byte[]> value) throws Exception {
return value.f1;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
* @param <IN> input type
*/
@ForwardedFields("f0.f1->f0; f1.f1->f1")
public class NestedKeyDiscarder<IN> implements MapFunction<IN, Tuple2<byte[], byte[]>> {
@Override
@SuppressWarnings("unchecked")
public Tuple2<byte[], byte[]> map(IN value) throws Exception {
Tuple2<Tuple2<Tuple, byte[]>, Tuple2<Tuple, byte[]>> x = (Tuple2<Tuple2<Tuple, byte[]>, Tuple2<Tuple, byte[]>>) value;
return new Tuple2<>(x.f0.f1, x.f1.f1);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
/**
* Utility function to serialize values, usually directly from data sources.
*/
public class SerializerMap<IN> implements MapFunction<IN, byte[]> {
private transient Serializer<IN> serializer;
@Override
@SuppressWarnings("unchecked")
public byte[] map(IN value) throws Exception {
if (serializer == null) {
serializer = SerializationUtils.getSerializer(value);
}
return serializer.serialize(value);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.ConfigConstants;
/**
* Utility function to deserialize strings, used for non-CSV sinks.
*/
public class StringDeserializerMap implements MapFunction<byte[], String> {
@Override
public String map(byte[] value) throws Exception {
//discard type byte and size
return new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.functions.util;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.ConfigConstants;
/**
* Utility function to deserialize strings, used for CSV sinks.
*/
public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<String>> {
@Override
public Tuple1<String> map(byte[] value) throws Exception {
//5 = string type byte + string size
return new Tuple1<>(new String(value, 5, value.length - 5, ConfigConstants.DEFAULT_CHARSET));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.configuration.Configuration;
import java.io.IOException;
/**
* This class is a {@link PythonSender} for operations with two input streams.
*
* @param <IN1> first input type
* @param <IN2> second input type
*/
public class PythonDualInputSender<IN1, IN2> extends PythonSender {
private static final long serialVersionUID = 614115041181108878L;
private transient Serializer<IN1> serializer1;
private transient Serializer<IN2> serializer2;
protected PythonDualInputSender(Configuration config) {
super(config);
}
/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param input iterator containing records
* @return size of the written buffer
* @throws IOException
*/
public int sendBuffer1(SingleElementPushBackIterator<IN1> input) throws IOException {
if (serializer1 == null) {
IN1 value = input.next();
serializer1 = getSerializer(value);
input.pushBack(value);
}
return sendBuffer(input, serializer1);
}
/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param input iterator containing records
* @return size of the written buffer
* @throws IOException
*/
public int sendBuffer2(SingleElementPushBackIterator<IN2> input) throws IOException {
if (serializer2 == null) {
IN2 value = input.next();
serializer2 = getSerializer(value);
input.pushBack(value);
}
return sendBuffer(input, serializer2);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.net.SocketTimeoutException;
import java.util.Iterator;
/**
* This class is a {@link PythonStreamer} for operations with two input stream.
*
* @param <IN1> first input type
* @param <IN2> second input type
* @param <OUT> output type
*/
public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<PythonDualInputSender<IN1, IN2>, OUT> {
private static final long serialVersionUID = -607175070491761873L;
public PythonDualInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) {
super(function, config, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>(config));
}
/**
* Sends all values contained in both iterators to the external process and collects all results.
*
* @param iterator1 first input stream
* @param iterator2 second input stream
* @param c collector
*/
public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) {
SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<>(iterator1);
SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<>(iterator2);
try {
int size;
if (i1.hasNext() || i2.hasNext()) {
while (true) {
int sig = in.readInt();
switch (sig) {
case SIGNAL_BUFFER_REQUEST_G0:
if (i1.hasNext()) {
size = sender.sendBuffer1(i1);
sendWriteNotification(size, i1.hasNext());
}
break;
case SIGNAL_BUFFER_REQUEST_G1:
if (i2.hasNext()) {
size = sender.sendBuffer2(i2);
sendWriteNotification(size, i2.hasNext());
}
break;
case SIGNAL_FINISHED:
return;
case SIGNAL_ERROR:
try {
outPrinter.join();
} catch (InterruptedException e) {
outPrinter.interrupt();
}
try {
errorPrinter.join();
} catch (InterruptedException e) {
errorPrinter.interrupt();
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
default:
receiver.collectBuffer(c, sig);
sendReadConfirmation();
break;
}
}
}
} catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
} catch (Exception e) {
throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.util.Collector;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
* This class is used to read data from memory-mapped files.
*/
public class PythonReceiver<OUT> implements Serializable {
private static final long serialVersionUID = -2474088929850009968L;
private transient RandomAccessFile inputRAF;
private transient FileChannel inputChannel;
private transient MappedByteBuffer fileBuffer;
private final long mappedFileSizeBytes;
private final boolean readAsByteArray;
private transient Deserializer<OUT> deserializer;
public PythonReceiver(Configuration config, boolean usesByteArray) {
readAsByteArray = usesByteArray;
mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
}
//=====Setup========================================================================================================
@SuppressWarnings("unchecked")
public void open(File inputFile) throws IOException {
deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer());
inputFile.getParentFile().mkdirs();
if (inputFile.exists()) {
inputFile.delete();
}
inputFile.createNewFile();
inputRAF = new RandomAccessFile(inputFile, "rw");
inputRAF.setLength(mappedFileSizeBytes);
inputRAF.seek(mappedFileSizeBytes - 1);
inputRAF.writeByte(0);
inputRAF.seek(0);
inputChannel = inputRAF.getChannel();
fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, mappedFileSizeBytes);
}
public void close() throws IOException {
closeMappedFile();
}
private void closeMappedFile() throws IOException {
inputChannel.close();
inputRAF.close();
}
//=====IO===========================================================================================================
/**
* Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method
* assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization.
* The user must guarantee that the buffer was completely written before calling this method.
*
* @param c Collector to collect records
* @param bufferSize size of the buffer
* @throws IOException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void collectBuffer(Collector<OUT> c, int bufferSize) throws IOException {
fileBuffer.position(0);
while (fileBuffer.position() < bufferSize) {
c.collect(deserializer.deserialize());
}
}
//=====Deserializer=================================================================================================
private interface Deserializer<T> {
T deserialize();
}
private class ByteArrayDeserializer implements Deserializer<byte[]> {
@Override
public byte[] deserialize() {
int size = fileBuffer.getInt();
byte[] value = new byte[size];
fileBuffer.get(value);
return value;
}
}
private class TupleDeserializer implements Deserializer<Tuple2<Tuple, byte[]>> {
@Override
public Tuple2<Tuple, byte[]> deserialize() {
int keyTupleSize = fileBuffer.get();
Tuple keys = createTuple(keyTupleSize);
for (int x = 0; x < keyTupleSize; x++) {
byte[] data = new byte[fileBuffer.getInt()];
fileBuffer.get(data);
keys.setField(data, x);
}
byte[] value = new byte[fileBuffer.getInt()];
fileBuffer.get(value);
return new Tuple2<>(keys, value);
}
}
public static Tuple createTuple(int size) {
try {
return Tuple.getTupleClass(size).newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
* General-purpose class to write data to memory-mapped files.
*/
public abstract class PythonSender implements Serializable {
private static final long serialVersionUID = -2004095650353962110L;
public static final byte TYPE_ARRAY = 63;
public static final byte TYPE_KEY_VALUE = 62;
public static final byte TYPE_VALUE_VALUE = 61;
private transient RandomAccessFile outputRAF;
private transient FileChannel outputChannel;
private transient MappedByteBuffer fileBuffer;
private final long mappedFileSizeBytes;
private final Configuration config;
protected PythonSender(Configuration config) {
this.config = config;
mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
}
//=====Setup========================================================================================================
public void open(File outputFile) throws IOException {
outputFile.mkdirs();
if (outputFile.exists()) {
outputFile.delete();
}
outputFile.createNewFile();
outputRAF = new RandomAccessFile(outputFile, "rw");
outputRAF.setLength(mappedFileSizeBytes);
outputRAF.seek(mappedFileSizeBytes - 1);
outputRAF.writeByte(0);
outputRAF.seek(0);
outputChannel = outputRAF.getChannel();
fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, mappedFileSizeBytes);
}
public void close() throws IOException {
closeMappedFile();
}
private void closeMappedFile() throws IOException {
outputChannel.close();
outputRAF.close();
}
//=====IO===========================================================================================================
/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param input iterator containing records
* @param serializer serializer for the input records
* @return size of the written buffer
* @throws IOException
*/
protected <IN> int sendBuffer(SingleElementPushBackIterator<IN> input, Serializer<IN> serializer) throws IOException {
fileBuffer.clear();
while (input.hasNext()) {
IN value = input.next();
ByteBuffer bb = serializer.serialize(value);
if (bb.remaining() > mappedFileSizeBytes) {
throw new RuntimeException("Serialized object does not fit into a single buffer.");
}
if (bb.remaining() <= fileBuffer.remaining()) {
fileBuffer.put(bb);
} else {
input.pushBack(value);
break;
}
}
int size = fileBuffer.position();
return size;
}
//=====Serializer===================================================================================================
@SuppressWarnings("unchecked")
protected <IN> Serializer<IN> getSerializer(IN value) {
if (value instanceof byte[]) {
return (Serializer<IN>) new ArraySerializer();
}
if (((Tuple2<?, ?>) value).f0 instanceof byte[]) {
return (Serializer<IN>) new ValuePairSerializer();
}
if (((Tuple2<?, ?>) value).f0 instanceof Tuple) {
return (Serializer<IN>) new KeyValuePairSerializer();
}
throw new IllegalArgumentException("This object can't be serialized: " + value);
}
/**
* Interface for all serializers used by {@link PythonSender} classes to write container objects.
*
* <p>These serializers must be kept in sync with the python counterparts.
*
* @param <T> input type
*/
protected abstract static class Serializer<T> {
protected ByteBuffer buffer;
/**
* Serializes the given value into a {@link ByteBuffer}.
*
* @param value value to serialize
* @return ByteBuffer containing serialized record
*/
public ByteBuffer serialize(T value) {
serializeInternal(value);
buffer.flip();
return buffer;
}
protected abstract void serializeInternal(T value);
}
private static class ArraySerializer extends Serializer<byte[]> {
@Override
public void serializeInternal(byte[] value) {
buffer = ByteBuffer.allocate(value.length + 1);
buffer.put(TYPE_ARRAY);
buffer.put(value);
}
}
private static class ValuePairSerializer extends Serializer<Tuple2<byte[], byte[]>> {
@Override
public void serializeInternal(Tuple2<byte[], byte[]> value) {
buffer = ByteBuffer.allocate(1 + value.f0.length + value.f1.length);
buffer.put(TYPE_VALUE_VALUE);
buffer.put(value.f0);
buffer.put(value.f1);
}
}
private static class KeyValuePairSerializer extends Serializer<Tuple2<Tuple, byte[]>> {
@Override
public void serializeInternal(Tuple2<Tuple, byte[]> value) {
int keySize = 0;
for (int x = 0; x < value.f0.getArity(); x++) {
keySize += ((byte[]) value.f0.getField(x)).length;
}
buffer = ByteBuffer.allocate(5 + keySize + value.f1.length);
buffer.put(TYPE_KEY_VALUE);
buffer.put((byte) value.f0.getArity());
for (int x = 0; x < value.f0.getArity(); x++) {
buffer.put((byte[]) value.f0.getField(x));
}
buffer.put(value.f1);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.configuration.Configuration;
import java.io.IOException;
/**
* This class is a {@link PythonSender} for operations with one input stream.
*
* @param <IN> input type
*/
public class PythonSingleInputSender<IN> extends PythonSender {
private static final long serialVersionUID = 614115041181108878L;
private transient Serializer<IN> serializer;
protected PythonSingleInputSender(Configuration config) {
super(config);
}
/**
* Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
* in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
* guarantee that the file may be written to before calling this method.
*
* @param input iterator containing records
* @return size of the written buffer
* @throws IOException
*/
public int sendBuffer(SingleElementPushBackIterator<IN> input) throws IOException {
if (serializer == null) {
IN value = input.next();
serializer = getSerializer(value);
input.pushBack(value);
}
return sendBuffer(input, serializer);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.net.SocketTimeoutException;
import java.util.Iterator;
/**
* This class is a {@link PythonStreamer} for operations with one input stream.
* @param <IN> input type
* @param <OUT> output type
*/
public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSingleInputSender<IN>, OUT> {
private static final long serialVersionUID = -5149905918522069034L;
public PythonSingleInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) {
super(function, config, envID, setID, usesByteArray, new PythonSingleInputSender<IN>(config));
}
/**
* Sends all values contained in the iterator to the external process and collects all results.
*
* @param iterator input stream
* @param c collector
*/
public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT> c) {
SingleElementPushBackIterator<IN> i = new SingleElementPushBackIterator<>(iterator);
try {
int size;
if (i.hasNext()) {
while (true) {
int sig = in.readInt();
switch (sig) {
case SIGNAL_BUFFER_REQUEST:
if (i.hasNext()) {
size = sender.sendBuffer(i);
sendWriteNotification(size, i.hasNext());
} else {
throw new RuntimeException("External process requested data even though none is available.");
}
break;
case SIGNAL_FINISHED:
return;
case SIGNAL_ERROR:
try {
outPrinter.join();
} catch (InterruptedException e) {
outPrinter.interrupt();
}
try {
errorPrinter.join();
} catch (InterruptedException e) {
errorPrinter.interrupt();
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
default:
receiver.collectBuffer(c, sig);
sendReadConfirmation();
break;
}
}
}
} catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg.get());
} catch (Exception e) {
throw new RuntimeException("Critical failure for task " + function.getRuntimeContext().getTaskName() + ". " + msg.get(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.python.api.streaming.data;
import org.apache.flink.util.Preconditions;
import java.util.Iterator;
/**
* This class is a wrapper for an {@link Iterator} that supports pushing back a single record.
*
* @param <IN> input type
*/
class SingleElementPushBackIterator<IN> {
private IN pushBack;
private final Iterator<IN> iterator;
SingleElementPushBackIterator(Iterator<IN> iterator) {
this.pushBack = null;
this.iterator = iterator;
}
public boolean hasNext() {
return pushBack != null || iterator.hasNext();
}
public IN next() {
if (pushBack != null) {
IN obj = pushBack;
pushBack = null;
return obj;
} else {
return iterator.next();
}
}
public void pushBack(IN element) {
Preconditions.checkState(pushBack == null, "Already contains an element that was pushed back. This indicates a programming error.");
pushBack = element;
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册