diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md index 0f55124cc9a14095ef6eb815ae49616055383673..9158dfbde68b46dc8f05647b0f1e2e26d6e060a2 100644 --- a/docs/apis/batch/python.md +++ b/docs/apis/batch/python.md @@ -309,6 +309,16 @@ result = data1.cross(data2)

Produces the union of two data sets.

{% highlight python %} data.union(data2) +{% endhighlight %} + + + + ZipWithIndex + +

Assigns consecutive indexes to each element. For more information, please refer to + the [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).

+{% highlight python %} +data.zip_with_index() {% endhighlight %} diff --git a/docs/apis/batch/zip_elements_guide.md b/docs/apis/batch/zip_elements_guide.md index 59f723ab90c555ba1483ca93e7a223a1011d5f78..e3e93b53eb29d3b155d33f2544ab3f14a5107516 100644 --- a/docs/apis/batch/zip_elements_guide.md +++ b/docs/apis/batch/zip_elements_guide.md @@ -34,7 +34,7 @@ This document shows how {% gh_link /flink-java/src/main/java/org/apache/flink/ap ### Zip with a Dense Index `zipWithIndex` assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of `(unique id, initial value)` 2-tuples. This process requires two passes, first counting then labeling elements, and cannot be pipelined due to the synchronization of counts. -The alternative `zipWIthUniqueId` works in a pipelined fashion and is preferred when a unique labeling is sufficient. +The alternative `zipWithUniqueId` works in a pipelined fashion and is preferred when a unique labeling is sufficient. For example, the following code:
@@ -66,6 +66,21 @@ env.execute() {% endhighlight %}
+
+{% highlight python %} +from flink.plan.Environment import get_environment + +env = get_environment() +env.set_parallelism(2) +input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H") + +result = input.zipWithIndex() + +result.write_text(result_path) +env.execute() +{% endhighlight %} +
+ may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F) @@ -74,7 +89,7 @@ may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F) ### Zip with a Unique Identifier In many cases one may not need to assign consecutive labels. -`zipWIthUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of `(unique id, initial value)` 2-tuples. +`zipWithUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of `(unique id, initial value)` 2-tuples. For example, the following code:
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index e67099e99c51bc92d87688e340bc478e8b38c9aa..10aded8f0fff55f566ed8c9a2f09c978767bf634 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -130,6 +130,7 @@ public class PythonStreamer implements Serializable { processOutput.write("operator\n".getBytes()); processOutput.write(("" + server.getLocalPort() + "\n").getBytes()); processOutput.write((id + "\n").getBytes()); + processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes()); processOutput.write((inputFilePath + "\n").getBytes()); processOutput.write((outputFilePath + "\n").getBytes()); processOutput.flush(); diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py index 4cb337ab4a2e288f1a442b769eef7be0ce1aa4ff..83f563bf523d98078c98649f591dc03ef690e3ce 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py @@ -25,13 +25,13 @@ class CoGroupFunction(Function.Function): self._keys1 = None self._keys2 = None - def _configure(self, input_file, output_file, port, env, info): + def _configure(self, input_file, output_file, port, env, info, subtask_index): self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port) self._iterator = Iterator.Iterator(self._connection, env, 0) self._iterator2 = Iterator.Iterator(self._connection, env, 1) self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2) self._collector = Collector.Collector(self._connection, env, info) - self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector) + self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector, subtask_index) if info.chained_info is not None: info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info) self._collector = info.chained_info.operator @@ -53,4 +53,4 @@ class CoGroupFunction(Function.Function): collector._close() def co_group(self, iterator1, iterator2, collector): - pass \ No newline at end of file + pass diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py index dfe6a283acf4c1d21a428a8d1627b9598e9c8b35..45a0f2ec7f9a4863e6d5a8bd517d8de34adf6173 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py @@ -32,11 +32,11 @@ class Function(object): self.context = None self._env = None - def _configure(self, input_file, output_file, port, env, info): + def _configure(self, input_file, output_file, port, env, info, subtask_index): self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port) self._iterator = Iterator.Iterator(self._connection, env) self._collector = Collector.Collector(self._connection, env, info) - self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector) + self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector, subtask_index) self._env = env if info.chained_info is not None: info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info) diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py index 8d1934cda97ae6d6f51ce4b4f0aed250e32a4766..77b53a22ae4f171c873a553bcf2d1d38bca675a6 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py @@ -25,8 +25,8 @@ class GroupReduceFunction(Function.Function): def __init__(self): super(GroupReduceFunction, self).__init__() - def _configure(self, input_file, output_file, port, env, info): - super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info) + def _configure(self, input_file, output_file, port, env, info, subtask_index): + super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index) if len(info.key1) == 0: self._run = self._run_all_group_reduce else: diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py index b1d22019270f66b39735cfdef859e2ca4181004f..08af276bfab73915da6eaaade96614e02e324ca2 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py @@ -24,8 +24,8 @@ class ReduceFunction(Function.Function): def __init__(self): super(ReduceFunction, self).__init__() - def _configure(self, input_file, output_file, port, env, info): - super(ReduceFunction, self)._configure(input_file, output_file, port, env, info) + def _configure(self, input_file, output_file, port, env, info, subtask_index): + super(ReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index) if len(info.key1) == 0: self._run = self._run_all_reduce else: diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py index 2977eb561268fbc6fe5246cb00c9117a25a8493c..04608d4a0b0a0c34f87c7ba7ad4977c30927bdb7 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/RuntimeContext.py @@ -18,13 +18,17 @@ class RuntimeContext(object): - def __init__(self, iterator, collector): + def __init__(self, iterator, collector, subtask_index): self.iterator = iterator self.collector = collector self.broadcast_variables = dict() + self.subtask_id = subtask_index def _add_broadcast_variable(self, name, var): self.broadcast_variables[name] = var def get_broadcast_variable(self, name): return self.broadcast_variables[name] + + def get_index_of_this_subtask(self): + return self.subtask_id diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py index 698c236c1a3c96e4e5b7b570b4cfaf8136a8ee3e..fa832599fb5ed17c93ddf6d4b206e5d964c57b81 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import copy +import collections import types as TYPES from flink.plan.Constants import _Identifier, WriteMode, _createKeyValueTypeInfo, _createArrayTypeInfo @@ -572,6 +572,48 @@ class DataSet(object): self._info.parallelism.value = parallelism return self + def count_elements_per_partition(self): + """ + Method that goes over all the elements in each partition in order to retrieve the total number of elements. + :return: A DataSet containing Tuples of subtask index, number of elements mappings. + """ + class CountElementsPerPartitionMapper(MapPartitionFunction): + def map_partition(self, iterator, collector): + counter = 0 + for x in iterator: + counter += 1 + + collector.collect((self.context.get_index_of_this_subtask(), counter)) + return self.map_partition(CountElementsPerPartitionMapper()) + + def zip_with_index(self): + """ + Method that assigns a unique Long value to all elements of the DataSet. The generated values are consecutive. + :return: A DataSet of Tuples consisting of consecutive ids and initial values. + """ + element_count = self.count_elements_per_partition() + class ZipWithIndexMapper(MapPartitionFunction): + start = -1 + + def _run(self): + offsets = self.context.get_broadcast_variable("counts") + offsets = sorted(offsets, key=lambda t: t[0]) # sort by task ID + offsets = collections.deque(offsets) + + # compute the offset for each partition + for i in range(self.context.get_index_of_this_subtask()): + self.start += offsets[i][1] + + super(ZipWithIndexMapper, self)._run() + + def map_partition(self, iterator, collector): + for value in iterator: + self.start += 1 + collector.collect((self.start, value)) + return self\ + .map_partition(ZipWithIndexMapper())\ + .with_broadcast_set("counts", element_count) + class OperatorSet(DataSet): def __init__(self, env, info): diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 3dbce45384f3b0584b3bb344baac693fb306ccd3..9d08baf5671b121c5176c32429b925eec9137d26 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -184,6 +184,7 @@ class Environment(object): port = int(sys.stdin.readline().rstrip('\n')) id = int(sys.stdin.readline().rstrip('\n')) + subtask_index = int(sys.stdin.readline().rstrip('\n')) input_path = sys.stdin.readline().rstrip('\n') output_path = sys.stdin.readline().rstrip('\n') @@ -193,7 +194,7 @@ class Environment(object): if set.id == id: used_set = set operator = set.operator - operator._configure(input_path, output_path, port, self, used_set) + operator._configure(input_path, output_path, port, self, used_set, subtask_index) operator._go() operator._close() sys.stdout.flush() diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index c0a44149f8cba64c6c4f180ca2e10e0755d7f2e9..223ff681dc1554a3ddfbb6317f2f2802d026cab8 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -42,9 +42,16 @@ if __name__ == "__main__": d6 = env.from_elements(1, 1, 12) + d7 = env.generate_sequence(0, 999) + #Generate Sequence Source - d7 = env.generate_sequence(1, 5)\ - .map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output() + d7.map(Id()).map_partition(Verify(range(1000), "Sequence")).output() + + #Zip with Index + #check that IDs (first field of each element) are consecutive + d7.zip_with_index()\ + .map(lambda x: x[0])\ + .map_partition(Verify(range(1000), "ZipWithIndex")).output() #CSV Source/Sink csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))