diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 23e1b2a31d88b14e391ee15f9121c671304295c9..4a41f69411836e1154df23fa3c1d33dd0327ab24 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -111,8 +111,7 @@ void MessageBus::ListenPort() { #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) // function keep listen the port and handle the message - InterceptorMessageServiceImpl interceptor_message_service; - PADDLE_ENFORCE_EQ(server_.AddService(&interceptor_message_service, + PADDLE_ENFORCE_EQ(server_.AddService(&interceptor_message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), 0, platform::errors::Unavailable( "Message bus: init brpc service error.")); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index 9212a93df425fe7e279b7ef83a6eb4ed96b773ca..03bb7ed81a0c78b1645529a88def18cf4922c1ed 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -89,6 +89,7 @@ class MessageBus final { #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) + InterceptorMessageServiceImpl interceptor_message_service_; // brpc server brpc::Server server_; #endif diff --git a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc index c9455ffef492d5d25e9179832e7f452e7cfe6c2c..3a76bd43f9d55be8e5ac6dc6caa1d3008e3687e4 100644 --- a/paddle/fluid/distributed/fleet_executor/runtime_graph.cc +++ b/paddle/fluid/distributed/fleet_executor/runtime_graph.cc @@ -142,7 +142,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { int pipeline_stage = coord.pp_idx; int64_t num_pipeline_stages = exe_desc_.pp_degree(); // TODO(fleet_executor dev): start up steps should be a config `num_slots` - int64_t start_up_steps = num_pipeline_stages - pipeline_stage - 1; + int64_t start_up_steps = num_pipeline_stages - pipeline_stage; int64_t num_micro_batches = exe_desc_.num_micro_batches(); int64_t task_id = cur_rank * functionality_order.size(); for (std::size_t i = 0; i < functionality_order.size(); ++i) {