提交 ad857bb4 编写于 作者: T TeslaZhao 提交者: felixhjh

Merge pull request #1455 from felixhjh/develop

Add new feature PaddleServing and Pipeline stop command
上级 cf3a762f
# coding:utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
This module is used to store environmental variables in PaddleServing.
SERVING_HOME --> the root directory for storing Paddleserving related data. Default to the current directory of starting PaddleServing . Users can
change the default value through the SERVING_HOME environment variable.
CONF_HOME --> Store the default configuration files.
'''
import os
import sys
def _get_user_home():
return os.path.expanduser(os.getcwd())
def _get_serving_home():
if 'SERVING_HOME' in os.environ:
home_path = os.environ['SERVING_HOME']
if os.path.exists(home_path):
if os.path.isdir(home_path):
return home_path
else:
raise RuntimeError('The environment variable SERVING_HOME {} is not a directory.'.format(home_path))
else:
return home_path
return os.path.join(_get_user_home())
def _get_sub_home(directory):
home = os.path.join(_get_serving_home(), directory)
if not os.path.exists(home):
os.makedirs(home)
return home
SERVING_HOME = _get_serving_home()
CONF_HOME = _get_sub_home("")
...@@ -31,6 +31,9 @@ elif sys.version_info.major == 3: ...@@ -31,6 +31,9 @@ elif sys.version_info.major == 3:
from contextlib import closing from contextlib import closing
import socket import socket
from paddle_serving_server.env import CONF_HOME
import signal
from paddle_serving_server.util import *
# web_service.py is still used by Pipeline. # web_service.py is still used by Pipeline.
...@@ -106,6 +109,7 @@ def is_gpu_mode(unformatted_gpus): ...@@ -106,6 +109,7 @@ def is_gpu_mode(unformatted_gpus):
def serve_args(): def serve_args():
parser = argparse.ArgumentParser("serve") parser = argparse.ArgumentParser("serve")
parser.add_argument("server", type=str, default="start",nargs="?", help="stop or start PaddleServing")
parser.add_argument( parser.add_argument(
"--thread", "--thread",
type=int, type=int,
...@@ -366,17 +370,83 @@ class MainService(BaseHTTPRequestHandler): ...@@ -366,17 +370,83 @@ class MainService(BaseHTTPRequestHandler):
self.wfile.write(json.dumps(response).encode()) self.wfile.write(json.dumps(response).encode())
def stop_serving(command : str, port : int = None):
'''
Stop PaddleServing by port.
Args:
command(str): stop->SIGINT, kill->SIGKILL
port(int): Default to None, kill all processes in ProcessInfo.json.
Not None, kill the specific process relating to port
Returns:
True if stop serving successfully.
False if error occured
Examples:
.. code-block:: python
stop_serving("stop", 9494)
'''
filepath = os.path.join(CONF_HOME, "ProcessInfo.json")
infoList = load_pid_file(filepath)
if infoList is False:
return False
lastInfo = infoList[-1]
for info in infoList:
storedPort = info["port"]
pid = info["pid"]
model = info["model"]
start_time = info["start_time"]
if port is not None:
if port in storedPort:
kill_stop_process_by_pid(command ,pid)
infoList.remove(info)
if len(infoList):
with open(filepath, "w") as fp:
json.dump(infoList, fp)
else:
os.remove(filepath)
return True
else:
if lastInfo == info:
raise ValueError(
"Please confirm the port [%s] you specified is correct." %
port)
else:
pass
else:
kill_stop_process_by_pid(command ,pid)
if lastInfo == info:
os.remove(filepath)
return True
if __name__ == "__main__": if __name__ == "__main__":
# args.device is not used at all. # args.device is not used at all.
# just keep the interface. # just keep the interface.
# so --device should not be recommended at the HomePage. # so --device should not be recommended at the HomePage.
args = serve_args() args = serve_args()
if args.server == "stop" or args.server == "kill":
result = 0
if "--port" in sys.argv:
result = stop_serving(args.server, args.port)
else:
result = stop_serving(args.server)
if result == 0:
os._exit(0)
else:
os._exit(-1)
for single_model_config in args.model: for single_model_config in args.model:
if os.path.isdir(single_model_config): if os.path.isdir(single_model_config):
pass pass
elif os.path.isfile(single_model_config): elif os.path.isfile(single_model_config):
raise ValueError("The input of --model should be a dir not file.") raise ValueError("The input of --model should be a dir not file.")
if port_is_available(args.port):
portList = [args.port]
dump_pid_file(portList, args.model)
if args.use_encryption_model: if args.use_encryption_model:
p_flag = False p_flag = False
p = None p = None
......
# coding:utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import signal
import os
import time
import json
from paddle_serving_server.env import CONF_HOME
def pid_is_exist(pid: int):
'''
Try to kill process by PID.
Args:
pid(int): PID of process to be killed.
Returns:
True if PID will be killed.
Examples:
.. code-block:: python
pid_is_exist(pid=8866)
'''
try:
os.kill(pid, 0)
except:
return False
else:
return True
def kill_stop_process_by_pid(command : str, pid : int):
'''
using different signals to kill process group by PID .
Args:
command(str): stop->SIGINT, kill->SIGKILL
pid(int): PID of process to be killed.
Returns:
None
Examples:
.. code-block:: python
kill_stop_process_by_pid("stop", 9494)
'''
if not pid_is_exist(pid):
print("Process [%s] has been stopped."%pid)
return
try:
if command == "stop":
os.killpg(pid, signal.SIGINT)
elif command == "kill":
os.killpg(pid, signal.SIGKILL)
except ProcessLookupError:
if command == "stop":
os.kill(pid, signal.SIGINT)
elif command == "kill":
os.kill(pid, signal.SIGKILL)
def dump_pid_file(portList, model):
'''
Write PID info to file.
Args:
portList(List): PiplineServing includes http_port and rpc_port
PaddleServing include one port
model(str): 'Pipline' for PiplineServing
Specific model list for ServingModel
Returns:
None
Examples:
.. code-block:: python
dump_pid_file([9494, 10082], 'serve')
'''
pid = os.getpid()
pidInfoList = []
filepath = os.path.join(CONF_HOME, "ProcessInfo.json")
if os.path.exists(filepath):
if os.path.getsize(filepath):
with open(filepath, "r") as fp:
pidInfoList = json.load(fp)
# delete old pid data when new port number is same as old's
for info in pidInfoList:
storedPort = list(info["port"])
interList = list(set(portList)&set(storedPort))
if interList:
pidInfoList.remove(info)
with open(filepath, "w") as fp:
info ={"pid": pid, "port" : portList, "model" : str(model), "start_time" : time.time()}
pidInfoList.append(info)
json.dump(pidInfoList, fp)
def load_pid_file(filepath: str):
'''
Read PID info from file.
'''
if not os.path.exists(filepath):
raise ValueError(
"ProcessInfo.json file is not exists, All processes of PaddleServing has been stopped.")
return False
if os.path.getsize(filepath):
with open(filepath, "r") as fp:
infoList = json.load(fp)
return infoList
else:
os.remove(filepath)
print("ProcessInfo.json file is empty, All processes of PaddleServing has been stopped.")
return False
...@@ -23,12 +23,15 @@ import multiprocessing ...@@ -23,12 +23,15 @@ import multiprocessing
import yaml import yaml
import io import io
import time import time
import os
from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2 from .proto import pipeline_service_pb2_grpc, pipeline_service_pb2
from . import operator from . import operator
from . import dag from . import dag
from . import util from . import util
from . import channel from . import channel
from paddle_serving_server.env import CONF_HOME
from paddle_serving_server.util import dump_pid_file
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
...@@ -78,7 +81,6 @@ def _reserve_port(port): ...@@ -78,7 +81,6 @@ def _reserve_port(port):
finally: finally:
sock.close() sock.close()
class PipelineServer(object): class PipelineServer(object):
""" """
Pipeline Server : grpc gateway + grpc server. Pipeline Server : grpc gateway + grpc server.
...@@ -198,7 +200,14 @@ class PipelineServer(object): ...@@ -198,7 +200,14 @@ class PipelineServer(object):
self._http_port): self._http_port):
raise SystemExit("Failed to prepare_server: http_port({}) " raise SystemExit("Failed to prepare_server: http_port({}) "
"is already used".format(self._http_port)) "is already used".format(self._http_port))
# write the port info into ProcessInfo.json
portList = []
if self._http_port is not None:
portList.append(self._rpc_port)
if self._rpc_port is not None:
portList.append(self._http_port)
if len(portList):
dump_pid_file(portList, "pipline")
self._worker_num = conf["worker_num"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"] self._build_dag_each_worker = conf["build_dag_each_worker"]
self._init_ops(conf["op"]) self._init_ops(conf["op"])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册