未验证 提交 2ed4cf7c 编写于 作者: H HuangXingBo 提交者: GitHub

[FLINK-16252][python] Optimize the output emit logic to remove unnecessary overhead

上级 293fd5a9
......@@ -19,7 +19,6 @@
import datetime
import cloudpickle
from apache_beam.runners.common import _OutputProcessor
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import bundle_processor
from apache_beam.runners.worker.operations import Operation
......@@ -37,9 +36,8 @@ class ScalarFunctionOperation(Operation):
def __init__(self, name, spec, counter_factory, sampler, consumers):
super(ScalarFunctionOperation, self).__init__(name, spec, counter_factory, sampler)
for tag, op_consumers in consumers.items():
for consumer in op_consumers:
self.add_receiver(consumer, 0)
self.consumer = consumers['output'][0]
self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()
self.variable_dict = {}
self.scalar_funcs = []
......@@ -49,18 +47,15 @@ class ScalarFunctionOperation(Operation):
def setup(self):
super(ScalarFunctionOperation, self).setup()
self.output_processor = _OutputProcessor(
window_fn=None,
main_receivers=self.receivers[0],
tagged_receivers=None,
per_element_output_counter=None)
def start(self):
with self.scoped_start_state:
super(ScalarFunctionOperation, self).start()
def process(self, o):
self.output_processor.process_outputs(o, [self.func(o.value)])
output_stream = self.consumer.output_stream
self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True)
output_stream.maybe_flush()
def finish(self):
super(ScalarFunctionOperation, self).finish()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册