未验证 提交 d107b701 编写于 作者: J Jiawei Wang 提交者: GitHub

Merge pull request #969 from TeslaZhao/develop

support multiple devices(cpu/gpu/trt/xpu/arm) in pipeline mode
......@@ -20,6 +20,9 @@ op:
#uci模型路径
model_config: ResNet50_vd_model
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 1
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
......
......@@ -20,7 +20,10 @@ op:
#uci模型路径
model_config: uci_housing_model
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
......
......@@ -20,6 +20,7 @@ import google.protobuf.text_format
import numpy as np
import argparse
import paddle.fluid as fluid
import paddle.inference as inference
from .proto import general_model_config_pb2 as m_config
from paddle.fluid.core import PaddleTensor
from paddle.fluid.core import AnalysisConfig
......@@ -125,14 +126,13 @@ class LocalPredictor(object):
if use_lite:
config.enable_lite_engine(
precision_mode = PrecisionType.Float32,
zero_copy = True,
passes_filter = [],
ops_filter = []
)
precision_mode=inference.PrecisionType.Float32,
zero_copy=True,
passes_filter=[],
ops_filter=[])
if use_xpu:
config.enable_xpu(100 * 1024 * 1024)
config.enable_xpu(8 * 1024 * 1024)
self.predictor = create_paddle_predictor(config)
......
......@@ -38,14 +38,12 @@ class LocalServiceHandler(object):
client_type='local_predictor',
workdir="",
thread_num=2,
device_type=-1,
devices="",
fetch_names=None,
mem_optim=True,
ir_optim=False,
available_port_generator=None,
use_trt=False,
use_lite=False,
use_xpu=False,
use_profile=False):
"""
Initialization of localservicehandler
......@@ -55,15 +53,14 @@ class LocalServiceHandler(object):
client_type: brpc, grpc and local_predictor[default]
workdir: work directory
thread_num: number of threads, concurrent quantity.
device_type: support multiple devices. -1=Not set, determined by
`devices`. 0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
devices: gpu id list[gpu], "" default[cpu]
fetch_names: get fetch names out of LocalServiceHandler in
local_predictor mode. fetch_names_ is compatible for Client().
mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default.
available_port_generator: generate available ports
use_trt: use nvidia tensorRt engine, False default.
use_lite: use Paddle-Lite engine, False default.
use_xpu: run predict on Baidu Kunlun, False default.
use_profile: use profiling, False default.
Returns:
......@@ -74,28 +71,61 @@ class LocalServiceHandler(object):
self._model_config = model_config
self._port_list = []
self._device_type = "cpu"
if devices == "":
# cpu
devices = [-1]
if use_lite:
self._device_type = "arm"
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in arm device. Port({})"
.format(model_config, self._port_list))
self._device_name = "cpu"
self._use_gpu = False
self._use_trt = False
self._use_lite = False
self._use_xpu = False
if device_type == -1:
# device_type is not set, determined by `devices`,
if devices == "":
# CPU
self._device_name = "cpu"
devices = [-1]
else:
self._device_type = "cpu"
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in cpu device. Port({})"
.format(model_config, self._port_list))
else:
# gpu
self._device_type = "gpu"
# GPU
self._device_name = "gpu"
self._use_gpu = True
devices = [int(x) for x in devices.split(",")]
elif device_type == 0:
# CPU
self._device_name = "cpu"
devices = [-1]
elif device_type == 1:
# GPU
self._device_name = "gpu"
self._use_gpu = True
devices = [int(x) for x in devices.split(",")]
elif device_type == 2:
# Nvidia Tensor RT
self._device_name = "gpu"
self._use_gpu = True
devices = [int(x) for x in devices.split(",")]
self._use_trt = True
elif device_type == 3:
# ARM CPU
self._device_name = "arm"
devices = [-1]
self._use_lite = True
elif device_type == 4:
# Kunlun XPU
self._device_name = "arm"
devices = [int(x) for x in devices.split(",")]
self._use_lite = True
self._use_xpu = True
else:
_LOGGER.error(
"LocalServiceHandler initialization fail. device_type={}"
.format(device_type))
if client_type == "brpc" or client_type == "grpc":
for _ in devices:
self._port_list.append(available_port_generator.next())
_LOGGER.info("Model({}) will be launch in gpu device: {}. Port({})"
.format(model_config, devices, self._port_list))
_LOGGER.info("Create ports for devices:{}. Port:{}"
.format(devices, self._port_list))
self._client_type = client_type
self._workdir = workdir
self._devices = devices
......@@ -105,14 +135,21 @@ class LocalServiceHandler(object):
self._local_predictor_client = None
self._rpc_service_list = []
self._server_pros = []
self._use_trt = use_trt
self._use_lite = use_lite
self._use_xpu = use_xpu
self._use_profile = use_profile
self.fetch_names_ = fetch_names
self._fetch_names = fetch_names
_LOGGER.info(
"Models({}) will be launched by device {}. use_gpu:{}, "
"use_trt:{}, use_lite:{}, use_xpu:{}, device_type:{}, devices:{}, "
"mem_optim:{}, ir_optim:{}, use_profile:{}, thread_num:{}, "
"client_type:{}, fetch_names:{}".format(
model_config, self._device_name, self._use_gpu, self._use_trt,
self._use_lite, self._use_xpu, device_type, self._devices,
self._mem_optim, self._ir_optim, self._use_profile,
self._thread_num, self._client_type, self._fetch_names))
def get_fetch_list(self):
return self.fetch_names_
return self._fetch_names
def get_port_list(self):
return self._port_list
......@@ -149,22 +186,17 @@ class LocalServiceHandler(object):
from paddle_serving_app.local_predict import LocalPredictor
if self._local_predictor_client is None:
self._local_predictor_client = LocalPredictor()
use_gpu = False
use_lite = False
if self._device_type == "gpu":
use_gpu = True
elif self._device_type == "arm":
use_lite = True
self._local_predictor_client.load_model_config(
model_path=self._model_config,
use_gpu=use_gpu,
use_gpu=self._use_gpu,
gpu_id=self._devices[concurrency_idx],
use_profile=self._use_profile,
thread_num=self._thread_num,
mem_optim=self._mem_optim,
ir_optim=self._ir_optim,
use_trt=self._use_trt,
use_lite=use_lite,
use_lite=self._use_lite,
use_xpu=self._use_xpu)
return self._local_predictor_client
......@@ -174,7 +206,7 @@ class LocalServiceHandler(object):
def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim,
ir_optim):
"""
According to _device_type, generating one CpuServer or GpuServer, and
According to self._device_name, generating one Cpu/Gpu/Arm Server, and
setting the model config amd startup params.
Args:
......@@ -188,7 +220,7 @@ class LocalServiceHandler(object):
Returns:
server: CpuServer/GpuServer
"""
if self._device_type == "cpu":
if self._device_name == "cpu":
from paddle_serving_server import OpMaker, OpSeqMaker, Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
......@@ -225,9 +257,9 @@ class LocalServiceHandler(object):
server.load_model_config(self._model_config)
server.prepare_server(
workdir=workdir, port=port, device=self._device_type)
if self.fetch_names_ is None:
self.fetch_names_ = server.get_fetch_list()
workdir=workdir, port=port, device=self._device_name)
if self._fetch_names is None:
self._fetch_names = server.get_fetch_list()
return server
def _start_one_server(self, service_idx):
......@@ -264,7 +296,7 @@ class LocalServiceHandler(object):
"""
Start multiple processes and start one server in each process
"""
for i, service in enumerate(self._rpc_service_list):
for i, _ in enumerate(self._rpc_service_list):
p = multiprocessing.Process(
target=self._start_one_server, args=(i, ))
p.daemon = True
......
......@@ -134,6 +134,7 @@ class Op(object):
self.model_config = None
self.workdir = None
self.thread_num = self.concurrency
self.device_type = -1
self.devices = ""
self.mem_optim = False
self.ir_optim = False
......@@ -153,6 +154,7 @@ class Op(object):
self.client_type = local_service_conf.get("client_type")
self.workdir = local_service_conf.get("workdir")
self.thread_num = local_service_conf.get("thread_num")
self.device_type = local_service_conf.get("device_type")
self.devices = local_service_conf.get("devices")
self.mem_optim = local_service_conf.get("mem_optim")
self.ir_optim = local_service_conf.get("ir_optim")
......@@ -168,6 +170,7 @@ class Op(object):
client_type=self.client_type,
workdir=self.workdir,
thread_num=self.thread_num,
device_type=self.device_type,
devices=self.devices,
mem_optim=self.mem_optim,
ir_optim=self.ir_optim)
......@@ -188,8 +191,11 @@ class Op(object):
client_type=self.client_type,
workdir=self.workdir,
thread_num=self.thread_num,
device_type=self.device_type,
devices=self.devices,
fetch_names=self._fetch_names)
fetch_names=self._fetch_names,
mem_optim=self.mem_optim,
ir_optim=self.ir_optim)
if self._client_config is None:
self._client_config = service_handler.get_client_config(
)
......@@ -550,7 +556,8 @@ class Op(object):
args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), False, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.devices, self.mem_optim, self.ir_optim))
self.device_type, self.devices, self.mem_optim,
self.ir_optim))
p.daemon = True
p.start()
process.append(p)
......@@ -583,7 +590,8 @@ class Op(object):
args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), True, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.devices, self.mem_optim, self.ir_optim))
self.device_type, self.devices, self.mem_optim,
self.ir_optim))
# When a process exits, it attempts to terminate
# all of its daemonic child processes.
t.daemon = True
......@@ -991,7 +999,7 @@ class Op(object):
def _run(self, concurrency_idx, input_channel, output_channels,
is_thread_op, trace_buffer, model_config, workdir, thread_num,
devices, mem_optim, ir_optim):
device_type, devices, mem_optim, ir_optim):
"""
_run() is the entry function of OP process / thread model.When client
type is local_predictor in process mode, the CUDA environment needs to
......@@ -1009,6 +1017,7 @@ class Op(object):
model_config: model config path
workdir: work directory
thread_num: number of threads, concurrent quantity
device_type: support multiple devices
devices: gpu id list[gpu], "" default[cpu]
mem_optim: use memory/graphics memory optimization, True default.
ir_optim: use calculation chart optimization, False default.
......@@ -1017,7 +1026,6 @@ class Op(object):
None
"""
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
tid = threading.current_thread().ident
# init ops
profiler = None
......@@ -1028,6 +1036,7 @@ class Op(object):
client_type="local_predictor",
workdir=workdir,
thread_num=thread_num,
device_type=device_type,
devices=devices,
mem_optim=mem_optim,
ir_optim=ir_optim)
......
......@@ -234,6 +234,7 @@ class PipelineServer(object):
"local_service_conf": {
"workdir": "",
"thread_num": 2,
"device_type": -1,
"devices": "",
"mem_optim": True,
"ir_optim": False,
......@@ -389,6 +390,7 @@ class ServerYamlConfChecker(object):
default_conf = {
"workdir": "",
"thread_num": 2,
"device_type": -1,
"devices": "",
"mem_optim": True,
"ir_optim": False,
......@@ -397,6 +399,7 @@ class ServerYamlConfChecker(object):
"model_config": str,
"workdir": str,
"thread_num": int,
"device_type": int,
"devices": str,
"mem_optim": bool,
"ir_optim": bool,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册