提交 a8806e95 编写于 作者: M MRXLT

fix conflict

...@@ -45,12 +45,12 @@ class PredictorRes { ...@@ -45,12 +45,12 @@ class PredictorRes {
~PredictorRes() {} ~PredictorRes() {}
public: public:
const std::vector<std::vector<int64_t>> & get_int64_by_name( const std::vector<std::vector<int64_t>>& get_int64_by_name(
const std::string & name) { const std::string& name) {
return _int64_map[name]; return _int64_map[name];
} }
const std::vector<std::vector<float>> & get_float_by_name( const std::vector<std::vector<float>>& get_float_by_name(
const std::string & name) { const std::string& name) {
return _float_map[name]; return _float_map[name];
} }
...@@ -71,7 +71,7 @@ class PredictorClient { ...@@ -71,7 +71,7 @@ class PredictorClient {
void set_predictor_conf(const std::string& conf_path, void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file); const std::string& conf_file);
int create_predictor_by_desc(const std::string & sdk_desc); int create_predictor_by_desc(const std::string& sdk_desc);
int create_predictor(); int create_predictor();
int destroy_predictor(); int destroy_predictor();
...@@ -81,7 +81,8 @@ class PredictorClient { ...@@ -81,7 +81,8 @@ class PredictorClient {
const std::vector<std::vector<int64_t>>& int_feed, const std::vector<std::vector<int64_t>>& int_feed,
const std::vector<std::string>& int_feed_name, const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name, const std::vector<std::string>& fetch_name,
PredictorRes & predict_res); // NOLINT PredictorRes& predict_res, // NOLINT
const int& pid);
std::vector<std::vector<float>> predict( std::vector<std::vector<float>> predict(
const std::vector<std::vector<float>>& float_feed, const std::vector<std::vector<float>>& float_feed,
......
...@@ -132,13 +132,13 @@ int PredictorClient::create_predictor() { ...@@ -132,13 +132,13 @@ int PredictorClient::create_predictor() {
_api.thrd_initialize(); _api.thrd_initialize();
} }
int PredictorClient::predict( int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
const std::vector<std::vector<float>>& float_feed, const std::vector<std::string> &float_feed_name,
const std::vector<std::string>& float_feed_name, const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::vector<int64_t>>& int_feed, const std::vector<std::string> &int_feed_name,
const std::vector<std::string>& int_feed_name, const std::vector<std::string> &fetch_name,
const std::vector<std::string>& fetch_name, PredictorRes &predict_res,
PredictorRes & predict_res) { // NOLINT const int &pid) { // NOLINT
predict_res._int64_map.clear(); predict_res._int64_map.clear();
predict_res._float_map.clear(); predict_res._float_map.clear();
Timer timeline; Timer timeline;
...@@ -218,6 +218,7 @@ int PredictorClient::predict( ...@@ -218,6 +218,7 @@ int PredictorClient::predict(
VLOG(2) << "fetch name: " << name; VLOG(2) << "fetch name: " << name;
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
int len = res.insts(0).tensor_array(idx).int64_data_size(); int len = res.insts(0).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res._int64_map[name].resize(1); predict_res._int64_map[name].resize(1);
predict_res._int64_map[name][0].resize(len); predict_res._int64_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
...@@ -226,6 +227,7 @@ int PredictorClient::predict( ...@@ -226,6 +227,7 @@ int PredictorClient::predict(
} }
} else if (_fetch_name_to_type[name] == 1) { } else if (_fetch_name_to_type[name] == 1) {
int len = res.insts(0).tensor_array(idx).float_data_size(); int len = res.insts(0).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len;
predict_res._float_map[name].resize(1); predict_res._float_map[name].resize(1);
predict_res._float_map[name][0].resize(len); predict_res._float_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
...@@ -240,6 +242,7 @@ int PredictorClient::predict( ...@@ -240,6 +242,7 @@ int PredictorClient::predict(
if (FLAGS_profile_client) { if (FLAGS_profile_client) {
std::ostringstream oss; std::ostringstream oss;
oss << "PROFILE\t" oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " " << "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
...@@ -342,7 +345,7 @@ std::vector<std::vector<std::vector<float>>> PredictorClient::batch_predict( ...@@ -342,7 +345,7 @@ std::vector<std::vector<std::vector<float>>> PredictorClient::batch_predict(
} }
VLOG(2) << "batch [" << bi << "] " VLOG(2) << "batch [" << bi << "] "
<< "itn feed value prepared"; << "int feed value prepared";
} }
int64_t preprocess_end = timeline.TimeStampUS(); int64_t preprocess_end = timeline.TimeStampUS();
......
...@@ -31,13 +31,15 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -31,13 +31,15 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol()) py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol())
.def(py::init()) .def(py::init())
.def("get_int64_by_name", .def("get_int64_by_name",
[](PredictorRes &self, std::string & name) { [](PredictorRes &self, std::string &name) {
return self.get_int64_by_name(name); return self.get_int64_by_name(name);
}, py::return_value_policy::reference) },
py::return_value_policy::reference)
.def("get_float_by_name", .def("get_float_by_name",
[](PredictorRes &self, std::string & name) { [](PredictorRes &self, std::string &name) {
return self.get_float_by_name(name); return self.get_float_by_name(name);
}, py::return_value_policy::reference); },
py::return_value_policy::reference);
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol()) py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init()) .def(py::init())
...@@ -56,8 +58,9 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -56,8 +58,9 @@ PYBIND11_MODULE(serving_client, m) {
self.set_predictor_conf(conf_path, conf_file); self.set_predictor_conf(conf_path, conf_file);
}) })
.def("create_predictor_by_desc", .def("create_predictor_by_desc",
[](PredictorClient &self, const std::string & sdk_desc) { [](PredictorClient &self, const std::string &sdk_desc) {
self.create_predictor_by_desc(sdk_desc); }) self.create_predictor_by_desc(sdk_desc);
})
.def("create_predictor", .def("create_predictor",
[](PredictorClient &self) { self.create_predictor(); }) [](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor", .def("destroy_predictor",
...@@ -69,13 +72,15 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -69,13 +72,15 @@ PYBIND11_MODULE(serving_client, m) {
const std::vector<std::vector<int64_t>> &int_feed, const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name, const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes & predict_res) { PredictorRes &predict_res,
const int &pid) {
return self.predict(float_feed, return self.predict(float_feed,
float_feed_name, float_feed_name,
int_feed, int_feed,
int_feed_name, int_feed_name,
fetch_name, fetch_name,
predict_res); predict_res,
pid);
}) })
.def("batch_predict", .def("batch_predict",
[](PredictorClient &self, [](PredictorClient &self,
......
...@@ -36,6 +36,7 @@ class BertService(): ...@@ -36,6 +36,7 @@ class BertService():
self.show_ids = show_ids self.show_ids = show_ids
self.do_lower_case = do_lower_case self.do_lower_case = do_lower_case
self.retry = retry self.retry = retry
self.pid = os.getpid()
self.profile = True if ("FLAGS_profile_client" in os.environ and self.profile = True if ("FLAGS_profile_client" in os.environ and
os.environ["FLAGS_profile_client"]) else False os.environ["FLAGS_profile_client"]) else False
...@@ -78,7 +79,8 @@ class BertService(): ...@@ -78,7 +79,8 @@ class BertService():
} }
prepro_end = time.time() prepro_end = time.time()
if self.profile: if self.profile:
print("PROFILE\tbert_pre_0:{} bert_pre_1:{}".format( print("PROFILE\tpid:{}\tbert_pre_0:{} bert_pre_1:{}".format(
self.pid,
int(round(prepro_start * 1000000)), int(round(prepro_start * 1000000)),
int(round(prepro_end * 1000000)))) int(round(prepro_end * 1000000))))
fetch_map = self.client.predict(feed=feed, fetch=fetch) fetch_map = self.client.predict(feed=feed, fetch=fetch)
...@@ -111,7 +113,8 @@ class BertService(): ...@@ -111,7 +113,8 @@ class BertService():
feed_batch.append(feed) feed_batch.append(feed)
prepro_end = time.time() prepro_end = time.time()
if self.profile: if self.profile:
print("PROFILE\tbert_pre_0:{} bert_pre_1:{}".format( print("PROFILE\tpid:{}\tbert_pre_0:{} bert_pre_1:{}".format(
self.pid,
int(round(prepro_start * 1000000)), int(round(prepro_start * 1000000)),
int(round(prepro_end * 1000000)))) int(round(prepro_end * 1000000))))
fetch_map_batch = self.client.batch_predict( fetch_map_batch = self.client.batch_predict(
...@@ -120,7 +123,6 @@ class BertService(): ...@@ -120,7 +123,6 @@ class BertService():
def test(): def test():
bc = BertService( bc = BertService(
model_name='bert_chinese_L-12_H-768_A-12', model_name='bert_chinese_L-12_H-768_A-12',
max_seq_len=20, max_seq_len=20,
...@@ -130,9 +132,13 @@ def test(): ...@@ -130,9 +132,13 @@ def test():
config_file = './serving_client_conf/serving_client_conf.prototxt' config_file = './serving_client_conf/serving_client_conf.prototxt'
fetch = ["pooled_output"] fetch = ["pooled_output"]
bc.load_client(config_file, server_addr) bc.load_client(config_file, server_addr)
batch_size = 4 batch_size = 1
batch = [] batch = []
for line in sys.stdin: for line in sys.stdin:
if batch_size == 1:
result = bc.run_general([[line.strip()]], fetch)
print(result)
else:
if len(batch) < batch_size: if len(batch) < batch_size:
batch.append([line.strip()]) batch.append([line.strip()])
else: else:
...@@ -140,6 +146,11 @@ def test(): ...@@ -140,6 +146,11 @@ def test():
batch = [] batch = []
for r in result: for r in result:
print(r) print(r)
if len(batch) > 0:
result = bc.run_batch_general(batch, fetch)
batch = []
for r in result:
print(r)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -31,8 +31,6 @@ op_seq_maker.add_op(general_response_op) ...@@ -31,8 +31,6 @@ op_seq_maker.add_op(general_response_op)
server = Server() server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4) server.set_num_threads(4)
server.set_local_bin(
"~/github/Serving/build_server/core/general-server/serving")
server.load_model_config(sys.argv[1]) server.load_model_config(sys.argv[1])
port = int(sys.argv[2]) port = int(sys.argv[2])
......
wget https://paddle-serving.bj.bcebos.com/bert_example/data-c.txt --no-check-certificate
...@@ -11,25 +11,38 @@ ...@@ -11,25 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle_serving_client import Client
import os from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args
import time
import paddle
import sys import sys
from paddle_serving_server_gpu import OpMaker import requests
from paddle_serving_server_gpu import OpSeqMaker
from paddle_serving_server_gpu import Server
op_maker = OpMaker() args = benchmark_args()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
op_seq_maker = OpSeqMaker() def single_func(idx, resource):
op_seq_maker.add_op(read_op) if args.request == "rpc":
op_seq_maker.add_op(general_infer_op) client = Client()
client.load_client_config(args.model)
client.connect([args.endpoint])
train_reader = paddle.batch(paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500), batch_size=1)
start = time.time()
for data in train_reader():
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"])
end = time.time()
return [[end - start]]
elif args.request == "http":
train_reader = paddle.batch(paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500), batch_size=1)
start = time.time()
for data in train_reader():
r = requests.post('http://{}/uci/prediction'.format(args.endpoint),
data = {"x": data[0]})
end = time.time()
return [[end - start]]
server = Server() multi_thread_runner = MultiThreadRunner()
server.set_op_sequence(op_seq_maker.get_op_sequence()) result = multi_thread_runner.run(single_func, args.thread, {})
server.set_num_threads(12) print(result)
server.load_model_config(sys.argv[1])
port = int(sys.argv[2])
server.prepare_server(workdir="work_dir1", port=port, device="gpu")
server.run_server()
...@@ -19,7 +19,7 @@ cat test.data | python test_client_batch.py inference.conf 4 > result ...@@ -19,7 +19,7 @@ cat test.data | python test_client_batch.py inference.conf 4 > result
设备 :Intel(R) Xeon(R) Gold 6271 CPU @ 2.60GHz * 48 设备 :Intel(R) Xeon(R) Gold 6271 CPU @ 2.60GHz * 48
模型 :IMDB-CNN 模型 :[CNN](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imdb/nets.py)
server thread num : 16 server thread num : 16
......
...@@ -13,55 +13,45 @@ ...@@ -13,55 +13,45 @@
# limitations under the License. # limitations under the License.
import sys import sys
import time
import requests
from imdb_reader import IMDBDataset
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
import time from paddle_serving_client.utils import benchmark_args
args = benchmark_args()
def predict(thr_id, resource): def single_func(idx, resource):
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource(args.vocab)
filelist_fn = args.filelist
filelist = []
start = time.time()
with open(filelist_fn) as fin:
for line in fin:
filelist.append(line.strip())
filelist = filelist[idx::args.thread]
if args.request == "rpc":
client = Client() client = Client()
client.load_client_config(resource["conf_file"]) client.load_client_config(args.model)
client.connect(resource["server_endpoint"]) client.connect([args.endpoint])
thread_num = resource["thread_num"] for fn in filelist:
file_list = resource["filelist"]
line_id = 0
prob = []
label_list = []
dataset = []
for fn in file_list:
fin = open(fn) fin = open(fn)
for line in fin: for line in fin:
if line_id % thread_num == thr_id - 1: word_ids, label = imdb_dataset.get_words_and_label(line)
group = line.strip().split() fetch_map = client.predict(feed={"words": word_ids},
words = [int(x) for x in group[1:int(group[0])]] fetch=["prediction"])
label = [int(group[-1])] elif args.request == "http":
feed = {"words": words, "label": label} for fn in filelist:
dataset.append(feed) fin = open(fn)
line_id += 1 for line in fin:
fin.close() word_ids, label = imdb_dataset.get_words_and_label(line)
r = requests.post("http://{}/imdb/prediction".format(args.endpoint),
start = time.time() data={"words": word_ids})
fetch = ["acc", "cost", "prediction"]
for inst in dataset:
fetch_map = client.predict(feed=inst, fetch=fetch)
prob.append(fetch_map["prediction"][1])
label_list.append(label[0])
end = time.time() end = time.time()
client.release() return [[end - start]]
return [prob, label_list, [end - start]]
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
resource = {}
resource["conf_file"] = conf_file
resource["server_endpoint"] = ["127.0.0.1:9293"]
resource["filelist"] = [data_file]
resource["thread_num"] = int(sys.argv[3])
thread_runner = MultiThreadRunner()
result = thread_runner.run(predict, int(sys.argv[3]), resource)
print("total time {} s".format(sum(result[-1]) / len(result[-1]))) multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(single_func, args.thread, {})
print(result)
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz tar -zxvf text_classification_data.tar.gz
#wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo%2Fimdb.tar.gz tar -zxvf imdb_model.tar.gz
#tar -xzf imdb-demo%2Fimdb.tar.gz
...@@ -30,6 +30,14 @@ class IMDBDataset(dg.MultiSlotDataGenerator): ...@@ -30,6 +30,14 @@ class IMDBDataset(dg.MultiSlotDataGenerator):
self._pattern = re.compile(r'(;|,|\.|\?|!|\s|\(|\))') self._pattern = re.compile(r'(;|,|\.|\?|!|\s|\(|\))')
self.return_value = ("words", [1, 2, 3, 4, 5, 6]), ("label", [0]) self.return_value = ("words", [1, 2, 3, 4, 5, 6]), ("label", [0])
def get_words_only(self, line):
sent = line.lower().replace("<br />", " ").strip()
words = [x for x in self._pattern.split(sent) if x and x != " "]
feas = [
self._vocab[x] if x in self._vocab else self._unk_id for x in words
]
return feas
def get_words_and_label(self, line): def get_words_and_label(self, line):
send = '|'.join(line.split('|')[:-1]).lower().replace("<br />", send = '|'.join(line.split('|')[:-1]).lower().replace("<br />",
" ").strip() " ").strip()
......
wget https://paddle-serving.bj.bcebos.com/imdb-demo%2Fimdb_service.tar.gz wget https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_service.tar.gz
tar -xzf imdb_service.tar.gz tar -xzf imdb_service.tar.gz
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
tar -zxvf text_classification_data.tar.gz tar -zxvf text_classification_data.tar.gz
......
...@@ -49,8 +49,9 @@ if __name__ == "__main__": ...@@ -49,8 +49,9 @@ if __name__ == "__main__":
dataset.set_batch_size(128) dataset.set_batch_size(128)
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.set_thread(10) dataset.set_thread(10)
from nets import bow_net from nets import lstm_net
avg_cost, acc, prediction = bow_net(data, label, dict_dim) model_name = "imdb_lstm"
avg_cost, acc, prediction = lstm_net(data, label, dict_dim)
optimizer = fluid.optimizer.SGD(learning_rate=0.01) optimizer = fluid.optimizer.SGD(learning_rate=0.01)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
...@@ -65,6 +66,7 @@ if __name__ == "__main__": ...@@ -65,6 +66,7 @@ if __name__ == "__main__":
program=fluid.default_main_program(), dataset=dataset, debug=False) program=fluid.default_main_program(), dataset=dataset, debug=False)
logger.info("TRAIN --> pass: {}".format(i)) logger.info("TRAIN --> pass: {}".format(i))
if i == 5: if i == 5:
serving_io.save_model("imdb_model", "imdb_client_conf", serving_io.save_model("{}_model".format(model_name),
"{}_client_conf".format(model_name),
{"words": data}, {"prediction": prediction}, {"words": data}, {"prediction": prediction},
fluid.default_main_program()) fluid.default_main_program())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client from paddle_serving_client import Client
from imdb_reader import IMDBDataset
import sys import sys
client = Client() client = Client()
client.load_client_config(sys.argv[1]) client.load_client_config(sys.argv[1])
client.connect(["127.0.0.1:9393"]) client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource(sys.argv[2])
for line in sys.stdin: for line in sys.stdin:
group = line.strip().split() word_ids, label = imdb_dataset.get_words_and_label(line)
words = [int(x) for x in group[1:int(group[0]) + 1]] feed = {"words": word_ids, "label": label}
label = [int(group[-1])]
feed = {"words": words, "label": label}
fetch = ["acc", "cost", "prediction"] fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch) fetch_map = client.predict(feed=feed, fetch=fetch)
print("{} {}".format(fetch_map["prediction"][1], label[0])) print("{} {}".format(fetch_map["prediction"][1], label[0]))
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client
import sys
import subprocess
from multiprocessing import Pool
import time
def predict(p_id, p_size, data_list):
client = Client()
client.load_client_config(conf_file)
client.connect(["127.0.0.1:8010"])
result = []
for line in data_list:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0])]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
fetch = ["acc", "cost", "prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch)
#print("{} {}".format(fetch_map["prediction"][1], label[0]))
result.append([fetch_map["prediction"][1], label[0]])
return result
def predict_multi_thread(p_num):
data_list = []
with open(data_file) as f:
for line in f.readlines():
data_list.append(line)
start = time.time()
p = Pool(p_num)
p_size = len(data_list) / p_num
result_list = []
for i in range(p_num):
result_list.append(
p.apply_async(predict,
[i, p_size, data_list[i * p_size:(i + 1) * p_size]]))
p.close()
p.join()
for i in range(p_num):
result = result_list[i].get()
for j in result:
print("{} {}".format(j[0], j[1]))
cost = time.time() - start
print("{} threads cost {}".format(p_num, cost))
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
p_num = int(sys.argv[3])
predict_multi_thread(p_num)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer')
general_response_op = op_maker.create('general_response')
op_seq_maker = OpSeqMaker()
op_seq_maker.add_op(read_op)
op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op)
server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1])
port = int(sys.argv[2])
server.prepare_server(workdir="work_dir1", port=port, device="cpu")
server.run_server()
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
#!flask/bin/python
from paddle_serving_server.web_service import WebService from paddle_serving_server.web_service import WebService
from imdb_reader import IMDBDataset from imdb_reader import IMDBDataset
import sys import sys
...@@ -27,7 +26,7 @@ class IMDBService(WebService): ...@@ -27,7 +26,7 @@ class IMDBService(WebService):
if "words" not in feed: if "words" not in feed:
exit(-1) exit(-1)
res_feed = {} res_feed = {}
res_feed["words"] = self.dataset.get_words_and_label(feed["words"])[0] res_feed["words"] = self.dataset.get_words_only(feed["words"])[0]
return res_feed, fetch return res_feed, fetch
imdb_service = IMDBService(name="imdb") imdb_service = IMDBService(name="imdb")
......
...@@ -5,8 +5,9 @@ import sys ...@@ -5,8 +5,9 @@ import sys
profile_file = sys.argv[1] profile_file = sys.argv[1]
def prase(line, counter): def prase(pid_str, time_str, counter):
event_list = line.split(" ") pid = pid_str.split(":")[1]
event_list = time_str.split(" ")
trace_list = [] trace_list = []
for event in event_list: for event in event_list:
name, ts = event.split(":") name, ts = event.split(":")
...@@ -19,7 +20,7 @@ def prase(line, counter): ...@@ -19,7 +20,7 @@ def prase(line, counter):
event_dict = {} event_dict = {}
event_dict["name"] = name event_dict["name"] = name
event_dict["tid"] = 0 event_dict["tid"] = 0
event_dict["pid"] = 0 event_dict["pid"] = pid
event_dict["ts"] = ts event_dict["ts"] = ts
event_dict["ph"] = ph event_dict["ph"] = ph
...@@ -36,7 +37,7 @@ if __name__ == "__main__": ...@@ -36,7 +37,7 @@ if __name__ == "__main__":
for line in f.readlines(): for line in f.readlines():
line = line.strip().split("\t") line = line.strip().split("\t")
if line[0] == "PROFILE": if line[0] == "PROFILE":
trace_list = prase(line[1], counter) trace_list = prase(line[1], line[2], counter)
counter += 1 counter += 1
for trace in trace_list: for trace in trace_list:
all_list.append(trace) all_list.append(trace)
......
...@@ -78,6 +78,7 @@ class Client(object): ...@@ -78,6 +78,7 @@ class Client(object):
self.feed_types_ = {} self.feed_types_ = {}
self.feed_names_to_idx_ = {} self.feed_names_to_idx_ = {}
self.rpath() self.rpath()
self.pid = os.getpid()
def rpath(self): def rpath(self):
lib_path = os.path.dirname(paddle_serving_client.__file__) lib_path = os.path.dirname(paddle_serving_client.__file__)
...@@ -160,6 +161,7 @@ class Client(object): ...@@ -160,6 +161,7 @@ class Client(object):
int_feed_names = [] int_feed_names = []
float_feed_names = [] float_feed_names = []
fetch_names = [] fetch_names = []
for key in feed: for key in feed:
self.shape_check(feed, key) self.shape_check(feed, key)
if key not in self.feed_names_: if key not in self.feed_names_:
...@@ -177,7 +179,7 @@ class Client(object): ...@@ -177,7 +179,7 @@ class Client(object):
ret = self.client_handle_.predict(float_slot, float_feed_names, ret = self.client_handle_.predict(float_slot, float_feed_names,
int_slot, int_feed_names, fetch_names, int_slot, int_feed_names, fetch_names,
self.result_handle_) self.result_handle_, self.pid)
result_map = {} result_map = {}
for i, name in enumerate(fetch_names): for i, name in enumerate(fetch_names):
......
...@@ -11,16 +11,28 @@ ...@@ -11,16 +11,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import sys import sys
import subprocess import subprocess
import argparse
from multiprocessing import Pool from multiprocessing import Pool
def benchmark_args():
parser = argparse.ArgumentParser("benchmark")
parser.add_argument("--thread", type=int, default=10, help="concurrecy")
parser.add_argument("--model", type=str, default="", help="model for evaluation")
parser.add_argument("--endpoint", type=str, default="127.0.0.1:9292", help="endpoint of server")
parser.add_argument("--request", type=str, default="rpc", help="mode of service")
return parser.parse_args()
class MultiThreadRunner(object): class MultiThreadRunner(object):
def __init__(self): def __init__(self):
pass pass
def run(self, thread_func, thread_num, global_resource): def run(self, thread_func, thread_num, global_resource):
os.environ["http_proxy"] = ""
os.environ["https_proxy"] = ""
p = Pool(thread_num) p = Pool(thread_num)
result_list = [] result_list = []
for i in range(thread_num): for i in range(thread_num):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册