diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 0c54201c94034f4ceaca1ba720dce22a81fe417d..f9411aa73fad49daa11cf0573c44664b6b01933b 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -75,6 +75,11 @@ class Carrier final { bool IsInit() const; + // NOTE: This mutex will be used in interceptor's RunOps function. + // This mutex is used for avoiding forward ops and backward ops run + // simultaneously, which will lead to a random hang for some sync ops. + std::mutex run; + DISABLE_COPY_AND_ASSIGN(Carrier); private: diff --git a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc index 35905125a0a430e4f30d32ac367cb268722f9c85..98583de84e7ea9c0fef595b097e7fe9116142c81 100644 --- a/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/compute_interceptor.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/fleet_executor/compute_interceptor.h" +#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/executor_gc_helper.h" @@ -169,6 +170,8 @@ void ComputeInterceptor::ReplyCompletedToUpStream() { } void ComputeInterceptor::RunOps() { + Carrier& carrier_instance = Carrier::Instance(); + std::unique_lock lock(carrier_instance.run); VLOG(3) << "ComputeInterceptor " << interceptor_id_ << " running ops for the " << step_ + 1 << " time."; for (auto op : node_->ops()) {