提交 886fe9e7 编写于 作者: B barriery 提交者: GitHub

Merge branch 'develop' into pipeline-update

...@@ -31,8 +31,9 @@ DEFINE_bool(print_output, false, "print output flag"); ...@@ -31,8 +31,9 @@ DEFINE_bool(print_output, false, "print output flag");
DEFINE_int32(thread_num, 1, "thread num"); DEFINE_int32(thread_num, 1, "thread num");
std::atomic<int> g_concurrency(0); std::atomic<int> g_concurrency(0);
std::vector<uint64_t> time_list; std::vector<std::vector<uint64_t>> time_list;
std::vector<uint64_t> request_list; std::vector<uint64_t> request_list;
int turns = 1000000 / FLAGS_batch;
namespace { namespace {
inline uint64_t time_diff(const struct timeval& start_time, inline uint64_t time_diff(const struct timeval& start_time,
...@@ -97,7 +98,7 @@ int run(int argc, char** argv, int thread_id) { ...@@ -97,7 +98,7 @@ int run(int argc, char** argv, int thread_id) {
while (g_concurrency.load() >= FLAGS_thread_num) { while (g_concurrency.load() >= FLAGS_thread_num) {
} }
g_concurrency++; g_concurrency++;
time_list[thread_id].resize(turns);
while (index < file_size) { while (index < file_size) {
// uint64_t key = strtoul(buffer, NULL, 10); // uint64_t key = strtoul(buffer, NULL, 10);
...@@ -121,47 +122,12 @@ int run(int argc, char** argv, int thread_id) { ...@@ -121,47 +122,12 @@ int run(int argc, char** argv, int thread_id) {
} }
++seek_counter; ++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end); uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost; time_list[thread_id][request - 1] = seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
keys.clear(); keys.clear();
values.clear(); values.clear();
} }
} }
/*
if (keys.size() > 0) {
int ret = 0;
values.resize(keys.size());
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
}
*/
g_concurrency--; g_concurrency--;
// fclose(key_file); // fclose(key_file);
...@@ -171,12 +137,6 @@ int run(int argc, char** argv, int thread_id) { ...@@ -171,12 +137,6 @@ int run(int argc, char** argv, int thread_id) {
LOG(WARNING) << "destroy cube api failed err=" << ret; LOG(WARNING) << "destroy cube api failed err=" << ret;
} }
uint64_t seek_cost_avg = seek_cost_total / seek_counter;
LOG(INFO) << "seek cost avg = " << seek_cost_avg;
LOG(INFO) << "seek cost max = " << seek_cost_max;
LOG(INFO) << "seek cost min = " << seek_cost_min;
time_list[thread_id] = seek_cost_avg;
request_list[thread_id] = request; request_list[thread_id] = request;
return 0; return 0;
...@@ -188,6 +148,7 @@ int run_m(int argc, char** argv) { ...@@ -188,6 +148,7 @@ int run_m(int argc, char** argv) {
request_list.resize(thread_num); request_list.resize(thread_num);
time_list.resize(thread_num); time_list.resize(thread_num);
std::vector<std::thread*> thread_pool; std::vector<std::thread*> thread_pool;
TIME_FLAG(main_start);
for (int i = 0; i < thread_num; i++) { for (int i = 0; i < thread_num; i++) {
thread_pool.push_back(new std::thread(run, argc, argv, i)); thread_pool.push_back(new std::thread(run, argc, argv, i));
} }
...@@ -195,27 +156,33 @@ int run_m(int argc, char** argv) { ...@@ -195,27 +156,33 @@ int run_m(int argc, char** argv) {
thread_pool[i]->join(); thread_pool[i]->join();
delete thread_pool[i]; delete thread_pool[i];
} }
TIME_FLAG(main_end);
uint64_t sum_time = 0; uint64_t sum_time = 0;
uint64_t max_time = 0; uint64_t max_time = 0;
uint64_t min_time = 1000000; uint64_t min_time = 1000000;
uint64_t request_num = 0; uint64_t request_num = 0;
for (int i = 0; i < thread_num; i++) { for (int i = 0; i < thread_num; i++) {
sum_time += time_list[i]; for (int j = 0; j < request_list[i]; j++) {
if (time_list[i] > max_time) { sum_time += time_list[i][j];
max_time = time_list[i]; if (time_list[i][j] > max_time) {
max_time = time_list[i][j];
}
if (time_list[i][j] < min_time) {
min_time = time_list[i][j];
} }
if (time_list[i] < min_time) {
min_time = time_list[i];
} }
request_num += request_list[i]; request_num += request_list[i];
} }
uint64_t mean_time = sum_time / thread_num; uint64_t mean_time = sum_time / (thread_num * turns);
LOG(INFO) << thread_num << " thread seek cost" uint64_t main_time = time_diff(main_start, main_end);
<< " avg = " << std::to_string(mean_time) LOG(INFO) << "\n"
<< " max = " << std::to_string(max_time) << thread_num << " thread seek cost"
<< " min = " << std::to_string(min_time); << "\navg = " << std::to_string(mean_time)
LOG(INFO) << " total_request = " << std::to_string(request_num) << " speed = " << "\nmax = " << std::to_string(max_time)
<< std::to_string(1000000 * thread_num / mean_time) // mean_time us << "\nmin = " << std::to_string(min_time);
LOG(INFO) << "\ntotal_request = " << std::to_string(request_num)
<< "\nspeed = " << std::to_string(request_num * 1000000 /
main_time) // mean_time us
<< " query per second"; << " query per second";
return 0; return 0;
} }
......
...@@ -90,6 +90,9 @@ int GeneralDistKVInferOp::inference() { ...@@ -90,6 +90,9 @@ int GeneralDistKVInferOp::inference() {
keys.begin() + key_idx); keys.begin() + key_idx);
key_idx += dataptr_size_pairs[i].second; key_idx += dataptr_size_pairs[i].second;
} }
Timer timeline;
int64_t cube_start = timeline.TimeStampUS();
timeline.Start();
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
std::vector<std::string> table_names = cube->get_table_names(); std::vector<std::string> table_names = cube->get_table_names();
if (table_names.size() == 0) { if (table_names.size() == 0) {
...@@ -97,7 +100,7 @@ int GeneralDistKVInferOp::inference() { ...@@ -97,7 +100,7 @@ int GeneralDistKVInferOp::inference() {
return -1; return -1;
} }
int ret = cube->seek(table_names[0], keys, &values); int ret = cube->seek(table_names[0], keys, &values);
int64_t cube_end = timeline.TimeStampUS();
if (values.size() != keys.size() || values[0].buff.size() == 0) { if (values.size() != keys.size() || values[0].buff.size() == 0) {
LOG(ERROR) << "cube value return null"; LOG(ERROR) << "cube value return null";
} }
...@@ -153,9 +156,7 @@ int GeneralDistKVInferOp::inference() { ...@@ -153,9 +156,7 @@ int GeneralDistKVInferOp::inference() {
VLOG(2) << "infer batch size: " << batch_size; VLOG(2) << "infer batch size: " << batch_size;
Timer timeline;
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
timeline.Start();
if (InferManager::instance().infer( if (InferManager::instance().infer(
engine_name().c_str(), &infer_in, out, batch_size)) { engine_name().c_str(), &infer_in, out, batch_size)) {
...@@ -165,6 +166,8 @@ int GeneralDistKVInferOp::inference() { ...@@ -165,6 +166,8 @@ int GeneralDistKVInferOp::inference() {
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
CopyBlobInfo(input_blob, output_blob); CopyBlobInfo(input_blob, output_blob);
AddBlobInfo(output_blob, cube_start);
AddBlobInfo(output_blob, cube_end);
AddBlobInfo(output_blob, start); AddBlobInfo(output_blob, start);
AddBlobInfo(output_blob, end); AddBlobInfo(output_blob, end);
return 0; return 0;
......
...@@ -3,7 +3,7 @@ export CUDA_VISIBLE_DEVICES=0,1,2,3 ...@@ -3,7 +3,7 @@ export CUDA_VISIBLE_DEVICES=0,1,2,3
export FLAGS_profile_server=1 export FLAGS_profile_server=1
export FLAGS_profile_client=1 export FLAGS_profile_client=1
export FLAGS_serving_latency=1 export FLAGS_serving_latency=1
python3 -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 --mem_optim False --ir_optim True 2> elog > stdlog & python3 -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 --mem_optim --ir_optim 2> elog > stdlog &
hostname=`echo $(hostname)|awk -F '.baidu.com' '{print $1}'` hostname=`echo $(hostname)|awk -F '.baidu.com' '{print $1}'`
sleep 5 sleep 5
gpu_id=0 gpu_id=0
......
...@@ -75,14 +75,17 @@ if __name__ == '__main__': ...@@ -75,14 +75,17 @@ if __name__ == '__main__':
multi_thread_runner = MultiThreadRunner() multi_thread_runner = MultiThreadRunner()
endpoint_list = ["127.0.0.1:9292"] endpoint_list = ["127.0.0.1:9292"]
#result = single_func(0, {"endpoint": endpoint_list}) #result = single_func(0, {"endpoint": endpoint_list})
start = time.time()
result = multi_thread_runner.run(single_func, args.thread, result = multi_thread_runner.run(single_func, args.thread,
{"endpoint": endpoint_list}) {"endpoint": endpoint_list})
print(result) end = time.time()
total_cost = end - start
avg_cost = 0 avg_cost = 0
qps = 0 qps = 0
for i in range(args.thread): for i in range(args.thread):
avg_cost += result[0][i * 2 + 0] avg_cost += result[0][i * 2 + 0]
qps += result[0][i * 2 + 1] qps += result[0][i * 2 + 1]
avg_cost = avg_cost / args.thread avg_cost = avg_cost / args.thread
print("total cost: {}".format(total_cost))
print("average total cost {} s.".format(avg_cost)) print("average total cost {} s.".format(avg_cost))
print("qps {} ins/s".format(qps)) print("qps {} ins/s".format(qps))
rm profile_log rm profile_log
export FLAGS_profile_client=1 export FLAGS_profile_client=1
export FLAGS_profile_server=1 export FLAGS_profile_server=1
for thread_num in 1 2 4 8 16
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz --no-check-certificate
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz --no-check-certificate
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
python test_server.py ctr_serving_model_kv > serving_log 2>&1 &
for thread_num in 1 4 16
do do
for batch_size in 1 4 16 64 256 for batch_size in 1 4 16 64
do do
$PYTHONROOT/bin/python benchmark.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1 $PYTHONROOT/bin/python benchmark.py --thread $thread_num --batch_size $batch_size --model serving_client_conf/serving_client_conf.prototxt --request rpc > profile 2>&1
echo "batch size : $batch_size" echo "batch size : $batch_size"
...@@ -11,6 +25,8 @@ do ...@@ -11,6 +25,8 @@ do
echo "========================================" echo "========================================"
echo "batch size : $batch_size" >> profile_log echo "batch size : $batch_size" >> profile_log
$PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log $PYTHONROOT/bin/python ../util/show_profile.py profile $thread_num >> profile_log
tail -n 2 profile >> profile_log tail -n 3 profile >> profile_log
done done
done done
ps -ef|grep 'serving'|grep -v grep|cut -c 9-15 | xargs kill -9
rm profile_log
wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz --no-check-certificate
tar xf ctr_cube_unittest.tar.gz
mv models/ctr_client_conf ./
mv models/ctr_serving_model_kv ./
mv models/data ./cube/
wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz --no-check-certificate
tar xf cube_app.tar.gz
mv cube_app/cube* ./cube/
sh cube_prepare.sh &
cp ../../../build_server/core/cube/cube-api/cube-cli .
python gen_key.py
for thread_num in 1 4 16 32
do
for batch_size in 1000
do
./cube-cli -config_file ./cube/conf/cube.conf -keys key -dict test_dict -thread_num $thread_num --batch $batch_size > profile 2>&1
echo "batch size : $batch_size"
echo "thread num : $thread_num"
echo "========================================"
echo "batch size : $batch_size" >> profile_log
echo "thread num : $thread_num" >> profile_log
tail -n 7 profile | head -n 4 >> profile_log
tail -n 2 profile >> profile_log
done
done
ps -ef|grep 'cube'|grep -v grep|cut -c 9-15 | xargs kill -9
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import random
with open("key", "w") as f:
for i in range(1000000):
f.write("{}\n".format(random.randint(0, 999999)))
...@@ -33,5 +33,9 @@ server = Server() ...@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4) server.set_num_threads(4)
server.load_model_config(sys.argv[1]) server.load_model_config(sys.argv[1])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu") server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server() server.run_server()
...@@ -33,5 +33,9 @@ server = Server() ...@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4) server.set_num_threads(4)
server.load_model_config(sys.argv[1]) server.load_model_config(sys.argv[1])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu") server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server() server.run_server()
...@@ -33,5 +33,9 @@ server = Server() ...@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4) server.set_num_threads(4)
server.load_model_config(sys.argv[1], sys.argv[2]) server.load_model_config(sys.argv[1], sys.argv[2])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu") server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server() server.run_server()
...@@ -33,5 +33,9 @@ server = Server() ...@@ -33,5 +33,9 @@ server = Server()
server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(4) server.set_num_threads(4)
server.load_model_config(sys.argv[1], sys.argv[2]) server.load_model_config(sys.argv[1], sys.argv[2])
server.prepare_server(workdir="work_dir1", port=9292, device="cpu") server.prepare_server(
workdir="work_dir1",
port=9292,
device="cpu",
cube_conf="./cube/conf/cube.conf")
server.run_server() server.run_server()
...@@ -24,7 +24,7 @@ import json ...@@ -24,7 +24,7 @@ import json
import base64 import base64
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args from paddle_serving_client.utils import benchmark_args, show_latency
from paddle_serving_app.reader import Sequential, File2Image, Resize from paddle_serving_app.reader import Sequential, File2Image, Resize
from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize from paddle_serving_app.reader import CenterCrop, RGB2BGR, Transpose, Div, Normalize
...@@ -38,7 +38,11 @@ seq_preprocess = Sequential([ ...@@ -38,7 +38,11 @@ seq_preprocess = Sequential([
def single_func(idx, resource): def single_func(idx, resource):
file_list = [] file_list = []
turns = 10 turns = resource["turns"]
latency_flags = False
if os.getenv("FLAGS_serving_latency"):
latency_flags = True
latency_list = []
for file_name in os.listdir("./image_data/n01440764"): for file_name in os.listdir("./image_data/n01440764"):
file_list.append(file_name) file_list.append(file_name)
img_list = [] img_list = []
...@@ -56,6 +60,7 @@ def single_func(idx, resource): ...@@ -56,6 +60,7 @@ def single_func(idx, resource):
start = time.time() start = time.time()
for i in range(turns): for i in range(turns):
if args.batch_size >= 1: if args.batch_size >= 1:
l_start = time.time()
feed_batch = [] feed_batch = []
i_start = time.time() i_start = time.time()
for bi in range(args.batch_size): for bi in range(args.batch_size):
...@@ -69,6 +74,9 @@ def single_func(idx, resource): ...@@ -69,6 +74,9 @@ def single_func(idx, resource):
int(round(i_end * 1000000)))) int(round(i_end * 1000000))))
result = client.predict(feed=feed_batch, fetch=fetch) result = client.predict(feed=feed_batch, fetch=fetch)
l_end = time.time()
if latency_flags:
latency_list.append(l_end * 1000 - l_start * 1000)
else: else:
print("unsupport batch size {}".format(args.batch_size)) print("unsupport batch size {}".format(args.batch_size))
...@@ -88,6 +96,8 @@ def single_func(idx, resource): ...@@ -88,6 +96,8 @@ def single_func(idx, resource):
r = requests.post( r = requests.post(
server, data=req, headers={"Content-Type": "application/json"}) server, data=req, headers={"Content-Type": "application/json"})
end = time.time() end = time.time()
if latency_flags:
return [[end - start], latency_list]
return [[end - start]] return [[end - start]]
...@@ -96,11 +106,21 @@ if __name__ == '__main__': ...@@ -96,11 +106,21 @@ if __name__ == '__main__':
endpoint_list = [ endpoint_list = [
"127.0.0.1:9292", "127.0.0.1:9293", "127.0.0.1:9294", "127.0.0.1:9295" "127.0.0.1:9292", "127.0.0.1:9293", "127.0.0.1:9294", "127.0.0.1:9295"
] ]
result = multi_thread_runner.run(single_func, args.thread, turns = 100
{"endpoint": endpoint_list}) start = time.time()
result = multi_thread_runner.run(
single_func, args.thread, {"endpoint": endpoint_list,
"turns": turns})
#result = single_func(0, {"endpoint": endpoint_list}) #result = single_func(0, {"endpoint": endpoint_list})
end = time.time()
total_cost = end - start
avg_cost = 0 avg_cost = 0
for i in range(args.thread): for i in range(args.thread):
avg_cost += result[0][i] avg_cost += result[0][i]
avg_cost = avg_cost / args.thread avg_cost = avg_cost / args.thread
print("average total cost {} s.".format(avg_cost)) print("total cost: {}s".format(end - start))
print("each thread cost: {}s.".format(avg_cost))
print("qps: {}samples/s".format(args.batch_size * args.thread * turns /
total_cost))
if os.getenv("FLAGS_serving_latency"):
show_latency(result[1])
...@@ -2,14 +2,14 @@ rm profile_log ...@@ -2,14 +2,14 @@ rm profile_log
export CUDA_VISIBLE_DEVICES=0,1,2,3 export CUDA_VISIBLE_DEVICES=0,1,2,3
export FLAGS_profile_server=1 export FLAGS_profile_server=1
export FLAGS_profile_client=1 export FLAGS_profile_client=1
python -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 2> elog > stdlog & python -m paddle_serving_server_gpu.serve --model $1 --port 9292 --thread 4 --gpu_ids 0,1,2,3 --mem_optim --ir_optim 2> elog > stdlog &
sleep 5 sleep 5
#warm up #warm up
$PYTHONROOT/bin/python benchmark.py --thread 8 --batch_size 1 --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1 $PYTHONROOT/bin/python benchmark.py --thread 8 --batch_size 1 --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1
for thread_num in 4 8 16 for thread_num in 1 4 8 16
do do
for batch_size in 1 4 16 64 for batch_size in 1 4 16 64
do do
......
...@@ -8,9 +8,9 @@ hostname=`echo $(hostname)|awk -F '.baidu.com' '{print $1}'` ...@@ -8,9 +8,9 @@ hostname=`echo $(hostname)|awk -F '.baidu.com' '{print $1}'`
sleep 5 sleep 5
for thread_num in 4 8 16 for thread_num in 1 4 8 16
do do
for batch_size in 1 4 16 64 256 for batch_size in 1 4 16 64
do do
job_bt=`date '+%Y%m%d%H%M%S'` job_bt=`date '+%Y%m%d%H%M%S'`
python benchmark.py --thread $thread_num --batch_size $batch_size --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1 python benchmark.py --thread $thread_num --batch_size $batch_size --model $2/serving_client_conf.prototxt --request rpc > profile 2>&1
......
...@@ -25,6 +25,7 @@ from contextlib import closing ...@@ -25,6 +25,7 @@ from contextlib import closing
import collections import collections
import fcntl import fcntl
import shutil
import numpy as np import numpy as np
import grpc import grpc
from .proto import multi_lang_general_model_service_pb2 from .proto import multi_lang_general_model_service_pb2
...@@ -230,7 +231,7 @@ class Server(object): ...@@ -230,7 +231,7 @@ class Server(object):
infer_service.workflows.extend(["workflow1"]) infer_service.workflows.extend(["workflow1"])
self.infer_service_conf.services.extend([infer_service]) self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir): def _prepare_resource(self, workdir, cube_conf):
self.workdir = workdir self.workdir = workdir
if self.resource_conf == None: if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn), with open("{}/{}".format(workdir, self.general_model_config_fn),
...@@ -242,6 +243,11 @@ class Server(object): ...@@ -242,6 +243,11 @@ class Server(object):
if "dist_kv" in node.name: if "dist_kv" in node.name:
self.resource_conf.cube_config_path = workdir self.resource_conf.cube_config_path = workdir
self.resource_conf.cube_config_file = self.cube_config_fn self.resource_conf.cube_config_file = self.cube_config_fn
if cube_conf == None:
raise ValueError(
"Please set the path of cube.conf while use dist_kv op."
)
shutil.copy(cube_conf, workdir)
if "quant" in node.name: if "quant" in node.name:
self.resource_conf.cube_quant_bits = 8 self.resource_conf.cube_quant_bits = 8
self.resource_conf.model_toolkit_path = workdir self.resource_conf.model_toolkit_path = workdir
...@@ -366,7 +372,11 @@ class Server(object): ...@@ -366,7 +372,11 @@ class Server(object):
os.chdir(self.cur_path) os.chdir(self.cur_path)
self.bin_path = self.server_path + "/serving" self.bin_path = self.server_path + "/serving"
def prepare_server(self, workdir=None, port=9292, device="cpu"): def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if workdir == None: if workdir == None:
workdir = "./tmp" workdir = "./tmp"
os.system("mkdir {}".format(workdir)) os.system("mkdir {}".format(workdir))
...@@ -377,7 +387,7 @@ class Server(object): ...@@ -377,7 +387,7 @@ class Server(object):
if not self.port_is_available(port): if not self.port_is_available(port):
raise SystemExit("Port {} is already used".format(port)) raise SystemExit("Port {} is already used".format(port))
self.set_port(port) self.set_port(port)
self._prepare_resource(workdir) self._prepare_resource(workdir, cube_conf)
self._prepare_engine(self.model_config_paths, device) self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port) self._prepare_infer_service(port)
self.workdir = workdir self.workdir = workdir
...@@ -645,7 +655,11 @@ class MultiLangServer(object): ...@@ -645,7 +655,11 @@ class MultiLangServer(object):
server_config_paths) server_config_paths)
self.bclient_config_path_ = client_config_path self.bclient_config_path_ = client_config_path
def prepare_server(self, workdir=None, port=9292, device="cpu"): def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if not self._port_is_available(port): if not self._port_is_available(port):
raise SystemExit("Prot {} is already used".format(port)) raise SystemExit("Prot {} is already used".format(port))
default_port = 12000 default_port = 12000
...@@ -656,7 +670,10 @@ class MultiLangServer(object): ...@@ -656,7 +670,10 @@ class MultiLangServer(object):
self.port_list_.append(default_port + i) self.port_list_.append(default_port + i)
break break
self.bserver_.prepare_server( self.bserver_.prepare_server(
workdir=workdir, port=self.port_list_[0], device=device) workdir=workdir,
port=self.port_list_[0],
device=device,
cube_conf=cube_conf)
self.set_port(port) self.set_port(port)
def _launch_brpc_service(self, bserver): def _launch_brpc_service(self, bserver):
......
...@@ -26,7 +26,7 @@ from contextlib import closing ...@@ -26,7 +26,7 @@ from contextlib import closing
import argparse import argparse
import collections import collections
import fcntl import fcntl
import shutil
import numpy as np import numpy as np
import grpc import grpc
from .proto import multi_lang_general_model_service_pb2 from .proto import multi_lang_general_model_service_pb2
...@@ -285,7 +285,7 @@ class Server(object): ...@@ -285,7 +285,7 @@ class Server(object):
infer_service.workflows.extend(["workflow1"]) infer_service.workflows.extend(["workflow1"])
self.infer_service_conf.services.extend([infer_service]) self.infer_service_conf.services.extend([infer_service])
def _prepare_resource(self, workdir): def _prepare_resource(self, workdir, cube_conf):
self.workdir = workdir self.workdir = workdir
if self.resource_conf == None: if self.resource_conf == None:
with open("{}/{}".format(workdir, self.general_model_config_fn), with open("{}/{}".format(workdir, self.general_model_config_fn),
...@@ -297,6 +297,11 @@ class Server(object): ...@@ -297,6 +297,11 @@ class Server(object):
if "dist_kv" in node.name: if "dist_kv" in node.name:
self.resource_conf.cube_config_path = workdir self.resource_conf.cube_config_path = workdir
self.resource_conf.cube_config_file = self.cube_config_fn self.resource_conf.cube_config_file = self.cube_config_fn
if cube_conf == None:
raise ValueError(
"Please set the path of cube.conf while use dist_kv op."
)
shutil.copy(cube_conf, workdir)
self.resource_conf.model_toolkit_path = workdir self.resource_conf.model_toolkit_path = workdir
self.resource_conf.model_toolkit_file = self.model_toolkit_fn self.resource_conf.model_toolkit_file = self.model_toolkit_fn
self.resource_conf.general_model_path = workdir self.resource_conf.general_model_path = workdir
...@@ -406,7 +411,11 @@ class Server(object): ...@@ -406,7 +411,11 @@ class Server(object):
os.chdir(self.cur_path) os.chdir(self.cur_path)
self.bin_path = self.server_path + "/serving" self.bin_path = self.server_path + "/serving"
def prepare_server(self, workdir=None, port=9292, device="cpu"): def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if workdir == None: if workdir == None:
workdir = "./tmp" workdir = "./tmp"
os.system("mkdir {}".format(workdir)) os.system("mkdir {}".format(workdir))
...@@ -418,7 +427,7 @@ class Server(object): ...@@ -418,7 +427,7 @@ class Server(object):
raise SystemExit("Port {} is already used".format(port)) raise SystemExit("Port {} is already used".format(port))
self.set_port(port) self.set_port(port)
self._prepare_resource(workdir) self._prepare_resource(workdir, cube_conf)
self._prepare_engine(self.model_config_paths, device) self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port) self._prepare_infer_service(port)
self.workdir = workdir self.workdir = workdir
...@@ -690,7 +699,11 @@ class MultiLangServer(object): ...@@ -690,7 +699,11 @@ class MultiLangServer(object):
server_config_paths) server_config_paths)
self.bclient_config_path_ = client_config_path self.bclient_config_path_ = client_config_path
def prepare_server(self, workdir=None, port=9292, device="cpu"): def prepare_server(self,
workdir=None,
port=9292,
device="cpu",
cube_conf=None):
if not self._port_is_available(port): if not self._port_is_available(port):
raise SystemExit("Prot {} is already used".format(port)) raise SystemExit("Prot {} is already used".format(port))
default_port = 12000 default_port = 12000
...@@ -701,7 +714,10 @@ class MultiLangServer(object): ...@@ -701,7 +714,10 @@ class MultiLangServer(object):
self.port_list_.append(default_port + i) self.port_list_.append(default_port + i)
break break
self.bserver_.prepare_server( self.bserver_.prepare_server(
workdir=workdir, port=self.port_list_[0], device=device) workdir=workdir,
port=self.port_list_[0],
device=device,
cube_conf=cube_conf)
self.set_port(port) self.set_port(port)
def _launch_brpc_service(self, bserver): def _launch_brpc_service(self, bserver):
......
...@@ -229,10 +229,7 @@ function python_run_criteo_ctr_with_cube() { ...@@ -229,10 +229,7 @@ function python_run_criteo_ctr_with_cube() {
check_cmd "mv models/data ./cube/" check_cmd "mv models/data ./cube/"
check_cmd "mv models/ut_data ./" check_cmd "mv models/ut_data ./"
cp ../../../build-server-$TYPE/output/bin/cube* ./cube/ cp ../../../build-server-$TYPE/output/bin/cube* ./cube/
mkdir -p $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/
yes | cp ../../../build-server-$TYPE/output/demo/serving/bin/serving $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server/serving-cpu-avx-openblas-0.1.3/
sh cube_prepare.sh & sh cube_prepare.sh &
check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/"
python test_server.py ctr_serving_model_kv & python test_server.py ctr_serving_model_kv &
sleep 5 sleep 5
check_cmd "python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data >score" check_cmd "python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data >score"
...@@ -257,10 +254,7 @@ function python_run_criteo_ctr_with_cube() { ...@@ -257,10 +254,7 @@ function python_run_criteo_ctr_with_cube() {
check_cmd "mv models/data ./cube/" check_cmd "mv models/data ./cube/"
check_cmd "mv models/ut_data ./" check_cmd "mv models/ut_data ./"
cp ../../../build-server-$TYPE/output/bin/cube* ./cube/ cp ../../../build-server-$TYPE/output/bin/cube* ./cube/
mkdir -p $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server_gpu/serving-gpu-0.1.3/
yes | cp ../../../build-server-$TYPE/output/demo/serving/bin/serving $PYTHONROOT/lib/python2.7/site-packages/paddle_serving_server_gpu/serving-gpu-0.1.3/
sh cube_prepare.sh & sh cube_prepare.sh &
check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/"
python test_server_gpu.py ctr_serving_model_kv & python test_server_gpu.py ctr_serving_model_kv &
sleep 5 sleep 5
# for warm up # for warm up
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册