提交 049defb0 编写于 作者: M MRXLT

fix conflict

......@@ -45,12 +45,12 @@ class PredictorRes {
~PredictorRes() {}
public:
const std::vector<std::vector<int64_t>> & get_int64_by_name(
const std::string & name) {
const std::vector<std::vector<int64_t>>& get_int64_by_name(
const std::string& name) {
return _int64_map[name];
}
const std::vector<std::vector<float>> & get_float_by_name(
const std::string & name) {
const std::vector<std::vector<float>>& get_float_by_name(
const std::string& name) {
return _float_map[name];
}
......@@ -71,7 +71,7 @@ class PredictorClient {
void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file);
int create_predictor_by_desc(const std::string & sdk_desc);
int create_predictor_by_desc(const std::string& sdk_desc);
int create_predictor();
int destroy_predictor();
......@@ -81,7 +81,8 @@ class PredictorClient {
const std::vector<std::vector<int64_t>>& int_feed,
const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name,
PredictorRes & predict_res); // NOLINT
PredictorRes& predict_res, // NOLINT
const int& pid);
std::vector<std::vector<float>> predict(
const std::vector<std::vector<float>>& float_feed,
......
......@@ -132,13 +132,13 @@ int PredictorClient::create_predictor() {
_api.thrd_initialize();
}
int PredictorClient::predict(
const std::vector<std::vector<float>>& float_feed,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int64_t>>& int_feed,
const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name,
PredictorRes & predict_res) { // NOLINT
int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res,
const int &pid) { // NOLINT
predict_res._int64_map.clear();
predict_res._float_map.clear();
Timer timeline;
......@@ -218,6 +218,7 @@ int PredictorClient::predict(
VLOG(2) << "fetch name: " << name;
if (_fetch_name_to_type[name] == 0) {
int len = res.insts(0).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res._int64_map[name].resize(1);
predict_res._int64_map[name][0].resize(len);
for (int i = 0; i < len; ++i) {
......@@ -226,6 +227,7 @@ int PredictorClient::predict(
}
} else if (_fetch_name_to_type[name] == 1) {
int len = res.insts(0).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len;
predict_res._float_map[name].resize(1);
predict_res._float_map[name][0].resize(len);
for (int i = 0; i < len; ++i) {
......@@ -240,11 +242,12 @@ int PredictorClient::predict(
if (FLAGS_profile_client) {
std::ostringstream oss;
oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
......@@ -252,10 +255,10 @@ int PredictorClient::predict(
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
}
}
oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str());
}
return 0;
......@@ -342,7 +345,7 @@ std::vector<std::vector<std::vector<float>>> PredictorClient::batch_predict(
}
VLOG(2) << "batch [" << bi << "] "
<< "itn feed value prepared";
<< "int feed value prepared";
}
int64_t preprocess_end = timeline.TimeStampUS();
......
......@@ -31,13 +31,15 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol())
.def(py::init())
.def("get_int64_by_name",
[](PredictorRes &self, std::string & name) {
[](PredictorRes &self, std::string &name) {
return self.get_int64_by_name(name);
}, py::return_value_policy::reference)
},
py::return_value_policy::reference)
.def("get_float_by_name",
[](PredictorRes &self, std::string & name) {
[](PredictorRes &self, std::string &name) {
return self.get_float_by_name(name);
}, py::return_value_policy::reference);
},
py::return_value_policy::reference);
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init())
......@@ -56,26 +58,29 @@ PYBIND11_MODULE(serving_client, m) {
self.set_predictor_conf(conf_path, conf_file);
})
.def("create_predictor_by_desc",
[](PredictorClient &self, const std::string & sdk_desc) {
self.create_predictor_by_desc(sdk_desc); })
[](PredictorClient &self, const std::string &sdk_desc) {
self.create_predictor_by_desc(sdk_desc);
})
.def("create_predictor",
[](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); })
.def("predict",
[](PredictorClient &self,
const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name,
PredictorRes & predict_res) {
const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res,
const int &pid) {
return self.predict(float_feed,
float_feed_name,
int_feed,
int_feed_name,
fetch_name,
predict_res);
predict_res,
pid);
})
.def("batch_predict",
[](PredictorClient &self,
......
......@@ -36,6 +36,7 @@ class BertService():
self.show_ids = show_ids
self.do_lower_case = do_lower_case
self.retry = retry
self.pid = os.getpid()
self.profile = True if ("FLAGS_profile_client" in os.environ and
os.environ["FLAGS_profile_client"]) else False
......@@ -78,7 +79,8 @@ class BertService():
}
prepro_end = time.time()
if self.profile:
print("PROFILE\tbert_pre_0:{} bert_pre_1:{}".format(
print("PROFILE\tpid:{}\tbert_pre_0:{} bert_pre_1:{}".format(
self.pid,
int(round(prepro_start * 1000000)),
int(round(prepro_end * 1000000))))
fetch_map = self.client.predict(feed=feed, fetch=fetch)
......@@ -111,7 +113,8 @@ class BertService():
feed_batch.append(feed)
prepro_end = time.time()
if self.profile:
print("PROFILE\tbert_pre_0:{} bert_pre_1:{}".format(
print("PROFILE\tpid:{}\tbert_pre_0:{} bert_pre_1:{}".format(
self.pid,
int(round(prepro_start * 1000000)),
int(round(prepro_end * 1000000))))
fetch_map_batch = self.client.batch_predict(
......@@ -120,7 +123,6 @@ class BertService():
def test():
bc = BertService(
model_name='bert_chinese_L-12_H-768_A-12',
max_seq_len=20,
......@@ -130,16 +132,25 @@ def test():
config_file = './serving_client_conf/serving_client_conf.prototxt'
fetch = ["pooled_output"]
bc.load_client(config_file, server_addr)
batch_size = 4
batch_size = 1
batch = []
for line in sys.stdin:
if len(batch) < batch_size:
batch.append([line.strip()])
if batch_size == 1:
result = bc.run_general([[line.strip()]], fetch)
print(result)
else:
result = bc.run_batch_general(batch, fetch)
batch = []
for r in result:
print(r)
if len(batch) < batch_size:
batch.append([line.strip()])
else:
result = bc.run_batch_general(batch, fetch)
batch = []
for r in result:
print(r)
if len(batch) > 0:
result = bc.run_batch_general(batch, fetch)
batch = []
for r in result:
print(r)
if __name__ == '__main__':
......
......@@ -31,8 +31,6 @@ op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.set_local_bin(
"~/github/Serving/build_server/core/general-server/serving")
server.load_model_config(sys.argv[1])
port = int(sys.argv[2])
......
wget https://paddle-serving.bj.bcebos.com/bert_example/data-c.txt --no-check-certificate
......@@ -11,25 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from paddle_serving_client import Client
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args
import time
import paddle
import sys
from paddle_serving_server_gpu import OpMaker
from paddle_serving_server_gpu import OpSeqMaker
from paddle_serving_server_gpu import Server
import requests
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
args = benchmark_args()
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
def single_func(idx, resource):
if args.request == "rpc":
client = Client()
client.load_client_config(args.model)
client.connect([args.endpoint])
train_reader = paddle.batch(paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500), batch_size=1)
start = time.time()
for data in train_reader():
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"])
end = time.time()
return [[end - start]]
elif args.request == "http":
train_reader = paddle.batch(paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500), batch_size=1)
start = time.time()
for data in train_reader():
r = requests.post('http://{}/uci/prediction'.format(args.endpoint),
data = {"x": data[0]})
end = time.time()
return [[end - start]]
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(12)
server.load_model_config(sys.argv[1])
port = int(sys.argv[2])
server.prepare_server(workdir="work_dir1", port=port, device="gpu")
server.run_server()
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(single_func, args.thread, {})
print(result)
......@@ -19,7 +19,7 @@ cat test.data | python test_client_batch.py inference.conf 4 > result
设备 :Intel(R) Xeon(R) Gold 6271 CPU @ 2.60GHz * 48
模型 :IMDB-CNN
模型 :[CNN](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imdb/nets.py)
server thread num : 16
......
......@@ -13,55 +13,45 @@
# limitations under the License.
import sys
import time
import requests
from imdb_reader import IMDBDataset
from paddle_serving_client import Client
from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner
import time
from paddle_serving_client.utils import benchmark_args
args = benchmark_args()
def predict(thr_id, resource):
client = Client()
client.load_client_config(resource["conf_file"])
client.connect(resource["server_endpoint"])
thread_num = resource["thread_num"]
file_list = resource["filelist"]
line_id = 0
prob = []
label_list = []
dataset = []
for fn in file_list:
fin = open(fn)
for line in fin:
if line_id % thread_num == thr_id - 1:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0])]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
dataset.append(feed)
line_id += 1
fin.close()
def single_func(idx, resource):
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource(args.vocab)
filelist_fn = args.filelist
filelist = []
start = time.time()
fetch = ["acc", "cost", "prediction"]
for inst in dataset:
fetch_map = client.predict(feed=inst, fetch=fetch)
prob.append(fetch_map["prediction"][1])
label_list.append(label[0])
with open(filelist_fn) as fin:
for line in fin:
filelist.append(line.strip())
filelist = filelist[idx::args.thread]
if args.request == "rpc":
client = Client()
client.load_client_config(args.model)
client.connect([args.endpoint])
for fn in filelist:
fin = open(fn)
for line in fin:
word_ids, label = imdb_dataset.get_words_and_label(line)
fetch_map = client.predict(feed={"words": word_ids},
fetch=["prediction"])
elif args.request == "http":
for fn in filelist:
fin = open(fn)
for line in fin:
word_ids, label = imdb_dataset.get_words_and_label(line)
r = requests.post("http://{}/imdb/prediction".format(args.endpoint),
data={"words": word_ids})
end = time.time()
client.release()
return [prob, label_list, [end - start]]
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
resource = {}
resource["conf_file"] = conf_file
resource["server_endpoint"] = ["127.0.0.1:9293"]
resource["filelist"] = [data_file]
resource["thread_num"] = int(sys.argv[3])
thread_runner = MultiThreadRunner()
result = thread_runner.run(predict, int(sys.argv[3]), resource)
return [[end - start]]
print("total time {} s".format(sum(result[-1]) / len(result[-1])))
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(single_func, args.thread, {})
print(result)
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz
#wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo%2Fimdb.tar.gz
#tar -xzf imdb-demo%2Fimdb.tar.gz
tar -zxvf imdb_model.tar.gz
......@@ -30,6 +30,14 @@ class IMDBDataset(dg.MultiSlotDataGenerator):
self._pattern = re.compile(r'(;|,|\.|\?|!|\s|\(|\))')
self.return_value = ("words", [1, 2, 3, 4, 5, 6]), ("label", [0])
def get_words_only(self, line):
sent = line.lower().replace("<br />", " ").strip()
words = [x for x in self._pattern.split(sent) if x and x != " "]
feas = [
self._vocab[x] if x in self._vocab else self._unk_id for x in words
]
return feas
def get_words_and_label(self, line):
send = '|'.join(line.split('|')[:-1]).lower().replace("<br />",
" ").strip()
......
wget https://paddle-serving.bj.bcebos.com/imdb-demo%2Fimdb_service.tar.gz
wget https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_service.tar.gz
tar -xzf imdb_service.tar.gz
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
tar -zxvf text_classification_data.tar.gz
......
......@@ -49,8 +49,9 @@ if __name__ == "__main__":
dataset.set_batch_size(128)
dataset.set_filelist(filelist)
dataset.set_thread(10)
from nets import bow_net
avg_cost, acc, prediction = bow_net(data, label, dict_dim)
from nets import lstm_net
model_name = "imdb_lstm"
avg_cost, acc, prediction = lstm_net(data, label, dict_dim)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer.minimize(avg_cost)
......@@ -65,6 +66,7 @@ if __name__ == "__main__":
program=fluid.default_main_program(), dataset=dataset, debug=False)
logger.info("TRAIN --> pass: {}".format(i))
if i == 5:
serving_io.save_model("imdb_model", "imdb_client_conf",
serving_io.save_model("{}_model".format(model_name),
"{}_client_conf".format(model_name),
{"words": data}, {"prediction": prediction},
fluid.default_main_program())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
import sys
client = Client()
client.load_client_config(sys.argv[1])
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource(sys.argv[2])
for line in sys.stdin:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0]) + 1]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids, "label": label}
fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch)
print("{} {}".format(fetch_map["prediction"][1], label[0]))
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client
import sys
import subprocess
from multiprocessing import Pool
import time
def predict(p_id, p_size, data_list):
client = Client()
client.load_client_config(conf_file)
client.connect(["127.0.0.1:8010"])
result = []
for line in data_list:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0])]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch)
#print("{} {}".format(fetch_map["prediction"][1], label[0]))
result.append([fetch_map["prediction"][1], label[0]])
return result
def predict_multi_thread(p_num):
data_list = []
with open(data_file) as f:
for line in f.readlines():
data_list.append(line)
start = time.time()
p = Pool(p_num)
p_size = len(data_list) / p_num
result_list = []
for i in range(p_num):
result_list.append(
p.apply_async(predict,
[i, p_size, data_list[i * p_size:(i + 1) * p_size]]))
p.close()
p.join()
for i in range(p_num):
result = result_list[i].get()
for j in result:
print("{} {}".format(j[0], j[1]))
cost = time.time() - start
print("{} threads cost {}".format(p_num, cost))
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
p_num = int(sys.argv[3])
predict_multi_thread(p_num)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1])
port = int(sys.argv[2])
server.prepare_server(workdir="work_dir1", port=port, device="cpu")
server.run_server()
......@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!flask/bin/python
from paddle_serving_server.web_service import WebService
from imdb_reader import IMDBDataset
import sys
......@@ -27,7 +26,7 @@ class IMDBService(WebService):
if "words" not in feed:
exit(-1)
res_feed = {}
res_feed["words"] = self.dataset.get_words_and_label(feed["words"])[0]
res_feed["words"] = self.dataset.get_words_only(feed["words"])[0]
return res_feed, fetch
imdb_service = IMDBService(name="imdb")
......
......@@ -5,8 +5,9 @@ import sys
profile_file = sys.argv[1]
def prase(line, counter):
event_list = line.split(" ")
def prase(pid_str, time_str, counter):
pid = pid_str.split(":")[1]
event_list = time_str.split(" ")
trace_list = []
for event in event_list:
name, ts = event.split(":")
......@@ -19,7 +20,7 @@ def prase(line, counter):
event_dict = {}
event_dict["name"] = name
event_dict["tid"] = 0
event_dict["pid"] = 0
event_dict["pid"] = pid
event_dict["ts"] = ts
event_dict["ph"] = ph
......@@ -36,7 +37,7 @@ if __name__ == "__main__":
for line in f.readlines():
line = line.strip().split("\t")
if line[0] == "PROFILE":
trace_list = prase(line[1], counter)
trace_list = prase(line[1], line[2], counter)
counter += 1
for trace in trace_list:
all_list.append(trace)
......
......@@ -78,6 +78,7 @@ class Client(object):
self.feed_types_ = {}
self.feed_names_to_idx_ = {}
self.rpath()
self.pid = os.getpid()
def rpath(self):
lib_path = os.path.dirname(paddle_serving_client.__file__)
......@@ -160,6 +161,7 @@ class Client(object):
int_feed_names = []
float_feed_names = []
fetch_names = []
for key in feed:
self.shape_check(feed, key)
if key not in self.feed_names_:
......@@ -177,7 +179,7 @@ class Client(object):
ret = self.client_handle_.predict(float_slot, float_feed_names,
int_slot, int_feed_names, fetch_names,
self.result_handle_)
self.result_handle_, self.pid)
result_map = {}
for i, name in enumerate(fetch_names):
......
......@@ -11,16 +11,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import subprocess
import argparse
from multiprocessing import Pool
def benchmark_args():
parser = argparse.ArgumentParser("benchmark")
parser.add_argument("--thread", type=int, default=10, help="concurrecy")
parser.add_argument("--model", type=str, default="", help="model for evaluation")
parser.add_argument("--endpoint", type=str, default="127.0.0.1:9292", help="endpoint of server")
parser.add_argument("--request", type=str, default="rpc", help="mode of service")
return parser.parse_args()
class MultiThreadRunner(object):
def __init__(self):
pass
def run(self, thread_func, thread_num, global_resource):
os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
p = Pool(thread_num)
result_list = []
for i in range(thread_num):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册