From 9171aaa076e45e4d7843073fc7c65cb5aac7d8a2 Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 29 Dec 2021 09:56:10 +0800 Subject: [PATCH] [fleet_executor] remove SetCreatingFlag (#38539) --- paddle/fluid/distributed/fleet_executor/carrier.h | 1 - paddle/fluid/distributed/fleet_executor/task_loop.h | 3 +++ paddle/fluid/distributed/fleet_executor/task_loop_thread.h | 4 ++++ .../fluid/distributed/fleet_executor/task_loop_thread_pool.h | 4 ++++ .../fleet_executor/test/compute_interceptor_run_op_test.cc | 2 -- .../fleet_executor/test/compute_interceptor_test.cc | 2 -- .../fleet_executor/test/interceptor_ping_pong_test.cc | 1 - .../test/interceptor_ping_pong_with_brpc_test.cc | 2 -- .../test/interceptor_pipeline_long_path_test.cc | 2 -- .../test/interceptor_pipeline_short_path_test.cc | 2 -- 10 files changed, 11 insertions(+), 12 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h index 81643a7455..5b7275416f 100644 --- a/paddle/fluid/distributed/fleet_executor/carrier.h +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -73,7 +73,6 @@ class Carrier final { Interceptor* SetInterceptor(int64_t interceptor_id, std::unique_ptr); - void SetCreatingFlag(bool flag) {} void SetMsgBus(const std::shared_ptr& msg_bus) { msg_bus_ = msg_bus; } diff --git a/paddle/fluid/distributed/fleet_executor/task_loop.h b/paddle/fluid/distributed/fleet_executor/task_loop.h index 91425304e5..ddf8d292d4 100644 --- a/paddle/fluid/distributed/fleet_executor/task_loop.h +++ b/paddle/fluid/distributed/fleet_executor/task_loop.h @@ -21,6 +21,7 @@ #include #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/platform/macros.h" namespace paddle { namespace distributed { @@ -66,6 +67,8 @@ class TaskLoop { } private: + DISABLE_COPY_AND_ASSIGN(TaskLoop); + void AbortNotInLoopThread(); static thread_local TaskLoop* thread_local_loop_; diff --git a/paddle/fluid/distributed/fleet_executor/task_loop_thread.h b/paddle/fluid/distributed/fleet_executor/task_loop_thread.h index 07952abdc2..ad5e99a5de 100644 --- a/paddle/fluid/distributed/fleet_executor/task_loop_thread.h +++ b/paddle/fluid/distributed/fleet_executor/task_loop_thread.h @@ -18,6 +18,8 @@ #include #include +#include "paddle/fluid/platform/macros.h" + namespace paddle { namespace distributed { @@ -31,6 +33,8 @@ class TaskLoopThread { TaskLoop* StartLoop(); private: + DISABLE_COPY_AND_ASSIGN(TaskLoopThread); + void Loop(); bool start_; diff --git a/paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h b/paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h index ffc9588f4e..559a83ef5a 100644 --- a/paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h +++ b/paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h @@ -17,6 +17,8 @@ #include #include +#include "paddle/fluid/platform/macros.h" + namespace paddle { namespace distributed { @@ -37,6 +39,8 @@ class TaskLoopThreadPool { std::vector GetAllLoops(); private: + DISABLE_COPY_AND_ASSIGN(TaskLoopThreadPool); + bool start_; int thread_num_; std::vector> threads_; diff --git a/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_run_op_test.cc b/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_run_op_test.cc index 2e0a12b424..b14ca5fc46 100644 --- a/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_run_op_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_run_op_test.cc @@ -84,8 +84,6 @@ TEST(ComputeInterceptor, Compute) { a->SetPlace(place); a->SetMicroBatchScope(scopes); - carrier.SetCreatingFlag(false); - // start InterceptorMessage msg; msg.set_message_type(DATA_IS_READY); diff --git a/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc b/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc index 47f4cf0c04..5b1c0de6f9 100644 --- a/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/compute_interceptor_test.cc @@ -69,8 +69,6 @@ TEST(ComputeInterceptor, Compute) { carrier.SetInterceptor(1, InterceptorFactory::Create("Compute", 1, node_b)); carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c)); - carrier.SetCreatingFlag(false); - InterceptorMessage msg; msg.set_message_type(DATA_IS_READY); // test run three times diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc index 639e16a94a..37f13dabb0 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_test.cc @@ -68,7 +68,6 @@ TEST(InterceptorTest, PingPong) { 0, InterceptorFactory::Create("PingPong", 0, nullptr)); carrier.SetInterceptor(1, std::make_unique(1, nullptr)); - carrier.SetCreatingFlag(false); InterceptorMessage msg; a->Send(1, msg); diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_with_brpc_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_with_brpc_test.cc index 262e5caa8c..16e40de774 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_with_brpc_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_ping_pong_with_brpc_test.cc @@ -113,7 +113,6 @@ TEST(InterceptorTest, PingPong) { if (pid == 0) { Carrier* carrier = FleetExecutor::CreateCarrier(0, interceptor_id_to_rank); - carrier->SetCreatingFlag(false); auto msg_bus = std::make_shared(); carrier->SetMsgBus(msg_bus); // NOTE: need Init msg_bus after carrier SetMsgBus @@ -128,7 +127,6 @@ TEST(InterceptorTest, PingPong) { } else { Carrier* carrier = FleetExecutor::CreateCarrier(1, interceptor_id_to_rank); - carrier->SetCreatingFlag(false); auto msg_bus = std::make_shared(); carrier->SetMsgBus(msg_bus); msg_bus->Init(1, {{0, ip0}, {1, ip1}}, ip1); diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc index b203617738..0e902f3d74 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_long_path_test.cc @@ -80,8 +80,6 @@ TEST(AmplifierInterceptor, Amplifier) { carrier.SetInterceptor(4, InterceptorFactory::Create("Amplifier", 4, node_e)); carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f)); - carrier.SetCreatingFlag(false); - // start InterceptorMessage msg; msg.set_message_type(DATA_IS_READY); diff --git a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc index 68ee054e76..d84b909eec 100644 --- a/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc +++ b/paddle/fluid/distributed/fleet_executor/test/interceptor_pipeline_short_path_test.cc @@ -96,8 +96,6 @@ TEST(AmplifierInterceptor, Amplifier) { carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c)); carrier.SetInterceptor(3, InterceptorFactory::Create("Amplifier", 3, node_d)); - carrier.SetCreatingFlag(false); - // start InterceptorMessage msg; msg.set_message_type(DATA_IS_READY); -- GitLab