未验证 提交 c6e63ce7 编写于 作者: M MRXLT 提交者: GitHub

Merge branch 'develop' into develop

......@@ -31,8 +31,9 @@ DEFINE_bool(print_output, false, "print output flag");
DEFINE_int32(thread_num, 1, "thread num");
std::atomic<int> g_concurrency(0);
std::vector<uint64_t> time_list;
std::vector<std::vector<uint64_t>> time_list;
std::vector<uint64_t> request_list;
int turns = 1000000 / FLAGS_batch;
namespace {
inline uint64_t time_diff(const struct timeval& start_time,
......@@ -97,7 +98,7 @@ int run(int argc, char** argv, int thread_id) {
while (g_concurrency.load() >= FLAGS_thread_num) {
}
g_concurrency++;
time_list[thread_id].resize(turns);
while (index < file_size) {
// uint64_t key = strtoul(buffer, NULL, 10);
......@@ -121,47 +122,12 @@ int run(int argc, char** argv, int thread_id) {
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
time_list[thread_id][request - 1] = seek_cost;
keys.clear();
values.clear();
}
}
/*
if (keys.size() > 0) {
int ret = 0;
values.resize(keys.size());
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
}
*/
g_concurrency--;
// fclose(key_file);
......@@ -171,12 +137,6 @@ int run(int argc, char** argv, int thread_id) {
LOG(WARNING) << "destroy cube api failed err=" << ret;
}
uint64_t seek_cost_avg = seek_cost_total / seek_counter;
LOG(INFO) << "seek cost avg = " << seek_cost_avg;
LOG(INFO) << "seek cost max = " << seek_cost_max;
LOG(INFO) << "seek cost min = " << seek_cost_min;
time_list[thread_id] = seek_cost_avg;
request_list[thread_id] = request;
return 0;
......@@ -188,6 +148,7 @@ int run_m(int argc, char** argv) {
request_list.resize(thread_num);
time_list.resize(thread_num);
std::vector<std::thread*> thread_pool;
TIME_FLAG(main_start);
for (int i = 0; i < thread_num; i++) {
thread_pool.push_back(new std::thread(run, argc, argv, i));
}
......@@ -195,27 +156,33 @@ int run_m(int argc, char** argv) {
thread_pool[i]->join();
delete thread_pool[i];
}
TIME_FLAG(main_end);
uint64_t sum_time = 0;
uint64_t max_time = 0;
uint64_t min_time = 1000000;
uint64_t request_num = 0;
for (int i = 0; i < thread_num; i++) {
sum_time += time_list[i];
if (time_list[i] > max_time) {
max_time = time_list[i];
for (int j = 0; j < request_list[i]; j++) {
sum_time += time_list[i][j];
if (time_list[i][j] > max_time) {
max_time = time_list[i][j];
}
if (time_list[i][j] < min_time) {
min_time = time_list[i][j];
}
if (time_list[i] < min_time) {
min_time = time_list[i];
}
request_num += request_list[i];
}
uint64_t mean_time = sum_time / thread_num;
LOG(INFO) << thread_num << " thread seek cost"
<< " avg = " << std::to_string(mean_time)
<< " max = " << std::to_string(max_time)
<< " min = " << std::to_string(min_time);
LOG(INFO) << " total_request = " << std::to_string(request_num) << " speed = "
<< std::to_string(1000000 * thread_num / mean_time) // mean_time us
uint64_t mean_time = sum_time / (thread_num * turns);
uint64_t main_time = time_diff(main_start, main_end);
LOG(INFO) << "\n"
<< thread_num << " thread seek cost"
<< "\navg = " << std::to_string(mean_time)
<< "\nmax = " << std::to_string(max_time)
<< "\nmin = " << std::to_string(min_time);
LOG(INFO) << "\ntotal_request = " << std::to_string(request_num)
<< "\nspeed = " << std::to_string(request_num * 1000000 /
main_time) // mean_time us
<< " query per second";
return 0;
}
......
......@@ -90,6 +90,9 @@ int GeneralDistKVInferOp::inference() {
keys.begin() + key_idx);
key_idx += dataptr_size_pairs[i].second;
}
Timer timeline;
int64_t cube_start = timeline.TimeStampUS();
timeline.Start();
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
std::vector<std::string> table_names = cube->get_table_names();
if (table_names.size() == 0) {
......@@ -97,7 +100,7 @@ int GeneralDistKVInferOp::inference() {
return -1;
}
int ret = cube->seek(table_names[0], keys, &values);
int64_t cube_end = timeline.TimeStampUS();
if (values.size() != keys.size() || values[0].buff.size() == 0) {
LOG(ERROR) << "cube value return null";
}
......@@ -153,9 +156,7 @@ int GeneralDistKVInferOp::inference() {
VLOG(2) << "infer batch size: " << batch_size;
Timer timeline;
int64_t start = timeline.TimeStampUS();
timeline.Start();
if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) {
......@@ -165,6 +166,8 @@ int GeneralDistKVInferOp::inference() {
int64_t end = timeline.TimeStampUS();
CopyBlobInfo(input_blob, output_blob);
AddBlobInfo(output_blob, cube_start);
AddBlobInfo(output_blob, cube_end);
AddBlobInfo(output_blob, start);
AddBlobInfo(output_blob, end);
return 0;
......
......@@ -75,14 +75,17 @@ if __name__ == '__main__':
multi_thread_runner = MultiThreadRunner()
endpoint_list = ["127.0.0.1:9292"]
#result = single_func(0, {"endpoint": endpoint_list})
start = time.time()
result = multi_thread_runner.run(single_func, args.thread,
{"endpoint": endpoint_list})
print(result)
end = time.time()
total_cost = end - start
avg_cost = 0
qps = 0
for i in range(args.thread):
avg_cost += result[0][i * 2 + 0]
qps += result[0][i * 2 + 1]
avg_cost = avg_cost / args.thread
print("total cost: {}".format(total_cost))
print("average total cost {} s.".format(avg_cost))
print("qps {} ins/s".format(qps))
rm profile_log
export FLAGS_profile_client=1
export FLAGS_profile_server=1
for thread_num in 1 2 4 8 16
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz --no-check-certificate
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz --no-check-certificate
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
python test_server.py ctr_serving_model_kv > serving_log 2>&1 &
for thread_num in 1 4 16
do
for batch_size in 1 4 16 64 256
for batch_size in 1 4 16 64
do
$PYTHONROOT/bin/python benchmark.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1
echo "batch size : $batch_size"
......@@ -11,6 +25,8 @@ do
echo "========================================"
echo "batch size : $batch_size" >> profile_log
$PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log
tail -n 2 profile >> profile_log
tail -n 3 profile >> profile_log
done
done
ps -ef|grep 'serving'|grep -v grep|cut -c 9-15 | xargs kill -9
rm profile_log
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz --no-check-certificate
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz --no-check-certificate
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
cp ../../../build_server/core/cube/cube-api/cube-cli .
python gen_key.py
for thread_num in 1 4 16 32
do
for batch_size in 1000
do
./cube-cli -config_file ./cube/conf/cube.conf -keys key -dict test_dict -thread_num $thread_num --batch $batch_size > profile 2>&1
echo "batch size : $batch_size"
echo "thread num : $thread_num"
echo "========================================"
echo "batch size : $batch_size" >> profile_log
echo "thread num : $thread_num" >> profile_log
tail -n 7 profile | head -n 4 >> profile_log
tail -n 2 profile >> profile_log
done
done
ps -ef|grep 'cube'|grep -v grep|cut -c 9-15 | xargs kill -9
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import random
with open("key", "w") as f:
for i in range(1000000):
f.write("{}\n".format(random.randint(0, 999999)))
......@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu")
server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server()
......@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu")
server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server()
......@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1], sys.argv[2])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu")
server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server()
......@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4)
server.load_model_config(sys.argv[1], sys.argv[2])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu")
server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server()
......@@ -24,7 +24,7 @@ import json
import base64
from paddle_serving_client import Client
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args
from paddle_serving_client.utils import benchmark_args, show_latency
from paddle_serving_app.reader import Sequential, File2Image, Resize
from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize
......@@ -38,7 +38,11 @@ seq_preprocess = Sequential([
def single_func(idx, resource):
file_list = []
turns = 10
turns = resource["turns"]
latency_flags = False
if os.getenv("FLAGS_serving_latency"):
latency_flags = True
latency_list = []
for file_name in os.listdir("./image_data/n01440764"):
file_list.append(file_name)
img_list = []
......@@ -56,6 +60,7 @@ def single_func(idx, resource):
start = time.time()
for i in range(turns):
if args.batch_size >= 1:
l_start = time.time()
feed_batch = []
i_start = time.time()
for bi in range(args.batch_size):
......@@ -69,6 +74,9 @@ def single_func(idx, resource):
int(round(i_end * 1000000))))
result = client.predict(feed=feed_batch, fetch=fetch)
l_end = time.time()
if latency_flags:
latency_list.append(l_end * 1000 - l_start * 1000)
else:
print("unsupport batch size {}".format(args.batch_size))
......@@ -88,6 +96,8 @@ def single_func(idx, resource):
r = requests.post(
server, data=req, headers={"Content-Type": "application/json"})
end = time.time()
if latency_flags:
return [[end - start], latency_list]
return [[end - start]]
......@@ -96,11 +106,21 @@ if __name__ == '__main__':
endpoint_list = [
"127.0.0.1:9292", "127.0.0.1:9293", "127.0.0.1:9294", "127.0.0.1:9295"
]
result = multi_thread_runner.run(single_func, args.thread,
{"endpoint": endpoint_list})
turns = 100
start = time.time()
result = multi_thread_runner.run(
single_func, args.thread, {"endpoint": endpoint_list,
"turns": turns})
#result = single_func(0, {"endpoint": endpoint_list})
end = time.time()
total_cost = end - start
avg_cost = 0
for i in range(args.thread):
avg_cost += result[0][i]
avg_cost = avg_cost / args.thread
print("average total cost {} s.".format(avg_cost))
print("total cost: {}s".format(end - start))
print("each thread cost: {}s.".format(avg_cost))
print("qps: {}samples/s".format(args.batch_size * args.thread * turns /
total_cost))
if os.getenv("FLAGS_serving_latency"):
show_latency(result[1])
......@@ -2,7 +2,7 @@ rm profile_log*
export CUDA_VISIBLE_DEVICES=0,1,2,3
export FLAGS_profile_server=1
export FLAGS_profile_client=1
python -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 2> elog > stdlog &
python -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 --mem_optim --ir_optim 2> elog > stdlog &
sleep 5
gpu_id=0
......
......@@ -12,6 +12,7 @@ else
fi
sleep 5
#warm up
$PYTHONROOT/bin/python3 benchmark.py --thread 4 --batch_size 1 --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1
echo -e "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
......
......@@ -25,6 +25,7 @@ from contextlib import closing
import collections
import fcntl
import shutil
import numpy as np
import grpc
from .proto import multi_lang_general_model_service_pb2
......@@ -230,7 +231,7 @@ class Server(object):
infer_service.workflows.extend(["workflow1"])
self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir):
def _prepare_resource(self, workdir, cube_conf):
self.workdir = workdir
if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn),
......@@ -242,6 +243,11 @@ class Server(object):
if "dist_kv" in node.name:
self.resource_conf.cube_config_path = workdir
self.resource_conf.cube_config_file = self.cube_config_fn
if cube_conf == None:
raise ValueError(
"Please set the path of cube.conf while use dist_kv op."
)
shutil.copy(cube_conf, workdir)
if "quant" in node.name:
self.resource_conf.cube_quant_bits = 8
self.resource_conf.model_toolkit_path = workdir
......@@ -366,7 +372,11 @@ class Server(object):
os.chdir(self.cur_path)
self.bin_path = self.server_path + "/serving"
def prepare_server(self, workdir=None, port=9292, device="cpu"):
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if workdir == None:
workdir = "./tmp"
os.system("mkdir {}".format(workdir))
......@@ -377,7 +387,7 @@ class Server(object):
if not self.port_is_available(port):
raise SystemExit("Port {} is already used".format(port))
self.set_port(port)
self._prepare_resource(workdir)
self._prepare_resource(workdir, cube_conf)
self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port)
self.workdir = workdir
......@@ -645,7 +655,11 @@ class MultiLangServer(object):
server_config_paths)
self.bclient_config_path_ = client_config_path
def prepare_server(self, workdir=None, port=9292, device="cpu"):
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if not self._port_is_available(port):
raise SystemExit("Prot {} is already used".format(port))
default_port = 12000
......@@ -656,7 +670,10 @@ class MultiLangServer(object):
self.port_list_.append(default_port + i)
break
self.bserver_.prepare_server(
workdir=workdir, port=self.port_list_[0], device=device)
workdir=workdir,
port=self.port_list_[0],
device=device,
cube_conf=cube_conf)
self.set_port(port)
def _launch_brpc_service(self, bserver):
......
......@@ -26,7 +26,7 @@ from contextlib import closing
import argparse
import collections
import fcntl
import shutil
import numpy as np
import grpc
from .proto import multi_lang_general_model_service_pb2
......@@ -285,7 +285,7 @@ class Server(object):
infer_service.workflows.extend(["workflow1"])
self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir):
def _prepare_resource(self, workdir, cube_conf):
self.workdir = workdir
if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn),
......@@ -297,6 +297,11 @@ class Server(object):
if "dist_kv" in node.name:
self.resource_conf.cube_config_path = workdir
self.resource_conf.cube_config_file = self.cube_config_fn
if cube_conf == None:
raise ValueError(
"Please set the path of cube.conf while use dist_kv op."
)
shutil.copy(cube_conf, workdir)
self.resource_conf.model_toolkit_path = workdir
self.resource_conf.model_toolkit_file = self.model_toolkit_fn
self.resource_conf.general_model_path = workdir
......@@ -406,7 +411,11 @@ class Server(object):
os.chdir(self.cur_path)
self.bin_path = self.server_path + "/serving"
def prepare_server(self, workdir=None, port=9292, device="cpu"):
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if workdir == None:
workdir = "./tmp"
os.system("mkdir {}".format(workdir))
......@@ -418,7 +427,7 @@ class Server(object):
raise SystemExit("Port {} is already used".format(port))
self.set_port(port)
self._prepare_resource(workdir)
self._prepare_resource(workdir, cube_conf)
self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port)
self.workdir = workdir
......@@ -690,7 +699,11 @@ class MultiLangServer(object):
server_config_paths)
self.bclient_config_path_ = client_config_path
def prepare_server(self, workdir=None, port=9292, device="cpu"):
def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if not self._port_is_available(port):
raise SystemExit("Prot {} is already used".format(port))
default_port = 12000
......@@ -701,7 +714,10 @@ class MultiLangServer(object):
self.port_list_.append(default_port + i)
break
self.bserver_.prepare_server(
workdir=workdir, port=self.port_list_[0], device=device)
workdir=workdir,
port=self.port_list_[0],
device=device,
cube_conf=cube_conf)
self.set_port(port)
def _launch_brpc_service(self, bserver):
......
......@@ -229,10 +229,7 @@ function python_run_criteo_ctr_with_cube() {
check_cmd "mv models/data ./cube/"
check_cmd "mv models/ut_data ./"
cp ../../../build-server-$TYPE/output/bin/cube* ./cube/
mkdir -p $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/
yes | cp ../../../build-server-$TYPE/output/demo/serving/bin/serving $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/
sh cube_prepare.sh &
check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/"
python test_server.py ctr_serving_model_kv &
sleep 5
check_cmd "python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data >score"
......@@ -257,10 +254,7 @@ function python_run_criteo_ctr_with_cube() {
check_cmd "mv models/data ./cube/"
check_cmd "mv models/ut_data ./"
cp ../../../build-server-$TYPE/output/bin/cube* ./cube/
mkdir -p $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server_gpu/serving-gpu-0.1.3/
yes | cp ../../../build-server-$TYPE/output/demo/serving/bin/serving $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server_gpu/serving-gpu-0.1.3/
sh cube_prepare.sh &
check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/"
python test_server_gpu.py ctr_serving_model_kv &
sleep 5
# for warm up
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册