未验证 提交 cf515d90 编写于 作者: C Chitsing KUI 提交者: GitHub

[LAUNCH] no endpoints env in dynamic mode (#54636)

* no endpoints in dy mode

* fix fleet api inconsistent
上级 855650ec
...@@ -272,20 +272,7 @@ class Fleet: ...@@ -272,20 +272,7 @@ class Fleet:
self.strategy_compiler = StrategyCompiler() self.strategy_compiler = StrategyCompiler()
if self._role_maker._is_non_distributed() and self._is_collective:
if paddle.framework.core.is_compiled_with_cuda():
gpus_num = paddle.framework.core.get_cuda_device_count()
if gpus_num != 1:
raise ValueError(
"CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program."
)
if in_dynamic_mode(): if in_dynamic_mode():
if self.worker_num() == 1:
# if worker_num is 1, should construct default topology & hcg
self._topology = tp.CommunicateTopology()
self._hcg = tp.HybridCommunicateGroup(self._topology)
return
if parallel_helper._is_parallel_ctx_initialized(): if parallel_helper._is_parallel_ctx_initialized():
logger.warning( logger.warning(
"The dygraph parallel environment has been initialized." "The dygraph parallel environment has been initialized."
......
...@@ -85,7 +85,7 @@ def distributed_model(model): ...@@ -85,7 +85,7 @@ def distributed_model(model):
fleet_env = fleet.fleet fleet_env = fleet.fleet
assert model is not None, "model should not be None" assert model is not None, "model should not be None"
if fleet_env.worker_num() <= 1: if paddle.distributed.get_world_size() <= 1:
return model return model
amp_enable = False amp_enable = False
......
...@@ -120,12 +120,14 @@ class CollectiveController(Controller): ...@@ -120,12 +120,14 @@ class CollectiveController(Controller):
"PADDLE_LOCAL_RANK": f"{i}", "PADDLE_LOCAL_RANK": f"{i}",
"PADDLE_NNODES": f"{len(ips)}", "PADDLE_NNODES": f"{len(ips)}",
# compatible env # compatible env
"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints),
"PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset], "PADDLE_CURRENT_ENDPOINT": job_endpoints[i + rank_offset],
"PADDLE_TRAINER_ID": f"{i + rank_offset}", "PADDLE_TRAINER_ID": f"{i + rank_offset}",
"PADDLE_TRAINERS_NUM": f"{len(job_endpoints)}", "PADDLE_TRAINERS_NUM": f"{len(job_endpoints)}",
"PADDLE_RANK_IN_NODE": str(i), "PADDLE_RANK_IN_NODE": str(i),
} }
if len(",".join(job_endpoints)) < 120 * 1024:
e.update({"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints)})
if self._tuner_run_mode is not None: if self._tuner_run_mode is not None:
e.update( e.update(
{ {
...@@ -213,12 +215,14 @@ class CollectiveController(Controller): ...@@ -213,12 +215,14 @@ class CollectiveController(Controller):
"PADDLE_LOCAL_RANK": f"{i}", "PADDLE_LOCAL_RANK": f"{i}",
"PADDLE_NNODES": f"{self.job.replicas}", "PADDLE_NNODES": f"{self.job.replicas}",
# compatible env # compatible env
"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints),
"PADDLE_CURRENT_ENDPOINT": endpoints[i], "PADDLE_CURRENT_ENDPOINT": endpoints[i],
"PADDLE_TRAINER_ID": f"{i + rank_offset}", "PADDLE_TRAINER_ID": f"{i + rank_offset}",
"PADDLE_TRAINERS_NUM": f"{global_size}", "PADDLE_TRAINERS_NUM": f"{global_size}",
"PADDLE_RANK_IN_NODE": str(i), "PADDLE_RANK_IN_NODE": str(i),
} }
if len(",".join(job_endpoints)) < 120 * 1024:
e.update({"PADDLE_TRAINER_ENDPOINTS": ",".join(job_endpoints)})
if self._tuner_run_mode is not None: if self._tuner_run_mode is not None:
e.update( e.update(
{ {
......
...@@ -38,7 +38,11 @@ from paddle.distributed.collective import ( ...@@ -38,7 +38,11 @@ from paddle.distributed.collective import (
_set_group_map_by_name, _set_group_map_by_name,
_valid_backend_list, _valid_backend_list,
) )
from paddle.distributed.communication.group import _add_new_group from paddle.distributed.communication.group import (
_add_new_group,
_get_global_group,
is_initialized,
)
from paddle.distributed.fleet.base.private_helper_function import ( # noqa: F401 from paddle.distributed.fleet.base.private_helper_function import ( # noqa: F401
wait_server_ready, wait_server_ready,
) )
...@@ -1017,7 +1021,6 @@ def init_parallel_env(): ...@@ -1017,7 +1021,6 @@ def init_parallel_env():
_check_var_exists("PADDLE_TRAINER_ID") _check_var_exists("PADDLE_TRAINER_ID")
_check_var_exists("PADDLE_CURRENT_ENDPOINT") _check_var_exists("PADDLE_CURRENT_ENDPOINT")
_check_var_exists("PADDLE_TRAINERS_NUM") _check_var_exists("PADDLE_TRAINERS_NUM")
_check_var_exists("PADDLE_TRAINER_ENDPOINTS")
# NOTE(chenweihang): [ why config global place here? ] # NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode, # the dygraph mode will be set to default mode,
...@@ -1242,6 +1245,10 @@ def get_world_size(group=None): ...@@ -1242,6 +1245,10 @@ def get_world_size(group=None):
print("The world_size is %d" % dist.get_world_size()) print("The world_size is %d" % dist.get_world_size())
# The world_size is 1 # The world_size is 1
""" """
if in_dynamic_mode() and (group is None):
if is_initialized():
group = _get_global_group()
if in_dynamic_mode() and group: if in_dynamic_mode() and group:
return group.world_size return group.world_size
......
...@@ -37,7 +37,7 @@ class TestInitParallelEnv(unittest.TestCase): ...@@ -37,7 +37,7 @@ class TestInitParallelEnv(unittest.TestCase):
def test_check_env_failed(self): def test_check_env_failed(self):
os.environ['FLAGS_selected_gpus'] = '0' os.environ['FLAGS_selected_gpus'] = '0'
os.environ['PADDLE_TRAINER_ID'] = '0' os.environ['PADDLE_TRAINER_ID'] = '0'
os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170' # os.environ['PADDLE_CURRENT_ENDPOINT'] = '127.0.0.1:6170'
os.environ['PADDLE_TRAINERS_NUM'] = '2' os.environ['PADDLE_TRAINERS_NUM'] = '2'
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
dist.init_parallel_env() dist.init_parallel_env()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册