未验证 提交 853c1e5b 编写于 作者: B Bin Long 提交者: GitHub

Merge pull request #284 from ShenYuhan/fix_serving_multiprocessing_bug

Fix serving multiprocessing bug
......@@ -24,6 +24,32 @@ import socket
import json
import paddlehub as hub
from paddlehub.commands.base_command import BaseCommand, ENTRY
from paddlehub.serving import app_single as app
import multiprocessing
import gunicorn.app.base
def number_of_workers():
return (multiprocessing.cpu_count() * 2) + 1
class StandaloneApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super(StandaloneApplication, self).__init__()
def load_config(self):
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
class ServingCommand(BaseCommand):
......@@ -110,8 +136,77 @@ class ServingCommand(BaseCommand):
return configs
@staticmethod
def start_serving(args):
config_file = args.config
def start_single_app_with_file(configs):
use_gpu = configs.get("use_gpu", False)
port = configs.get("port", 8866)
if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port)
return False
configs = configs.get("modules_info")
module = [str(i["module"]) + "==" + str(i["version"]) for i in configs]
module_info = ServingCommand.preinstall_modules(module)
for index in range(len(module_info)):
configs[index].update(module_info[index])
app.run(use_gpu, configs=configs, port=port)
@staticmethod
def start_multi_app_with_file(configs):
port = configs.get("port", 8866)
if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port)
return False
workers = configs.get("workers", number_of_workers())
options = {"bind": "0.0.0.0:%s" % port, "workers": workers}
StandaloneApplication(
app.create_app(init_flag=False, configs=configs), options).run()
print("PaddleHub-Serving has been stopped.")
def start_single_app_with_args(self):
module = self.args.modules
if module is not None:
use_gpu = self.args.use_gpu
port = self.args.port
if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port)
return False
module_info = ServingCommand.preinstall_modules(module)
[
item.update({
"batch_size": 1,
"queue_size": 20
}) for item in module_info
]
app.run(use_gpu, configs=module_info, port=port)
else:
print("Lack of necessary parameters!")
def start_multi_app_with_args(self):
module = self.args.modules
if module is not None:
use_gpu = self.args.use_gpu
port = self.args.port
workers = number_of_workers()
if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % port)
return False
module_info = ServingCommand.preinstall_modules(module)
[
item.update({
"batch_size": 1,
"queue_size": 20
}) for item in module_info
]
options = {"bind": "0.0.0.0:%s" % port, "workers": workers}
configs = {"use_gpu": use_gpu, "modules_info": module_info}
StandaloneApplication(
app.create_app(init_flag=False, configs=configs),
options).run()
print("PaddleHub-Serving has been stopped.")
else:
print("Lack of necessary parameters!")
def start_serving(self):
config_file = self.args.config
if config_file is not None:
if os.path.exists(config_file):
with open(config_file, "r") as fp:
......@@ -123,56 +218,24 @@ class ServingCommand(BaseCommand):
"Warning: Windows cannot use multiprocess working "
"mode, Hub-Serving will switch to single process mode"
)
from paddlehub.serving import app_single as app
ServingCommand.start_single_app_with_file(configs)
else:
from paddlehub.serving import app
ServingCommand.start_multi_app_with_file(configs)
else:
from paddlehub.serving import app_single as app
use_gpu = configs.get("use_gpu", False)
port = configs.get("port", 8866)
if ServingCommand.is_port_occupied("127.0.0.1",
port) is True:
print("Port %s is occupied, please change it." % port)
return False
configs = configs.get("modules_info")
module = [
str(i["module"]) + "==" + str(i["version"])
for i in configs
]
module_info = ServingCommand.preinstall_modules(module)
for index in range(len(module_info)):
configs[index].update(module_info[index])
app.run(use_gpu, configs=configs, port=port)
ServingCommand.start_single_app_with_file(configs)
else:
print("config_file ", config_file, "not exists.")
else:
if args.use_multiprocess is True:
if self.args.use_multiprocess is True:
if platform.system() == "Windows":
print(
"Warning: Windows cannot use multiprocess working "
"mode, Hub-Serving will switch to single process mode")
from paddlehub.serving import app_single as app
self.start_single_app_with_args()
else:
from paddlehub.serving import app
else:
from paddlehub.serving import app_single as app
module = args.modules
if module is not None:
use_gpu = args.use_gpu
port = args.port
if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
print("Port %s is occupied, please change it." % (port))
return False
module_info = ServingCommand.preinstall_modules(module)
[
item.update({
"batch_size": 1,
"queue_size": 20
}) for item in module_info
]
app.run(use_gpu, configs=module_info, port=port)
self.start_multi_app_with_args()
else:
print("Lack of necessary parameters!")
self.start_single_app_with_args()
@staticmethod
def show_help():
......@@ -199,15 +262,15 @@ class ServingCommand(BaseCommand):
def execute(self, argv):
try:
args = self.parser.parse_args()
self.args = self.parser.parse_args()
except:
ServingCommand.show_help()
return False
if args.sub_command == "start":
if args.bert_service == "bert_service":
ServingCommand.start_bert_serving(args)
elif args.bert_service is None:
ServingCommand.start_serving(args)
if self.args.sub_command == "start":
if self.args.bert_service == "bert_service":
ServingCommand.start_bert_serving(self.args)
elif self.args.bert_service is None:
self.start_serving()
else:
ServingCommand.show_help()
else:
......
......@@ -206,7 +206,15 @@ def predict_semantic_segmentation(module, input_img, id, batch_size, extra={}):
return results_pack
def create_app():
def create_app(init_flag=False, configs=None):
if init_flag is False:
if configs is None:
raise RuntimeError("Lack of necessary configs.")
global use_gpu, time_out
time_out = 60
use_gpu = configs.get("use_gpu", False)
config_with_file(configs.get("modules_info", []))
app_instance = Flask(__name__)
app_instance.config["JSON_AS_ASCII"] = False
gunicorn_logger = logging.getLogger('gunicorn.error')
......@@ -220,7 +228,6 @@ def create_app():
@app_instance.before_request
def before_request():
request.data = {"id": utils.md5(request.remote_addr + str(time.time()))}
pass
@app_instance.route("/get/modules", methods=["GET", "POST"])
def get_modules_info():
......@@ -330,8 +337,8 @@ def run(is_use_gpu=False, configs=None, port=8866, timeout=60):
else:
print("Start failed cause of missing configuration.")
return
my_app = create_app()
my_app.run(host="0.0.0.0", port=port, debug=False)
my_app = create_app(init_flag=True)
my_app.run(host="0.0.0.0", port=port, debug=False, threaded=False)
print("PaddleHub-Serving has been stopped.")
......
......@@ -7,12 +7,12 @@ configuration for gunicorn
import multiprocessing
bind = '0.0.0.0:8888'
backlog = 2048
workers = multiprocessing.cpu_count()
threads = 2
workers = multiprocessing.cpu_count() * 2 + 1
threads = 1
worker_class = 'sync'
worker_connections = 1000
timeout = 30
keepalive = 2
timeout = 500
keepalive = 40
daemon = False
loglevel = 'info'
errorlog = '-'
......
......@@ -4,28 +4,33 @@
"module": "lac",
"version": "1.0.0",
"batch_size": 200,
"queue_size": 200
"queue_size": 200,
"category": "NLP"
},
{
"module": "senta_lstm",
"version": "1.0.0",
"batch_size": 1,
"queue_size": 200
"queue_size": 200,
"category": "NLP"
},
{
"module": "yolov3_coco2017",
"module": "yolov3_darknet53_coco2017",
"version": "1.0.0",
"batch_size": 1,
"queue_size": 10
"queue_size": 10,
"category": "CV"
},
{
"module": "faster_rcnn_coco2017",
"version": "1.0.0",
"batch_size": 1,
"queue_size": 10
"queue_size": 10,
"category": "CV"
}
],
"use_gpu": false,
"port": 8866,
"use_multiprocess": false
"use_multiprocess": true,
"workers": 3
}
......@@ -23,3 +23,6 @@ numpy < 1.17.0 ; python_version < "3"
# pandas no longer support python2 in version 0.25 and above
pandas ; python_version >= "3"
pandas < 0.25.0 ; python_version < "3"
# gunicorn not support windows
gunicorn >= 19.10.0; sys_platform == "darwin" or sys_platform == "linux"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册