From 29a45f36152fd26063d0c9bd0d9586263aa21045 Mon Sep 17 00:00:00 2001 From: barrierye Date: Fri, 17 Jul 2020 19:14:44 +0800 Subject: [PATCH] update code --- python/pipeline/operator.py | 12 ++++++------ python/pipeline/pipeline_server.py | 10 ++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 2664b50b..0e5e66ae 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -21,6 +21,7 @@ import logging import func_timeout import os import sys +import numpy as np from numpy import * from .proto import pipeline_service_pb2 @@ -481,10 +482,9 @@ class Op(object): class RequestOp(Op): """ RequestOp do not run preprocess, process, postprocess. """ - def __init__(self, concurrency=1): + def __init__(self): # PipelineService.name = "@G" - super(RequestOp, self).__init__( - name="@G", input_ops=[], concurrency=concurrency) + super(RequestOp, self).__init__(name="@G", input_ops=[]) # init op try: self.init_op() @@ -507,9 +507,8 @@ class RequestOp(Op): class ResponseOp(Op): """ ResponseOp do not run preprocess, process, postprocess. """ - def __init__(self, input_ops, concurrency=1): - super(ResponseOp, self).__init__( - name="@R", input_ops=input_ops, concurrency=concurrency) + def __init__(self, input_ops): + super(ResponseOp, self).__init__(name="@R", input_ops=input_ops) # init op try: self.init_op() @@ -525,6 +524,7 @@ class ResponseOp(Op): feed = channeldata.parse() # ndarray to string: # https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray + np.set_printoptions(threshold=np.nan) for name, var in feed.items(): resp.value.append(var.__repr__()) resp.key.append(name) diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 7e20425a..8d4e87d8 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -126,8 +126,8 @@ class PipelineServer(object): show_info = (i == 0) worker = multiprocessing.Process( target=self._run_server_func, - args=(bind_address, self._response_op, self._dag_config, - self._worker_num)) + args=(bind_address, self._response_op, + self._dag_config)) worker.start() workers.append(worker) for worker in workers: @@ -141,13 +141,11 @@ class PipelineServer(object): server.start() server.wait_for_termination() - def _run_server_func(self, bind_address, response_op, dag_config, - worker_num): + def _run_server_func(self, bind_address, response_op, dag_config): options = (('grpc.so_reuseport', 1), ) server = grpc.server( futures.ThreadPoolExecutor( - max_workers=worker_num, ), - options=options) + max_workers=1, ), options=options) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( ProcessPipelineService(response_op, dag_config), server) server.add_insecure_port(bind_address) -- GitLab