提交 f4adba14 编写于 作者: W wangguibao

Predictor compile pass

上级 e608c4a5
......@@ -6,7 +6,7 @@ add_library(predictor ${predictor_srcs})
set_source_files_properties(
${predictor_srcs}
PROPERTIES
COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
COMPILE_FLAGS "-Wno-strict-aliasing -Wno-unused-variable -Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
add_dependencies(predictor protobuf boost brpc)
target_include_directories(predictor PUBLIC
${CMAKE_CURRENT_LIST_DIR}/
......
......@@ -4,7 +4,6 @@
#include <vector>
#include <deque>
#include <butil/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include "framework/infer_data.h"
#include "framework/memory.h"
......@@ -112,25 +111,23 @@ public:
const InArrayT& in, OutArrayT& out, bool align) {
if (align) {
if (out.count() <= 0 || out.size() <= 0) {
CFATAL_LOG("Out tensor is empty, when aligned");
LOG(FATAL) << "Out tensor is empty, when aligned";
return false;
}
if (out.size() != in.size()) {
CFATAL_LOG("In/Out tensor size not eq: %ld!=%ld",
out.size(), in.size());
LOG(FATAL) << "In/Out tensor size not eq: " << out.size() << "!=" << in.size();
return false;
}
for (size_t fi = 0, shape0 = 0; fi < out.count(); ++fi) {
if (!out[fi].valid()) {
CFATAL_LOG("Out[%ld] tensor not valid", fi);
LOG(FATAL) << "Out[" << fi << "] tensor not valid";
return false;
}
if (out.size() != out[fi].shape0()) {
CFATAL_LOG("Shape0 not consistency, %ld!=%ld, %ld",
out.size(), out[fi].shape0(), fi);
LOG(FATAL) << "Shape0 not consistency, " << out.size() << "!=" << out[fi].shape0() << ", " << fi;
return false;
}
}
......@@ -234,7 +231,7 @@ public:
void* data_buf
= MempoolWrapper::instance().malloc(tensor_byte);
if (!data_buf) {
CFATAL_LOG("Malloc failed, size: %ld", tensor_byte);
LOG(FATAL) << "Malloc failed, size: " << tensor_byte;
return ;
}
......@@ -243,25 +240,23 @@ public:
TaskMetaT& tm = _tasks[ti];
size_t acc_byte = ins_byte * (tm.end - tm.begin);
if (data_byte + acc_byte > tensor_byte) {
CFATAL_LOG("Invalid bytes: %ld + %ld >= %ld",
data_byte, acc_byte, tensor_byte);
LOG(FATAL) << "Invalid bytes: " << data_byte << " + " << acc_byte << " >= " << tensor_byte;
return ;
}
const Tensor& tensor = (*(tm.task->get(is_in)))[fi];
memcpy(data_buf + data_byte,
tensor.data.data() + tm.begin * ins_byte,
memcpy((char *)data_buf + data_byte,
(char *)(tensor.data.data()) + tm.begin * ins_byte,
acc_byte);
data_byte += acc_byte;
}
if (data_byte != tensor_byte) {
CFATAL_LOG("Invalid tensor byte: %ld != %ld",
data_byte, tensor_byte);
LOG(FATAL) << "Invalid tensor byte: " << data_byte << " != " << tensor_byte;
return ;
}
batch_tensor.data = DataBuf(data_buf, tensor_byte);
batch_tensor.data = DataBuf((char *)data_buf, tensor_byte);
if (is_in) {
_batch_in.push_back(batch_tensor);
} else {
......@@ -275,8 +270,7 @@ public:
void notify_tasks() {
if (_batch_out.size() != _batch_in.size()) {
CFATAL_LOG("batch size not consistency: %ld != %ld",
_batch_out.size(), _batch_in.size());
LOG(FATAL) << "batch size not consistency: " << _batch_out.size() << " != " << _batch_in.size();
return ;
}
......@@ -299,8 +293,8 @@ public:
if (_batch_align) { // merge all batchs
size_t offset_dst = ins_byte * _tasks[ti].begin;
void* ptr = const_cast<void*>((*dst)[fi].data.data());
memcpy(ptr + offset_dst,
_batch_out[fi].data.data() + offset_src, add_byte);
memcpy((char *)ptr + offset_dst,
(char *)(_batch_out[fi].data.data()) + offset_src, add_byte);
} else { // overwrite
if (dst->count() <= 0) {
dst->push_back(_batch_out[fi]);
......@@ -310,7 +304,7 @@ public:
(*dst)[fi].shape[0] = add;
(*dst)[fi].data = DataBuf(
_batch_out[fi].data.data() + offset_src, add_byte);
(char *)(_batch_out[fi].data.data()) + offset_src, add_byte);
}
}
}
......@@ -348,8 +342,8 @@ private:
std::vector<TaskMetaT> _tasks;
InArrayT _batch_in;
OutArrayT _batch_out;
size_t _rem_size;
size_t _batch_size;
size_t _rem_size;
bool _batch_align;
};
......
#pragma once
#include <boost/bind.hpp>
#include <base/atomicops.h>
#include <butil/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include <sys/syscall.h>
......@@ -13,8 +12,6 @@ namespace bsf {
template<typename TaskT>
void* TaskExecutor<TaskT>::thread_entry(void* args) {
ComlogGuard logging_guard;
ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
TaskExecutor<TaskT>* executor = static_cast<TaskExecutor<TaskT>*>(context->executor);
executor->work(context);
......@@ -26,12 +23,12 @@ template<typename TaskT>
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
_stop = false;
if (!_thread_contexts.empty()) {
CWARNING_LOG("BSF has started");
LOG(WARNING) << "BSF has started";
return 0;
}
if (thread_num == 0) {
CFATAL_LOG("cannot init BSF with zero thread");
LOG(FATAL) << "cannot init BSF with zero thread";
return -1;
}
......@@ -45,8 +42,7 @@ int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
int rc = THREAD_CREATE(
&contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
if (rc != 0) {
CFATAL_LOG("failed to create BSF worker thread: index=%u, rc=%d, errno=%d:%m",
i, rc, errno);
LOG(FATAL) << "failed to create BSF worker thread: index=" << i << ", rc=" << rc << ", errno=" << errno << ":" << strerror(errno);
return -1;
}
......@@ -75,12 +71,12 @@ int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
}
if (has_error) {
CFATAL_LOG("BSF thread init error");
LOG(FATAL) << "BSF thread init error";
return -1;
}
if (done) {
CDEBUG_LOG("BSF thread init done");
LOG(INFO) << "BSF thread init done";
return 0;
}
......@@ -90,7 +86,7 @@ int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
init_timeout -= sleep_interval;
}
CFATAL_LOG("BSF thread init timed out");
LOG(FATAL) << "BSF thread init timed out";
return -1;
}
......@@ -110,7 +106,7 @@ void TaskExecutor<TaskT>::stop() {
template<typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
const InArrayT& in, OutArrayT& out) {
TaskT* task = base::get_object<TaskT>();
TaskT* task = butil::get_object<TaskT>();
if (!task) {
LOG(FATAL) << "Failed get TaskT from object pool";
return TaskHandler<TaskT>::valid_handle();
......@@ -124,7 +120,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
int fds[2];
int rc = pipe(fds);
if (rc != 0) {
CFATAL_LOG("call pipe() failed, errno=%d:%m", errno);
LOG(FATAL) << "call pipe() failed, errno=" << errno << ":" << strerror(errno);
return TaskHandler<TaskT>::valid_handle();
}
......@@ -136,7 +132,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
task->out = &out;
task->rem = in.size();
task->size = in.size();
task->index.store(0, base::memory_order_relaxed);
task->index.store(0, butil::memory_order_relaxed);
AutoMutex lock(_mut);
_task_queue.push_back(task);
......@@ -153,7 +149,7 @@ bool TaskExecutor<TaskT>::fetch_batch(BatchTasks<TaskT>& batch) {
}
if (_task_queue.empty()) {
CFATAL_LOG("invalid task queue!");
LOG(FATAL) << "invalid task queue!";
return false;
}
......@@ -173,11 +169,11 @@ template<typename TaskT>
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
if (_thread_init_fn != NULL) {
if (_thread_init_fn(context->user_thread_context) != 0) {
CFATAL_LOG("execute thread init thunk failed, BSF thread will exit");
LOG(FATAL) << "execute thread init thunk failed, BSF thread will exit";
context->init_status = -1;
return -1;
} else {
CDEBUG_LOG("execute thread init thunk succeed");
LOG(INFO) << "execute thread init thunk succeed";
}
}
......@@ -185,7 +181,7 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
while (!_stop) {
if (_thread_reset_fn != NULL) {
if (_thread_reset_fn(context->user_thread_context) != 0) {
CFATAL_LOG("execute user thread reset failed");
LOG(FATAL) << "execute user thread reset failed";
}
}
......@@ -209,7 +205,7 @@ bool TaskManager<InItemT, OutItemT>::schedule(const InArrayT& in,
_task_owned = handler;
return true;
} else {
CFATAL_LOG("failed to schedule task");
LOG(FATAL) << "failed to schedule task";
return false;
}
}
......
......@@ -5,7 +5,6 @@
#include <vector>
#include <deque>
#include <butil/atomicops.h>
#include <comlog/comlog.h>
#include "common/inner_common.h"
#include <boost/function.hpp>
......@@ -122,8 +121,7 @@ public:
void notify_tasks() {
if (_batch_out.size() != _batch_in.size()) {
CFATAL_LOG("batch size not consistency: %ld != %ld",
_batch_out.size(), _batch_in.size());
LOG(FATAL) << "batch size not consistency: " << _batch_out.size() << " != " << _batch_in.size();
return ;
}
......@@ -135,8 +133,7 @@ public:
for (size_t oi = begin; oi < end; ++oi, ++bi) {
if (bi >= _batch_in.size()) {
CFATAL_LOG("batch index overflow: %d > %d",
bi, _batch_in.size());
LOG(FATAL) << "batch index overflow: " << bi << " > " <<_batch_in.size();
return ;
}
(*task->out)[oi] = _batch_out[bi];
......@@ -313,10 +310,10 @@ private:
std::vector<ThreadContext<TaskT>*> _thread_contexts;
friend class TaskManager<InType, OutType>;
boost::function<void(const InArrayT&, OutArrayT&)> _fn;
size_t _batch_size;
bool _batch_align;
boost::function<void(const InArrayT&, OutArrayT&)> _fn;
};
template<typename InItemT, typename OutItemT>
......@@ -350,16 +347,6 @@ private:
TaskHandler<TaskT> _task_owned;
}; // class TaskManager
struct ComlogGuard {
ComlogGuard() {
com_openlog_r();
}
~ComlogGuard() {
com_closelog_r();
}
};
class AutoMutex {
public:
AutoMutex(THREAD_MUTEX_T& mut)
......
......@@ -137,18 +137,18 @@ int Dag::init(const comcfg::Configure& conf, const std::string& name) {
}
if (FLAGS_el_log_level == 16) {
LOG(INFO) << "DAG: " << _dag_name << noflush;
LOG(INFO) << "DAG: " << _dag_name;
LOG(INFO) << ", Op Num: " << _index_nodes.size();
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagNode* node = _index_nodes[nid];
LOG(INFO)
<< ", OP-" << node->id << "-" << node->name << "-"
<< node->type << noflush;
LOG(INFO) << " depends: " << node->depends.size() << noflush;
<< node->type;
LOG(INFO) << " depends: " << node->depends.size();
boost::unordered_map<std::string, EdgeMode>::iterator it;
for (it = node->depends.begin(); it != node->depends.end(); it++) {
LOG(INFO) << " " << it->first << " " << it->second << noflush;
LOG(INFO) << " " << it->first << " " << it->second;
}
}
LOG(INFO) << "";
......
#include "framework/dag_view.h"
#include <baidu/rpc/traceprintf.h> // TRACEPRINTF
#include <brpc/traceprintf.h> // TRACEPRINTF
#include "common/inner_common.h"
#include "framework/op_repository.h"
......@@ -156,7 +156,7 @@ const Channel* DagView::get_response_channel() const {
// Caller obtains response channel from bus, and
// writes it to rpc response(protbuf/json)
if (_view.size() < 1) {
LOG(FATAL) << "invalid empty view stage!" << noflush;
LOG(FATAL) << "invalid empty view stage!";
return NULL;
}
......@@ -165,7 +165,7 @@ const Channel* DagView::get_response_channel() const {
|| last_stage->nodes[0] == NULL) {
LOG(FATAL) << "Invalid last stage, size["
<< last_stage->nodes.size()
<< "] != 1" << noflush;
<< "] != 1";
return NULL;
}
......
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_INFER_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_INFER_H
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "common/inner_common.h"
#include "framework/infer_data.h"
#include "framework/factory.h"
......@@ -886,7 +889,7 @@ public:
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return NULL;
return -1;
}
return engine->infer(in, out, batch_size);
}
......
......@@ -86,14 +86,14 @@ struct Tensor {
bool valid() const {
if (shape.empty()) {
if (data.data() || data.size()) {
CFATAL_LOG("data should be empty");
LOG(FATAL) << "data should be empty";
return false;
}
return true;
}
if (!data.data() || !data.size()) {
CFATAL_LOG("data cannot empty");
LOG(FATAL) << "data cannot empty";
return false;
}
......@@ -103,10 +103,11 @@ struct Tensor {
}
if (byte_size * ele_byte() != data.size()) {
CFATAL_LOG("wrong data size: %ld vs. %ld",
byte_size * ele_byte(), data.size());
LOG(FATAL) << "wrong data size: " << byte_size * ele_byte() << " vs. " << data.size();
return false;
}
return true;
}
size_t shape0() {
......
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_LOGGER_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_LOGGER_H
#include <base/comlog_sink.h>
#include "common/inner_common.h"
#include <butil/comlog_sink.h>
namespace baidu {
namespace paddle_serving {
......
#include "mc_cache.h"
#include <bvar/bvar.h> // bvar
namespace baidu {
namespace paddle_serving {
namespace predictor {
::bvar::Adder<int> g_mc_cache_seek_error_count("mc_cache_seek_error_count"); // 失败查询次数
::bvar::Window<::bvar::Adder<int> > g_mc_cache_seek_error_window(
"mc_cache_seek_error_window", &g_mc_cache_seek_error_count,
::bvar::FLAGS_bvar_dump_interval);
::bvar::Adder<int> g_mc_cache_seek_count("mc_cache_seek_count"); // 总查询次数
::bvar::Window<::bvar::Adder<int> > g_mc_cache_seek_window(
"mc_cache_seek_window", &g_mc_cache_seek_count,
::bvar::FLAGS_bvar_dump_interval);
float get_mc_cache_seek_error_percent(void*) {
if (g_mc_cache_seek_window.get_value() <= 0) {
return 0;
}
return g_mc_cache_seek_error_window.get_value()
/ (float) g_mc_cache_seek_window.get_value();
}
::bvar::PassiveStatus<float> g_mc_cache_seek_error_percent("mc_cache_seek_error_percent",
get_mc_cache_seek_error_percent, NULL);
McCache::McCache() {
_pcache = NULL;
_cache_unitsize = 0;
}
int McCache::initialize(uint32_t cache_capacity, uint32_t unitsize) {
_pcache = mc_creat_cache(cache_capacity, unitsize);
if (_pcache == NULL) {
LOG(ERROR) << "create mc_cache capacity[" << cache_capacity
<< "], unitsize[" << unitsize << "] failed.";
return -1;
}
_cache_unitsize = unitsize;
return 0;
}
int McCache::finalize() {
// 销毁cache结构
if (mc_destroy_cache(_pcache) == RT_NOTICE_NONE_PROCESSED) {
LOG(ERROR) << "input pcache[" << _pcache << "] destroy failed";
return -1;
}
return 0;
}
int McCache::add_item(uint32_t* sign, void* pdata) {
int ret = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
ret = mc_additem(_pcache, sign, pdata, _cache_unitsize);
}
return (ret - 1);
}
int McCache::add_item(uint32_t* sign, void* pdata, uint32_t unitsize) {
CHECK_GT(_cache_unitsize, unitsize) << "input unitsize should < _cache_unitsize";
int ret = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
ret = mc_additem(_pcache, sign, pdata, unitsize);
}
return (ret - 1);
}
int McCache::seek_item(uint32_t* sign, void* pdata) const {
int ret = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
ret = mc_seekitem(_pcache, sign, pdata, _cache_unitsize);
}
g_mc_cache_seek_count << 1;
if (ret != RT_NOTICE_PROCESSED) {
g_mc_cache_seek_error_count << 1;
}
return (ret - 1);
}
int McCache::remove_item(uint32_t* sign) {
int ret = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
ret = mc_removeitem(_pcache, sign);
}
if (ret != RT_NOTICE_PROCESSED) {
LOG(WARNING) << "remove item from cache failed, errno[" << ret
<< "], sign[" << *sign << "].";
return -1;
}
return 0;
}
const uint32_t McCache::get_cache_unitsize() {
return _cache_unitsize;
}
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
#ifndef BAIDU_PADDLE_SERVING_PREDICTOR_FRAMEWORK_MC_CACHE_H
#define BAIDU_PADDLE_SERVING_PREDICTOR_FRAMEWORK_MC_CACHE_H
#include <stdint.h>
#include <mc_cache.h> // mc_creat_cache
#include <base/scoped_lock.h> // BAIDU_SCOPED_LOCK
namespace baidu {
namespace paddle_serving {
namespace predictor {
class McCache {
public:
McCache();
int initialize(uint32_t cache_capacity, uint32_t unitsize);
int finalize();
int add_item(uint32_t* sign, void* pdata);
int add_item(uint32_t* sign, void* pdata, uint32_t unitsize);
int seek_item(uint32_t* sign, void* pdata) const;
int remove_item(uint32_t* sign);
const uint32_t get_cache_unitsize();
private:
mc_cache* _pcache; // cacheָ
uint32_t _cache_unitsize; // cacheԪС
mutable base::Mutex _mutex;
};
} // namespace predictor
} // namespace paddle_serving
} // namespace baidu
#endif // BAIDU_PADDLE_SERVING_PREDICTOR_FRAMEWORK_MC_CACHE_H
#include "predictor_metric.h"
#include "base/memory/singleton.h"
#include "butil/memory/singleton.h"
namespace baidu {
namespace paddle_serving {
......
......@@ -39,24 +39,6 @@ int Resource::initialize(const std::string& path, const std::string& file) {
}
LOG(WARNING) << "Successfully proc initialized mempool wrapper";
if (FLAGS_enable_mc_cache) {
_mc_cache = new (std::nothrow) McCache();
CHECK(_mc_cache != nullptr) << "failed to new McCache";
uint32_t cache_capacity = 0;
conf["cache_capacity"].get_uint32(&cache_capacity, DEFAULT_CACHE_CAPACITY);
LOG(INFO) << "cache_capacity[" << cache_capacity << "].";
uint32_t cache_unitsize = 0;
conf["cache_unitsize"].get_uint32(&cache_unitsize, DEFAULT_CACHE_UNITSIZE);
LOG(INFO) << "cache_unitsize[" << cache_unitsize << "].";
if (_mc_cache->initialize(cache_capacity, cache_unitsize) != 0) {
LOG(ERROR) << "init mc cache failed";
return -1;
}
LOG(INFO) << "mc cache proc_init success.";
}
if (FLAGS_enable_model_toolkit) {
int err = 0;
std::string model_toolkit_path = conf["model_toolkit_path"].to_cstr(&err);
......@@ -166,18 +148,6 @@ int Resource::reload() {
}
int Resource::finalize() {
if (FLAGS_enable_mc_cache && _mc_cache != NULL) {
if (_mc_cache->finalize() != 0) {
LOG(ERROR) << "failed to finalize mc cache";
delete _mc_cache;
_mc_cache = NULL;
return -1;
}
delete _mc_cache;
_mc_cache = NULL;
LOG(INFO) << "mc_cache finalize success";
}
if (FLAGS_enable_model_toolkit && InferManager::instance().proc_finalize() != 0) {
LOG(FATAL) << "Failed proc finalize infer manager";
return -1;
......
......@@ -22,7 +22,7 @@ struct DynamicResource {
class Resource {
public:
Resource() : {}
Resource() {}
~Resource() { finalize(); }
......
#include <baidu/rpc/policy/itp.h> // ItpAdaptor
#include <baidu/rpc/policy/nova_pbrpc_protocol.h> // NovaServiceAdaptor
#include <baidu/rpc/policy/public_pbrpc_protocol.h> // PublicPbrpcServiceAdaptor
#include <baidu/rpc/policy/nshead_mcpack_protocol.h> // NsheadMcpackAdaptor
#include <brpc/policy/nova_pbrpc_protocol.h> // NovaServiceAdaptor
#include <brpc/policy/public_pbrpc_protocol.h> // PublicPbrpcServiceAdaptor
#include <brpc/policy/nshead_mcpack_protocol.h> // NsheadMcpackAdaptor
#include "common/inner_common.h"
#include "framework/server.h"
#include "framework/service_manager.h"
......@@ -68,7 +67,7 @@ int ServerManager::start_and_wait() {
boost::unordered_map<std::string, Service*>::iterator it;
for (it = _format_services.begin(); it != _format_services.end();
it++) {
if (_server.AddService(it->second, baidu::rpc::SERVER_DOESNT_OWN_SERVICE)
if (_server.AddService(it->second, brpc::SERVER_DOESNT_OWN_SERVICE)
!= 0) {
LOG(ERROR) << "Failed to add service of format:"
<< it->first << "!";
......@@ -93,14 +92,12 @@ int ServerManager::start_and_wait() {
void ServerManager::_set_server_option_by_protocol(
const ::butil::StringPiece& protocol_type) {
std::string enabled_protocols = FLAGS_enable_protocol_list;
if (_compare_string_piece_without_case(protocol_type, "itp")) {
_options.nshead_service = new ::baidu::rpc::policy::ItpAdaptor;
} else if (_compare_string_piece_without_case(protocol_type, "nova_pbrpc")) {
_options.nshead_service = new ::baidu::rpc::policy::NovaServiceAdaptor;;
if (_compare_string_piece_without_case(protocol_type, "nova_pbrpc")) {
_options.nshead_service = new ::brpc::policy::NovaServiceAdaptor;;
} else if (_compare_string_piece_without_case(protocol_type, "public_pbrpc")) {
_options.nshead_service = new ::baidu::rpc::policy::PublicPbrpcServiceAdaptor;
_options.nshead_service = new ::brpc::policy::PublicPbrpcServiceAdaptor;
} else if (_compare_string_piece_without_case(protocol_type, "nshead_mcpack")) {
_options.nshead_service = new ::baidu::rpc::policy::NsheadMcpackAdaptor;
_options.nshead_service = new ::brpc::policy::NsheadMcpackAdaptor;
} else {
LOG(ERROR) << "fail to set nshead protocol, protocol_type[" << protocol_type << "].";
return;
......
......@@ -41,8 +41,8 @@ private:
void _set_server_option_by_protocol(const ::butil::StringPiece& protocol_type);
baidu::rpc::ServerOptions _options;
baidu::rpc::Server _server;
brpc::ServerOptions _options;
brpc::Server _server;
boost::unordered_map<std::string, Service*> _format_services;
THREAD_T _reload_thread;
static volatile bool _s_reload_starting;
......
......@@ -54,7 +54,7 @@ using baidu::paddle_serving::predictor::format::DenseInstance;
using baidu::paddle_serving::predictor::format::DensePrediction;
void send_dense_format(BuiltinDenseFormatService_Stub& stub, int log_id) {
baidu::rpc::Controller cntl;
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
......@@ -83,7 +83,7 @@ void send_dense_format(BuiltinDenseFormatService_Stub& stub, int log_id) {
}
if (FLAGS_compress) {
cntl.set_request_compress_type(baidu::rpc::COMPRESS_TYPE_SNAPPY);
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
......@@ -98,13 +98,13 @@ void send_dense_format(BuiltinDenseFormatService_Stub& stub, int log_id) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << dense_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us" << noflush;
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << dense_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us " << noflush;
<< " latency=" << cntl.latency_us() << "us ";
}
} else {
LOG(WARNING) << cntl.ErrorText();
......@@ -124,7 +124,7 @@ using baidu::paddle_serving::predictor::format::SparseInstance;
using baidu::paddle_serving::predictor::format::SparsePrediction;
void send_sparse_format(BuiltinSparseFormatService_Stub& stub, int log_id) {
baidu::rpc::Controller cntl;
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
......@@ -177,7 +177,7 @@ void send_sparse_format(BuiltinSparseFormatService_Stub& stub, int log_id) {
}
if (FLAGS_compress) {
cntl.set_request_compress_type(baidu::rpc::COMPRESS_TYPE_SNAPPY);
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
......@@ -192,13 +192,13 @@ void send_sparse_format(BuiltinSparseFormatService_Stub& stub, int log_id) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << sparse_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us" << noflush;
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << sparse_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us" << noflush;
<< " latency=" << cntl.latency_us() << "us";
}
} else {
LOG(WARNING) << cntl.ErrorText();
......@@ -217,7 +217,7 @@ using baidu::paddle_serving::predictor::format::Int64TensorInstance;
using baidu::paddle_serving::predictor::format::Float32TensorPredictor;
void send_fluid_format(BuiltinFluidService_Stub& stub, int log_id) {
baidu::rpc::Controller cntl;
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
......@@ -250,7 +250,7 @@ void send_fluid_format(BuiltinFluidService_Stub& stub, int log_id) {
}
if (FLAGS_compress) {
cntl.set_request_compress_type(baidu::rpc::COMPRESS_TYPE_SNAPPY);
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
......@@ -265,13 +265,13 @@ void send_fluid_format(BuiltinFluidService_Stub& stub, int log_id) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << fluid_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us" << noflush;
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << fluid_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us " << noflush;
<< " latency=" << cntl.latency_us() << "us ";
}
} else {
LOG(WARNING) << cntl.ErrorText();
......@@ -294,7 +294,7 @@ using baidu::paddle_serving::predictor::format::XImageReqInstance;
using baidu::paddle_serving::predictor::format::XImageResInstance;
void send_ximage_format(ImageClassifyService_Stub& stub, int log_id) {
baidu::rpc::Controller cntl;
brpc::Controller cntl;
// We will receive response synchronously, safe to put variables
// on stack.
......@@ -333,7 +333,7 @@ void send_ximage_format(ImageClassifyService_Stub& stub, int log_id) {
}
if (FLAGS_compress) {
cntl.set_request_compress_type(baidu::rpc::COMPRESS_TYPE_SNAPPY);
cntl.set_request_compress_type(brpc::COMPRESS_TYPE_SNAPPY);
}
timer.check("compress");
......@@ -347,13 +347,13 @@ void send_ximage_format(ImageClassifyService_Stub& stub, int log_id) {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << ximage_response.ShortDebugString()
<< " latency=" << cntl.latency_us() << "us" << noflush;
<< " latency=" << cntl.latency_us() << "us";
} else {
LOG(INFO) << "Received response from " << cntl.remote_side()
<< " to " << cntl.local_side()
<< ": " << ximage_response.ShortDebugString()
<< " (attached=" << cntl.response_attachment() << ")"
<< " latency=" << cntl.latency_us() << "us " << noflush;
<< " latency=" << cntl.latency_us() << "us ";
}
} else {
LOG(WARNING) << cntl.ErrorText();
......@@ -381,22 +381,22 @@ int main(int argc, char* argv[]) {
// Login to get `CredentialGenerator' (see baas-lib-c/baas.h for more
// information) and then pass it to `GianoAuthenticator'.
std::unique_ptr<baidu::rpc::policy::GianoAuthenticator> auth;
std::unique_ptr<brpc::policy::GianoAuthenticator> auth;
if (FLAGS_auth) {
if (baas::BAAS_Init() != 0) {
LOG(ERROR) << "Fail to init BAAS";
return -1;
}
baas::CredentialGenerator gen = baas::ClientUtility::Login(FLAGS_auth_group);
auth.reset(new baidu::rpc::policy::GianoAuthenticator(&gen, NULL));
auth.reset(new brpc::policy::GianoAuthenticator(&gen, NULL));
}
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
baidu::rpc::Channel channel;
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
baidu::rpc::ChannelOptions options;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.auth = auth.get();
......@@ -423,7 +423,7 @@ int main(int argc, char* argv[]) {
// Send a request and wait for the response every 1 second.
int log_id = 0;
while (!baidu::rpc::IsAskedToQuit()) {
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
// on stack.
log_id++;
......
......@@ -103,7 +103,7 @@ public:
printer.Print("#include \"framework/service_manager.h\"\n");
}
if (generate_stub) {
printer.Print("#include <baidu/rpc/parallel_channel.h>\n");
printer.Print("#include <brpc/parallel_channel.h>\n");
printer.Print("#include \"factory.h\"\n");
printer.Print("#include \"stub.h\"\n");
printer.Print("#include \"stub_impl.h\"\n");
......@@ -251,9 +251,9 @@ private:
"output_name", google::protobuf::dots_to_colons(m->output_type()->full_name()));
if (m->name() == "inference") {
printer->Print(
" baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n"
" static_cast<baidu::rpc::Controller*>(cntl_base);\n"
" brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n"
" static_cast<brpc::Controller*>(cntl_base);\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
" ::baidu::paddle_serving::predictor::InferServiceManager::instance().item(\"$service$\");\n"
" if (svr == NULL) {\n"
......@@ -261,10 +261,10 @@ private:
" cntl->SetFailed(404, \"Not found service: $service$\");\n"
" return ;\n"
" }\n"
" LOG(NOTICE) << \" remote_side=\[\" << cntl->remote_side() << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" local_side=\[\" << cntl->local_side() << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" service_name=\[\" << \"$name$\" << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" log_id=\[\" << cntl->log_id() << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" remote_side=\[\" << cntl->remote_side() << \"\]\";\n"
" LOG(NOTICE) << \" local_side=\[\" << cntl->local_side() << \"\]\";\n"
" LOG(NOTICE) << \" service_name=\[\" << \"$name$\" << \"\]\";\n"
" LOG(NOTICE) << \" log_id=\[\" << cntl->log_id() << \"\]\";\n"
" int err_code = svr->inference(request, response);\n"
" if (err_code != 0) {\n"
" LOG(WARNING)\n"
......@@ -280,9 +280,9 @@ private:
}
if (m->name() == "debug") {
printer->Print(
" baidu::rpc::ClosureGuard done_guard(done);\n"
" baidu::rpc::Controller* cntl = \n"
" static_cast<baidu::rpc::Controller*>(cntl_base);\n"
" brpc::ClosureGuard done_guard(done);\n"
" brpc::Controller* cntl = \n"
" static_cast<brpc::Controller*>(cntl_base);\n"
" ::baidu::paddle_serving::predictor::InferService* svr = \n"
" ::baidu::paddle_serving::predictor::InferServiceManager::instance().item(\"$service$\");\n"
" if (svr == NULL) {\n"
......@@ -290,11 +290,11 @@ private:
" cntl->SetFailed(404, \"Not found service: $service$\");\n"
" return ;\n"
" }\n"
" LOG(NOTICE) << \" remote_side=\[\" << cntl->remote_side() << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" local_side=\[\" << cntl->local_side() << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" service_name=\[\" << \"$name$\" << \"\]\" << noflush;\n"
" LOG(NOTICE) << \" log_id=\[\" << cntl->log_id() << \"\]\" << noflush;\n"
" base::IOBufBuilder debug_os;\n"
" LOG(NOTICE) << \" remote_side=\[\" << cntl->remote_side() << \"\]\";\n"
" LOG(NOTICE) << \" local_side=\[\" << cntl->local_side() << \"\]\";\n"
" LOG(NOTICE) << \" service_name=\[\" << \"$name$\" << \"\]\";\n"
" LOG(NOTICE) << \" log_id=\[\" << cntl->log_id() << \"\]\";\n"
" butil::IOBufBuilder debug_os;\n"
" int err_code = svr->inference(request, response, &debug_os);\n"
" if (err_code != 0) {\n"
" LOG(WARNING)\n"
......@@ -329,7 +329,7 @@ private:
const std::string& service_name,
const std::string& class_name) const {
printer->Print(
"class $name$_StubCallMapper : public baidu::rpc::CallMapper {\n"
"class $name$_StubCallMapper : public brpc::CallMapper {\n"
"private:\n"
" uint32_t _package_size;\n"
" baidu::paddle_serving::sdk_cpp::Stub* _stub_handler;\n"
......@@ -342,7 +342,7 @@ private:
"}\n", "name", class_name);
printer->Print(
"baidu::rpc::SubCall default_map(\n"
"brpc::SubCall default_map(\n"
" int channel_index,\n"
" const google::protobuf::MethodDescriptor* method,\n"
" const google::protobuf::Message* request,\n"
......@@ -361,7 +361,7 @@ private:
"}\n");
printer->Print(
"baidu::rpc::SubCall sub_package_map(\n"
"brpc::SubCall sub_package_map(\n"
" int channel_index,\n"
" const google::protobuf::MethodDescriptor* method,\n"
" const google::protobuf::Message* request,\n"
......@@ -404,7 +404,7 @@ private:
"}\n");
printer->Print(
"baidu::rpc::SubCall Map(\n"
"brpc::SubCall Map(\n"
" int channel_index,\n"
" const google::protobuf::MethodDescriptor* method,\n"
" const google::protobuf::Message* request,\n"
......@@ -418,15 +418,15 @@ private:
"return default_map(channel_index, method, request, response);\n");
} else {
printer->Print(
"base::Timer tt(base::Timer::STARTED);\n"
"baidu::rpc::SubCall ret;\n"
"butil::Timer tt(butil::Timer::STARTED);\n"
"brpc::SubCall ret;\n"
"if (_package_size == 0) {\n"
" ret = default_map(channel_index, method, request, response);\n"
"} else {\n"
" ret = sub_package_map(channel_index, method, request, response);\n"
"}\n"
"tt.stop();\n"
"if (ret.flags != baidu::rpc::SKIP_SUB_CHANNEL && ret.method != NULL) {\n"
"if (ret.flags != brpc::SKIP_SUB_CHANNEL && ret.method != NULL) {\n"
" _stub_handler->update_latency(tt.u_elapsed(), \"pack_map\");\n"
"}\n"
"return ret;\n");
......@@ -440,7 +440,7 @@ private:
////////////////////////////////////////////////////////////////
printer->Print(
"class $name$_StubResponseMerger : public baidu::rpc::ResponseMerger {\n"
"class $name$_StubResponseMerger : public brpc::ResponseMerger {\n"
"private:\n"
" uint32_t _package_size;\n"
" baidu::paddle_serving::sdk_cpp::Stub* _stub_handler;\n"
......@@ -453,7 +453,7 @@ private:
"}\n", "name", class_name);
printer->Print(
"baidu::rpc::ResponseMerger::Result default_merge(\n"
"brpc::ResponseMerger::Result default_merge(\n"
" google::protobuf::Message* response,\n"
" const google::protobuf::Message* sub_response) {\n"
" baidu::paddle_serving::sdk_cpp::TracePackScope scope(\"default_merge\");",
......@@ -468,7 +468,7 @@ private:
"}\n");
printer->Print(
"baidu::rpc::ResponseMerger::Result sub_package_merge(\n"
"brpc::ResponseMerger::Result sub_package_merge(\n"
" google::protobuf::Message* response,\n"
" const google::protobuf::Message* sub_response) {\n"
" baidu::paddle_serving::sdk_cpp::TracePackScope scope(\"sub_merge\");",
......@@ -483,21 +483,21 @@ private:
"}\n");
printer->Print(
"baidu::rpc::ResponseMerger::Result Merge(\n"
"brpc::ResponseMerger::Result Merge(\n"
" google::protobuf::Message* response,\n"
" const google::protobuf::Message* sub_response) {\n",
"name", class_name);
printer->Indent();
printer->Print(
"base::Timer tt(base::Timer::STARTED);\n"
"baidu::rpc::ResponseMerger::Result ret;"
"butil::Timer tt(butil::Timer::STARTED);\n"
"brpc::ResponseMerger::Result ret;"
"if (_package_size <= 0) {\n"
" ret = default_merge(response, sub_response);\n"
"} else {\n"
" ret = sub_package_merge(response, sub_response);\n"
"}\n"
"tt.stop();\n"
"if (ret != baidu::rpc::ResponseMerger::FAIL) {\n"
"if (ret != brpc::ResponseMerger::FAIL) {\n"
" _stub_handler->update_latency(tt.u_elapsed(), \"pack_merge\");\n"
"}\n"
"return ret;\n");
......@@ -516,7 +516,7 @@ private:
const std::string& class_name) const {
printer->Print(
"if (channel_index > 0) { \n"
" return baidu::rpc::SubCall::Skip();\n"
" return brpc::SubCall::Skip();\n"
"}\n");
printer->Print(
"google::protobuf::Message* cur_res = _stub_handler->fetch_response();\n"
......@@ -526,14 +526,14 @@ private:
" if (cur_res == NULL) {\n"
" LOG(FATAL) << \"Failed new response item!\";\n"
" _stub_handler->update_average(1, \"pack_fail\");\n"
" return baidu::rpc::SubCall::Bad();\n"
" return brpc::SubCall::Bad();\n"
" }\n"
" return baidu::rpc::SubCall(method, request, cur_res, baidu::rpc::DELETE_RESPONSE);\n"
" return brpc::SubCall(method, request, cur_res, brpc::DELETE_RESPONSE);\n"
"}\n");
"LOG(INFO) \n"
" << \"[default] Succ map, channel_index: \" << channel_index;\n";
printer->Print(
"return baidu::rpc::SubCall(method, request, cur_res, 0);\n"
"return brpc::SubCall(method, request, cur_res, 0);\n"
);
return true;
}
......@@ -546,11 +546,11 @@ private:
printer->Print(
"try {\n"
" response->MergeFrom(*sub_response);\n"
" return baidu::rpc::ResponseMerger::MERGED;\n"
" return brpc::ResponseMerger::MERGED;\n"
"} catch (const std::exception& e) {\n"
" LOG(FATAL) << \"Merge failed.\";\n"
" _stub_handler->update_average(1, \"pack_fail\");\n"
" return baidu::rpc::ResponseMerger::FAIL;\n"
" return brpc::ResponseMerger::FAIL;\n"
"}\n");
return true;
}
......@@ -593,7 +593,7 @@ private:
printer->Print(
"int start = _package_size * channel_index;\n"
"if (start >= total_size) {\n"
" return baidu::rpc::SubCall::Skip();\n"
" return brpc::SubCall::Skip();\n"
"}\n"
"int end = _package_size * (channel_index + 1);\n"
"if (end > total_size) {\n"
......@@ -605,7 +605,7 @@ private:
"if (sub_req == NULL) {\n"
" LOG(FATAL) << \"failed fetch sub_req from stub.\";\n"
" _stub_handler->update_average(1, \"pack_fail\");\n"
" return baidu::rpc::SubCall::Bad();\n"
" return brpc::SubCall::Bad();\n"
"}\n",
"name", class_name, "req_type", google::protobuf::dots_to_colons(
md->input_type()->full_name()));
......@@ -617,7 +617,7 @@ private:
" << total_size << \"!=\" << req->$field_name$_size()\n"
" << \", field: $field_name$.\";\n"
" _stub_handler->update_average(1, \"pack_fail\");\n"
" return baidu::rpc::SubCall::Bad();\n"
" return brpc::SubCall::Bad();\n"
"}\n", "field_name", field_name);
}
......@@ -645,7 +645,7 @@ private:
" if (!sub_req) {\n"
" LOG(FATAL) << \"failed fetch sub_req from stub handler.\";\n"
" _stub_handler->update_average(1, \"pack_fail\");\n"
" return baidu::rpc::SubCall::Bad();\n"
" return brpc::SubCall::Bad();\n"
" }\n"
"}\n", "req_type", google::protobuf::dots_to_colons(
md->input_type()->full_name()));
......@@ -683,9 +683,9 @@ private:
"if (sub_res == NULL) {\n"
" LOG(FATAL) << \"failed create sub_res from res.\";\n"
" _stub_handler->update_average(1, \"pack_fail\");\n"
" return baidu::rpc::SubCall::Bad();\n"
" return brpc::SubCall::Bad();\n"
"}\n"
"return baidu::rpc::SubCall(method, sub_req, sub_res, 0);\n");
"return brpc::SubCall(method, sub_req, sub_res, 0);\n");
return true;
}
bool generate_paddle_serving_stub_package_merger(
......
......@@ -18,7 +18,7 @@ namespace baidu {
namespace paddle_serving {
namespace unittest {
base::atomic<size_t> global_id;
butil::atomic<size_t> global_id;
void TestItem::auto_gen() {
id = global_id.fetch_add(1);
......@@ -37,7 +37,7 @@ void work(const std::vector<TestItem>& in, std::vector<TestItem>& out) {
TEST_F(TestBsf, test_single_thread) {
// initialize TaskExecutor
global_id.store(0, base::memory_order_relaxed);
global_id.store(0, butil::memory_order_relaxed);
im::bsf::TaskExecutor<im::bsf::Task<TestItem, TestItem> >::instance()->set_thread_callback_fn(
boost::bind(&work, _1, _2));
EXPECT_EQ((im::bsf::TaskExecutor<im::bsf::Task<TestItem, TestItem> >::instance()->start(1)), 0);
......@@ -67,7 +67,7 @@ TEST_F(TestBsf, test_single_thread) {
TEST_F(TestBsf, test_multi_thread) {
// initialize TaskExecutor
global_id.store(0, base::memory_order_relaxed);
global_id.store(0, butil::memory_order_relaxed);
im::bsf::TaskExecutor<im::bsf::Task<TestItem, TestItem> >::instance()->set_thread_callback_fn(
boost::bind(&work, _1, _2));
im::bsf::TaskExecutor<im::bsf::Task<TestItem, TestItem> >::instance()->set_batch_size(100);
......
#include <base/files/temp_file.h>
#include <butil/files/temp_file.h>
#include "framework/manager.h"
#include "framework/service.h"
#include "framework/dag.h"
......@@ -246,7 +246,7 @@ TEST_F(TestOP, test_op_with_channel_and_conf) {
Dag dag;
std::string op_name = "test_op";
std::string name_in_conf = "test_name_in_conf";
base::TempFile dag_conf;
butil::TempFile dag_conf;
dag_conf.save_format(
"[@Node]\n"
"name: %s\n"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册