diff --git a/python/paddle/device/__init__.py b/python/paddle/device/__init__.py index d3bcd56db7415ffbdade9b4815c89e7bd2a8f1bd..fac5d76b2bd4d696285ab3cbb0b19396bb024678 100644 --- a/python/paddle/device/__init__.py +++ b/python/paddle/device/__init__.py @@ -19,7 +19,6 @@ import ctypes import paddle from paddle.fluid import core from paddle.fluid import framework -from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.fluid.framework import is_compiled_with_cinn # noqa: F401 from paddle.fluid.framework import is_compiled_with_cuda # noqa: F401 from paddle.fluid.framework import is_compiled_with_rocm # noqa: F401 @@ -238,7 +237,7 @@ def _convert_to_place(device): "The device should not be 'gpu', " "since PaddlePaddle is not compiled with CUDA" ) - place = core.CUDAPlace(ParallelEnv().dev_id) + place = core.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) elif lower_device == 'xpu': if not core.is_compiled_with_xpu(): raise ValueError( diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index 900cdacb1154662ecd61ce6e0754cd8f6f2d70a9..9be44effbcf46bc841bedacfde13be595c684a7a 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -19,6 +19,7 @@ from .launch.main import launch # noqa: F401 from .parallel import init_parallel_env # noqa: F401 from .parallel import get_rank # noqa: F401 from .parallel import get_world_size # noqa: F401 +from .parallel import ParallelEnv # noqa: F401 from .parallel_with_gloo import gloo_init_parallel_env from .parallel_with_gloo import gloo_barrier @@ -69,11 +70,6 @@ from .entry_attr import ProbabilityEntry # noqa: F401 from .entry_attr import CountFilterEntry # noqa: F401 from .entry_attr import ShowClickEntry # noqa: F401 -# (TODO: GhostScreaming) It needs migration of ParallelEnv. However, -# it's hard to migrate APIs in paddle.fluid.dygraph.parallel completely. -# It will be replaced later. -from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401 - from . import cloud_utils # noqa: F401 from .sharding import group_sharded_parallel # noqa: F401 diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index a270d754597fa1200ffb7d6f4e8df761e2c46e6e..f098564a3082dbcbc6f429a3c664904d0da487d4 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -26,7 +26,6 @@ import paddle.distributed.auto_parallel.utils as auto_utils import paddle.utils as utils from paddle import static from paddle.distributed import fleet -from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.fluid.executor import _to_name_str from paddle.fluid.layers.utils import flatten from paddle.framework import IrGraph @@ -771,7 +770,9 @@ class Engine: self._place = _get_device() if isinstance(self._place, paddle.framework.CUDAPlace): - self._place = paddle.framework.CUDAPlace(ParallelEnv().dev_id) + self._place = paddle.framework.CUDAPlace( + paddle.distributed.ParallelEnv().dev_id + ) if self._strategy.seed: paddle.seed(self._strategy.seed + self._dp_ranks[0]) diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 7dad9831e744d7237cd6ce95416be3c9646f7dad..6cc7bfa581cd94dc2a40ca40bfc079283013228c 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -41,9 +41,6 @@ from paddle.distributed.fleet.launch_utils import check_backend # (TODO: GhostScreaming) It will be removed later. from paddle.fluid import core -# (TODO: GhostScreaming) It will be removed later. -from paddle.fluid.dygraph.parallel import ParallelEnv - # (TODO: GhostScreaming) It will be removed later. from paddle.framework import ( _set_expected_place, @@ -60,6 +57,219 @@ ParallelStrategy = core.ParallelStrategy _global_parallel_env = None +class ParallelEnv: + """ + .. note:: + This API is not recommended, if you need to get rank and world_size, + it is recommended to use ``paddle.distributed.get_rank()`` and + ``paddle.distributed.get_world_size()`` . + + This class is used to obtain the environment variables required for + the parallel execution of ``paddle.nn.Layer`` in dynamic mode. + + The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch`` + or ``paddle.distributed.spawn`` . + + Examples: + .. code-block:: python + + import paddle + import paddle.distributed as dist + + def train(): + # 1. initialize parallel environment + dist.init_parallel_env() + + # 2. get current ParallelEnv + parallel_env = dist.ParallelEnv() + print("rank: ", parallel_env.rank) + print("world_size: ", parallel_env.world_size) + + # print result in process 1: + # rank: 1 + # world_size: 2 + # print result in process 2: + # rank: 2 + # world_size: 2 + + if __name__ == '__main__': + # 1. start by ``paddle.distributed.spawn`` (default) + dist.spawn(train, nprocs=2) + # 2. start by ``paddle.distributed.launch`` + # train() + """ + + def __init__(self): + self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + self._device_type = str(os.getenv("PADDLE_XCCL_BACKEND", "")) + + # imperative only support one gpu or xpu + if self._device_type != "": + FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format( + self._device_type + ) + selected_custom_devices = os.getenv( + FLAGS_selected_custom_devices, "0" + ).split(",") + self._device_id = int(selected_custom_devices[0]) + else: + if core.is_compiled_with_cuda(): + selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",") + self._device_id = int(selected_gpus[0]) + elif core.is_compiled_with_xpu(): + selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",") + self._device_id = int(selected_xpus[0]) + elif core.is_compiled_with_npu(): + selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",") + self._device_id = int(selected_npus[0]) + elif core.is_compiled_with_mlu(): + selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",") + self._device_id = int(selected_mlus[0]) + + self._trainer_endpoints = os.getenv( + "PADDLE_TRAINER_ENDPOINTS", "" + ).split(",") + self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "") + self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1")) + assert ( + self._nrings > 0 + ), "nccl_nrings must be an integer greater than 0." + assert ( + self._nrings < 9 + ), "nccl_nrings should be less than 9, which is enough in most scenarios." + + @property + def rank(self): + """ + Rank of current trainer. + + Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ID`` . The default value is 0. + + Examples: + .. code-block:: python + + # execute this command in terminal: export PADDLE_TRAINER_ID=0 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The rank is %d" % env.rank) + # The rank is 0 + """ + return self._rank + + @property + def world_size(self): + """ + The number of trainers (number of processes participating in current job). + + Its value is equal to the value of the environment variable ``PADDLE_TRAINERS_NUM`` . The default value is 1. + + Examples: + .. code-block:: python + + # execute this command in terminal: export PADDLE_TRAINERS_NUM=4 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The world_size is %d" % env.world_size) + # The world_size is 4 + """ + return self._world_size + + @property + def device_id(self): + """ + The ID of selected GPU card for parallel training. + + Its value is equal to the value of the environment variable ``FLAGS_selected_gpus`` . The default value is 0. + + Examples: + .. code-block:: python + + # execute this command in terminal: export FLAGS_selected_gpus=1 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The device id are %d" % env.device_id) + # The device id are 1 + """ + return self._device_id + + @property + def device_type(self): + """ + The type of custom device for parallel training. + + Its value is equal to the value of the environment variable ``PADDLE_XCCL_BACKEND`` . The default value is None. + + """ + return self._device_type + + @property + def current_endpoint(self): + """ + The endpoint of current trainer, it is in the form of (node IP + port). + + Its value is equal to the value of the environment variable ``PADDLE_CURRENT_ENDPOINT`` . The default value is "". + + Examples: + .. code-block:: python + + # execute this command in terminal: export PADDLE_CURRENT_ENDPOINT=127.0.0.1:6170 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The current endpoint are %s" % env.current_endpoint) + # The current endpoint are 127.0.0.1:6170 + """ + return self._current_endpoint + + @property + def trainer_endpoints(self): + """ + The endpoints of all trainer nodes in the task, + which are used to broadcast the NCCL ID when NCCL2 is initialized. + + Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ENDPOINTS`` . The default value is "". + + Examples: + .. code-block:: python + + # execute this command in terminal: export PADDLE_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The trainer endpoints are %s" % env.trainer_endpoints) + # The trainer endpoints are ['127.0.0.1:6170', '127.0.0.1:6171'] + """ + return self._trainer_endpoints + + @property + def nrings(self): + """ + Nrings of current trainer. + + Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1. + + Examples: + .. code-block:: python + + # execute this command in terminal: export FLAGS_nccl_nrings=1 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The nrings is %d" % env.nrings) + # the number of ring is 1 + """ + return self._nrings + + # [aliases] Compatible with old method names + local_rank = rank + nranks = world_size + dev_id = device_id + + def _get_global_parallel_env(): global _global_parallel_env if _global_parallel_env is None: diff --git a/python/paddle/distributed/passes/auto_parallel_quantization.py b/python/paddle/distributed/passes/auto_parallel_quantization.py index ea4357fdcc568780cc69c56feb5eed0a6fb3d01e..3638b8c4cb861544c6c7d6d7ec7e05d14f8821d4 100644 --- a/python/paddle/distributed/passes/auto_parallel_quantization.py +++ b/python/paddle/distributed/passes/auto_parallel_quantization.py @@ -17,7 +17,6 @@ import logging import numpy as np import paddle -from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.framework import IrGraph, core from paddle.static.quantization import ( AddQuantDequantForInferencePass, @@ -72,7 +71,9 @@ class QuantizationPass(PassBase): # TODO: scope and place will be removed, # cause params should be initialized by engine module. scope = paddle.static.global_scope() - place = paddle.framework.CUDAPlace(ParallelEnv().dev_id) + place = paddle.framework.CUDAPlace( + paddle.distributed.ParallelEnv().dev_id + ) # 0. record the relation among blocks parent_idx_dict = dict() diff --git a/python/paddle/fluid/dataloader/batch_sampler.py b/python/paddle/fluid/dataloader/batch_sampler.py index ff749271e56bb601700246120da5fddb99a1ab79..3e0449719c4cdd20fc94bb85b2a324abf940a143 100644 --- a/python/paddle/fluid/dataloader/batch_sampler.py +++ b/python/paddle/fluid/dataloader/batch_sampler.py @@ -250,7 +250,7 @@ class DistributedBatchSampler(BatchSampler): drop_last, bool ), "drop_last should be a boolean number" - from paddle.fluid.dygraph.parallel import ParallelEnv + from paddle.distributed import ParallelEnv if num_replicas is not None: assert ( diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 90c71abbaaa8ed7e5fedaf46d79af42315343cac..525e90f7a0ff96c28d1e1aef0ad72c0b90e1567b 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -35,236 +35,21 @@ from paddle.fluid.framework import ( in_dygraph_mode, ) -__all__ = ["ParallelEnv", "DataParallel"] +__all__ = ["DataParallel"] ParallelStrategy = core.ParallelStrategy -class ParallelEnv: - """ - .. note:: - This API is not recommended, if you need to get rank and world_size, - it is recommended to use ``paddle.distributed.get_rank()`` and - ``paddle.distributed.get_world_size()`` . - - This class is used to obtain the environment variables required for - the parallel execution of ``paddle.nn.Layer`` in dynamic mode. - - The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch`` - or ``paddle.distributed.spawn`` . - - Examples: - .. code-block:: python - - import paddle - import paddle.distributed as dist - - def train(): - # 1. initialize parallel environment - dist.init_parallel_env() - - # 2. get current ParallelEnv - parallel_env = dist.ParallelEnv() - print("rank: ", parallel_env.rank) - print("world_size: ", parallel_env.world_size) - - # print result in process 1: - # rank: 1 - # world_size: 2 - # print result in process 2: - # rank: 2 - # world_size: 2 - - if __name__ == '__main__': - # 1. start by ``paddle.distributed.spawn`` (default) - dist.spawn(train, nprocs=2) - # 2. start by ``paddle.distributed.launch`` - # train() - """ - - def __init__(self): - self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0")) - self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) - self._device_type = str(os.getenv("PADDLE_XCCL_BACKEND", "")) - - # imperative only support one gpu or xpu - if self._device_type != "": - FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format( - self._device_type - ) - selected_custom_devices = os.getenv( - FLAGS_selected_custom_devices, "0" - ).split(",") - self._device_id = int(selected_custom_devices[0]) - else: - if core.is_compiled_with_cuda(): - selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",") - self._device_id = int(selected_gpus[0]) - elif core.is_compiled_with_xpu(): - selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",") - self._device_id = int(selected_xpus[0]) - elif core.is_compiled_with_npu(): - selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",") - self._device_id = int(selected_npus[0]) - elif core.is_compiled_with_mlu(): - selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",") - self._device_id = int(selected_mlus[0]) - - self._trainer_endpoints = os.getenv( - "PADDLE_TRAINER_ENDPOINTS", "" - ).split(",") - self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "") - self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1")) - assert ( - self._nrings > 0 - ), "nccl_nrings must be an integer greater than 0." - assert ( - self._nrings < 9 - ), "nccl_nrings should be less than 9, which is enough in most scenarios." - - @property - def rank(self): - """ - Rank of current trainer. - - Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ID`` . The default value is 0. - - Examples: - .. code-block:: python - - # execute this command in terminal: export PADDLE_TRAINER_ID=0 - import paddle.distributed as dist - - env = dist.ParallelEnv() - print("The rank is %d" % env.rank) - # The rank is 0 - """ - return self._rank - - @property - def world_size(self): - """ - The number of trainers (number of processes participating in current job). - - Its value is equal to the value of the environment variable ``PADDLE_TRAINERS_NUM`` . The default value is 1. - - Examples: - .. code-block:: python - - # execute this command in terminal: export PADDLE_TRAINERS_NUM=4 - import paddle.distributed as dist - - env = dist.ParallelEnv() - print("The world_size is %d" % env.world_size) - # The world_size is 4 - """ - return self._world_size - - @property - def device_id(self): - """ - The ID of selected GPU card for parallel training. - - Its value is equal to the value of the environment variable ``FLAGS_selected_gpus`` . The default value is 0. - - Examples: - .. code-block:: python - - # execute this command in terminal: export FLAGS_selected_gpus=1 - import paddle.distributed as dist - - env = dist.ParallelEnv() - print("The device id are %d" % env.device_id) - # The device id are 1 - """ - return self._device_id - - @property - def device_type(self): - """ - The type of custom device for parallel training. - - Its value is equal to the value of the environment variable ``PADDLE_XCCL_BACKEND`` . The default value is None. - - """ - return self._device_type - - @property - def current_endpoint(self): - """ - The endpoint of current trainer, it is in the form of (node IP + port). - - Its value is equal to the value of the environment variable ``PADDLE_CURRENT_ENDPOINT`` . The default value is "". - - Examples: - .. code-block:: python - - # execute this command in terminal: export PADDLE_CURRENT_ENDPOINT=127.0.0.1:6170 - import paddle.distributed as dist - - env = dist.ParallelEnv() - print("The current endpoint are %s" % env.current_endpoint) - # The current endpoint are 127.0.0.1:6170 - """ - return self._current_endpoint - - @property - def trainer_endpoints(self): - """ - The endpoints of all trainer nodes in the task, - which are used to broadcast the NCCL ID when NCCL2 is initialized. - - Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ENDPOINTS`` . The default value is "". - - Examples: - .. code-block:: python - - # execute this command in terminal: export PADDLE_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171 - import paddle.distributed as dist - - env = dist.ParallelEnv() - print("The trainer endpoints are %s" % env.trainer_endpoints) - # The trainer endpoints are ['127.0.0.1:6170', '127.0.0.1:6171'] - """ - return self._trainer_endpoints - - @property - def nrings(self): - """ - Nrings of current trainer. - - Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1. - - Examples: - .. code-block:: python - - # execute this command in terminal: export FLAGS_nccl_nrings=1 - import paddle.distributed as dist - - env = dist.ParallelEnv() - print("The nrings is %d" % env.nrings) - # the number of ring is 1 - """ - return self._nrings - - # [aliases] Compatible with old method names - local_rank = rank - nranks = world_size - dev_id = device_id - - -# NOTE: [ Compatible ] Originally this class name is `Env`. The semantics of the old class names -# are inaccurate and may confuse users, so replace it with `ParallelEnv`, but to be compatible -# with the old examples, here still need to keep this name. -Env = ParallelEnv - - def _build_default_parallel_strategy(): strategy = ParallelStrategy() - strategy.nranks = ParallelEnv().nranks - strategy.local_rank = ParallelEnv().local_rank - strategy.trainer_endpoints = ParallelEnv().trainer_endpoints - strategy.current_endpoint = ParallelEnv().current_endpoint + strategy.nranks = paddle.distributed.ParallelEnv().nranks + strategy.local_rank = paddle.distributed.ParallelEnv().local_rank + strategy.trainer_endpoints = ( + paddle.distributed.ParallelEnv().trainer_endpoints + ) + strategy.current_endpoint = ( + paddle.distributed.ParallelEnv().current_endpoint + ) return strategy @@ -318,11 +103,13 @@ def _split_tensors(coalesced_grads_and_grad_vars): def scale_loss(loss): # TODO(liuyuhui) Currently only for xpu. Will be removed in the future. - if not ParallelEnv().world_size > 1: + if not paddle.distributed.ParallelEnv().world_size > 1: return loss loss_scale = to_variable( - np.array([ParallelEnv().world_size]).astype("float32") + np.array([paddle.distributed.ParallelEnv().world_size]).astype( + "float32" + ) ) loss_scale.stop_gradient = True scaled_loss = loss / loss_scale diff --git a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py index 383713e0f5ff52525ff07d42ed27a07721fb943c..0e4181ba04ad3b33cb85727360c4edc90d83871c 100644 --- a/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/process_group_xccl.py @@ -19,17 +19,16 @@ import numpy as np import paddle from paddle.fluid import core -from paddle.fluid.dygraph.parallel import ParallelEnv def init_process_group(strategy=None): - nranks = ParallelEnv().nranks - rank = ParallelEnv().local_rank + nranks = paddle.distributed.ParallelEnv().nranks + rank = paddle.distributed.ParallelEnv().local_rank is_master = True if rank == 0 else False store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) pg_group = core.ProcessGroupCustom.create( store, - ParallelEnv().device_type, + paddle.distributed.ParallelEnv().device_type, rank, nranks, ) diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py index 1f90f90b2fb72b36b6f60a8be6588f4529b60927..388ab592e99330a532f0d0504e9c35bab7729b93 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/amp_pass_unittest.py @@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv def apply_pass(use_amp=False, level=None): @@ -62,7 +61,7 @@ class TestAMPPass(unittest.TestCase): paddle.seed(2021) np.random.seed(2021) random.seed(2021) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_amp=False, level=None): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/clip_grad_by_global_norm.py b/python/paddle/fluid/tests/unittests/auto_parallel/clip_grad_by_global_norm.py index baae57b84af996561c760ab1ca0cd4bb7e56c176..11fe954b7b51ad8a68defde7d14e2a7c4e23c819 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/clip_grad_by_global_norm.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/clip_grad_by_global_norm.py @@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv paddle.enable_static() @@ -73,7 +72,7 @@ class TestGradientClipByGlobalNorm(unittest.TestCase): paddle.seed(2022) np.random.seed(2022) random.seed(2022) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_sharding=False): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py index 2a6d61d96167c71364400208a1261d25476e89b4..adf40a236a852dde0c7e0f358ec0a42ddc6def31 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/gradient_merge_pass_unittest.py @@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv paddle.enable_static() @@ -56,7 +55,7 @@ class TestGradientMergePass(unittest.TestCase): paddle.seed(2021) np.random.seed(2021) random.seed(2021) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_gradient_merge=False): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py index ae15ec02d6dcd18a9b1c253514e8ad7322533894..c698ea3d7084f2a379db0b0cc506b72b3db931b6 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/recompute_pass_unittest.py @@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv def apply_pass(use_recompute=False, no_recompute_segments=[]): @@ -52,7 +51,7 @@ class TestRecomputePass(unittest.TestCase): paddle.seed(2022) np.random.seed(2022) random.seed(2022) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_recompute=False, no_recompute_segments=[]): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/sharding_newexe.py b/python/paddle/fluid/tests/unittests/auto_parallel/sharding_newexe.py index ca76daada5704396fd8c632042ffcc1fd1f26a11..48690f585cbebfd04a3ed74f0dd3b6846c6443c6 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/sharding_newexe.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/sharding_newexe.py @@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv paddle.enable_static() @@ -83,7 +82,7 @@ class TestShardingStage2WithNewEXE(unittest.TestCase): paddle.seed(2022) np.random.seed(2022) random.seed(2022) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine( diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py b/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py index a77837ec209b8ba2c4722e53ad865fbca9e90847..4ecc551124db72c498f8d53326be4095e3870a03 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/sharding_pass_unittest.py @@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv paddle.enable_static() @@ -56,7 +55,7 @@ class TestShardingPass(unittest.TestCase): paddle.seed(2022) np.random.seed(2022) random.seed(2022) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_sharding=False, stage=None): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_fused_linear_pass.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_fused_linear_pass.py index aad5922a767a56652a666d53632b921d20b47642..d2da582ef15b0d0c091cd6464b3108218167c85c 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_fused_linear_pass.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_fused_linear_pass.py @@ -21,7 +21,6 @@ from get_gpt_model import FakeDataset, generate_model import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv sys.path.append("..") from test_sparse_addmm_op import get_cuda_version @@ -55,7 +54,7 @@ class TestFusedLinearPass(unittest.TestCase): paddle.seed(2021) np.random.seed(2021) random.seed(2021) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_fused_passes=False, fused_passes_list=[]): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_pass_bf16.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_pass_bf16.py index 2c72ccd9386c1546f208c2ec3be90c41d8853395..b9d744ff0287796e7796f54c0d3f3170f23d410e 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_pass_bf16.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_pass_bf16.py @@ -21,7 +21,6 @@ import paddle import paddle.fluid.core as core import paddle.nn as nn from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.static import InputSpec from paddle.static.amp.bf16.amp_utils import _valid_types from paddle.static.amp.fp16_utils import find_true_prev_op @@ -90,7 +89,7 @@ class TestBF16Pass(unittest.TestCase): paddle.seed(2021) np.random.seed(2021) random.seed(2021) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine(self, use_bf16=False): diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_selective_recompute.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_selective_recompute.py index 64563314ac2cadea58adc69ee6c1be3659ae3223..73d58c7a527e7219ae5958c2aa179286fb6b971d 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_selective_recompute.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_selective_recompute.py @@ -21,7 +21,6 @@ from get_gpt_model import FakeDataset import paddle from paddle.distributed.fleet import auto -from paddle.fluid.dygraph.parallel import ParallelEnv sys.path.append("..") import auto_parallel_gpt_model as modeling @@ -92,7 +91,7 @@ class TestRecomputePassWithRecomputeAPI(unittest.TestCase): paddle.seed(2022) np.random.seed(2022) random.seed(2022) - place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) def get_engine( diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py index c54101baec69212ec707eeb59b11ad12752aa7aa..c657088f3374488058a4047aab948fed1c82c1ea 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py @@ -19,7 +19,6 @@ import numpy as np import paddle from paddle.fluid import core -from paddle.fluid.dygraph.parallel import ParallelEnv class TestProcessGroupFp32(unittest.TestCase): @@ -34,8 +33,8 @@ class TestProcessGroupFp32(unittest.TestCase): self.shape = (2, 10, 5) def test_create_process_group_gloo(self): - nranks = ParallelEnv().nranks - rank = ParallelEnv().local_rank + nranks = paddle.distributed.ParallelEnv().nranks + rank = paddle.distributed.ParallelEnv().local_rank is_master = True if rank == 0 else False store = paddle.fluid.core.TCPStore( "127.0.0.1", 6272, is_master, nranks, 30 diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index 3be3cfecf16d6ef3e19ef989b1065d592529eb90..713e0a01b4abbf5be4c5306c5c3603376333a933 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -19,12 +19,11 @@ import numpy as np import paddle import paddle.distributed as dist -from paddle.fluid.dygraph.parallel import ParallelEnv def init_process_group(strategy=None): - nranks = ParallelEnv().nranks - rank = ParallelEnv().local_rank + nranks = paddle.distributed.ParallelEnv().nranks + rank = dist.ParallelEnv().local_rank is_master = True if rank == 0 else False pg_group = dist.init_parallel_env() diff --git a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py index f7833199180ecc356adcd238b7d600785a0edf20..49fe7c97d0a1043cdbc7219f8b39b07314ec1d9f 100644 --- a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py +++ b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py @@ -20,12 +20,11 @@ import numpy as np import paddle import paddle.distributed as dist -from paddle.fluid.dygraph.parallel import ParallelEnv def init_process_group(strategy=None): - nranks = ParallelEnv().nranks - rank = ParallelEnv().local_rank + nranks = paddle.distributed.ParallelEnv().nranks + rank = dist.ParallelEnv().local_rank is_master = True if rank == 0 else False pg_group = dist.init_parallel_env() diff --git a/python/paddle/hapi/callbacks.py b/python/paddle/hapi/callbacks.py index 2d069a39e5f4d8ae7fe35317ce83908d6d2b67c4..1cd1d224f4ed2843ed86b5a8d6b64f4345acd4ad 100644 --- a/python/paddle/hapi/callbacks.py +++ b/python/paddle/hapi/callbacks.py @@ -20,7 +20,6 @@ import warnings import numpy as np import paddle -from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.utils import try_import from .progressbar import ProgressBar @@ -350,7 +349,7 @@ class ProgBarLogger(Callback): self.log_freq = log_freq def _is_print(self): - return self.verbose and ParallelEnv().local_rank == 0 + return self.verbose and paddle.distributed.ParallelEnv().local_rank == 0 def on_train_begin(self, logs=None): self.epochs = self.params['epochs'] @@ -598,7 +597,11 @@ class ModelCheckpoint(Callback): self.epoch = epoch def _is_save(self): - return self.model and self.save_dir and ParallelEnv().local_rank == 0 + return ( + self.model + and self.save_dir + and paddle.distributed.ParallelEnv().local_rank == 0 + ) def on_epoch_end(self, epoch, logs=None): if self._is_save() and self.epoch % self.save_freq == 0: @@ -922,7 +925,7 @@ class VisualDL(Callback): self.epoch = 0 def _is_write(self): - return ParallelEnv().local_rank == 0 + return paddle.distributed.ParallelEnv().local_rank == 0 def on_train_begin(self, logs=None): self.epochs = self.params['epochs'] @@ -1074,7 +1077,7 @@ class WandbCallback(Callback): _ = self.run def _is_write(self): - return ParallelEnv().local_rank == 0 + return paddle.distributed.ParallelEnv().local_rank == 0 @property def run(self): @@ -1333,7 +1336,10 @@ class ReduceLROnPlateau(Callback): new_lr = old_lr * self.factor new_lr = max(new_lr, self.min_lr) self.model._optimizer._learning_rate = new_lr - if self.verbose > 0 and ParallelEnv().local_rank == 0: + if ( + self.verbose > 0 + and paddle.distributed.ParallelEnv().local_rank == 0 + ): print( '\nEpoch %d: ReduceLROnPlateau reducing learning ' 'rate to %s.' % (self.epoch + 1, new_lr) diff --git a/python/paddle/hapi/logger.py b/python/paddle/hapi/logger.py index ac6f29b338d43e58d25849de963417cf8d3b715a..25a6bbcbf1008998bc5a1f3a107445c82f6418d8 100644 --- a/python/paddle/hapi/logger.py +++ b/python/paddle/hapi/logger.py @@ -16,8 +16,6 @@ import logging import os import sys -from paddle.fluid.dygraph.parallel import ParallelEnv - __all__ = [] @@ -40,7 +38,7 @@ def setup_logger(output=None, name="hapi", log_level=logging.INFO): format_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # stdout logging: only local rank==0 - local_rank = ParallelEnv().local_rank + local_rank = int(os.getenv("PADDLE_TRAINER_ID", "0")) if local_rank == 0 and len(logger.handlers) == 0: ch = logging.StreamHandler(stream=sys.stdout) ch.setLevel(log_level) diff --git a/python/paddle/hapi/model.py b/python/paddle/hapi/model.py index 9222090d1ae932588b7e6c4350515cda342c9ba2..d5bad6a977639daebd11eda9b667a3d1478f951a 100644 --- a/python/paddle/hapi/model.py +++ b/python/paddle/hapi/model.py @@ -30,7 +30,6 @@ from paddle.autograd import no_grad from paddle.distributed.fleet.base import role_maker from paddle.fluid import core from paddle.fluid.dygraph.base import to_variable -from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.fluid.executor import global_scope from paddle.fluid.framework import Variable from paddle.fluid.framework import _current_expected_place as _get_device @@ -190,17 +189,21 @@ def init_communicator( def prepare_distributed_context(place=None): if place is None: place = ( - fluid.CUDAPlace(ParallelEnv().dev_id) - if ParallelEnv().nranks > 1 + fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id) + if paddle.distributed.ParallelEnv().nranks > 1 else fluid.CUDAPlace(0) ) place = _get_paddle_place(place) strategy = fluid.dygraph.parallel.ParallelStrategy() - strategy.nranks = ParallelEnv().nranks - strategy.local_rank = ParallelEnv().local_rank - strategy.trainer_endpoints = ParallelEnv().trainer_endpoints - strategy.current_endpoint = ParallelEnv().current_endpoint + strategy.nranks = paddle.distributed.ParallelEnv().nranks + strategy.local_rank = paddle.distributed.ParallelEnv().local_rank + strategy.trainer_endpoints = ( + paddle.distributed.ParallelEnv().trainer_endpoints + ) + strategy.current_endpoint = ( + paddle.distributed.ParallelEnv().current_endpoint + ) if strategy.nranks < 2: return @@ -282,8 +285,8 @@ class StaticGraphAdapter: 'test_batch': 0, } - self._nranks = ParallelEnv().nranks - self._local_rank = ParallelEnv().local_rank + self._nranks = paddle.distributed.ParallelEnv().nranks + self._local_rank = paddle.distributed.ParallelEnv().local_rank self._amp_level = "O0" self._amp_configs = {} @@ -733,8 +736,8 @@ class DynamicGraphAdapter: def __init__(self, model): super().__init__() self.model = model - self._nranks = ParallelEnv().nranks - self._local_rank = ParallelEnv().local_rank + self._nranks = paddle.distributed.ParallelEnv().nranks + self._local_rank = paddle.distributed.ParallelEnv().local_rank self._merge_count = { 'eval_total': 0, 'test_total': 0, @@ -751,10 +754,14 @@ class DynamicGraphAdapter: if self._nranks > 1: dist.init_parallel_env() stradegy = fluid.dygraph.parallel.ParallelStrategy() - stradegy.nranks = ParallelEnv().nranks - stradegy.local_rank = ParallelEnv().local_rank - stradegy.trainer_endpoints = ParallelEnv().trainer_endpoints - stradegy.current_endpoint = ParallelEnv().current_endpoint + stradegy.nranks = paddle.distributed.ParallelEnv().nranks + stradegy.local_rank = paddle.distributed.ParallelEnv().local_rank + stradegy.trainer_endpoints = ( + paddle.distributed.ParallelEnv().trainer_endpoints + ) + stradegy.current_endpoint = ( + paddle.distributed.ParallelEnv().current_endpoint + ) self.ddp_model = fluid.dygraph.parallel.DataParallel( self.model.network, stradegy ) @@ -1373,7 +1380,7 @@ class Model: """ - if ParallelEnv().local_rank == 0: + if paddle.distributed.ParallelEnv().local_rank == 0: if not training: self._save_inference_model(path) else: @@ -1657,7 +1664,10 @@ class Model: self._place = _get_device() if isinstance(self._place, fluid.CUDAPlace): global _parallel_context_initialized - if ParallelEnv().nranks > 1 and not _parallel_context_initialized: + if ( + paddle.distributed.ParallelEnv().nranks > 1 + and not _parallel_context_initialized + ): if fluid._non_static_mode(): main_prog_seed = fluid.default_main_program().random_seed startup_prog_seed = ( @@ -2307,7 +2317,9 @@ class Model: mode == 'train' or self._adapter._merge_count.get(mode + '_batch', 0) <= 0 ): - logs['batch_size'] = batch_size * ParallelEnv().nranks + logs['batch_size'] = ( + batch_size * paddle.distributed.ParallelEnv().nranks + ) else: logs['batch_size'] = self._adapter._merge_count[mode + '_batch'] diff --git a/python/paddle/utils/download.py b/python/paddle/utils/download.py index a7f6883c9749854bfd25d6116f7f439a6f886e59..9c82531565f24a317f1bf3d08303ab3e2a2707db 100644 --- a/python/paddle/utils/download.py +++ b/python/paddle/utils/download.py @@ -136,7 +136,7 @@ def get_path_from_url( str: a local path to save downloaded models & weights & datasets. """ - from paddle.fluid.dygraph.parallel import ParallelEnv + from paddle.distributed import ParallelEnv assert is_url(url), "downloading from {} not a url".format(url) # parse path after download to decompress under root_dir