未验证 提交 10d8d6b6 编写于 作者: Y Yuang Liu 提交者: GitHub

[fleet_executor] fix message bus bug (#37507)

上级 7de99d8c
...@@ -111,8 +111,7 @@ void MessageBus::ListenPort() { ...@@ -111,8 +111,7 @@ void MessageBus::ListenPort() {
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL) !defined(PADDLE_WITH_ASCEND_CL)
// function keep listen the port and handle the message // function keep listen the port and handle the message
InterceptorMessageServiceImpl interceptor_message_service; PADDLE_ENFORCE_EQ(server_.AddService(&interceptor_message_service_,
PADDLE_ENFORCE_EQ(server_.AddService(&interceptor_message_service,
brpc::SERVER_DOESNT_OWN_SERVICE), brpc::SERVER_DOESNT_OWN_SERVICE),
0, platform::errors::Unavailable( 0, platform::errors::Unavailable(
"Message bus: init brpc service error.")); "Message bus: init brpc service error."));
......
...@@ -89,6 +89,7 @@ class MessageBus final { ...@@ -89,6 +89,7 @@ class MessageBus final {
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL) !defined(PADDLE_WITH_ASCEND_CL)
InterceptorMessageServiceImpl interceptor_message_service_;
// brpc server // brpc server
brpc::Server server_; brpc::Server server_;
#endif #endif
......
...@@ -142,7 +142,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { ...@@ -142,7 +142,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
int pipeline_stage = coord.pp_idx; int pipeline_stage = coord.pp_idx;
int64_t num_pipeline_stages = exe_desc_.pp_degree(); int64_t num_pipeline_stages = exe_desc_.pp_degree();
// TODO(fleet_executor dev): start up steps should be a config `num_slots` // TODO(fleet_executor dev): start up steps should be a config `num_slots`
int64_t start_up_steps = num_pipeline_stages - pipeline_stage - 1; int64_t start_up_steps = num_pipeline_stages - pipeline_stage;
int64_t num_micro_batches = exe_desc_.num_micro_batches(); int64_t num_micro_batches = exe_desc_.num_micro_batches();
int64_t task_id = cur_rank * functionality_order.size(); int64_t task_id = cur_rank * functionality_order.size();
for (std::size_t i = 0; i < functionality_order.size(); ++i) { for (std::size_t i = 0; i < functionality_order.size(); ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册