提交 da09478a 编写于 作者: T TeslaZhao

support multiple devices(cpu/gpu/trt/xpu/arm) in pipeline mode

上级 7a8ee364
...@@ -20,6 +20,9 @@ op: ...@@ -20,6 +20,9 @@ op:
#uci模型路径 #uci模型路径
model_config: ResNet50_vd_model model_config: ResNet50_vd_model
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 1
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 #计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1" devices: "0" # "0,1"
......
...@@ -20,7 +20,10 @@ op: ...@@ -20,7 +20,10 @@ op:
#uci模型路径 #uci模型路径
model_config: uci_housing_model model_config: uci_housing_model
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 #计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1" devices: "" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测 #client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
......
...@@ -38,14 +38,12 @@ class LocalServiceHandler(object): ...@@ -38,14 +38,12 @@ class LocalServiceHandler(object):
client_type='local_predictor', client_type='local_predictor',
workdir="", workdir="",
thread_num=2, thread_num=2,
device_type=-1,
devices="", devices="",
fetch_names=None, fetch_names=None,
mem_optim=True, mem_optim=True,
ir_optim=False, ir_optim=False,
available_port_generator=None, available_port_generator=None,
use_trt=False,
use_lite=False,
use_xpu=False,
use_profile=False): use_profile=False):
""" """
Initialization of localservicehandler Initialization of localservicehandler
...@@ -55,15 +53,14 @@ class LocalServiceHandler(object): ...@@ -55,15 +53,14 @@ class LocalServiceHandler(object):
client_type: brpc, grpc and local_predictor[default] client_type: brpc, grpc and local_predictor[default]
workdir: work directory workdir: work directory
thread_num: number of threads, concurrent quantity. thread_num: number of threads, concurrent quantity.
device_type: support multiple devices. -1=Not set, determined by
`devices`. 0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
devices: gpu id list[gpu], "" default[cpu] devices: gpu id list[gpu], "" default[cpu]
fetch_names: get fetch names out of LocalServiceHandler in fetch_names: get fetch names out of LocalServiceHandler in
local_predictor mode. fetch_names_ is compatible for Client(). local_predictor mode. fetch_names_ is compatible for Client().
mem_optim: use memory/graphics memory optimization, True default. mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default. ir_optim: use calculation chart optimization, False default.
available_port_generator: generate available ports available_port_generator: generate available ports
use_trt: use nvidia tensorRt engine, False default.
use_lite: use Paddle-Lite engine, False default.
use_xpu: run predict on Baidu Kunlun, False default.
use_profile: use profiling, False default. use_profile: use profiling, False default.
Returns: Returns:
...@@ -74,28 +71,61 @@ class LocalServiceHandler(object): ...@@ -74,28 +71,61 @@ class LocalServiceHandler(object):
self._model_config = model_config self._model_config = model_config
self._port_list = [] self._port_list = []
self._device_type = "cpu" self._device_name = "cpu"
self._use_gpu = False
self._use_trt = False
self._use_lite = False
self._use_xpu = False
if device_type == -1:
# device_type is not set, determined by `devices`,
if devices == "": if devices == "":
# cpu # CPU
self._device_name = "cpu"
devices = [-1] devices = [-1]
if use_lite:
self._device_type = "arm"
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in arm device. Port({})"
.format(model_config, self._port_list))
else:
self._device_type = "cpu"
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in cpu device. Port({})"
.format(model_config, self._port_list))
else: else:
# gpu # GPU
self._device_type = "gpu" self._device_name = "gpu"
self._use_gpu = True
devices = [int(x) for x in devices.split(",")]
elif device_type == 0:
# CPU
self._device_name = "cpu"
devices = [-1]
elif device_type == 1:
# GPU
self._device_name = "gpu"
self._use_gpu = True
devices = [int(x) for x in devices.split(",")]
elif device_type == 2:
# Nvidia Tensor RT
self._device_name = "gpu"
self._use_gpu = True
devices = [int(x) for x in devices.split(",")] devices = [int(x) for x in devices.split(",")]
self._use_trt = True
elif device_type == 3:
# ARM CPU
self._device_name = "arm"
devices = [-1]
self._use_lite = True
elif device_type == 4:
# Kunlun XPU
self._device_name = "arm"
devices = [int(x) for x in devices.split(",")]
self._use_lite = True
self._use_xpu = True
else:
_LOGGER.error(
"LocalServiceHandler initialization fail. device_type={}"
.format(device_type))
if client_type == "brpc" or client_type == "grpc":
for _ in devices: for _ in devices:
self._port_list.append(available_port_generator.next()) self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})" _LOGGER.info("Create ports for devices:{}. Port:{}"
.format(model_config, devices, self._port_list)) .format(devices, self._port_list))
self._client_type = client_type self._client_type = client_type
self._workdir = workdir self._workdir = workdir
self._devices = devices self._devices = devices
...@@ -105,14 +135,21 @@ class LocalServiceHandler(object): ...@@ -105,14 +135,21 @@ class LocalServiceHandler(object):
self._local_predictor_client = None self._local_predictor_client = None
self._rpc_service_list = [] self._rpc_service_list = []
self._server_pros = [] self._server_pros = []
self._use_trt = use_trt
self._use_lite = use_lite
self._use_xpu = use_xpu
self._use_profile = use_profile self._use_profile = use_profile
self.fetch_names_ = fetch_names self._fetch_names = fetch_names
_LOGGER.info(
"Models({}) will be launched by device {}. use_gpu:{}, "
"use_trt:{}, use_lite:{}, use_xpu:{}, device_type:{}, devices:{}, "
"mem_optim:{}, ir_optim:{}, use_profile:{}, thread_num:{}, "
"client_type:{}, fetch_names:{}".format(
model_config, self._device_name, self._use_gpu, self._use_trt,
self._use_lite, self._use_xpu, device_type, self._devices,
self._mem_optim, self._ir_optim, self._use_profile,
self._thread_num, self._client_type, self._fetch_names))
def get_fetch_list(self): def get_fetch_list(self):
return self.fetch_names_ return self._fetch_names
def get_port_list(self): def get_port_list(self):
return self._port_list return self._port_list
...@@ -149,22 +186,17 @@ class LocalServiceHandler(object): ...@@ -149,22 +186,17 @@ class LocalServiceHandler(object):
from paddle_serving_app.local_predict import LocalPredictor from paddle_serving_app.local_predict import LocalPredictor
if self._local_predictor_client is None: if self._local_predictor_client is None:
self._local_predictor_client = LocalPredictor() self._local_predictor_client = LocalPredictor()
use_gpu = False
use_lite = False
if self._device_type == "gpu":
use_gpu = True
elif self._device_type == "arm":
use_lite = True
self._local_predictor_client.load_model_config( self._local_predictor_client.load_model_config(
model_path=self._model_config, model_path=self._model_config,
use_gpu=use_gpu, use_gpu=self._use_gpu,
gpu_id=self._devices[concurrency_idx], gpu_id=self._devices[concurrency_idx],
use_profile=self._use_profile, use_profile=self._use_profile,
thread_num=self._thread_num, thread_num=self._thread_num,
mem_optim=self._mem_optim, mem_optim=self._mem_optim,
ir_optim=self._ir_optim, ir_optim=self._ir_optim,
use_trt=self._use_trt, use_trt=self._use_trt,
use_lite=use_lite, use_lite=self._use_lite,
use_xpu=self._use_xpu) use_xpu=self._use_xpu)
return self._local_predictor_client return self._local_predictor_client
...@@ -174,7 +206,7 @@ class LocalServiceHandler(object): ...@@ -174,7 +206,7 @@ class LocalServiceHandler(object):
def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim, def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
ir_optim): ir_optim):
""" """
According to _device_type, generating one CpuServer or GpuServer, and According to self._device_name, generating one Cpu/Gpu/Arm Server, and
setting the model config amd startup params. setting the model config amd startup params.
Args: Args:
...@@ -188,7 +220,7 @@ class LocalServiceHandler(object): ...@@ -188,7 +220,7 @@ class LocalServiceHandler(object):
Returns: Returns:
server: CpuServer/GpuServer server: CpuServer/GpuServer
""" """
if self._device_type == "cpu": if self._device_name == "cpu":
from paddle_serving_server import OpMaker, OpSeqMaker, Server from paddle_serving_server import OpMaker, OpSeqMaker, Server
op_maker = OpMaker() op_maker = OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
...@@ -225,9 +257,9 @@ class LocalServiceHandler(object): ...@@ -225,9 +257,9 @@ class LocalServiceHandler(object):
server.load_model_config(self._model_config) server.load_model_config(self._model_config)
server.prepare_server( server.prepare_server(
workdir=workdir, port=port, device=self._device_type) workdir=workdir, port=port, device=self._device_name)
if self.fetch_names_ is None: if self._fetch_names is None:
self.fetch_names_ = server.get_fetch_list() self._fetch_names = server.get_fetch_list()
return server return server
def _start_one_server(self, service_idx): def _start_one_server(self, service_idx):
...@@ -264,7 +296,7 @@ class LocalServiceHandler(object): ...@@ -264,7 +296,7 @@ class LocalServiceHandler(object):
""" """
Start multiple processes and start one server in each process Start multiple processes and start one server in each process
""" """
for i, service in enumerate(self._rpc_service_list): for i, _ in enumerate(self._rpc_service_list):
p = multiprocessing.Process( p = multiprocessing.Process(
target=self._start_one_server, args=(i, )) target=self._start_one_server, args=(i, ))
p.daemon = True p.daemon = True
......
...@@ -134,6 +134,7 @@ class Op(object): ...@@ -134,6 +134,7 @@ class Op(object):
self.model_config = None self.model_config = None
self.workdir = None self.workdir = None
self.thread_num = self.concurrency self.thread_num = self.concurrency
self.device_type = -1
self.devices = "" self.devices = ""
self.mem_optim = False self.mem_optim = False
self.ir_optim = False self.ir_optim = False
...@@ -153,6 +154,7 @@ class Op(object): ...@@ -153,6 +154,7 @@ class Op(object):
self.client_type = local_service_conf.get("client_type") self.client_type = local_service_conf.get("client_type")
self.workdir = local_service_conf.get("workdir") self.workdir = local_service_conf.get("workdir")
self.thread_num = local_service_conf.get("thread_num") self.thread_num = local_service_conf.get("thread_num")
self.device_type = local_service_conf.get("device_type")
self.devices = local_service_conf.get("devices") self.devices = local_service_conf.get("devices")
self.mem_optim = local_service_conf.get("mem_optim") self.mem_optim = local_service_conf.get("mem_optim")
self.ir_optim = local_service_conf.get("ir_optim") self.ir_optim = local_service_conf.get("ir_optim")
...@@ -168,6 +170,7 @@ class Op(object): ...@@ -168,6 +170,7 @@ class Op(object):
client_type=self.client_type, client_type=self.client_type,
workdir=self.workdir, workdir=self.workdir,
thread_num=self.thread_num, thread_num=self.thread_num,
device_type=self.device_type,
devices=self.devices, devices=self.devices,
mem_optim=self.mem_optim, mem_optim=self.mem_optim,
ir_optim=self.ir_optim) ir_optim=self.ir_optim)
...@@ -188,8 +191,11 @@ class Op(object): ...@@ -188,8 +191,11 @@ class Op(object):
client_type=self.client_type, client_type=self.client_type,
workdir=self.workdir, workdir=self.workdir,
thread_num=self.thread_num, thread_num=self.thread_num,
device_type=self.device_type,
devices=self.devices, devices=self.devices,
fetch_names=self._fetch_names) fetch_names=self._fetch_names,
mem_optim=self.mem_optim,
ir_optim=self.ir_optim)
if self._client_config is None: if self._client_config is None:
self._client_config = service_handler.get_client_config( self._client_config = service_handler.get_client_config(
) )
...@@ -550,7 +556,8 @@ class Op(object): ...@@ -550,7 +556,8 @@ class Op(object):
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), False, trace_buffer, self._get_output_channels(), False, trace_buffer,
self.model_config, self.workdir, self.thread_num, self.model_config, self.workdir, self.thread_num,
self.devices, self.mem_optim, self.ir_optim)) self.device_type, self.devices, self.mem_optim,
self.ir_optim))
p.daemon = True p.daemon = True
p.start() p.start()
process.append(p) process.append(p)
...@@ -583,7 +590,8 @@ class Op(object): ...@@ -583,7 +590,8 @@ class Op(object):
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), True, trace_buffer, self._get_output_channels(), True, trace_buffer,
self.model_config, self.workdir, self.thread_num, self.model_config, self.workdir, self.thread_num,
self.devices, self.mem_optim, self.ir_optim)) self.device_type, self.devices, self.mem_optim,
self.ir_optim))
# When a process exits, it attempts to terminate # When a process exits, it attempts to terminate
# all of its daemonic child processes. # all of its daemonic child processes.
t.daemon = True t.daemon = True
...@@ -991,7 +999,7 @@ class Op(object): ...@@ -991,7 +999,7 @@ class Op(object):
def _run(self, concurrency_idx, input_channel, output_channels, def _run(self, concurrency_idx, input_channel, output_channels,
is_thread_op, trace_buffer, model_config, workdir, thread_num, is_thread_op, trace_buffer, model_config, workdir, thread_num,
devices, mem_optim, ir_optim): device_type, devices, mem_optim, ir_optim):
""" """
_run() is the entry function of OP process / thread model.When client _run() is the entry function of OP process / thread model.When client
type is local_predictor in process mode, the CUDA environment needs to type is local_predictor in process mode, the CUDA environment needs to
...@@ -1009,6 +1017,7 @@ class Op(object): ...@@ -1009,6 +1017,7 @@ class Op(object):
model_config: model config path model_config: model config path
workdir: work directory workdir: work directory
thread_num: number of threads, concurrent quantity thread_num: number of threads, concurrent quantity
device_type: support multiple devices
devices: gpu id list[gpu], "" default[cpu] devices: gpu id list[gpu], "" default[cpu]
mem_optim: use memory/graphics memory optimization, True default. mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default. ir_optim: use calculation chart optimization, False default.
...@@ -1017,7 +1026,6 @@ class Op(object): ...@@ -1017,7 +1026,6 @@ class Op(object):
None None
""" """
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
tid = threading.current_thread().ident
# init ops # init ops
profiler = None profiler = None
...@@ -1028,6 +1036,7 @@ class Op(object): ...@@ -1028,6 +1036,7 @@ class Op(object):
client_type="local_predictor", client_type="local_predictor",
workdir=workdir, workdir=workdir,
thread_num=thread_num, thread_num=thread_num,
device_type=device_type,
devices=devices, devices=devices,
mem_optim=mem_optim, mem_optim=mem_optim,
ir_optim=ir_optim) ir_optim=ir_optim)
......
...@@ -234,6 +234,7 @@ class PipelineServer(object): ...@@ -234,6 +234,7 @@ class PipelineServer(object):
"local_service_conf": { "local_service_conf": {
"workdir": "", "workdir": "",
"thread_num": 2, "thread_num": 2,
"device_type": -1,
"devices": "", "devices": "",
"mem_optim": True, "mem_optim": True,
"ir_optim": False, "ir_optim": False,
...@@ -389,6 +390,7 @@ class ServerYamlConfChecker(object): ...@@ -389,6 +390,7 @@ class ServerYamlConfChecker(object):
default_conf = { default_conf = {
"workdir": "", "workdir": "",
"thread_num": 2, "thread_num": 2,
"device_type": -1,
"devices": "", "devices": "",
"mem_optim": True, "mem_optim": True,
"ir_optim": False, "ir_optim": False,
...@@ -397,6 +399,7 @@ class ServerYamlConfChecker(object): ...@@ -397,6 +399,7 @@ class ServerYamlConfChecker(object):
"model_config": str, "model_config": str,
"workdir": str, "workdir": str,
"thread_num": int, "thread_num": int,
"device_type": int,
"devices": str, "devices": str,
"mem_optim": bool, "mem_optim": bool,
"ir_optim": bool, "ir_optim": bool,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册