未验证 提交 2fdb232c 编写于 作者: B Bin Long 提交者: GitHub

Merge pull request #402 from ShenYuhan/update_v2_0301

update serving v2
...@@ -27,6 +27,8 @@ from paddlehub.commands.base_command import BaseCommand, ENTRY ...@@ -27,6 +27,8 @@ from paddlehub.commands.base_command import BaseCommand, ENTRY
from paddlehub.serving import app_single as app from paddlehub.serving import app_single as app
from paddlehub.common.dir import CONF_HOME from paddlehub.common.dir import CONF_HOME
from paddlehub.common.hub_server import CacheUpdater from paddlehub.common.hub_server import CacheUpdater
from paddlehub.serving.model_service.base_model_service import cv_module_info
from paddlehub.serving.model_service.base_model_service import nlp_module_info
import multiprocessing import multiprocessing
import time import time
import signal import signal
...@@ -105,6 +107,11 @@ class ServingCommand(BaseCommand): ...@@ -105,6 +107,11 @@ class ServingCommand(BaseCommand):
self.parser.add_argument("--gpu", "-i", nargs="?", default=0) self.parser.add_argument("--gpu", "-i", nargs="?", default=0)
self.parser.add_argument( self.parser.add_argument(
"--use_singleprocess", action="store_true", default=False) "--use_singleprocess", action="store_true", default=False)
self.parser.add_argument(
"--modules_info", "-mi", default={}, type=json.loads)
self.parser.add_argument(
"--workers", "-w", nargs="?", default=number_of_workers())
self.modules_info = {}
def dump_pid_file(self): def dump_pid_file(self):
pid = os.getpid() pid = os.getpid()
...@@ -184,76 +191,59 @@ class ServingCommand(BaseCommand): ...@@ -184,76 +191,59 @@ class ServingCommand(BaseCommand):
except: except:
return False return False
@staticmethod def preinstall_modules(self):
def preinstall_modules(modules): for key, value in self.modules_info.items():
configs = [] init_args = value["init_args"]
module_exist = {} CacheUpdater(
if modules is not None: "hub_serving_start",
for module in modules: module=key,
module_name = module if "==" not in module else \ version=init_args.get("version", "0.0.0")).start()
module.split("==")[0]
module_version = None if "==" not in module else \ if "dir" not in init_args:
module.split("==")[1] init_args.update({"name": key})
if module_exist.get(module_name, "") != "": m = hub.Module(**init_args)
print(module_name, "==", module_exist.get(module_name), method_name = m.serving_func_name
" will be ignored cause new version is specified.") if method_name is None:
configs.pop() raise RuntimeError("{} cannot be use for "
module_exist.update({module_name: module_version}) "predicting".format(key))
try: exit(1)
CacheUpdater( category = str(m.type).split("/")[0].upper()
"hub_serving_start", self.modules_info[key].update({
module=module_name, "method_name": method_name,
version=module_version).start() "code_version": m.code_version,
m = hub.Module(name=module_name, version=module_version) "version": m.version,
method_name = m.desc.attr.map.data['default_signature'].s "category": category,
if method_name == "": "module": m,
raise RuntimeError("{} cannot be use for " "name": m.name
"predicting".format(module_name)) })
configs.append({
"module": module_name, def start_app_with_file(self):
"version": m.version, port = self.args.config.get("port", 8866)
"category": str(m.type).split("/")[0].upper()
})
except Exception as err:
print(err, ", start PaddleHub Serving unsuccessfully.")
exit(1)
return configs
def start_app_with_file(self, configs, workers):
port = configs.get("port", 8866)
if ServingCommand.is_port_occupied("127.0.0.1", port) is True: if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port) print("Port %s is occupied, please change it." % port)
return False return False
modules = configs.get("modules_info") self.modules_info = self.args.config.get("modules_info")
module = [str(i["module"]) + "==" + str(i["version"]) for i in modules] self.preinstall_modules()
module_info = ServingCommand.preinstall_modules(module)
for index in range(len(module_info)):
modules[index].update(module_info[index])
options = { options = {
"bind": "0.0.0.0:%s" % port, "bind": "0.0.0.0:%s" % port,
"workers": workers, "workers": self.args.workers,
"pid": "./pid.txt" "pid": "./pid.txt"
} }
configs["modules_info"] = modules
self.dump_pid_file() self.dump_pid_file()
StandaloneApplication( StandaloneApplication(
app.create_app(init_flag=False, configs=configs), options).run() app.create_app(init_flag=False, configs=self.modules_info),
options).run()
def start_single_app_with_file(self, configs): def start_single_app_with_file(self):
use_gpu = configs.get("use_gpu", False) port = self.args.config.get("port", 8866)
port = configs.get("port", 8866)
if ServingCommand.is_port_occupied("127.0.0.1", port) is True: if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port) print("Port %s is occupied, please change it." % port)
return False return False
configs = configs.get("modules_info") self.modules_info = self.args.config.get("modules_info")
module = [str(i["module"]) + "==" + str(i["version"]) for i in configs] self.preinstall_modules()
module_info = ServingCommand.preinstall_modules(module)
for index in range(len(module_info)):
configs[index].update(module_info[index])
self.dump_pid_file() self.dump_pid_file()
app.run(use_gpu, configs=configs, port=port) app.run(configs=self.modules_info, port=port)
@staticmethod @staticmethod
def start_multi_app_with_file(configs): def start_multi_app_with_file(configs):
...@@ -270,23 +260,15 @@ class ServingCommand(BaseCommand): ...@@ -270,23 +260,15 @@ class ServingCommand(BaseCommand):
def start_app_with_args(self, workers): def start_app_with_args(self, workers):
module = self.args.modules module = self.args.modules
if module is not None: if module is not None:
use_gpu = self.args.use_gpu
port = self.args.port port = self.args.port
if ServingCommand.is_port_occupied("127.0.0.1", port) is True: if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port) print("Port %s is occupied, please change it." % port)
return False return False
module_info = ServingCommand.preinstall_modules(module) self.preinstall_modules()
[
item.update({
"batch_size": 1,
"queue_size": 20
}) for item in module_info
]
options = {"bind": "0.0.0.0:%s" % port, "workers": workers} options = {"bind": "0.0.0.0:%s" % port, "workers": workers}
configs = {"use_gpu": use_gpu, "modules_info": module_info}
self.dump_pid_file() self.dump_pid_file()
StandaloneApplication( StandaloneApplication(
app.create_app(init_flag=False, configs=configs), app.create_app(init_flag=False, configs=self.modules_info),
options).run() options).run()
else: else:
print("Lack of necessary parameters!") print("Lack of necessary parameters!")
...@@ -294,41 +276,27 @@ class ServingCommand(BaseCommand): ...@@ -294,41 +276,27 @@ class ServingCommand(BaseCommand):
def start_single_app_with_args(self): def start_single_app_with_args(self):
module = self.args.modules module = self.args.modules
if module is not None: if module is not None:
use_gpu = self.args.use_gpu
port = self.args.port port = self.args.port
if ServingCommand.is_port_occupied("127.0.0.1", port) is True: if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port) print("Port %s is occupied, please change it." % port)
return False return False
module_info = ServingCommand.preinstall_modules(module) self.preinstall_modules()
[
item.update({
"batch_size": 1,
"queue_size": 20
}) for item in module_info
]
self.dump_pid_file() self.dump_pid_file()
app.run(use_gpu, configs=module_info, port=port) app.run(configs=self.modules_info, port=port)
else: else:
print("Lack of necessary parameters!") print("Lack of necessary parameters!")
def start_multi_app_with_args(self): def start_multi_app_with_args(self):
module = self.args.modules module = self.args.modules
if module is not None: if module is not None:
use_gpu = self.args.use_gpu
port = self.args.port port = self.args.port
workers = number_of_workers() workers = number_of_workers()
if ServingCommand.is_port_occupied("127.0.0.1", port) is True: if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port) print("Port %s is occupied, please change it." % port)
return False return False
module_info = ServingCommand.preinstall_modules(module) self.preinstall_modules()
[
item.update({
"batch_size": 1,
"queue_size": 20
}) for item in module_info
]
options = {"bind": "0.0.0.0:%s" % port, "workers": workers} options = {"bind": "0.0.0.0:%s" % port, "workers": workers}
configs = {"use_gpu": use_gpu, "modules_info": module_info} configs = {"modules_info": self.module_info}
StandaloneApplication( StandaloneApplication(
app.create_app(init_flag=False, configs=configs), app.create_app(init_flag=False, configs=configs),
options).run() options).run()
...@@ -336,31 +304,51 @@ class ServingCommand(BaseCommand): ...@@ -336,31 +304,51 @@ class ServingCommand(BaseCommand):
else: else:
print("Lack of necessary parameters!") print("Lack of necessary parameters!")
def link_module_info(self):
if self.args.config:
if os.path.exists(self.args.config):
with open(self.args.config, "r") as fp:
self.args.config = json.load(fp)
self.modules_info = self.args.config["modules_info"]
else:
raise RuntimeError("{} not exists.".format(self.args.config))
exit(1)
else:
for item in self.args.modules:
version = None
if "==" in item:
module = item.split("==")[0]
version = item.split("==")[1]
else:
module = item
self.modules_info.update({
module: {
"init_args": {
"version": version
},
"predict_args": {
"use_gpu": self.args.use_gpu
}
}
})
def start_serving(self): def start_serving(self):
config_file = self.args.config
single_mode = self.args.use_singleprocess single_mode = self.args.use_singleprocess
if config_file is not None: if self.args.config is not None:
if os.path.exists(config_file): self.args.workers = self.args.config.get("workers",
with open(config_file, "r") as fp:
configs = json.load(fp)
use_multiprocess = configs.get("use_multiprocess", False)
if single_mode is True:
ServingCommand.start_single_app_with_file(configs)
elif platform.system() == "Windows":
print(
"Warning: Windows cannot use multiprocess working "
"mode, PaddleHub Serving will switch to single process mode"
)
ServingCommand.start_single_app_with_file(configs)
else:
if use_multiprocess is True:
self.start_app_with_file(configs,
number_of_workers()) number_of_workers())
else: use_multiprocess = self.args.config.get("use_multiprocess", False)
self.start_app_with_file(configs, 1) if use_multiprocess is False:
self.start_single_app_with_file()
elif platform.system() == "Windows":
print(
"Warning: Windows cannot use multiprocess working "
"mode, PaddleHub Serving will switch to single process mode"
)
self.start_single_app_with_file()
else: else:
print("config_file ", config_file, "not exists.") self.start_app_with_file()
else: else:
if single_mode is True: if single_mode is True:
self.start_single_app_with_args() self.start_single_app_with_args()
...@@ -372,7 +360,7 @@ class ServingCommand(BaseCommand): ...@@ -372,7 +360,7 @@ class ServingCommand(BaseCommand):
self.start_single_app_with_args() self.start_single_app_with_args()
else: else:
if self.args.use_multiprocess is True: if self.args.use_multiprocess is True:
self.start_app_with_args(number_of_workers()) self.start_app_with_args(self.args.workers)
else: else:
self.start_app_with_args(1) self.start_app_with_args(1)
...@@ -393,10 +381,10 @@ class ServingCommand(BaseCommand): ...@@ -393,10 +381,10 @@ class ServingCommand(BaseCommand):
str += "\tPre-install modules via the parameter list.\n" str += "\tPre-install modules via the parameter list.\n"
str += "--port/-p XXXX\n" str += "--port/-p XXXX\n"
str += "\tUse port XXXX for serving.\n" str += "\tUse port XXXX for serving.\n"
str += "--use_gpu\n"
str += "\tUse gpu for predicting if you specify the parameter.\n"
str += "--use_multiprocess\n" str += "--use_multiprocess\n"
str += "\tChoose multoprocess mode, cannot be use on Windows.\n" str += "\tChoose multoprocess mode, cannot be use on Windows.\n"
str += "--modules_info\n"
str += "\tSet module config in PaddleHub Serving."
str += "--config/-c file_path\n" str += "--config/-c file_path\n"
str += "\tUse configs in file to start PaddleHub Serving. " str += "\tUse configs in file to start PaddleHub Serving. "
str += "Other parameters will be ignored if you specify the parameter.\n" str += "Other parameters will be ignored if you specify the parameter.\n"
...@@ -422,6 +410,7 @@ class ServingCommand(BaseCommand): ...@@ -422,6 +410,7 @@ class ServingCommand(BaseCommand):
except: except:
ServingCommand.show_help() ServingCommand.show_help()
return False return False
self.link_module_info()
if self.args.sub_command == "start": if self.args.sub_command == "start":
if self.args.bert_service == "bert_service": if self.args.bert_service == "bert_service":
ServingCommand.start_bert_serving(self.args) ServingCommand.start_bert_serving(self.args)
......
...@@ -65,10 +65,14 @@ def base64s_to_cvmats(base64s): ...@@ -65,10 +65,14 @@ def base64s_to_cvmats(base64s):
return base64s return base64s
def handle_mask_results(results): def handle_mask_results(results, data_len):
result = [] result = []
if len(results) <= 0: if len(results) <= 0 and data_len != 0:
return results return [{
"data": "No face.",
"id": i,
"path": ""
} for i in range(1, data_len + 1)]
_id = results[0]["id"] _id = results[0]["id"]
_item = { _item = {
"data": [], "data": [],
...@@ -87,6 +91,15 @@ def handle_mask_results(results): ...@@ -87,6 +91,15 @@ def handle_mask_results(results):
"id": item.get("id", _id) "id": item.get("id", _id)
} }
result.append(_item) result.append(_item)
for index in range(1, data_len + 1):
if index > len(result):
result.append({"data": "No face.", "id": index, "path": ""})
elif result[index - 1]["id"] != index:
result.insert(index - 1, {
"data": "No face.",
"id": index,
"path": ""
})
return result return result
......
...@@ -68,6 +68,19 @@ def runnable(func): ...@@ -68,6 +68,19 @@ def runnable(func):
return _wrapper return _wrapper
_module_serving_func = {}
def serving(func):
mod = func.__module__ + "." + inspect.stack()[1][3]
_module_serving_func[mod] = func.__name__
def _wrapper(*args, **kwargs):
return func(*args, **kwargs)
return _wrapper
def moduleinfo(name, version, author, author_email, summary, type): def moduleinfo(name, version, author, author_email, summary, type):
def _wrapper(cls): def _wrapper(cls):
if not issubclass(cls, Module): if not issubclass(cls, Module):
...@@ -135,6 +148,8 @@ class Module(object): ...@@ -135,6 +148,8 @@ class Module(object):
self._run_func = getattr(self, _run_func_name) self._run_func = getattr(self, _run_func_name)
else: else:
self._run_func = None self._run_func = None
self._serving_func_name = _module_serving_func.get(mod, None)
self._code_version = "v2"
self._directory = directory self._directory = directory
self._initialize(**kwargs) self._initialize(**kwargs)
self._is_initialize = True self._is_initialize = True
...@@ -221,6 +236,10 @@ class Module(object): ...@@ -221,6 +236,10 @@ class Module(object):
def is_runnable(self): def is_runnable(self):
return self._run_func != None return self._run_func != None
@property
def serving_func_name(self):
return self._serving_func_name
def _initialize(self): def _initialize(self):
pass pass
...@@ -303,6 +322,10 @@ class ModuleV1(Module): ...@@ -303,6 +322,10 @@ class ModuleV1(Module):
self._recover_variable_info(self.program) self._recover_variable_info(self.program)
@property @property
def serving_func_name(self):
serving_func_name = self.desc.attr.map.data['default_signature'].s
return serving_func_name if serving_func_name != "" else None
def desc(self): def desc(self):
return self._desc return self._desc
...@@ -553,6 +576,10 @@ class ModuleV1(Module): ...@@ -553,6 +576,10 @@ class ModuleV1(Module):
def is_runnable(self): def is_runnable(self):
return self.default_signature != None return self.default_signature != None
@property
def code_version(self):
return self._code_version
def context(self, def context(self,
sign_name=None, sign_name=None,
for_test=False, for_test=False,
......
此差异已折叠。
...@@ -16,6 +16,92 @@ import six ...@@ -16,6 +16,92 @@ import six
import abc import abc
class BaseModuleInfo(object):
def __init__(self):
self._modules_info = {}
self._modules = []
def set_modules_info(self, modules_info):
# dict of modules info.
self._modules_info = modules_info
# list of modules name.
self._modules = list(self._modules_info.keys())
def get_module_info(self, module_name):
return self._modules_info[module_name]
def add_module(self, module_name, module_info):
self._modules_info.update(module_info)
self._modules.append(module_name)
def get_module(self, module_name):
return self.get_module_info(module_name).get("module", None)
@property
def modules_info(self):
return self._modules_info
class CVModuleInfo(BaseModuleInfo):
def __init__(self):
self.cv_module_method = {
"vgg19_imagenet": "predict_classification",
"vgg16_imagenet": "predict_classification",
"vgg13_imagenet": "predict_classification",
"vgg11_imagenet": "predict_classification",
"shufflenet_v2_imagenet": "predict_classification",
"se_resnext50_32x4d_imagenet": "predict_classification",
"se_resnext101_32x4d_imagenet": "predict_classification",
"resnet_v2_50_imagenet": "predict_classification",
"resnet_v2_34_imagenet": "predict_classification",
"resnet_v2_18_imagenet": "predict_classification",
"resnet_v2_152_imagenet": "predict_classification",
"resnet_v2_101_imagenet": "predict_classification",
"pnasnet_imagenet": "predict_classification",
"nasnet_imagenet": "predict_classification",
"mobilenet_v2_imagenet": "predict_classification",
"googlenet_imagenet": "predict_classification",
"alexnet_imagenet": "predict_classification",
"yolov3_coco2017": "predict_object_detection",
"ultra_light_fast_generic_face_detector_1mb_640":
"predict_object_detection",
"ultra_light_fast_generic_face_detector_1mb_320":
"predict_object_detection",
"ssd_mobilenet_v1_pascal": "predict_object_detection",
"pyramidbox_face_detection": "predict_object_detection",
"faster_rcnn_coco2017": "predict_object_detection",
"cyclegan_cityscapes": "predict_gan",
"deeplabv3p_xception65_humanseg": "predict_semantic_segmentation",
"ace2p": "predict_semantic_segmentation",
"pyramidbox_lite_server_mask": "predict_mask",
"pyramidbox_lite_mobile_mask": "predict_mask"
}
super(CVModuleInfo, self).__init__()
@property
def cv_modules(self):
return self._modules
def add_module(self, module_name, module_info):
if "CV" == module_info[module_name].get("category", ""):
self._modules_info.update(module_info)
self._modules.append(module_name)
class NLPModuleInfo(BaseModuleInfo):
def __init__(self):
super(NLPModuleInfo, self).__init__()
@property
def nlp_modules(self):
return self._modules
def add_module(self, module_name, module_info):
if "NLP" == module_info[module_name].get("category", ""):
self._modules_info.update(module_info)
self._modules.append(module_name)
class BaseModelService(object): class BaseModelService(object):
def _initialize(self): def _initialize(self):
pass pass
...@@ -31,3 +117,7 @@ class BaseModelService(object): ...@@ -31,3 +117,7 @@ class BaseModelService(object):
@abc.abstractmethod @abc.abstractmethod
def _post_processing(self, data): def _post_processing(self, data):
pass pass
cv_module_info = CVModuleInfo()
nlp_module_info = NLPModuleInfo()
{ {
"modules_info": [ "modules_info": {
{ "yolov3_darknet53_coco2017": {
"module": "lac", "init_args": {
"version": "1.0.0", "version": "1.0.0"
"batch_size": 200 },
"predict_args": {
"batch_size": 1,
"use_gpu": false
}
},
"lac": {
"init_args": {
"version": "2.1.0",
"user_dict": "./dict.txt"
},
"predict_args": {
"batch_size": 1,
"use_gpu": false
}
}
}, },
{
"module": "senta_lstm",
"version": "1.0.0",
"batch_size": 1
},
{
"module": "yolov3_darknet53_coco2017",
"version": "1.0.0",
"batch_size": 1
},
{
"module": "faster_rcnn_coco2017",
"version": "1.0.0",
"batch_size": 1
}
],
"use_gpu": false,
"port": 8866, "port": 8866,
"use_multiprocess": true, "use_multiprocess": false,
"workers": 3 "workers": 2
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册