提交 b1af50c1 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #156 from guru4elephant/server_profiler

Server profiler
......@@ -26,4 +26,5 @@ endif()
if (NOT CLIENT_ONLY)
add_subdirectory(predictor)
add_subdirectory(general-server)
add_subdirectory(util)
endif()
......@@ -50,7 +50,10 @@ class PredictorClient {
void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file);
int create_predictor_by_desc(const std::string & sdk_desc);
int create_predictor();
int destroy_predictor();
std::vector<std::vector<float>> predict(
const std::vector<std::vector<float>>& float_feed,
......
......@@ -83,7 +83,22 @@ void PredictorClient::set_predictor_conf(const std::string &conf_path,
_predictor_conf = conf_file;
}
int PredictorClient::destroy_predictor() {
_api.thrd_finalize();
_api.destroy();
}
int PredictorClient::create_predictor_by_desc(const std::string & sdk_desc) {
if (_api.create(sdk_desc) != 0) {
LOG(ERROR) << "Predictor Creation Failed";
return -1;
}
_api.thrd_initialize();
}
int PredictorClient::create_predictor() {
VLOG(2) << "Predictor path: " << _predictor_path
<< " predictor file: " << _predictor_conf;
if (_api.create(_predictor_path.c_str(), _predictor_conf.c_str()) != 0) {
LOG(ERROR) << "Predictor Creation Failed";
return -1;
......@@ -101,7 +116,9 @@ std::vector<std::vector<float>> PredictorClient::predict(
if (fetch_name.size() == 0) {
return fetch_result;
}
fetch_result.resize(fetch_name.size());
// we save infer_us at fetch_result[fetch_name.size()]
fetch_result.resize(fetch_name.size() + 1);
_api.thrd_clear();
_predictor = _api.fetch_predictor("general_model");
......@@ -179,6 +196,8 @@ std::vector<std::vector<float>> PredictorClient::predict(
*(const float *)res.insts(0).tensor_array(idx).data(i).c_str();
}
}
fetch_result[fetch_name.size()].resize(1);
fetch_result[fetch_name.size()][0] = res.mean_infer_us();
}
return fetch_result;
......
......@@ -41,8 +41,13 @@ PYBIND11_MODULE(serving_client, m) {
const std::string &conf_file) {
self.set_predictor_conf(conf_path, conf_file);
})
.def("create_predictor_by_desc",
[](PredictorClient &self, const std::string & sdk_desc) {
self.create_predictor_by_desc(sdk_desc); })
.def("create_predictor",
[](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); })
.def("predict",
[](PredictorClient &self,
const std::vector<std::vector<float>> &float_feed,
......
......@@ -3,7 +3,7 @@ include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(serving ${serving_srcs})
add_dependencies(serving pdcodegen fluid_cpu_engine pdserving paddle_fluid
opencv_imgcodecs cube-api)
opencv_imgcodecs cube-api utils)
if (WITH_GPU)
add_dependencies(serving fluid_gpu_engine)
endif()
......@@ -23,6 +23,7 @@ target_link_libraries(serving paddle_fluid ${paddle_depend_libs})
target_link_libraries(serving pdserving)
target_link_libraries(serving cube-api)
target_link_libraries(serving utils)
target_link_libraries(serving kvdb rocksdb)
......
......@@ -21,12 +21,13 @@
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/resource.h"
#include "core/util/include/timer.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
using baidu::paddle_serving::Timer;
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
......@@ -54,10 +55,15 @@ int GeneralInferOp::inference() {
TensorVector *out = butil::get_object<TensorVector>();
int batch_size = (*in)[0].shape[0];
// infer
Timer timeline;
double infer_time = 0.0;
timeline.Start();
if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
return -1;
}
timeline.Pause();
infer_time = timeline.ElapsedUS();
const Request *req = dynamic_cast<const Request *>(get_request_message());
......@@ -79,6 +85,8 @@ int GeneralInferOp::inference() {
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
res->set_mean_infer_us(infer_time);
for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = res->add_insts();
for (auto & idx : fetch_index) {
......
......@@ -200,14 +200,6 @@ int GeneralReaderOp::inference() {
}
VLOG(2) << "read data from client success";
// print request
std::ostringstream oss;
int64_t *example = reinterpret_cast<int64_t *>((*in)[0].data.data());
for (int i = 0; i < 10; i++) {
oss << *(example + i) << " ";
}
VLOG(2) << "head element of first feed var : " << oss.str();
//
return 0;
}
DEFINE_OP(GeneralReaderOp);
......
......@@ -40,6 +40,7 @@ message Request {
message Response {
repeated FetchInst insts = 1;
optional float mean_infer_us = 2;
};
service GeneralModelService {
......
......@@ -32,6 +32,10 @@ class EndpointConfigManager {
EndpointConfigManager()
: _last_update_timestamp(0), _current_endpointmap_id(1) {}
int create(const std::string & sdk_desc_str);
int load(const std::string & sdk_desc_str);
int create(const char* path, const char* file);
int load();
......
......@@ -31,6 +31,8 @@ class PredictorApi {
int register_all();
int create(const std::string & sdk_desc_str);
int create(const char* path, const char* file);
int thrd_initialize();
......
......@@ -40,6 +40,7 @@ message Request {
message Response {
repeated FetchInst insts = 1;
optional float mean_infer_us = 2;
};
service GeneralModelService {
......
......@@ -26,6 +26,13 @@ namespace sdk_cpp {
using configure::SDKConf;
int EndpointConfigManager::create(const std::string& sdk_desc_str) {
if (load(sdk_desc_str) != 0) {
LOG(ERROR) << "Failed reload endpoint config";
return -1;
}
}
int EndpointConfigManager::create(const char* path, const char* file) {
_endpoint_config_path = path;
_endpoint_config_file = file;
......@@ -38,6 +45,46 @@ int EndpointConfigManager::create(const char* path, const char* file) {
return 0;
}
int EndpointConfigManager::load(const std::string& sdk_desc_str) {
try {
SDKConf sdk_conf;
sdk_conf.ParseFromString(sdk_desc_str);
VariantInfo default_var;
if (init_one_variant(sdk_conf.default_variant_conf(), default_var) != 0) {
LOG(ERROR) << "Failed read default var conf";
return -1;
}
uint32_t ep_size = sdk_conf.predictors_size();
for (uint32_t ei = 0; ei < ep_size; ++ei) {
EndpointInfo ep;
if (init_one_endpoint(sdk_conf.predictors(ei), ep, default_var) != 0) {
LOG(ERROR) << "Failed read endpoint info at: " << ei;
return -1;
}
std::map<std::string, EndpointInfo>::iterator it;
if (_ep_map.find(ep.endpoint_name) != _ep_map.end()) {
LOG(ERROR) << "Cannot insert duplicated endpoint"
<< ", ep name: " << ep.endpoint_name;
}
std::pair<std::map<std::string, EndpointInfo>::iterator, bool> r =
_ep_map.insert(std::make_pair(ep.endpoint_name, ep));
if (!r.second) {
LOG(ERROR) << "Failed insert endpoint, name" << ep.endpoint_name;
return -1;
}
}
} catch (std::exception& e) {
LOG(ERROR) << "Failed load configure" << e.what();
return -1;
}
LOG(INFO) << "Success reload endpoint config file, id: "
<< _current_endpointmap_id;
return 0;
}
int EndpointConfigManager::load() {
try {
SDKConf sdk_conf;
......
......@@ -30,6 +30,49 @@ int PredictorApi::register_all() {
return 0;
}
int PredictorApi::create(const std::string & api_desc_str) {
VLOG(2) << api_desc_str;
if (register_all() != 0) {
LOG(ERROR) << "Failed do register all!";
return -1;
}
if (_config_manager.create(api_desc_str) != 0) {
LOG(ERROR) << "Failed create config manager from desc string :"
<< api_desc_str;
return -1;
}
const std::map<std::string, EndpointInfo>& map = _config_manager.config();
std::map<std::string, EndpointInfo>::const_iterator it;
for (it = map.begin(); it != map.end(); ++it) {
const EndpointInfo& ep_info = it->second;
Endpoint* ep = new (std::nothrow) Endpoint();
if (ep->initialize(ep_info) != 0) {
LOG(ERROR) << "Failed intialize endpoint:" << ep_info.endpoint_name;
return -1;
}
if (_endpoints.find(ep_info.endpoint_name) != _endpoints.end()) {
LOG(ERROR) << "Cannot insert duplicated endpoint:"
<< ep_info.endpoint_name;
return -1;
}
std::pair<std::map<std::string, Endpoint*>::iterator, bool> r =
_endpoints.insert(std::make_pair(ep_info.endpoint_name, ep));
if (!r.second) {
LOG(ERROR) << "Failed insert endpoint:" << ep_info.endpoint_name;
return -1;
}
LOG(INFO) << "Succ create endpoint instance with name: "
<< ep_info.endpoint_name;
}
return 0;
}
int PredictorApi::create(const char* path, const char* file) {
if (register_all() != 0) {
LOG(ERROR) << "Failed do register all!";
......
include(src/CMakeLists.txt)
add_library(utils ${util_srcs})
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <stdlib.h>
namespace baidu {
namespace paddle_serving {
// A Standard Timer implementation for debugging
class Timer {
public:
// a timer class for profiling
// Reset() will be called during initialization
// all timing variables will be set 0 in Reset()
Timer() { Reset(); }
void Reset();
void Start();
void Pause();
// Resume will get current system time
void Resume();
int Count();
// return elapsed time in us
double ElapsedUS();
// return elapsed time in ms
double ElapsedMS();
// return elapsed time in sec
double ElapsedSec();
private:
struct timeval _start;
struct timeval _now;
int _count;
int64_t _elapsed;
bool _paused;
// get us difference between start and now
int64_t Tickus();
};
} // namespace paddle_serving
} // namespace baidu
FILE(GLOB srcs ${CMAKE_CURRENT_LIST_DIR}/*.cc)
LIST(APPEND util_srcs ${srcs})
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <sys/time.h>
#include "core/util/include/timer.h"
namespace baidu {
namespace paddle_serving {
void Timer::Reset() {
_start.tv_sec = 0;
_start.tv_usec = 0;
_count = 0;
_elapsed = 0;
_paused = true;
}
void Timer::Start() {
Reset();
Resume();
}
void Timer::Pause() {
if (_paused) {
return;
}
_elapsed += Tickus();
++_count;
_paused = true;
}
void Timer::Resume() {
gettimeofday(&_start, NULL);
_paused = false;
}
int Timer::Count() { return _count; }
double Timer::ElapsedUS() { return static_cast<double>(_elapsed); }
double Timer::ElapsedMS() { return _elapsed / 1000.0; }
double Timer::ElapsedSec() { return _elapsed / 1000000.0; }
int64_t Timer::Tickus() {
gettimeofday(&_now, NULL);
return (_now.tv_sec - _start.tv_sec) * 1000 * 1000L +
(_now.tv_usec - _start.tv_usec);
}
} // namespace paddle_serving
} // namespace baidu
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from paddle_serving_client import Client
from paddle_serving_client.metric import auc
from paddle_serving_client.utils import MultiThreadRunner
import time
def predict(thr_id, resource):
client = Client()
client.load_client_config(resource["conf_file"])
client.connect(resource["server_endpoint"])
thread_num = resource["thread_num"]
file_list = resource["filelist"]
line_id = 0
prob = []
label_list = []
dataset = []
for fn in file_list:
fin = open(fn)
for line in fin:
if line_id % thread_num == thr_id - 1:
group = line.strip().split()
words = [int(x) for x in group[1:int(group[0])]]
label = [int(group[-1])]
feed = {"words": words, "label": label}
dataset.append(feed)
line_id += 1
fin.close()
start = time.time()
fetch = ["acc", "cost", "prediction"]
infer_time_list = []
for inst in dataset:
fetch_map = client.predict(feed=inst, fetch=fetch, debug=True)
prob.append(fetch_map["prediction"][1])
label_list.append(label[0])
infer_time_list.append(fetch_map["infer_time"])
end = time.time()
client.release()
return [prob, label_list, [sum(infer_time_list)], [end - start]]
if __name__ == '__main__':
conf_file = sys.argv[1]
data_file = sys.argv[2]
resource = {}
resource["conf_file"] = conf_file
resource["server_endpoint"] = ["127.0.0.1:9292"]
resource["filelist"] = [data_file]
resource["thread_num"] = int(sys.argv[3])
thread_runner = MultiThreadRunner()
result = thread_runner.run(predict, int(sys.argv[3]), resource)
print("{}\t{}".format(sys.argv[3], sum(result[-1]) / len(result[-1])))
print("{}\t{}".format(sys.argv[3], sum(result[2]) / 1000.0 / 1000.0 / len(result[2])))
......@@ -36,7 +36,7 @@ class SDKConfig(object):
predictor_desc.service_name = \
"baidu.paddle_serving.predictor.general_model.GeneralModelService"
predictor_desc.endpoint_router = "WeightedRandomRender"
predictor_desc.weighted_random_render_conf.variant_weight_list = "30"
predictor_desc.weighted_random_render_conf.variant_weight_list = "100"
variant_desc = sdk.VariantConf()
variant_desc.tag = "var1"
......@@ -105,12 +105,7 @@ class Client(object):
predictor_sdk.set_server_endpoints(endpoints)
sdk_desc = predictor_sdk.gen_desc()
timestamp = time.asctime(time.localtime(time.time()))
predictor_path = "/tmp/"
predictor_file = "%s_predictor.conf" % timestamp
with open(predictor_path + predictor_file, "w") as fout:
fout.write(sdk_desc)
self.client_handle_.set_predictor_conf(predictor_path, predictor_file)
self.client_handle_.create_predictor()
self.client_handle_.create_predictor(sdk_desc)
def get_feed_names(self):
return self.feed_names_
......@@ -118,7 +113,7 @@ class Client(object):
def get_fetch_names(self):
return self.fetch_names_
def predict(self, feed={}, fetch=[]):
def predict(self, feed={}, fetch=[], debug=False):
int_slot = []
float_slot = []
int_feed_names = []
......@@ -148,6 +143,9 @@ class Client(object):
for i, name in enumerate(fetch_names):
result_map[name] = result[i]
if debug:
result_map["infer_time"] = result[-1][0]
return result_map
def batch_predict(self, feed_batch=[], fetch=[]):
......@@ -191,3 +189,6 @@ class Client(object):
result_map_batch.append(result_map)
return result_map_batch
def release(self):
self.client_handle_.destroy_predictor()
......@@ -12,3 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from auc import auc
from acc import acc
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def acc(prob, label, threshold):
# we support prob is the probability for label to be one
assert len(prob) == len(label)
total = len(prob)
right = 0
for i in range(len(prob)):
if (prob - threshold) * (label - prob) > 0:
right += 1
return float(right) / total
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def tied_rank(x):
"""
Computes the tied rank of elements in x.
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import subprocess
from multiprocessing import Pool
class MultiThreadRunner(object):
def __init__(self):
pass
def run(self, thread_func, thread_num, global_resource):
p = Pool(thread_num)
result_list = []
for i in range(thread_num):
result_list.append(
p.apply_async(thread_func, [i + 1, global_resource]))
p.close()
p.join()
return_result = result_list[0].get()
for i in range(1, thread_num, 1):
tmp_result = result_list[i].get()
for i, item in enumerate(tmp_result):
return_result[i].extend(tmp_result[i])
return return_result
......@@ -35,7 +35,8 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_client',
'paddle_serving_client.proto',
'paddle_serving_client.io',
'paddle_serving_client.metric']
'paddle_serving_client.metric',
'paddle_serving_client.utils']
package_data={'paddle_serving_client': ['serving_client.so']}
package_dir={'paddle_serving_client':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client',
......@@ -44,7 +45,9 @@ package_dir={'paddle_serving_client':
'paddle_serving_client.io':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/io',
'paddle_serving_client.metric':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/metric'}
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/metric',
'paddle_serving_client.utils':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_client/utils'}
setup(
name='paddle-serving-client',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册