提交 e92124ae 编写于 作者: B barrierye

update py-serving to pipeline-serving

上级 02bce891
...@@ -7,8 +7,10 @@ endif() ...@@ -7,8 +7,10 @@ endif()
if (SERVER) if (SERVER)
if (NOT WITH_GPU) if (NOT WITH_GPU)
file(INSTALL pipeline DESTINATION paddle_serving_server)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server/*.py)
else() else()
file(INSTALL pipeline DESTINATION paddle_serving_server_gpu)
file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py) file(GLOB_RECURSE SERVING_SERVER_PY_FILES paddle_serving_server_gpu/*.py)
endif() endif()
set(PY_FILES ${SERVING_SERVER_PY_FILES}) set(PY_FILES ${SERVING_SERVER_PY_FILES})
......
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz
tar -zxvf imdb_model.tar.gz
...@@ -11,12 +11,12 @@ ...@@ -11,12 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle_serving_client.pyclient import PyClient from paddle_serving_client.pipeline import PipelineClient
import numpy as np import numpy as np
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
from line_profiler import LineProfiler from line_profiler import LineProfiler
client = PyClient() client = PipelineClient()
client.connect('localhost:8080') client.connect('localhost:8080')
lp = LineProfiler() lp = LineProfiler()
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_server.pyserver import Op from paddle_serving_server.pipeline import Op
from paddle_serving_server.pyserver import PyServer from paddle_serving_server.pipeline import PipelineServer
import numpy as np import numpy as np
import logging import logging
...@@ -62,7 +62,7 @@ cnn_op = Op(name="cnn", ...@@ -62,7 +62,7 @@ cnn_op = Op(name="cnn",
combine_op = CombineOp( combine_op = CombineOp(
name="combine", inputs=[bow_op, cnn_op], concurrency=1, timeout=-1, retry=1) name="combine", inputs=[bow_op, cnn_op], concurrency=1, timeout=-1, retry=1)
pyserver = PyServer( pyserver = PipelineServer(
use_multithread=True, use_multithread=True,
client_type='grpc', client_type='grpc',
use_future=False, use_future=False,
......
...@@ -18,7 +18,7 @@ from .proto import general_python_service_pb2_grpc ...@@ -18,7 +18,7 @@ from .proto import general_python_service_pb2_grpc
import numpy as np import numpy as np
class PyClient(object): class PipelineClient(object):
def __init__(self): def __init__(self):
self._channel = None self._channel = None
......
此差异已折叠。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from operator import Op
from pipeline_server import PipelineServer
此差异已折叠。
...@@ -12,3 +12,391 @@ ...@@ -12,3 +12,391 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import threading
import multiprocessing
from paddle_serving_client import MultiLangClient, Client
from concurrent import futures
import logging
import func_timeout
from .channel import ThreadChannel, ProcessChannel, ChannelDataEcode, ChannelData, ChannelDataType
class Op(object):
def __init__(self,
name,
inputs,
server_model=None,
server_port=None,
device=None,
client_config=None,
server_name=None,
fetch_names=None,
concurrency=1,
timeout=-1,
retry=2):
self._is_run = False
self.name = name # to identify the type of OP, it must be globally unique
self._concurrency = concurrency # amount of concurrency
self.set_input_ops(inputs)
self.with_serving = False
self._client_config = client_config
self._server_name = server_name
self._fetch_names = fetch_names
self._server_model = server_model
self._server_port = server_port
self._device = device
if self._client_config is not None and \
self._server_name is not None and \
self._fetch_names is not None:
self.with_serving = True
self._timeout = timeout
self._retry = max(1, retry)
self._input = None
self._outputs = []
self._profiler = None
def init_profiler(self, profiler):
self._profiler = profiler
def _profiler_record(self, string):
if self._profiler is None:
return
self._profiler.record(string)
def init_client(self, client_type, client_config, server_name, fetch_names):
if self.with_serving == False:
logging.debug("{} no client".format(self.name))
return
logging.debug("{} client_config: {}".format(self.name, client_config))
logging.debug("{} server_name: {}".format(self.name, server_name))
logging.debug("{} fetch_names: {}".format(self.name, fetch_names))
if client_type == 'brpc':
self._client = Client()
elif client_type == 'grpc':
self._client = MultiLangClient()
else:
raise ValueError("unknow client type: {}".format(client_type))
self._client.load_client_config(client_config)
self._client.connect([server_name])
self._fetch_names = fetch_names
def get_input_channel(self):
return self._input
def get_input_ops(self):
return self._input_ops
def set_input_ops(self, ops):
if not isinstance(ops, list):
ops = [] if ops is None else [ops]
self._input_ops = []
for op in ops:
if not isinstance(op, Op):
raise TypeError(
self._log('input op must be Op type, not {}'.format(
type(op))))
self._input_ops.append(op)
def add_input_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
self._log('input channel must be Channel type, not {}'.format(
type(channel))))
channel.add_consumer(self.name)
self._input = channel
def get_output_channels(self):
return self._outputs
def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
self._log('output channel must be Channel type, not {}'.format(
type(channel))))
channel.add_producer(self.name)
self._outputs.append(channel)
def preprocess(self, channeldata):
if isinstance(channeldata, dict):
raise NotImplementedError(
'this Op has multiple previous inputs. Please override this method'
)
feed = channeldata.parse()
return feed
def midprocess(self, data, use_future=True):
if not isinstance(data, dict):
raise Exception(
self._log(
'data must be dict type(the output of preprocess()), but get {}'.
format(type(data))))
logging.debug(self._log('data: {}'.format(data)))
logging.debug(self._log('fetch: {}'.format(self._fetch_names)))
if isinstance(self._client, MultiLangClient):
call_result = self._client.predict(
feed=data, fetch=self._fetch_names, asyn=use_future)
else:
call_result = self._client.predict(
feed=data, fetch=self._fetch_names)
logging.debug(self._log("get call_result"))
return call_result
def postprocess(self, output_data):
return output_data
def stop(self):
self._input.stop()
for channel in self._outputs:
channel.stop()
self._is_run = False
def _parse_channeldata(self, channeldata):
data_id, error_channeldata = None, None
if isinstance(channeldata, dict):
parsed_data = {}
key = list(channeldata.keys())[0]
data_id = channeldata[key].id
for _, data in channeldata.items():
if data.ecode != ChannelDataEcode.OK.value:
error_channeldata = data
break
else:
data_id = channeldata.id
if channeldata.ecode != ChannelDataEcode.OK.value:
error_channeldata = channeldata
return data_id, error_channeldata
def _push_to_output_channels(self, data, channels, name=None):
if name is None:
name = self.name
for channel in channels:
channel.push(data, name)
def start_with_process(self, client_type, use_future):
proces = []
for concurrency_idx in range(self._concurrency):
p = multiprocessing.Process(
target=self._run,
args=(concurrency_idx, self.get_input_channel(),
self.get_output_channels(), client_type, use_future))
p.start()
proces.append(p)
return proces
def start_with_thread(self, client_type, use_future):
threads = []
for concurrency_idx in range(self._concurrency):
t = threading.Thread(
target=self._run,
args=(concurrency_idx, self.get_input_channel(),
self.get_output_channels(), client_type, use_future))
t.start()
threads.append(t)
return threads
def _run(self, concurrency_idx, input_channel, output_channels, client_type,
use_future):
# create client based on client_type
self.init_client(client_type, self._client_config, self._server_name,
self._fetch_names)
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
while self._is_run:
self._profiler_record("{}-get_0".format(op_info_prefix))
channeldata = input_channel.front(self.name)
self._profiler_record("{}-get_1".format(op_info_prefix))
logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata)
# error data in predecessor Op
if error_channeldata is not None:
self._push_to_output_channels(error_channeldata,
output_channels)
continue
# preprecess
try:
self._profiler_record("{}-prep_0".format(op_info_prefix))
preped_data = self.preprocess(channeldata)
self._profiler_record("{}-prep_1".format(op_info_prefix))
except NotImplementedError as e:
# preprocess function not implemented
error_info = log(e)
logging.error(error_info)
self._push_to_output_channels(
ChannelData(
ecode=ChannelDataEcode.NOT_IMPLEMENTED.value,
error_info=error_info,
data_id=data_id),
output_channels)
continue
except TypeError as e:
# Error type in channeldata.datatype
error_info = log(e)
logging.error(error_info)
self._push_to_output_channels(
ChannelData(
ecode=ChannelDataEcode.TYPE_ERROR.value,
error_info=error_info,
data_id=data_id),
output_channels)
continue
except Exception as e:
error_info = log(e)
logging.error(error_info)
self._push_to_output_channels(
ChannelData(
ecode=ChannelDataEcode.UNKNOW.value,
error_info=error_info,
data_id=data_id),
output_channels)
continue
# midprocess
midped_data = None
if self.with_serving:
ecode = ChannelDataEcode.OK.value
self._profiler_record("{}-midp_0".format(op_info_prefix))
if self._timeout <= 0:
try:
midped_data = self.midprocess(preped_data, use_future)
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
error_info = log(e)
logging.error(error_info)
else:
for i in range(self._retry):
try:
midped_data = func_timeout.func_timeout(
self._timeout,
self.midprocess,
args=(preped_data, use_future))
except func_timeout.FunctionTimedOut as e:
if i + 1 >= self._retry:
ecode = ChannelDataEcode.TIMEOUT.value
error_info = log(e)
logging.error(error_info)
else:
logging.warn(
log("timeout, retry({})".format(i + 1)))
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
error_info = log(e)
logging.error(error_info)
break
else:
break
if ecode != ChannelDataEcode.OK.value:
self._push_to_output_channels(
ChannelData(
ecode=ecode, error_info=error_info,
data_id=data_id),
output_channels)
continue
self._profiler_record("{}-midp_1".format(op_info_prefix))
else:
midped_data = preped_data
# postprocess
output_data = None
self._profiler_record("{}-postp_0".format(op_info_prefix))
if self.with_serving and client_type == 'grpc' and use_future:
# use call_future
output_data = ChannelData(
datatype=ChannelDataType.CHANNEL_FUTURE.value,
future=midped_data,
data_id=data_id,
callback_func=self.postprocess)
else:
try:
postped_data = self.postprocess(midped_data)
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
error_info = log(e)
logging.error(error_info)
self._push_to_output_channels(
ChannelData(
ecode=ecode, error_info=error_info,
data_id=data_id),
output_channels)
continue
if not isinstance(postped_data, dict):
ecode = ChannelDataEcode.TYPE_ERROR.value
error_info = log("output of postprocess funticon must be " \
"dict type, but get {}".format(type(postped_data)))
logging.error(error_info)
self._push_to_output_channels(
ChannelData(
ecode=ecode, error_info=error_info,
data_id=data_id),
output_channels)
continue
output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id)
self._profiler_record("{}-postp_1".format(op_info_prefix))
# push data to channel (if run succ)
self._profiler_record("{}-push_0".format(op_info_prefix))
self._push_to_output_channels(output_data, output_channels)
self._profiler_record("{}-push_1".format(op_info_prefix))
def _log(self, info):
return "{} {}".format(self.name, info)
def _get_log_func(self, op_info_prefix):
def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str)
return log_func
def get_concurrency(self):
return self._concurrency
class VirtualOp(Op):
''' For connecting two channels. '''
def __init__(self, name, concurrency=1):
super(VirtualOp, self).__init__(
name=name, inputs=None, concurrency=concurrency)
self._virtual_pred_ops = []
def add_virtual_pred_op(self, op):
self._virtual_pred_ops.append(op)
def add_output_channel(self, channel):
if not isinstance(channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
self._log('output channel must be Channel type, not {}'.format(
type(channel))))
for op in self._virtual_pred_ops:
channel.add_producer(op.name)
self._outputs.append(channel)
def _run(self, concurrency_idx, input_channel, output_channels, client_type,
use_future):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
while self._is_run:
self._profiler_record("{}-get_0".format(op_info_prefix))
channeldata = input_channel.front(self.name)
self._profiler_record("{}-get_1".format(op_info_prefix))
self._profiler_record("{}-push_0".format(op_info_prefix))
if isinstance(channeldata, dict):
for name, data in channeldata.items():
self._push_to_output_channels(
data, channels=output_channels, name=name)
else:
self._push_to_output_channels(
channeldata,
channels=output_channels,
name=self._virtual_pred_ops[0].name)
self._profiler_record("{}-push_1".format(op_info_prefix))
...@@ -12,3 +12,467 @@ ...@@ -12,3 +12,467 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import threading
import multiprocessing
import multiprocessing.queues
import sys
if sys.version_info.major == 2:
import Queue
elif sys.version_info.major == 3:
import queue as Queue
else:
raise Exception("Error Python version")
import os
from paddle_serving_client import MultiLangClient, Client
from concurrent import futures
import numpy as np
import grpc
from ..proto import general_python_service_pb2 as pyservice_pb2
from ..proto import pyserving_channel_pb2 as channel_pb2
from ..proto import general_python_service_pb2_grpc
import logging
import random
import time
import func_timeout
import enum
import collections
import copy
from .operator import Op, VirtualOp
from .channel import ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType
from .profiler import TimeProfiler
_profiler = TimeProfiler()
class GeneralPythonService(
general_python_service_pb2_grpc.GeneralPythonServiceServicer):
def __init__(self, in_channel, out_channel, retry=2):
super(GeneralPythonService, self).__init__()
self.name = "#G"
self.set_in_channel(in_channel)
self.set_out_channel(out_channel)
logging.debug(self._log(in_channel.debug()))
logging.debug(self._log(out_channel.debug()))
#TODO:
# multi-lock for different clients
# diffenert lock for server and client
self._id_lock = threading.Lock()
self._cv = threading.Condition()
self._globel_resp_dict = {}
self._id_counter = 0
self._retry = retry
self._recive_func = threading.Thread(
target=GeneralPythonService._recive_out_channel_func, args=(self, ))
self._recive_func.start()
def _log(self, info_str):
return "[{}] {}".format(self.name, info_str)
def set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
self._log('in_channel must be Channel type, but get {}'.format(
type(in_channel))))
in_channel.add_producer(self.name)
self._in_channel = in_channel
def set_out_channel(self, out_channel):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
self._log('out_channel must be Channel type, but get {}'.format(
type(out_channel))))
out_channel.add_consumer(self.name)
self._out_channel = out_channel
def _recive_out_channel_func(self):
while True:
channeldata = self._out_channel.front(self.name)
if not isinstance(channeldata, ChannelData):
raise TypeError(
self._log('data must be ChannelData type, but get {}'.
format(type(channeldata))))
with self._cv:
data_id = channeldata.id
self._globel_resp_dict[data_id] = channeldata
self._cv.notify_all()
def _get_next_id(self):
with self._id_lock:
self._id_counter += 1
return self._id_counter - 1
def _get_data_in_globel_resp_dict(self, data_id):
resp = None
with self._cv:
while data_id not in self._globel_resp_dict:
self._cv.wait()
resp = self._globel_resp_dict.pop(data_id)
self._cv.notify_all()
return resp
def _pack_data_for_infer(self, request):
logging.debug(self._log('start inferce'))
data_id = self._get_next_id()
npdata = {}
try:
for idx, name in enumerate(request.feed_var_names):
logging.debug(
self._log('name: {}'.format(request.feed_var_names[idx])))
logging.debug(
self._log('data: {}'.format(request.feed_insts[idx])))
npdata[name] = np.frombuffer(
request.feed_insts[idx], dtype=request.type[idx])
npdata[name].shape = np.frombuffer(
request.shape[idx], dtype="int32")
except Exception as e:
return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error",
data_id=data_id), data_id
else:
return ChannelData(
datatype=ChannelDataType.CHANNEL_NPDATA.value,
npdata=npdata,
data_id=data_id), data_id
def _pack_data_for_resp(self, channeldata):
logging.debug(self._log('get channeldata'))
resp = pyservice_pb2.Response()
resp.ecode = channeldata.ecode
if resp.ecode == ChannelDataEcode.OK.value:
if channeldata.datatype == ChannelDataType.CHANNEL_PBDATA.value:
for inst in channeldata.pbdata.insts:
resp.fetch_insts.append(inst.data)
resp.fetch_var_names.append(inst.name)
resp.shape.append(inst.shape)
resp.type.append(inst.type)
elif channeldata.datatype in (ChannelDataType.CHANNEL_FUTURE.value,
ChannelDataType.CHANNEL_NPDATA.value):
feed = channeldata.parse()
for name, var in feed.items():
resp.fetch_insts.append(var.tobytes())
resp.fetch_var_names.append(name)
resp.shape.append(
np.array(
var.shape, dtype="int32").tobytes())
resp.type.append(str(var.dtype))
else:
raise TypeError(
self._log("Error type({}) in datatype.".format(
channeldata.datatype)))
else:
resp.error_info = channeldata.error_info
return resp
def inference(self, request, context):
_profiler.record("{}-prepack_0".format(self.name))
data, data_id = self._pack_data_for_infer(request)
_profiler.record("{}-prepack_1".format(self.name))
resp_channeldata = None
for i in range(self._retry):
logging.debug(self._log('push data'))
_profiler.record("{}-push_0".format(self.name))
self._in_channel.push(data, self.name)
_profiler.record("{}-push_1".format(self.name))
logging.debug(self._log('wait for infer'))
_profiler.record("{}-fetch_0".format(self.name))
resp_channeldata = self._get_data_in_globel_resp_dict(data_id)
_profiler.record("{}-fetch_1".format(self.name))
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break
if i + 1 < self._retry:
logging.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info))
_profiler.record("{}-postpack_0".format(self.name))
resp = self._pack_data_for_resp(resp_channeldata)
_profiler.record("{}-postpack_1".format(self.name))
_profiler.print_profile()
return resp
class PipelineServer(object):
def __init__(self,
use_multithread=True,
client_type='brpc',
use_future=False,
retry=2,
profile=False):
self._channels = []
self._user_ops = []
self._actual_ops = []
self._port = None
self._worker_num = None
self._in_channel = None
self._out_channel = None
self._retry = retry
self._use_multithread = use_multithread
self._client_type = client_type
self._use_future = use_future
if not self._use_multithread:
self._manager = multiprocessing.Manager()
if profile:
raise Exception(
"profile cannot be used in multiprocess version temporarily")
if self._use_future:
raise Exception("cannot use future in multiprocess")
if self._client_type == 'brpc' and self._use_future:
logging.warn("brpc impl cannot use future")
_profiler.enable(profile)
def add_channel(self, channel):
self._channels.append(channel)
def add_op(self, op):
self._user_ops.append(op)
def add_ops(self, ops):
self._user_ops.extend(ops)
def gen_desc(self):
logging.info('here will generate desc for PAAS')
pass
def _topo_sort(self):
indeg_num = {}
que_idx = 0 # scroll queue
ques = [Queue.Queue() for _ in range(2)]
for op in self._user_ops:
if len(op.get_input_ops()) == 0:
op.name = "#G" # update read_op.name
break
outdegs = {op.name: [] for op in self._user_ops}
zero_indeg_num, zero_outdeg_num = 0, 0
for idx, op in enumerate(self._user_ops):
# check the name of op is globally unique
if op.name in indeg_num:
raise Exception("the name of Op must be unique")
indeg_num[op.name] = len(op.get_input_ops())
if indeg_num[op.name] == 0:
ques[que_idx].put(op)
zero_indeg_num += 1
for pred_op in op.get_input_ops():
outdegs[pred_op.name].append(op)
if zero_indeg_num != 1:
raise Exception("DAG contains multiple input Ops")
for _, succ_list in outdegs.items():
if len(succ_list) == 0:
zero_outdeg_num += 1
if zero_outdeg_num != 1:
raise Exception("DAG contains multiple output Ops")
# topo sort to get dag_views
dag_views = []
sorted_op_num = 0
while True:
que = ques[que_idx]
next_que = ques[(que_idx + 1) % 2]
dag_view = []
while que.qsize() != 0:
op = que.get()
dag_view.append(op)
sorted_op_num += 1
for succ_op in outdegs[op.name]:
indeg_num[succ_op.name] -= 1
if indeg_num[succ_op.name] == 0:
next_que.put(succ_op)
dag_views.append(dag_view)
if next_que.qsize() == 0:
break
que_idx = (que_idx + 1) % 2
if sorted_op_num < len(self._user_ops):
raise Exception("not legal DAG")
# create channels and virtual ops
def name_generator(prefix):
def number_generator():
idx = 0
while True:
yield "{}{}".format(prefix, idx)
idx += 1
return number_generator()
def gen_channel(name_gen):
channel = None
if self._use_multithread:
if sys.version_info.major == 2:
channel = ThreadChannel(name=name_gen.next())
elif sys.version_info.major == 3:
channel = ThreadChannel(name=name_gen.__next__())
else:
raise Exception("Error Python version")
else:
if sys.version_info.major == 2:
channel = ProcessChannel(
self._manager, name=name_gen.next())
elif sys.version_info.major == 3:
channel = ProcessChannel(
self._manager, name=name_gen.__next__())
else:
raise Exception("Error Python version")
return channel
def gen_virtual_op(name_gen):
virtual_op = None
if sys.version_info.major == 2:
virtual_op = VirtualOp(name=name_gen.next())
elif sys.version_info.major == 3:
virtual_op = VirtualOp(name=op_name_gen.__next__())
else:
raise Exception("Error Python version")
return virtual_op
virtual_op_name_gen = name_generator("vir")
channel_name_gen = name_generator("chl")
virtual_ops = []
channels = []
input_channel = None
actual_view = None
for v_idx, view in enumerate(dag_views):
if v_idx + 1 >= len(dag_views):
break
next_view = dag_views[v_idx + 1]
if actual_view is None:
actual_view = view
actual_next_view = []
pred_op_of_next_view_op = {}
for op in actual_view:
# find actual succ op in next view and create virtual op
for succ_op in outdegs[op.name]:
if succ_op in next_view:
if succ_op not in actual_next_view:
actual_next_view.append(succ_op)
if succ_op.name not in pred_op_of_next_view_op:
pred_op_of_next_view_op[succ_op.name] = []
pred_op_of_next_view_op[succ_op.name].append(op)
else:
# create virtual op
virtual_op = gen_virtual_op(virtual_op_name_gen)
virtual_ops.append(virtual_op)
outdegs[virtual_op.name] = [succ_op]
actual_next_view.append(virtual_op)
pred_op_of_next_view_op[virtual_op.name] = [op]
virtual_op.add_virtual_pred_op(op)
actual_view = actual_next_view
# create channel
processed_op = set()
for o_idx, op in enumerate(actual_next_view):
if op.name in processed_op:
continue
channel = gen_channel(channel_name_gen)
channels.append(channel)
logging.debug("{} => {}".format(channel.name, op.name))
op.add_input_channel(channel)
pred_ops = pred_op_of_next_view_op[op.name]
if v_idx == 0:
input_channel = channel
else:
# if pred_op is virtual op, it will use ancestors as producers to channel
for pred_op in pred_ops:
logging.debug("{} => {}".format(pred_op.name,
channel.name))
pred_op.add_output_channel(channel)
processed_op.add(op.name)
# find same input op to combine channel
for other_op in actual_next_view[o_idx + 1:]:
if other_op.name in processed_op:
continue
other_pred_ops = pred_op_of_next_view_op[other_op.name]
if len(other_pred_ops) != len(pred_ops):
continue
same_flag = True
for pred_op in pred_ops:
if pred_op not in other_pred_ops:
same_flag = False
break
if same_flag:
logging.debug("{} => {}".format(channel.name,
other_op.name))
other_op.add_input_channel(channel)
processed_op.add(other_op.name)
output_channel = gen_channel(channel_name_gen)
channels.append(output_channel)
last_op = dag_views[-1][0]
last_op.add_output_channel(output_channel)
self._actual_ops = virtual_ops
for op in self._user_ops:
if len(op.get_input_ops()) == 0:
# pass read op
continue
self._actual_ops.append(op)
self._channels = channels
for c in channels:
logging.debug(c.debug())
return input_channel, output_channel
def prepare_server(self, port, worker_num):
self._port = port
self._worker_num = worker_num
input_channel, output_channel = self._topo_sort()
self._in_channel = input_channel
self._out_channel = output_channel
for op in self._actual_ops:
if op.with_serving:
self.prepare_serving(op)
self.gen_desc()
def _run_ops(self):
threads_or_proces = []
for op in self._actual_ops:
op.init_profiler(_profiler)
if self._use_multithread:
threads_or_proces.extend(
op.start_with_thread(self._client_type, self._use_future))
else:
threads_or_proces.extend(
op.start_with_process(self._client_type, self._use_future))
return threads_or_proces
def _stop_ops(self):
for op in self._actual_ops:
op.stop()
def run_server(self):
op_threads_or_proces = self._run_ops()
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=self._worker_num))
general_python_service_pb2_grpc.add_GeneralPythonServiceServicer_to_server(
GeneralPythonService(self._in_channel, self._out_channel,
self._retry), server)
server.add_insecure_port('[::]:{}'.format(self._port))
server.start()
server.wait_for_termination()
self._stop_ops() # TODO
for x in op_threads_or_proces:
x.join()
def prepare_serving(self, op):
model_path = op._server_model
port = op._server_port
device = op._device
if self._client_type == "grpc":
if device == "cpu":
cmd = "(Use grpc impl) python -m paddle_serving_server.serve" \
" --model {} --thread 4 --port {} --use_multilang &>/dev/null &".format(model_path, port)
else:
cmd = "(Use grpc impl) python -m paddle_serving_server_gpu.serve" \
" --model {} --thread 4 --port {} --use_multilang &>/dev/null &".format(model_path, port)
elif self._client_type == "brpc":
if device == "cpu":
cmd = "(Use brpc impl) python -m paddle_serving_server.serve" \
" --model {} --thread 4 --port {} &>/dev/null &".format(model_path, port)
else:
cmd = "(Use brpc impl) python -m paddle_serving_server_gpu.serve" \
" --model {} --thread 4 --port {} &>/dev/null &".format(model_path, port)
else:
raise Exception("unknow client type: {}".format(self._client_type))
# run a server (not in PyServing)
logging.info("run a server (not in PyServing): {}".format(cmd))
...@@ -12,3 +12,50 @@ ...@@ -12,3 +12,50 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
import os
import sys
if sys.version_info.major == 2:
import Queue
elif sys.version_info.major == 3:
import queue as Queue
else:
raise Exception("Error Python version")
import time
class TimeProfiler(object):
def __init__(self):
self._pid = os.getpid()
self._print_head = 'PROFILE\tpid:{}\t'.format(self._pid)
self._time_record = Queue.Queue()
self._enable = False
def enable(self, enable):
self._enable = enable
def record(self, name_with_tag):
if self._enable is False:
return
name_with_tag = name_with_tag.split("_")
tag = name_with_tag[-1]
name = '_'.join(name_with_tag[:-1])
self._time_record.put((name, tag, int(round(time.time() * 1000000))))
def print_profile(self):
if self._enable is False:
return
sys.stderr.write(self._print_head)
tmp = {}
while not self._time_record.empty():
name, tag, timestamp = self._time_record.get()
if name in tmp:
ptag, ptimestamp = tmp.pop(name)
sys.stderr.write("{}_{}:{} ".format(name, ptag, ptimestamp))
sys.stderr.write("{}_{}:{} ".format(name, tag, timestamp))
else:
tmp[name] = (tag, timestamp)
sys.stderr.write('\n')
for name, item in tmp.items():
tag, timestamp = item
self._time_record.put((name, tag, timestamp))
...@@ -42,7 +42,8 @@ REQUIRED_PACKAGES = [ ...@@ -42,7 +42,8 @@ REQUIRED_PACKAGES = [
] ]
packages=['paddle_serving_server', packages=['paddle_serving_server',
'paddle_serving_server.proto'] 'paddle_serving_server.proto',
'paddle_serving_server.pipeline']
package_dir={'paddle_serving_server': package_dir={'paddle_serving_server':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server',
......
...@@ -43,7 +43,8 @@ REQUIRED_PACKAGES = [ ...@@ -43,7 +43,8 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_server_gpu', packages=['paddle_serving_server_gpu',
'paddle_serving_server_gpu.proto'] 'paddle_serving_server_gpu.proto',
'paddle_serving_server.pipeline']
package_dir={'paddle_serving_server_gpu': package_dir={'paddle_serving_server_gpu':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_server_gpu',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册