diff --git a/core/predictor/framework/dag_view.cpp b/core/predictor/framework/dag_view.cpp index 29a4e97378c20d6f9caae8a97de7dc5f714960e9..64383514f604688a085097a7ec0043c91f25a9b4 100644 --- a/core/predictor/framework/dag_view.cpp +++ b/core/predictor/framework/dag_view.cpp @@ -21,6 +21,15 @@ #include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/op_repository.h" +#ifdef BCLOUD +#include +#else +#include +#endif +#include + +#include "core/predictor/framework/resource.h" +using baidu::paddle_serving::predictor::Resource; namespace baidu { namespace paddle_serving { @@ -238,6 +247,77 @@ const Channel* DagView::get_response_channel(const uint64_t log_id) const { return last_op->mutable_channel(); } +void* call_back(void* ori_args) { + Resource::instance().thread_initialize(); + Args* args = (Args*)ori_args; + Op* op = static_cast(args->_op); + uint64_t log_id = static_cast(args->_log_id); + bool debug = static_cast(args->_debug); + args->errcode = op->process(log_id, debug); + return nullptr; +} + +int ParallelDagView::execute_one_stage(ViewStage* vstage, + const uint64_t log_id, + butil::IOBufBuilder* debug_os) { + butil::Timer stage_time(butil::Timer::STARTED); + uint32_t node_size = vstage->nodes.size(); + std::vector tids(node_size); + Args* args = new Args[node_size]; + VLOG(2) << "(logid=" << log_id << ") vstage->nodes.size(): " << node_size; + for (uint32_t ni = 0; ni < node_size; ni++) { + ViewNode* vnode = vstage->nodes[ni]; + DagNode* conf = vnode->conf; + Op* op = vnode->op; + TRACEPRINTF( + "(logid=%" PRIu64 ") start to execute op[%s]", log_id, op->name()); + + args[ni]._op = op; + args[ni]._log_id = log_id; + args[ni]._debug = (debug_os != NULL); + int rc = THREAD_CREATE(&tids[ni], NULL, call_back, (void*)(args + ni)); + if (rc != 0) { + LOG(ERROR) << "failed to create ParallelDagView worker thread: index=" + << ni << ", rc=" << rc << ", errno=" << errno << ":" + << strerror(errno); + delete[] args; + return -1; + } + } + for (uint32_t ni = 0; ni < node_size; ni++) { + THREAD_JOIN(tids[ni], NULL); + int errcode = args[ni].errcode; + Op* op = args[ni]._op; + TRACEPRINTF( + "(logid=%" PRIu64 ") finish to execute op[%s]", log_id, op->name()); + if (errcode < 0) { + LOG(ERROR) << "(logid=" << log_id + << ") Execute failed, Op:" << op->debug_string(); + delete[] args; + return errcode; + } + + if (errcode > 0) { + LOG(INFO) << "(logid=" << log_id + << ") Execute ignore, Op:" << op->debug_string(); + continue; + } + + if (debug_os) { + (*debug_os) << "(logid=" << log_id << ") {\"op_name\": \"" << op->name() + << "\", \"debug_str:\": \"" << op->debug_string() + << "\", \"time_info\": \"" << op->time_info() << "\"}"; + } + + // LOG(DEBUG) << "Execute succ, Op:" << op->debug_string(); + } + stage_time.stop(); + PredictorMetric::GetInstance()->update_latency_metric( + STAGE_METRIC_PREFIX + vstage->full_name, stage_time.u_elapsed()); + delete[] args; + return ERR_OK; +} + } // namespace predictor } // namespace paddle_serving } // namespace baidu diff --git a/core/predictor/framework/dag_view.h b/core/predictor/framework/dag_view.h index 8ba9d224c577b475d0a52b79e92f72bd1abaa187..5b12238396e359069fff2ce288eba3724ca0a764 100644 --- a/core/predictor/framework/dag_view.h +++ b/core/predictor/framework/dag_view.h @@ -24,7 +24,7 @@ namespace baidu { namespace paddle_serving { namespace predictor { -class Op; +// class Op; struct ViewNode { Op* op; // op->full_name == service_workflow_stageindex_opname @@ -75,11 +75,20 @@ class DagView { Bus* _bus; }; +struct Args { + Op* _op; + uint64_t _log_id; + bool _debug; + int errcode; +}; + // The derived DagView supports parallel execution // strategy, by implments the execute_one_stage(). class ParallelDagView : public DagView { public: - int execute_one_stage(ViewStage* vstage, butil::IOBufBuilder*) { return 0; } + virtual int execute_one_stage(ViewStage* vstage, + const uint64_t log_id, + butil::IOBufBuilder* debug_os); }; } // namespace predictor diff --git a/core/predictor/framework/service.cpp b/core/predictor/framework/service.cpp old mode 100644 new mode 100755 diff --git a/core/predictor/framework/service.h b/core/predictor/framework/service.h old mode 100644 new mode 100755