diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py old mode 100755 new mode 100644 index 22b160341eba42ed02743dd03b99d4af2f737434..557ee47d7ade14adfb5250809c49b3e44eb86b30 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -21,6 +21,151 @@ from paddle_serving_client import Client from contextlib import closing import socket +from paddle_serving_server import pipeline +from paddle_serving_server.pipeline.util import AvailablePortGenerator + + +class DefaultPipelineServer(object): + def __init__(self, available_port_generator): + self.server = pipeline.PipelineServer() + self.available_port_generator = available_port_generator + + def create_internel_op_class(self, f_preprocess, f_postprocess): + class InternelOp(pipeline.Op): + # f_preprocess and f_postprocess use variables + # in closures, so init_op function is not necessary. + def preprocess(self, input_dicts): + (_, input_dict), = input_dicts.items() + preped_data = f_preprocess(input_dict) + return preped_data + + def postprocess(self, input_dicts, fetch_dict): + (_, input_dict), = input_dicts.items() + postped_data = f_postprocess(input_dict, fetch_dict) + return postped_data + + self.internel_op_class = InternelOp + + def create_local_rpc_service_handler(self, model_config, workdir, + thread_num, devices, mem_optim, + ir_optim): + self.local_rpc_service_handler = pipeline.LocalRpcServiceHandler( + model_config=model_config, + workdir=workdir, + thread_num=thread_num, + devices=devices, + mem_optim=mem_optim, + ir_optim=ir_optim, + available_port_generator=self.available_port_generator) + + def init_pipeline_server(self, + internel_op_name, + internel_op_fetch_list=[], + internel_op_concurrency=4, + internel_op_timeout=-1, + internel_op_retry=1, + internel_op_batch_size=1, + internel_op_auto_batching_timeout=None): + read_op = pipeline.RequestOp() + internel_op = self.internel_op_class( + name=internel_op_name, + input_ops=[read_op], + fetch_list=internel_op_fetch_list, + local_rpc_service_handler=self.local_rpc_service_handler, + concurrency=internel_op_concurrency, + timeout=internel_op_timeout, + retry=internel_op_retry, + batch_size=internel_op_batch_size, + auto_batching_timeout=internel_op_auto_batching_timeout) + response_op = pipeline.ResponseOp(input_ops=[internel_op]) + self.server.set_response_op(response_op) + + def prepare_pipeline_server(self, + rpc_port, + http_port, + worker_num, + build_dag_each_worker=False, + is_thread_op=False, + client_type="brpc", + retry=1, + use_profile=False, + tracer_interval_s=-1): + default_server_conf = { + "port": rpc_port, + "worker_num": worker_num, + "build_dag_each_worker": build_dag_each_worker, + "grpc_gateway_port": http_port, + "dag": { + "is_thread_op": is_thread_op, + "client_type": client_type, + "retry": retry, + "use_profile": use_profile, + "tracer": { + "interval_s": tracer_interval_s, + } + } + } + self.server.prepare_server(yml_dict=default_server_conf) + + def start_pipeline_server(self): + self.server.start_local_rpc_service() + self.server.run_server() + + +class DefaultPipelineWebService(object): + def __init__(self, name="default"): + self.name = name + self.port = None + self.model_config = None + self.gpus = "" # use cpu + self.available_port_generator = AvailablePortGenerator(12000) + self.default_pipeline_server = DefaultPipelineServer( + self.available_port_generator) + + def load_model_config(self, model_config): + self.model_config = model_config + + def prepare_server(self, + workdir="workdir", + port=9393, + thread_num=2, + grpc_worker_num=4, + mem_optim=True, + ir_optim=False): + if not self.available_port_generator.port_is_available(port): + raise SystemExit("Failed to prepare server: prot({}) is not" + " available".format(port)) + self.port = port + + self.default_pipeline_server.create_internel_op_class(self.preprocess, + self.postprocess) + self.default_pipeline_server.create_local_rpc_service_handler( + model_config=self.model_config, + workdir=workdir, + thread_num=thread_num, + devices=self.gpus, + mem_optim=mem_optim, + ir_optim=ir_optim) + self.default_pipeline_server.init_pipeline_server( + internel_op_name=self.name) + self.default_pipeline_server.prepare_pipeline_server( + rpc_port=self.available_port_generator.next(), + http_port=self.port, + worker_num=grpc_worker_num) + + def run_service(self): + import socket + localIP = socket.gethostbyname(socket.gethostname()) + print("web service address: http://{}:{}/prediction" + .format(localIP, self.port)) + self.default_pipeline_server.start_pipeline_server() + + def preprocess(self, feed_dict): + return feed_dict + + def postprocess(self, feed_dict, fetch_dict): + return fetch_dict + class WebService(object): def __init__(self, name="default_service"): diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 2006145ac3ac5d26f9aef79ca0f01afcc17ddf0e..ac92efd73ac6ab8b6f7be86e3067a9bad7dc5183 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -125,7 +125,7 @@ class PipelineServer(object): def start_local_rpc_service(self): # only brpc now if self._conf["dag"]["client_type"] != "brpc": - raise ValueError("Local Servoce Version must be brpc type now.") + raise ValueError("Local service version must be brpc type now.") used_op, _ = DAG.get_use_ops(self._response_op) for op in used_op: if not isinstance(op, RequestOp):