未验证 提交 17e655f8 编写于 作者: Z Zhang Yulong 提交者: GitHub

Merge branch 'develop' into ci-test

...@@ -181,7 +181,7 @@ int GeneralDistKVInferOp::inference() { ...@@ -181,7 +181,7 @@ int GeneralDistKVInferOp::inference() {
continue; continue;
} }
VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] << " , cube value len:" << cur_val->buff.size(); VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] << " , cube value len:" << cur_val->buff.size();
memcpy(data_ptr, cur_val->buff.data(), cur_val->buff.size()); memcpy(data_ptr, cur_val->buff.data() + 10, sizeof(float) * EMBEDDING_SIZE);
//VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", " <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", " <<data_ptr[7] << ", " <<data_ptr[8]; //VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << data_ptr[1] << ", " <<data_ptr[2] << ", " <<data_ptr[3] << ", " <<data_ptr[4] << ", " <<data_ptr[5] << ", " <<data_ptr[6] << ", " <<data_ptr[7] << ", " <<data_ptr[8];
++cube_key_found; ++cube_key_found;
++cube_val_idx; ++cube_val_idx;
......
...@@ -21,6 +21,15 @@ ...@@ -21,6 +21,15 @@
#include <string> #include <string>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/op_repository.h" #include "core/predictor/framework/op_repository.h"
#ifdef BCLOUD
#include <base/atomicops.h>
#else
#include <butil/atomicops.h>
#endif
#include <errno.h>
#include "core/predictor/framework/resource.h"
using baidu::paddle_serving::predictor::Resource;
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
...@@ -238,6 +247,77 @@ const Channel* DagView::get_response_channel(const uint64_t log_id) const { ...@@ -238,6 +247,77 @@ const Channel* DagView::get_response_channel(const uint64_t log_id) const {
return last_op->mutable_channel(); return last_op->mutable_channel();
} }
void* call_back(void* ori_args) {
Resource::instance().thread_initialize();
Args* args = (Args*)ori_args;
Op* op = static_cast<Op*>(args->_op);
uint64_t log_id = static_cast<uint64_t>(args->_log_id);
bool debug = static_cast<bool>(args->_debug);
args->errcode = op->process(log_id, debug);
return nullptr;
}
int ParallelDagView::execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
butil::Timer stage_time(butil::Timer::STARTED);
uint32_t node_size = vstage->nodes.size();
std::vector<THREAD_T> tids(node_size);
Args* args = new Args[node_size];
VLOG(2) << "(logid=" << log_id << ") vstage->nodes.size(): " << node_size;
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf;
Op* op = vnode->op;
TRACEPRINTF(
"(logid=%" PRIu64 ") start to execute op[%s]", log_id, op->name());
args[ni]._op = op;
args[ni]._log_id = log_id;
args[ni]._debug = (debug_os != NULL);
int rc = THREAD_CREATE(&tids[ni], NULL, call_back, (void*)(args + ni));
if (rc != 0) {
LOG(ERROR) << "failed to create ParallelDagView worker thread: index="
<< ni << ", rc=" << rc << ", errno=" << errno << ":"
<< strerror(errno);
delete[] args;
return -1;
}
}
for (uint32_t ni = 0; ni < node_size; ni++) {
THREAD_JOIN(tids[ni], NULL);
int errcode = args[ni].errcode;
Op* op = args[ni]._op;
TRACEPRINTF(
"(logid=%" PRIu64 ") finish to execute op[%s]", log_id, op->name());
if (errcode < 0) {
LOG(ERROR) << "(logid=" << log_id
<< ") Execute failed, Op:" << op->debug_string();
delete[] args;
return errcode;
}
if (errcode > 0) {
LOG(INFO) << "(logid=" << log_id
<< ") Execute ignore, Op:" << op->debug_string();
continue;
}
if (debug_os) {
(*debug_os) << "(logid=" << log_id << ") {\"op_name\": \"" << op->name()
<< "\", \"debug_str:\": \"" << op->debug_string()
<< "\", \"time_info\": \"" << op->time_info() << "\"}";
}
// LOG(DEBUG) << "Execute succ, Op:" << op->debug_string();
}
stage_time.stop();
PredictorMetric::GetInstance()->update_latency_metric(
STAGE_METRIC_PREFIX + vstage->full_name, stage_time.u_elapsed());
delete[] args;
return ERR_OK;
}
} // namespace predictor } // namespace predictor
} // namespace paddle_serving } // namespace paddle_serving
} // namespace baidu } // namespace baidu
...@@ -24,7 +24,7 @@ namespace baidu { ...@@ -24,7 +24,7 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
class Op; // class Op;
struct ViewNode { struct ViewNode {
Op* op; // op->full_name == service_workflow_stageindex_opname Op* op; // op->full_name == service_workflow_stageindex_opname
...@@ -75,11 +75,20 @@ class DagView { ...@@ -75,11 +75,20 @@ class DagView {
Bus* _bus; Bus* _bus;
}; };
struct Args {
Op* _op;
uint64_t _log_id;
bool _debug;
int errcode;
};
// The derived DagView supports parallel execution // The derived DagView supports parallel execution
// strategy, by implments the execute_one_stage(). // strategy, by implments the execute_one_stage().
class ParallelDagView : public DagView { class ParallelDagView : public DagView {
public: public:
int execute_one_stage(ViewStage* vstage, butil::IOBufBuilder*) { return 0; } virtual int execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os);
}; };
} // namespace predictor } // namespace predictor
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册