未验证 提交 640f8cf0 编写于 作者: L lilong12 提交者: GitHub

[Cherry-pick] Disable gloo by default #29559 #29805 (#29601)

* update, test=develop (#29559)

* Disable gloo by default (#29805)

* update, test=develop

* update, test=develop
上级 38f83788
...@@ -272,8 +272,7 @@ void GlooWrapper::Init() { ...@@ -272,8 +272,7 @@ void GlooWrapper::Init() {
attr.iface = iface_; attr.iface = iface_;
std::shared_ptr<gloo::rendezvous::HdfsStore> file_store = nullptr; std::shared_ptr<gloo::rendezvous::HdfsStore> file_store = nullptr;
std::shared_ptr<gloo::rendezvous::HTTPStore> http_store = nullptr; std::shared_ptr<gloo::rendezvous::HTTPStore> http_store = nullptr;
auto context = auto context = std::make_shared<gloo::rendezvous::Context>(rank_, size_);
std::make_shared<gloo::rendezvous::ParallelConnectContext>(rank_, size_);
context->setTimeout(run_timeout_); context->setTimeout(run_timeout_);
auto dev = gloo::transport::tcp::CreateDevice(attr); auto dev = gloo::transport::tcp::CreateDevice(attr);
switch (store_type_) { switch (store_type_) {
...@@ -295,6 +294,7 @@ void GlooWrapper::Init() { ...@@ -295,6 +294,7 @@ void GlooWrapper::Init() {
http_store->SetTimeoutSeconds(init_timeout_.count()); http_store->SetTimeoutSeconds(init_timeout_.count());
context->connectFullMesh(*http_store, dev); context->connectFullMesh(*http_store, dev);
http_store->Finalize(); http_store->Finalize();
VLOG(3) << "after calling http_store->Finalize.";
break; break;
} }
default: default:
...@@ -304,6 +304,7 @@ void GlooWrapper::Init() { ...@@ -304,6 +304,7 @@ void GlooWrapper::Init() {
context_ = std::move(context); context_ = std::move(context);
#endif #endif
is_initialized_ = true; is_initialized_ = true;
VLOG(3) << "gloo initialized done.";
} }
template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>( template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>(
......
...@@ -220,15 +220,8 @@ class Gloo(object): ...@@ -220,15 +220,8 @@ class Gloo(object):
rank, nodes = self._get_rank_nodes(Role.WORKER) rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER") gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo self._worker_comm = gloo
else: # TODO (sandyhouse): initialize gloo for server and all
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
if start_http_server: if start_http_server:
http_server_d["running"] = False http_server_d["running"] = False
http_server.join() http_server.join()
......
...@@ -219,7 +219,7 @@ def launch_collective(args): ...@@ -219,7 +219,7 @@ def launch_collective(args):
global_envs = copy.copy(os.environ.copy()) global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp() gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env # add gloo env
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "1")) global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
......
...@@ -955,7 +955,7 @@ class ParameterServerLauncher(object): ...@@ -955,7 +955,7 @@ class ParameterServerLauncher(object):
"TRAINING_ROLE": "PSERVER", "TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(self.worker_num), "PADDLE_TRAINERS_NUM": str(self.worker_num),
"POD_IP": cur_server.endpoint.split(":")[0], "POD_IP": cur_server.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")), "PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "0")),
"PADDLE_GLOO_RENDEZVOUS": "3", "PADDLE_GLOO_RENDEZVOUS": "3",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"PADDLE_GLOO_HTTP_ENDPOINT": self.http_port "PADDLE_GLOO_HTTP_ENDPOINT": self.http_port
...@@ -1019,7 +1019,7 @@ class ParameterServerLauncher(object): ...@@ -1019,7 +1019,7 @@ class ParameterServerLauncher(object):
self.heter_worker_endpoints, self.heter_worker_endpoints,
"TRAINING_ROLE": "TRAINER", "TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(cur_worker.rank), "PADDLE_TRAINER_ID": str(cur_worker.rank),
"PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")), "PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "0")),
"PADDLE_GLOO_RENDEZVOUS": "3", "PADDLE_GLOO_RENDEZVOUS": "3",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"FLAGS_selected_gpus": "0", "FLAGS_selected_gpus": "0",
...@@ -1089,7 +1089,7 @@ class ParameterServerLauncher(object): ...@@ -1089,7 +1089,7 @@ class ParameterServerLauncher(object):
"TRAINING_ROLE": "HETER_TRAINER", "TRAINING_ROLE": "HETER_TRAINER",
"PADDLE_TRAINERS_NUM": str(self.worker_num), "PADDLE_TRAINERS_NUM": str(self.worker_num),
"POD_IP": cur_heter_worker.endpoint.split(":")[0], "POD_IP": cur_heter_worker.endpoint.split(":")[0],
"PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "1")), "PADDLE_WITH_GLOO": str(os.getenv("PADDLE_WITH_GLOO", "0")),
"PADDLE_GLOO_RENDEZVOUS": "3", "PADDLE_GLOO_RENDEZVOUS": "3",
"PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir, "PADDLE_GLOO_FS_PATH": self.gloo_rendezvous_dir,
"FLAGS_selected_gpus": "0", "FLAGS_selected_gpus": "0",
......
...@@ -142,21 +142,23 @@ def init_parallel_env(): ...@@ -142,21 +142,23 @@ def init_parallel_env():
_check_var_exists("PADDLE_TRAINER_ENDPOINTS") _check_var_exists("PADDLE_TRAINER_ENDPOINTS")
# 3: init gloo context (step 1: httpsever start) # 3: init gloo context (step 1: httpsever start)
ep_rank_0 = parallel_env.trainer_endpoints[0].split(":") init_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0"))
ep_rank = parallel_env.trainer_endpoints[parallel_env.rank].split(":") if init_gloo:
manager = Manager() ep_rank_0 = parallel_env.trainer_endpoints[0].split(":")
# glboal dict to store status ep_rank = parallel_env.trainer_endpoints[parallel_env.rank].split(":")
http_server_d = manager.dict() manager = Manager()
http_server_d["running"] = False # glboal dict to store status
if parallel_env.rank == 0: http_server_d = manager.dict()
# The scope for worker used by http server is '_worker' http_server_d["running"] = False
size = {'_worker': parallel_env.world_size} if parallel_env.rank == 0:
http_server = Process( # The scope for worker used by http server is '_worker'
target=_start_kv_server, size = {'_worker': parallel_env.world_size}
args=(int(ep_rank_0[1]), http_server_d, size)) http_server = Process(
http_server.daemon = True target=_start_kv_server,
http_server_d["running"] = True args=(int(ep_rank_0[1]), http_server_d, size))
http_server.start() http_server.daemon = True
http_server_d["running"] = True
http_server.start()
# 4. init NCCL ParallelStrategy # 4. init NCCL ParallelStrategy
strategy = ParallelStrategy() strategy = ParallelStrategy()
...@@ -185,22 +187,23 @@ def init_parallel_env(): ...@@ -185,22 +187,23 @@ def init_parallel_env():
# dividing init_gloo into two part beacause nccl and gloo # dividing init_gloo into two part beacause nccl and gloo
# are separately looking for free ports which sometimes # are separately looking for free ports which sometimes
# leads to port-conflict. # leads to port-conflict.
wait_server_ready([parallel_env.trainer_endpoints[0]]) if init_gloo:
wait_server_ready([parallel_env.trainer_endpoints[0]])
gloo_strategy = core.GlooParallelStrategy()
gloo_strategy.rank = parallel_env.rank gloo_strategy = core.GlooParallelStrategy()
gloo_strategy.rank_num = parallel_env.world_size gloo_strategy.rank = parallel_env.rank
gloo_strategy.ip_address = ep_rank_0[0] gloo_strategy.rank_num = parallel_env.world_size
gloo_strategy.ip_port = int(ep_rank_0[1]) gloo_strategy.ip_address = ep_rank_0[0]
default_init_timeout_seconds = 3600 gloo_strategy.ip_port = int(ep_rank_0[1])
default_run_timeout_seconds = 9999999 default_init_timeout_seconds = 3600
gloo_strategy.init_seconds = default_init_timeout_seconds default_run_timeout_seconds = 9999999
gloo_strategy.run_seconds = default_run_timeout_seconds gloo_strategy.init_seconds = default_init_timeout_seconds
gloo = core.GlooParallelContext(gloo_strategy) gloo_strategy.run_seconds = default_run_timeout_seconds
gloo.init() gloo = core.GlooParallelContext(gloo_strategy)
if parallel_env.rank == 0: gloo.init()
http_server_d["running"] = False if parallel_env.rank == 0:
http_server.join() http_server_d["running"] = False
http_server.join()
def get_rank(): def get_rank():
......
...@@ -169,6 +169,7 @@ class TestDistBase(unittest.TestCase): ...@@ -169,6 +169,7 @@ class TestDistBase(unittest.TestCase):
path_id="0", path_id="0",
check_error_log=False, check_error_log=False,
need_envs={}): need_envs={}):
with_gloo = '0' if backend == "nccl" else '1'
required_envs = { required_envs = {
"FLAGS_fraction_of_gpu_memory_to_use": "0.15", "FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_eager_delete_tensor_gb": "0.0", "FLAGS_eager_delete_tensor_gb": "0.0",
...@@ -178,6 +179,7 @@ class TestDistBase(unittest.TestCase): ...@@ -178,6 +179,7 @@ class TestDistBase(unittest.TestCase):
"LD_PRELOAD": os.getenv("LD_PRELOAD", ""), "LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
"GLOG_v": "0", "GLOG_v": "0",
"NCCL_P2P_DISABLE": "1", "NCCL_P2P_DISABLE": "1",
"PADDLE_WITH_GLOO": with_gloo,
"BACKEND": backend, "BACKEND": backend,
"PATH_ID": path_id "PATH_ID": path_id
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册