diff --git a/python/paddle_serving_server/serve.py b/python/paddle_serving_server/serve.py index 088e3928f4409eaac4d42d771a72ecc9d13fdbce..b57d2253dbe1f14caff50eb79543f224b8d0ec45 100644 --- a/python/paddle_serving_server/serve.py +++ b/python/paddle_serving_server/serve.py @@ -19,6 +19,7 @@ Usage: """ import argparse from .web_service import WebService +from flask import Flask, request def parse_args(): # pylint: disable=doc-string-missing @@ -88,3 +89,20 @@ if __name__ == "__main__": service.prepare_server( workdir=args.workdir, port=args.port, device=args.device) service.run_server() + + app_instance = Flask(__name__) + + @app_instance.before_first_request + def init(): + service._launch_web_service() + + service_name = "/" + service.name + "/prediction" + + @app_instance.route(service_name, methods=["POST"]) + def run(): + return service.get_prediction(request) + + app_instance.run(host="0.0.0.0", + port=service.port, + threaded=False, + processes=4) diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py index e94916ccf371022544707e7bb8e03d37045e54b5..c1a86eaecc899c987bd346f8a747fb486d4789ee 100755 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -50,44 +50,33 @@ class WebService(object): self.device = device def _launch_web_service(self): - app_instance = Flask(__name__) - client_service = Client() - client_service.load_client_config( + self.client_service = Client() + self.client_service.load_client_config( "{}/serving_server_conf.prototxt".format(self.model_config)) - client_service.connect(["0.0.0.0:{}".format(self.port + 1)]) - service_name = "/" + self.name + "/prediction" + self.client_service.connect(["0.0.0.0:{}".format(self.port + 1)]) - @app_instance.route(service_name, methods=['POST']) - def get_prediction(): - if not request.json: - abort(400) - if "fetch" not in request.json: - abort(400) - try: - feed, fetch = self.preprocess(request.json, - request.json["fetch"]) - if isinstance(feed, list): - fetch_map_batch = client_service.predict( - feed_batch=feed, fetch=fetch) - fetch_map_batch = self.postprocess( - feed=request.json, - fetch=fetch, - fetch_map=fetch_map_batch) - result = {"result": fetch_map_batch} - elif isinstance(feed, dict): - if "fetch" in feed: - del feed["fetch"] - fetch_map = client_service.predict(feed=feed, fetch=fetch) - result = self.postprocess( - feed=request.json, fetch=fetch, fetch_map=fetch_map) - except ValueError: - result = {"result": "Request Value Error"} - return result - - app_instance.run(host="0.0.0.0", - port=self.port, - threaded=False, - processes=1) + def get_prediction(self, request): + if not request.json: + abort(400) + if "fetch" not in request.json: + abort(400) + try: + feed, fetch = self.preprocess(request.json, request.json["fetch"]) + if isinstance(feed, list): + fetch_map_batch = self.client_service.predict( + feed_batch=feed, fetch=fetch) + fetch_map_batch = self.postprocess( + feed=request.json, fetch=fetch, fetch_map=fetch_map_batch) + result = {"result": fetch_map_batch} + elif isinstance(feed, dict): + if "fetch" in feed: + del feed["fetch"] + fetch_map = self.client_service.predict(feed=feed, fetch=fetch) + result = self.postprocess( + feed=request.json, fetch=fetch, fetch_map=fetch_map) + except ValueError: + result = {"result": "Request Value Error"} + return result def run_server(self): import socket @@ -96,11 +85,7 @@ class WebService(object): print("http://{}:{}/{}/prediction".format(localIP, self.port, self.name)) p_rpc = Process(target=self._launch_rpc_service) - p_web = Process(target=self._launch_web_service) p_rpc.start() - p_web.start() - p_web.join() - p_rpc.join() def preprocess(self, feed={}, fetch=[]): return feed, fetch diff --git a/python/paddle_serving_server_gpu/serve.py b/python/paddle_serving_server_gpu/serve.py index cb82e02cbec83324a6cb6029208325d8ce38e263..916af05ab6c6741b6504ce8f7660f6c7648c50f2 100644 --- a/python/paddle_serving_server_gpu/serve.py +++ b/python/paddle_serving_server_gpu/serve.py @@ -21,6 +21,7 @@ import argparse import os from multiprocessing import Pool, Process from paddle_serving_server_gpu import serve_args +from flask import Flask, request def start_gpu_card_model(index, gpuid, args): # pylint: disable=doc-string-missing @@ -114,3 +115,20 @@ if __name__ == "__main__": web_service.prepare_server( workdir=args.workdir, port=args.port, device=args.device) web_service.run_server() + + app_instance = Flask(__name__) + + @app_instance.before_first_request + def init(): + web_service._launch_web_service() + + service_name = "/" + web_service.name + "/prediction" + + @app_instance.route(service_name, methods=["POST"]) + def run(): + return web_service.get_prediction(request) + + app_instance.run(host="0.0.0.0", + port=web_service.port, + threaded=False, + processes=4) diff --git a/python/paddle_serving_server_gpu/web_service.py b/python/paddle_serving_server_gpu/web_service.py index 5d507c9475047d6c7eb65a2b2c5799221cf194b5..1bb8e93b24117c7545245809fab21af53af22dce 100755 --- a/python/paddle_serving_server_gpu/web_service.py +++ b/python/paddle_serving_server_gpu/web_service.py @@ -11,17 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -#!flask/bin/python -# pylint: disable=doc-string-missing from flask import Flask, request, abort -from multiprocessing import Pool, Process, Queue from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server import paddle_serving_server_gpu as serving +from multiprocessing import Pool, Process, Queue from paddle_serving_client import Client -from .serve import start_multi_card -import time -import random +from paddle_serving_server_gpu.serve import start_multi_card + +import sys +import numpy as np class WebService(object): @@ -29,7 +28,6 @@ class WebService(object): self.name = name self.gpus = [] self.rpc_service_list = [] - self.input_queues = [] def load_model_config(self, model_config): self.model_config = model_config @@ -66,12 +64,6 @@ class WebService(object): return server def _launch_rpc_service(self, service_idx): - if service_idx == 0: - self.rpc_service_list[service_idx].check_local_bin() - if not self.rpc_service_list[service_idx].use_local_bin: - self.rpc_service_list[service_idx].download_bin() - else: - time.sleep(3) self.rpc_service_list[service_idx].run_server() def prepare_server(self, workdir="", port=9393, device="gpu", gpuid=0): @@ -93,87 +85,30 @@ class WebService(object): gpuid, thread_num=10)) - def producers(self, inputqueue, endpoint): - client = Client() - client.load_client_config("{}/serving_server_conf.prototxt".format( + def _launch_web_service(self): + gpu_num = len(self.gpus) + self.client = Client() + self.client.load_client_config("{}/serving_server_conf.prototxt".format( self.model_config)) - client.connect([endpoint]) - while True: - request_json = inputqueue.get() - try: - feed, fetch = self.preprocess(request_json, - request_json["fetch"]) - if isinstance(feed, list): - fetch_map_batch = client.predict( - feed_batch=feed, fetch=fetch) - fetch_map_batch = self.postprocess( - feed=request_json, - fetch=fetch, - fetch_map=fetch_map_batch) - result = {"result": fetch_map_batch} - elif isinstance(feed, dict): - if "fetch" in feed: - del feed["fetch"] - fetch_map = client.predict(feed=feed, fetch=fetch) - result = self.postprocess( - feed=request_json, fetch=fetch, fetch_map=fetch_map) - self.output_queue.put(result) - except ValueError: - self.output_queue.put(-1) - - def _launch_web_service(self, gpu_num): - app_instance = Flask(__name__) - service_name = "/" + self.name + "/prediction" - - self.input_queues = [] - self.output_queue = Queue() - for i in range(gpu_num): - self.input_queues.append(Queue()) - - producer_list = [] - for i, input_q in enumerate(self.input_queues): - producer_processes = Process( - target=self.producers, - args=( - input_q, - "0.0.0.0:{}".format(self.port + 1 + i), )) - producer_list.append(producer_processes) - - for p in producer_list: - p.start() - - client = Client() - client.load_client_config("{}/serving_server_conf.prototxt".format( - self.model_config)) - client.connect(["0.0.0.0:{}".format(self.port + 1)]) - - self.idx = 0 - - @app_instance.route(service_name, methods=['POST']) - def get_prediction(): - if not request.json: - abort(400) - if "fetch" not in request.json: - abort(400) - - self.input_queues[self.idx].put(request.json) - - #self.input_queues[0].put(request.json) - self.idx += 1 - if self.idx >= len(self.gpus): - self.idx = 0 - result = self.output_queue.get() - if not isinstance(result, dict) and result == -1: - result = {"result": "Request Value Error"} - return result - - app_instance.run(host="0.0.0.0", - port=self.port, - threaded=False, - processes=1) - - for p in producer_list: - p.join() + endpoints = "" + if gpu_num > 0: + for i in range(gpu_num): + endpoints += "127.0.0.1:{},".format(self.port + i + 1) + else: + endpoints = "127.0.0.1:{}".format(self.port + 1) + self.client.connect([endpoints]) + + def get_prediction(self, request): + if not request.json: + abort(400) + if "fetch" not in request.json: + abort(400) + feed, fetch = self.preprocess(request.json, request.json["fetch"]) + fetch_map_batch = self.client.predict(feed=feed, fetch=fetch) + fetch_map_batch = self.postprocess( + feed=request.json, fetch=fetch, fetch_map=fetch_map_batch) + result = {"result": fetch_map_batch} + return result def run_server(self): import socket @@ -188,13 +123,6 @@ class WebService(object): for p in server_pros: p.start() - p_web = Process( - target=self._launch_web_service, args=(len(self.gpus), )) - p_web.start() - p_web.join() - for p in server_pros: - p.join() - def preprocess(self, feed={}, fetch=[]): return feed, fetch