未验证 提交 25845a28 编写于 作者: T TeslaZhao 提交者: GitHub

Merge pull request #49 from PaddlePaddle/develop

Develop
...@@ -21,6 +21,15 @@ ...@@ -21,6 +21,15 @@
#include <string> #include <string>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/op_repository.h" #include "core/predictor/framework/op_repository.h"
#ifdef BCLOUD
#include <base/atomicops.h>
#else
#include <butil/atomicops.h>
#endif
#include <errno.h>
#include "core/predictor/framework/resource.h"
using baidu::paddle_serving::predictor::Resource;
namespace baidu { namespace baidu {
namespace paddle_serving { namespace paddle_serving {
...@@ -238,6 +247,77 @@ const Channel* DagView::get_response_channel(const uint64_t log_id) const { ...@@ -238,6 +247,77 @@ const Channel* DagView::get_response_channel(const uint64_t log_id) const {
return last_op->mutable_channel(); return last_op->mutable_channel();
} }
void* call_back(void* ori_args) {
Resource::instance().thread_initialize();
Args* args = (Args*)ori_args;
Op* op = static_cast<Op*>(args->_op);
uint64_t log_id = static_cast<uint64_t>(args->_log_id);
bool debug = static_cast<bool>(args->_debug);
args->errcode = op->process(log_id, debug);
return nullptr;
}
int ParallelDagView::execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os) {
butil::Timer stage_time(butil::Timer::STARTED);
uint32_t node_size = vstage->nodes.size();
std::vector<THREAD_T> tids(node_size);
Args* args = new Args[node_size];
VLOG(2) << "(logid=" << log_id << ") vstage->nodes.size(): " << node_size;
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf;
Op* op = vnode->op;
TRACEPRINTF(
"(logid=%" PRIu64 ") start to execute op[%s]", log_id, op->name());
args[ni]._op = op;
args[ni]._log_id = log_id;
args[ni]._debug = (debug_os != NULL);
int rc = THREAD_CREATE(&tids[ni], NULL, call_back, (void*)(args + ni));
if (rc != 0) {
LOG(ERROR) << "failed to create ParallelDagView worker thread: index="
<< ni << ", rc=" << rc << ", errno=" << errno << ":"
<< strerror(errno);
delete[] args;
return -1;
}
}
for (uint32_t ni = 0; ni < node_size; ni++) {
THREAD_JOIN(tids[ni], NULL);
int errcode = args[ni].errcode;
Op* op = args[ni]._op;
TRACEPRINTF(
"(logid=%" PRIu64 ") finish to execute op[%s]", log_id, op->name());
if (errcode < 0) {
LOG(ERROR) << "(logid=" << log_id
<< ") Execute failed, Op:" << op->debug_string();
delete[] args;
return errcode;
}
if (errcode > 0) {
LOG(INFO) << "(logid=" << log_id
<< ") Execute ignore, Op:" << op->debug_string();
continue;
}
if (debug_os) {
(*debug_os) << "(logid=" << log_id << ") {\"op_name\": \"" << op->name()
<< "\", \"debug_str:\": \"" << op->debug_string()
<< "\", \"time_info\": \"" << op->time_info() << "\"}";
}
// LOG(DEBUG) << "Execute succ, Op:" << op->debug_string();
}
stage_time.stop();
PredictorMetric::GetInstance()->update_latency_metric(
STAGE_METRIC_PREFIX + vstage->full_name, stage_time.u_elapsed());
delete[] args;
return ERR_OK;
}
} // namespace predictor } // namespace predictor
} // namespace paddle_serving } // namespace paddle_serving
} // namespace baidu } // namespace baidu
...@@ -24,7 +24,7 @@ namespace baidu { ...@@ -24,7 +24,7 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace predictor { namespace predictor {
class Op; // class Op;
struct ViewNode { struct ViewNode {
Op* op; // op->full_name == service_workflow_stageindex_opname Op* op; // op->full_name == service_workflow_stageindex_opname
...@@ -75,11 +75,20 @@ class DagView { ...@@ -75,11 +75,20 @@ class DagView {
Bus* _bus; Bus* _bus;
}; };
struct Args {
Op* _op;
uint64_t _log_id;
bool _debug;
int errcode;
};
// The derived DagView supports parallel execution // The derived DagView supports parallel execution
// strategy, by implments the execute_one_stage(). // strategy, by implments the execute_one_stage().
class ParallelDagView : public DagView { class ParallelDagView : public DagView {
public: public:
int execute_one_stage(ViewStage* vstage, butil::IOBufBuilder*) { return 0; } virtual int execute_one_stage(ViewStage* vstage,
const uint64_t log_id,
butil::IOBufBuilder* debug_os);
}; };
} // namespace predictor } // namespace predictor
......
...@@ -909,7 +909,7 @@ function ocr_c++_service() { ...@@ -909,7 +909,7 @@ function ocr_c++_service() {
cp -r ocr_det_client/ ./ocr_det_client_cp cp -r ocr_det_client/ ./ocr_det_client_cp
rm -rf ocr_det_client rm -rf ocr_det_client
mv ocr_det_client_cp ocr_det_client mv ocr_det_client_cp ocr_det_client
sed -i "s/feed_type: 1/feed_type: 3/g" ocr_det_client/serving_client_conf.prototxt sed -i "s/feed_type: 1/feed_type: 20/g" ocr_det_client/serving_client_conf.prototxt
sed -i "s/shape: 3/shape: 1/g" ocr_det_client/serving_client_conf.prototxt sed -i "s/shape: 3/shape: 1/g" ocr_det_client/serving_client_conf.prototxt
sed -i '7,8d' ocr_det_client/serving_client_conf.prototxt sed -i '7,8d' ocr_det_client/serving_client_conf.prototxt
echo -e "${GREEN_COLOR}OCR_C++_Service_GPU_RPC server started${RES}" echo -e "${GREEN_COLOR}OCR_C++_Service_GPU_RPC server started${RES}"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册