提交 93c61c09 编写于 作者: S Shannon Quinn 提交者: zentol

[FLINK-3626] [py] Add zipWithIndex()

This closes #2136

Create a task_id message in PythonStreamer that is passed through
to the underlying process and included in the RuntimeContext, where
it is accessible to the user and to functions.
上级 e04239dd
......@@ -309,6 +309,16 @@ result = data1.cross(data2)
<p>Produces the union of two data sets.</p>
{% highlight python %}
data.union(data2)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>ZipWithIndex</strong></td>
<td>
<p>Assigns consecutive indexes to each element. For more information, please refer to
the [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).</p>
{% highlight python %}
data.zip_with_index()
{% endhighlight %}
</td>
</tr>
......
......@@ -34,7 +34,7 @@ This document shows how {% gh_link /flink-java/src/main/java/org/apache/flink/ap
### Zip with a Dense Index
`zipWithIndex` assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of `(unique id, initial value)` 2-tuples.
This process requires two passes, first counting then labeling elements, and cannot be pipelined due to the synchronization of counts.
The alternative `zipWIthUniqueId` works in a pipelined fashion and is preferred when a unique labeling is sufficient.
The alternative `zipWithUniqueId` works in a pipelined fashion and is preferred when a unique labeling is sufficient.
For example, the following code:
<div class="codetabs" markdown="1">
......@@ -66,6 +66,21 @@ env.execute()
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight python %}
from flink.plan.Environment import get_environment
env = get_environment()
env.set_parallelism(2)
input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
result = input.zipWithIndex()
result.write_text(result_path)
env.execute()
{% endhighlight %}
</div>
</div>
may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
......@@ -74,7 +89,7 @@ may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
### Zip with a Unique Identifier
In many cases one may not need to assign consecutive labels.
`zipWIthUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of `(unique id, initial value)` 2-tuples.
`zipWithUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of `(unique id, initial value)` 2-tuples.
For example, the following code:
<div class="codetabs" markdown="1">
......
......@@ -130,6 +130,7 @@ public class PythonStreamer implements Serializable {
processOutput.write("operator\n".getBytes());
processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
processOutput.write((id + "\n").getBytes());
processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes());
processOutput.write((inputFilePath + "\n").getBytes());
processOutput.write((outputFilePath + "\n").getBytes());
processOutput.flush();
......
......@@ -25,13 +25,13 @@ class CoGroupFunction(Function.Function):
self._keys1 = None
self._keys2 = None
def _configure(self, input_file, output_file, port, env, info):
def _configure(self, input_file, output_file, port, env, info, subtask_index):
self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection, env, 0)
self._iterator2 = Iterator.Iterator(self._connection, env, 1)
self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
self._collector = Collector.Collector(self._connection, env, info)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector, subtask_index)
if info.chained_info is not None:
info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info)
self._collector = info.chained_info.operator
......@@ -53,4 +53,4 @@ class CoGroupFunction(Function.Function):
collector._close()
def co_group(self, iterator1, iterator2, collector):
pass
\ No newline at end of file
pass
......@@ -32,11 +32,11 @@ class Function(object):
self.context = None
self._env = None
def _configure(self, input_file, output_file, port, env, info):
def _configure(self, input_file, output_file, port, env, info, subtask_index):
self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection, env)
self._collector = Collector.Collector(self._connection, env, info)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector, subtask_index)
self._env = env
if info.chained_info is not None:
info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info)
......
......@@ -25,8 +25,8 @@ class GroupReduceFunction(Function.Function):
def __init__(self):
super(GroupReduceFunction, self).__init__()
def _configure(self, input_file, output_file, port, env, info):
super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info)
def _configure(self, input_file, output_file, port, env, info, subtask_index):
super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index)
if len(info.key1) == 0:
self._run = self._run_all_group_reduce
else:
......
......@@ -24,8 +24,8 @@ class ReduceFunction(Function.Function):
def __init__(self):
super(ReduceFunction, self).__init__()
def _configure(self, input_file, output_file, port, env, info):
super(ReduceFunction, self)._configure(input_file, output_file, port, env, info)
def _configure(self, input_file, output_file, port, env, info, subtask_index):
super(ReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index)
if len(info.key1) == 0:
self._run = self._run_all_reduce
else:
......
......@@ -18,13 +18,17 @@
class RuntimeContext(object):
def __init__(self, iterator, collector):
def __init__(self, iterator, collector, subtask_index):
self.iterator = iterator
self.collector = collector
self.broadcast_variables = dict()
self.subtask_id = subtask_index
def _add_broadcast_variable(self, name, var):
self.broadcast_variables[name] = var
def get_broadcast_variable(self, name):
return self.broadcast_variables[name]
def get_index_of_this_subtask(self):
return self.subtask_id
......@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import copy
import collections
import types as TYPES
from flink.plan.Constants import _Identifier, WriteMode, _createKeyValueTypeInfo, _createArrayTypeInfo
......@@ -572,6 +572,48 @@ class DataSet(object):
self._info.parallelism.value = parallelism
return self
def count_elements_per_partition(self):
"""
Method that goes over all the elements in each partition in order to retrieve the total number of elements.
:return: A DataSet containing Tuples of subtask index, number of elements mappings.
"""
class CountElementsPerPartitionMapper(MapPartitionFunction):
def map_partition(self, iterator, collector):
counter = 0
for x in iterator:
counter += 1
collector.collect((self.context.get_index_of_this_subtask(), counter))
return self.map_partition(CountElementsPerPartitionMapper())
def zip_with_index(self):
"""
Method that assigns a unique Long value to all elements of the DataSet. The generated values are consecutive.
:return: A DataSet of Tuples consisting of consecutive ids and initial values.
"""
element_count = self.count_elements_per_partition()
class ZipWithIndexMapper(MapPartitionFunction):
start = -1
def _run(self):
offsets = self.context.get_broadcast_variable("counts")
offsets = sorted(offsets, key=lambda t: t[0]) # sort by task ID
offsets = collections.deque(offsets)
# compute the offset for each partition
for i in range(self.context.get_index_of_this_subtask()):
self.start += offsets[i][1]
super(ZipWithIndexMapper, self)._run()
def map_partition(self, iterator, collector):
for value in iterator:
self.start += 1
collector.collect((self.start, value))
return self\
.map_partition(ZipWithIndexMapper())\
.with_broadcast_set("counts", element_count)
class OperatorSet(DataSet):
def __init__(self, env, info):
......
......@@ -184,6 +184,7 @@ class Environment(object):
port = int(sys.stdin.readline().rstrip('\n'))
id = int(sys.stdin.readline().rstrip('\n'))
subtask_index = int(sys.stdin.readline().rstrip('\n'))
input_path = sys.stdin.readline().rstrip('\n')
output_path = sys.stdin.readline().rstrip('\n')
......@@ -193,7 +194,7 @@ class Environment(object):
if set.id == id:
used_set = set
operator = set.operator
operator._configure(input_path, output_path, port, self, used_set)
operator._configure(input_path, output_path, port, self, used_set, subtask_index)
operator._go()
operator._close()
sys.stdout.flush()
......
......@@ -42,9 +42,16 @@ if __name__ == "__main__":
d6 = env.from_elements(1, 1, 12)
d7 = env.generate_sequence(0, 999)
#Generate Sequence Source
d7 = env.generate_sequence(1, 5)\
.map(Id()).map_partition(Verify([1,2,3,4,5], "Sequence")).output()
d7.map(Id()).map_partition(Verify(range(1000), "Sequence")).output()
#Zip with Index
#check that IDs (first field of each element) are consecutive
d7.zip_with_index()\
.map(lambda x: x[0])\
.map_partition(Verify(range(1000), "ZipWithIndex")).output()
#CSV Source/Sink
csv_data = env.read_csv("src/test/python/org/apache/flink/python/api/data_csv", (INT, INT, STRING))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册