# coding:utf-8 # Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License" # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import os import platform import json import multiprocessing import time import signal import paddlehub as hub from paddlehub.commands import register from paddlehub.serving import app_compat as app from paddlehub.env import CONF_HOME from paddlehub.serving.http_server import run_all, StandaloneApplication from paddlehub.utils import log from paddlehub.utils.utils import is_port_occupied from paddlehub.server.server import CacheUpdater def number_of_workers(): ''' Get suitable quantity of workers based on empirical formula. ''' return (multiprocessing.cpu_count() * 2) + 1 def pid_is_exist(pid: int): ''' Try to kill process by PID. Args: pid(int): PID of process to be killed. Returns: True if PID will be killed. Examples: .. code-block:: python pid_is_exist(pid=8866) ''' try: os.kill(pid, 0) except: return False else: return True @register(name='hub.serving', description='Start Module Serving or Bert Service for online predicting.') class ServingCommand: name = "serving" module_list = [] def dump_pid_file(self): ''' Write PID info to file. ''' pid = os.getpid() filepath = os.path.join(CONF_HOME, "serving_" + str(self.args.port) + ".json") if os.path.exists(filepath): os.remove(filepath) with open(filepath, "w") as fp: info = {"pid": pid, "module": self.args.modules, "start_time": time.time()} json.dump(info, fp) @staticmethod def load_pid_file(filepath: str, port: int = None): ''' Read PID info from file. ''' if port is None: port = os.path.basename(filepath).split(".")[0].split("_")[1] if not os.path.exists(filepath): log.logger.error( "PaddleHub Serving config file is not exists, please confirm the port [%s] you specified is correct." % port) return False with open(filepath, "r") as fp: info = json.load(fp) return info def stop_serving(self, port: int): ''' Stop PaddleHub-Serving by port. ''' filepath = os.path.join(CONF_HOME, "serving_" + str(port) + ".json") info = self.load_pid_file(filepath, port) if info is False: return pid = info["pid"] module = info["module"] start_time = info["start_time"] CacheUpdater("hub_serving_stop", module=module, addition={"period_time": time.time() - start_time}).start() if os.path.exists(filepath): os.remove(filepath) if not pid_is_exist(pid): log.logger.info("PaddleHub Serving has been stopped.") return log.logger.info("PaddleHub Serving will stop.") if platform.system() == "Windows": os.kill(pid, signal.SIGTERM) else: try: os.killpg(pid, signal.SIGTERM) except ProcessLookupError: os.kill(pid, signal.SIGTERM) @staticmethod def start_bert_serving(args): ''' Start bert serving server. ''' if platform.system() != "Linux": log.logger.error("Error. Bert Service only support linux.") return False if is_port_occupied("127.0.0.1", args.port) is True: log.logger.error("Port %s is occupied, please change it." % args.port) return False from paddle_gpu_serving.run import BertServer bs = BertServer(with_gpu=args.use_gpu) bs.with_model(model_name=args.modules[0]) CacheUpdater("hub_bert_service", module=args.modules[0], version="0.0.0").start() bs.run(gpu_index=args.gpu, port=int(args.port)) def preinstall_modules(self): ''' Install module by PaddleHub and get info of this module. ''' for key, value in self.modules_info.items(): init_args = value["init_args"] CacheUpdater("hub_serving_start", module=key, version=init_args.get("version", "0.0.0")).start() if "directory" not in init_args: init_args.update({"name": key}) m = hub.Module(**init_args) method_name = m.serving_func_name if method_name is None: raise RuntimeError("{} cannot be use for " "predicting".format(key)) exit(1) serving_method = getattr(m, method_name) category = str(m.type).split("/")[0].upper() self.modules_info[key].update({ "method_name": method_name, "version": m.version, "category": category, "module": m, "name": m.name, "serving_method": serving_method }) def start_app_with_args(self): ''' Start one PaddleHub-Serving instance by arguments with gunicorn. ''' module = self.modules_info if module is not None: port = self.args.port if is_port_occupied("127.0.0.1", port) is True: log.logger.error("Port %s is occupied, please change it." % port) return False self.preinstall_modules() options = {"bind": "0.0.0.0:%s" % port, "workers": self.args.workers} self.dump_pid_file() StandaloneApplication(app.create_app(init_flag=False, configs=self.modules_info), options).run() else: log.logger.error("Lack of necessary parameters!") def start_zmq_serving_with_args(self): ''' Start one PaddleHub-Serving instance by arguments with zmq. ''' if self.modules_info is not None: for module, info in self.modules_info.items(): CacheUpdater("hub_serving_start", module=module, version=info['init_args']['version']).start() front_port = self.args.port if is_port_occupied("127.0.0.1", front_port) is True: log.logger.error("Port %s is occupied, please change it." % front_port) return False back_port = int(front_port) + 1 for index in range(100): if not is_port_occupied("127.0.0.1", back_port): break else: back_port = int(back_port) + 1 else: raise RuntimeError( "Port from %s to %s is occupied, please use another port" % (int(front_port) + 1, back_port)) self.dump_pid_file() run_all(self.modules_info, self.args.gpu, front_port, back_port) else: log.logger.error("Lack of necessary parameters!") def start_single_app_with_args(self): ''' Start one PaddleHub-Serving instance by arguments with flask. ''' module = self.modules_info if module is not None: port = self.args.port if is_port_occupied("127.0.0.1", port) is True: log.logger.error("Port %s is occupied, please change it." % port) return False self.preinstall_modules() self.dump_pid_file() app.run(configs=self.modules_info, port=port) else: log.logger.error("Lack of necessary parameters!") def start_serving(self): ''' Start PaddleHub-Serving with flask and gunicorn ''' if self.args.use_gpu: if self.args.use_multiprocess: log.logger.warning('`use_multiprocess` will be ignored if specify `use_gpu`.') self.start_zmq_serving_with_args() else: if self.args.use_multiprocess: if platform.system() == "Windows": log.logger.warning( "Warning: Windows cannot use multiprocess working mode, PaddleHub Serving will switch to single process mode" ) self.start_single_app_with_args() else: self.start_app_with_args() else: self.start_single_app_with_args() @staticmethod def show_help(): str = "serving