diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index ddac492e1d383531b98ba0a3153893cbe0632722..9d4398fbc110f456ae5b364e97e4d35f0fdfa76b 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -575,7 +575,7 @@ class Op(object): _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break - if len(parsed_data_dict) == 0: + if len(preped_data_dict) == 0: continue # process @@ -610,7 +610,7 @@ class Op(object): try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( - data=error_channeldata, + data=err_channeldata, channels=output_channels, client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 6ad34e343c6891a33e5cbd1728d0a65fd598d748..04d2c4fcb4c806aeb92d38ba589f6993cc836450 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -114,7 +114,9 @@ class PipelineServer(object): worker.join() else: server = grpc.server( - futures.ThreadPoolExecutor(max_workers=self._worker_num)) + futures.ThreadPoolExecutor(max_workers=self._worker_num), + options=[('grpc.max_send_message_length', 256 * 1024 * 1024), + ('grpc.max_receive_message_length', 256 * 1024 * 1024)]) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( PipelineServicer(self._response_op, self._conf), server) server.add_insecure_port('[::]:{}'.format(self._port)) @@ -122,7 +124,9 @@ class PipelineServer(object): server.wait_for_termination() def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): - options = (('grpc.so_reuseport', 1), ) + options = [('grpc.so_reuseport', 1), + ('grpc.max_send_message_length', 256 * 1024 * 1024), + ('grpc.max_send_message_length', 256 * 1024 * 1024)] server = grpc.server( futures.ThreadPoolExecutor( max_workers=1, ), options=options)