提交 29a45f36 编写于 作者: B barrierye

update code

上级 674cfda8
...@@ -21,6 +21,7 @@ import logging ...@@ -21,6 +21,7 @@ import logging
import func_timeout import func_timeout
import os import os
import sys import sys
import numpy as np
from numpy import * from numpy import *
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
...@@ -481,10 +482,9 @@ class Op(object): ...@@ -481,10 +482,9 @@ class Op(object):
class RequestOp(Op): class RequestOp(Op):
""" RequestOp do not run preprocess, process, postprocess. """ """ RequestOp do not run preprocess, process, postprocess. """
def __init__(self, concurrency=1): def __init__(self):
# PipelineService.name = "@G" # PipelineService.name = "@G"
super(RequestOp, self).__init__( super(RequestOp, self).__init__(name="@G", input_ops=[])
name="@G", input_ops=[], concurrency=concurrency)
# init op # init op
try: try:
self.init_op() self.init_op()
...@@ -507,9 +507,8 @@ class RequestOp(Op): ...@@ -507,9 +507,8 @@ class RequestOp(Op):
class ResponseOp(Op): class ResponseOp(Op):
""" ResponseOp do not run preprocess, process, postprocess. """ """ ResponseOp do not run preprocess, process, postprocess. """
def __init__(self, input_ops, concurrency=1): def __init__(self, input_ops):
super(ResponseOp, self).__init__( super(ResponseOp, self).__init__(name="@R", input_ops=input_ops)
name="@R", input_ops=input_ops, concurrency=concurrency)
# init op # init op
try: try:
self.init_op() self.init_op()
...@@ -525,6 +524,7 @@ class ResponseOp(Op): ...@@ -525,6 +524,7 @@ class ResponseOp(Op):
feed = channeldata.parse() feed = channeldata.parse()
# ndarray to string: # ndarray to string:
# https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray # https://stackoverflow.com/questions/30167538/convert-a-numpy-ndarray-to-stringor-bytes-and-convert-it-back-to-numpy-ndarray
np.set_printoptions(threshold=np.nan)
for name, var in feed.items(): for name, var in feed.items():
resp.value.append(var.__repr__()) resp.value.append(var.__repr__())
resp.key.append(name) resp.key.append(name)
......
...@@ -126,8 +126,8 @@ class PipelineServer(object): ...@@ -126,8 +126,8 @@ class PipelineServer(object):
show_info = (i == 0) show_info = (i == 0)
worker = multiprocessing.Process( worker = multiprocessing.Process(
target=self._run_server_func, target=self._run_server_func,
args=(bind_address, self._response_op, self._dag_config, args=(bind_address, self._response_op,
self._worker_num)) self._dag_config))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
for worker in workers: for worker in workers:
...@@ -141,13 +141,11 @@ class PipelineServer(object): ...@@ -141,13 +141,11 @@ class PipelineServer(object):
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_config, def _run_server_func(self, bind_address, response_op, dag_config):
worker_num):
options = (('grpc.so_reuseport', 1), ) options = (('grpc.so_reuseport', 1), )
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=worker_num, ), max_workers=1, ), options=options)
options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
ProcessPipelineService(response_op, dag_config), server) ProcessPipelineService(response_op, dag_config), server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册