提交 70b2ae05 编写于 作者: W wangjiawei04

Merge remote-tracking branch 'root/develop' into pddet

......@@ -108,7 +108,6 @@ void PredictorClient::set_predictor_conf(const std::string &conf_path,
_predictor_path = conf_path;
_predictor_conf = conf_file;
}
int PredictorClient::destroy_predictor() {
_api.thrd_finalize();
_api.destroy();
......@@ -160,6 +159,7 @@ int PredictorClient::batch_predict(
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "float feed name size: " << float_feed_name.size();
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
......@@ -179,12 +179,16 @@ int PredictorClient::batch_predict(
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name"
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name "
<< "prepared";
int vec_idx = 0;
VLOG(2) << "tensor_vec size " << tensor_vec.size() << " float shape "
<< float_shape.size();
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size();
for (int j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
}
......@@ -202,6 +206,8 @@ int PredictorClient::batch_predict(
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare int feed " << name << " shape size "
<< int_shape[vec_idx].size();
for (int j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]);
}
......@@ -243,8 +249,11 @@ int PredictorClient::batch_predict(
postprocess_start = client_infer_end;
for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name];
// int idx = _fetch_name_to_idx[name];
int idx = 0;
int shape_size = res.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
predict_res_batch._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
predict_res_batch._shape_map[name][i] =
......@@ -258,11 +267,14 @@ int PredictorClient::batch_predict(
res.insts(0).tensor_array(idx).lod(i);
}
}
idx += 1;
}
for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name];
// int idx = _fetch_name_to_idx[name];
int idx = 0;
if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int";
predict_res_batch._int64_value_map[name].resize(
res.insts(0).tensor_array(idx).int64_data_size());
int size = res.insts(0).tensor_array(idx).int64_data_size();
......@@ -271,6 +283,7 @@ int PredictorClient::batch_predict(
res.insts(0).tensor_array(idx).int64_data(i);
}
} else {
VLOG(2) << "fetch var " << name << "type float";
predict_res_batch._float_value_map[name].resize(
res.insts(0).tensor_array(idx).float_data_size());
int size = res.insts(0).tensor_array(idx).float_data_size();
......@@ -279,6 +292,7 @@ int PredictorClient::batch_predict(
res.insts(0).tensor_array(idx).float_data(i);
}
}
idx += 1;
}
postprocess_end = timeline.TimeStampUS();
}
......
......@@ -64,6 +64,8 @@ int GeneralResponseOp::inference() {
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
std::vector<int> fetch_index;
fetch_index.resize(req->fetch_var_names_size());
for (int i = 0; i < req->fetch_var_names_size(); ++i) {
......
......@@ -111,7 +111,9 @@ class Client(object):
self.result_handle_ = PredictorRes()
self.client_handle_ = PredictorClient()
self.client_handle_.init(path)
read_env_flags = ["profile_client", "profile_server"]
if "FLAGS_max_body_size" not in os.environ:
os.environ["FLAGS_max_body_size"] = str(512 * 1024 * 1024)
read_env_flags = ["profile_client", "profile_server", "max_body_size"]
self.client_handle_.init_gflags([sys.argv[
0]] + ["--tryfromenv=" + ",".join(read_env_flags)])
self.feed_names_ = [var.alias_name for var in model_conf.feed_var]
......@@ -223,8 +225,6 @@ class Client(object):
for i, feed_i in enumerate(feed_batch):
int_slot = []
float_slot = []
int_shape = []
float_shape = []
for key in feed_i:
if key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key))
......
......@@ -89,6 +89,7 @@ class Server(object):
self.num_threads = 4
self.port = 8080
self.reload_interval_s = 10
self.max_body_size = 64 * 1024 * 1024
self.module_path = os.path.dirname(paddle_serving_server.__file__)
self.cur_path = os.getcwd()
self.use_local_bin = False
......@@ -100,6 +101,14 @@ class Server(object):
def set_num_threads(self, threads):
self.num_threads = threads
def set_max_body_size(self, body_size):
if body_size >= self.max_body_size:
self.max_body_size = body_size
else:
print(
"max_body_size is less than default value, will use default value in service."
)
def set_port(self, port):
self.port = port
......@@ -292,7 +301,8 @@ class Server(object):
"-resource_file {} " \
"-workflow_path {} " \
"-workflow_file {} " \
"-bthread_concurrency {} ".format(
"-bthread_concurrency {} " \
"-max_body_size {} ".format(
self.bin_path,
self.workdir,
self.infer_service_fn,
......@@ -304,7 +314,8 @@ class Server(object):
self.resource_fn,
self.workdir,
self.workflow_fn,
self.num_threads)
self.num_threads,
self.max_body_size)
print("Going to Run Command")
print(command)
os.system(command)
......@@ -91,6 +91,7 @@ class Monitor(object):
model_name))
return model_name
tar_model_path = os.path.join(local_tmp_path, model_name)
_LOGGER.info("try to unpack remote file({})".format(tar_model_path))
if not tarfile.is_tarfile(tar_model_path):
raise Exception('not a tar packaged file type. {}'.format(
self._check_param_help('remote_model_name', model_name)))
......@@ -105,10 +106,11 @@ class Monitor(object):
self._check_param_help('local_tmp_path', local_tmp_path)))
finally:
os.remove(tar_model_path)
_LOGGER.debug('remove packed file({}).'.format(model_name))
_LOGGER.debug('remove packed file({}).'.format(tar_model_path))
_LOGGER.info('using unpacked filename: {}.'.format(
unpacked_filename))
if not os.path.exists(unpacked_filename):
if not os.path.exists(
os.path.join(local_tmp_path, unpacked_filename)):
raise Exception('file not exist. {}'.format(
self._check_param_help('unpacked_filename',
unpacked_filename)))
......@@ -124,13 +126,14 @@ class Monitor(object):
'_local_tmp_path', '_interval'
]
self._print_params(params)
if not os.path.exists(self._local_tmp_path):
_LOGGER.info('mkdir: {}'.format(self._local_tmp_path))
os.makedirs(self._local_tmp_path)
local_tmp_path = os.path.join(self._local_path, self._local_tmp_path)
_LOGGER.info('local_tmp_path: {}'.format(local_tmp_path))
if not os.path.exists(local_tmp_path):
_LOGGER.info('mkdir: {}'.format(local_tmp_path))
os.makedirs(local_tmp_path)
while True:
[flag, timestamp] = self._exist_remote_file(
self._remote_path, self._remote_donefile_name,
self._local_tmp_path)
self._remote_path, self._remote_donefile_name, local_tmp_path)
if flag:
if self._remote_donefile_timestamp is None or \
timestamp != self._remote_donefile_timestamp:
......@@ -139,15 +142,15 @@ class Monitor(object):
self._remote_donefile_timestamp = timestamp
self._pull_remote_dir(self._remote_path,
self._remote_model_name,
self._local_tmp_path)
local_tmp_path)
_LOGGER.info('pull remote model({}).'.format(
self._remote_model_name))
unpacked_filename = self._decompress_model_file(
self._local_tmp_path, self._remote_model_name,
local_tmp_path, self._remote_model_name,
self._unpacked_filename)
self._update_local_model(
self._local_tmp_path, unpacked_filename,
self._local_path, self._local_model_name)
self._update_local_model(local_tmp_path, unpacked_filename,
self._local_path,
self._local_model_name)
_LOGGER.info('update local model({}).'.format(
self._local_model_name))
self._update_local_donefile(self._local_path,
......@@ -220,7 +223,12 @@ class HadoopMonitor(Monitor):
local_dirpath = os.path.join(local_tmp_path, dirname)
if os.path.exists(local_dirpath):
_LOGGER.info('remove old temporary model file({}).'.format(dirname))
shutil.rmtree(local_dirpath)
if self._unpacked_filename is None:
# the remote file is model folder.
shutil.rmtree(local_dirpath)
else:
# the remote file is a packed model file
os.remove(local_dirpath)
remote_dirpath = os.path.join(remote_path, dirname)
cmd = '{} -get {} {} 2>/dev/null'.format(self._cmd_prefix,
remote_dirpath, local_dirpath)
......@@ -301,8 +309,8 @@ class FTPMonitor(Monitor):
os.path.join(remote_path, remote_dirname), name,
os.path.join(local_tmp_path, remote_dirname), overwrite)
else:
self._download_remote_file(remote_dirname, name,
local_tmp_path, overwrite)
self._download_remote_file(remote_dirpath, name,
local_dirpath, overwrite)
except ftplib.error_perm:
_LOGGER.debug('{} is file.'.format(remote_dirname))
self._download_remote_file(remote_path, remote_dirname,
......@@ -325,17 +333,17 @@ class GeneralMonitor(Monitor):
def _get_local_file_timestamp(self, filename):
return os.path.getmtime(filename)
def _exist_remote_file(self, path, filename, local_tmp_path):
remote_filepath = os.path.join(path, filename)
def _exist_remote_file(self, remote_path, filename, local_tmp_path):
remote_filepath = os.path.join(remote_path, filename)
url = '{}/{}'.format(self._general_host, remote_filepath)
_LOGGER.debug('remote file url: {}'.format(url))
cmd = 'wget -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
# only for check donefile, which is not a folder.
cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0:
_LOGGER.debug('remote file({}) not exist.'.format(filename))
_LOGGER.debug('remote file({}) not exist.'.format(remote_filepath))
return [False, None]
else:
_LOGGER.debug('download remote file({}).'.format(filename))
timestamp = self._get_local_file_timestamp(
os.path.join(local_tmp_path, filename))
return [True, timestamp]
......@@ -344,7 +352,13 @@ class GeneralMonitor(Monitor):
remote_dirpath = os.path.join(remote_path, dirname)
url = '{}/{}'.format(self._general_host, remote_dirpath)
_LOGGER.debug('remote file url: {}'.format(url))
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(local_tmp_path, url)
if self._unpacked_filename is None:
# the remote file is model folder.
cmd = 'wget -nH -r -P {} {} &>/dev/null'.format(
os.path.join(local_tmp_path, dirname), url)
else:
# the remote file is a packed model file
cmd = 'wget -nd -N -P {} {} &>/dev/null'.format(local_tmp_path, url)
_LOGGER.debug('wget cmd: {}'.format(cmd))
if os.system(cmd) != 0:
raise Exception('pull remote dir failed. {}'.format(
......@@ -352,7 +366,11 @@ class GeneralMonitor(Monitor):
def parse_args():
''' parse args. '''
""" parse args.
Returns:
parser.parse_args().
"""
parser = argparse.ArgumentParser(description="Monitor")
parser.add_argument(
"--type", type=str, default='general', help="Type of remote server")
......
......@@ -41,6 +41,11 @@ def parse_args(): # pylint: disable=doc-string-missing
"--device", type=str, default="cpu", help="Type of device")
parser.add_argument(
"--mem_optim", type=bool, default=False, help="Memory optimize")
parser.add_argument(
"--max_body_size",
type=int,
default=512 * 1024 * 1024,
help="Limit sizes of messages")
return parser.parse_args()
......@@ -52,6 +57,7 @@ def start_standard_model(): # pylint: disable=doc-string-missing
workdir = args.workdir
device = args.device
mem_optim = args.mem_optim
max_body_size = args.max_body_size
if model == "":
print("You must specify your serving model")
......@@ -72,6 +78,7 @@ def start_standard_model(): # pylint: disable=doc-string-missing
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_max_body_size(max_body_size)
server.load_model_config(model)
server.prepare_server(workdir=workdir, port=port, device=device)
......
......@@ -46,6 +46,11 @@ def serve_args():
"--name", type=str, default="None", help="Default service name")
parser.add_argument(
"--mem_optim", type=bool, default=False, help="Memory optimize")
parser.add_argument(
"--max_body_size",
type=int,
default=512 * 1024 * 1024,
help="Limit sizes of messages")
return parser.parse_args()
......@@ -114,6 +119,7 @@ class Server(object):
self.num_threads = 4
self.port = 8080
self.reload_interval_s = 10
self.max_body_size = 64 * 1024 * 1024
self.module_path = os.path.dirname(paddle_serving_server.__file__)
self.cur_path = os.getcwd()
self.check_cuda()
......@@ -126,6 +132,14 @@ class Server(object):
def set_num_threads(self, threads):
self.num_threads = threads
def set_max_body_size(self, body_size):
if body_size >= self.max_body_size:
self.max_body_size = body_size
else:
print(
"max_body_size is less than default value, will use default value in service."
)
def set_port(self, port):
self.port = port
......@@ -324,7 +338,8 @@ class Server(object):
"-workflow_path {} " \
"-workflow_file {} " \
"-bthread_concurrency {} " \
"-gpuid {} ".format(
"-gpuid {} " \
"-max_body_size {} ".format(
self.bin_path,
self.workdir,
self.infer_service_fn,
......@@ -337,7 +352,8 @@ class Server(object):
self.workdir,
self.workflow_fn,
self.num_threads,
self.gpuid,)
self.gpuid,
self.max_body_size)
print("Going to Run Comand")
print(command)
......
此差异已折叠。
......@@ -35,6 +35,7 @@ def start_gpu_card_model(index, gpuid, args): # pylint: disable=doc-string-miss
thread_num = args.thread
model = args.model
mem_optim = args.mem_optim
max_body_size = args.max_body_size
workdir = "{}_{}".format(args.workdir, gpuid)
if model == "":
......@@ -56,6 +57,7 @@ def start_gpu_card_model(index, gpuid, args): # pylint: disable=doc-string-miss
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
server.set_memory_optimize(mem_optim)
server.set_max_body_size(max_body_size)
server.load_model_config(model)
server.prepare_server(workdir=workdir, port=port, device=device)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册