未验证 提交 9dd1f4bf 编写于 作者: Z zqw_1997 提交者: GitHub

remove paddle.fluid.dygraph.parallel.ParallelEnv (#50157)

* remove dygraph.parallel.ParallelEnv

* logger.py error: AttributeError: module 'paddle' has no attribute 'distributed'

* move the implenmentation to the root folder

* logger.py import ParallelEnv from paddle.parallel to avoid circular import

* add the comment of why import ParallelEnv from paddle.parallel in logger.py and remove the api interface in the paddle/parallel.py

* outdated Env and note removed

* decouple the logger.py and ParallelEnv

* remove another ref of parallel in init.py
上级 17318c1a
...@@ -19,7 +19,6 @@ import ctypes ...@@ -19,7 +19,6 @@ import ctypes
import paddle import paddle
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid import framework from paddle.fluid import framework
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.fluid.framework import is_compiled_with_cinn # noqa: F401 from paddle.fluid.framework import is_compiled_with_cinn # noqa: F401
from paddle.fluid.framework import is_compiled_with_cuda # noqa: F401 from paddle.fluid.framework import is_compiled_with_cuda # noqa: F401
from paddle.fluid.framework import is_compiled_with_rocm # noqa: F401 from paddle.fluid.framework import is_compiled_with_rocm # noqa: F401
...@@ -238,7 +237,7 @@ def _convert_to_place(device): ...@@ -238,7 +237,7 @@ def _convert_to_place(device):
"The device should not be 'gpu', " "The device should not be 'gpu', "
"since PaddlePaddle is not compiled with CUDA" "since PaddlePaddle is not compiled with CUDA"
) )
place = core.CUDAPlace(ParallelEnv().dev_id) place = core.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
elif lower_device == 'xpu': elif lower_device == 'xpu':
if not core.is_compiled_with_xpu(): if not core.is_compiled_with_xpu():
raise ValueError( raise ValueError(
......
...@@ -19,6 +19,7 @@ from .launch.main import launch # noqa: F401 ...@@ -19,6 +19,7 @@ from .launch.main import launch # noqa: F401
from .parallel import init_parallel_env # noqa: F401 from .parallel import init_parallel_env # noqa: F401
from .parallel import get_rank # noqa: F401 from .parallel import get_rank # noqa: F401
from .parallel import get_world_size # noqa: F401 from .parallel import get_world_size # noqa: F401
from .parallel import ParallelEnv # noqa: F401
from .parallel_with_gloo import gloo_init_parallel_env from .parallel_with_gloo import gloo_init_parallel_env
from .parallel_with_gloo import gloo_barrier from .parallel_with_gloo import gloo_barrier
...@@ -69,11 +70,6 @@ from .entry_attr import ProbabilityEntry # noqa: F401 ...@@ -69,11 +70,6 @@ from .entry_attr import ProbabilityEntry # noqa: F401
from .entry_attr import CountFilterEntry # noqa: F401 from .entry_attr import CountFilterEntry # noqa: F401
from .entry_attr import ShowClickEntry # noqa: F401 from .entry_attr import ShowClickEntry # noqa: F401
# (TODO: GhostScreaming) It needs migration of ParallelEnv. However,
# it's hard to migrate APIs in paddle.fluid.dygraph.parallel completely.
# It will be replaced later.
from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401
from . import cloud_utils # noqa: F401 from . import cloud_utils # noqa: F401
from .sharding import group_sharded_parallel # noqa: F401 from .sharding import group_sharded_parallel # noqa: F401
......
...@@ -26,7 +26,6 @@ import paddle.distributed.auto_parallel.utils as auto_utils ...@@ -26,7 +26,6 @@ import paddle.distributed.auto_parallel.utils as auto_utils
import paddle.utils as utils import paddle.utils as utils
from paddle import static from paddle import static
from paddle.distributed import fleet from paddle.distributed import fleet
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.fluid.executor import _to_name_str from paddle.fluid.executor import _to_name_str
from paddle.fluid.layers.utils import flatten from paddle.fluid.layers.utils import flatten
from paddle.framework import IrGraph from paddle.framework import IrGraph
...@@ -771,7 +770,9 @@ class Engine: ...@@ -771,7 +770,9 @@ class Engine:
self._place = _get_device() self._place = _get_device()
if isinstance(self._place, paddle.framework.CUDAPlace): if isinstance(self._place, paddle.framework.CUDAPlace):
self._place = paddle.framework.CUDAPlace(ParallelEnv().dev_id) self._place = paddle.framework.CUDAPlace(
paddle.distributed.ParallelEnv().dev_id
)
if self._strategy.seed: if self._strategy.seed:
paddle.seed(self._strategy.seed + self._dp_ranks[0]) paddle.seed(self._strategy.seed + self._dp_ranks[0])
......
...@@ -41,9 +41,6 @@ from paddle.distributed.fleet.launch_utils import check_backend ...@@ -41,9 +41,6 @@ from paddle.distributed.fleet.launch_utils import check_backend
# (TODO: GhostScreaming) It will be removed later. # (TODO: GhostScreaming) It will be removed later.
from paddle.fluid import core from paddle.fluid import core
# (TODO: GhostScreaming) It will be removed later.
from paddle.fluid.dygraph.parallel import ParallelEnv
# (TODO: GhostScreaming) It will be removed later. # (TODO: GhostScreaming) It will be removed later.
from paddle.framework import ( from paddle.framework import (
_set_expected_place, _set_expected_place,
...@@ -60,6 +57,219 @@ ParallelStrategy = core.ParallelStrategy ...@@ -60,6 +57,219 @@ ParallelStrategy = core.ParallelStrategy
_global_parallel_env = None _global_parallel_env = None
class ParallelEnv:
"""
.. note::
This API is not recommended, if you need to get rank and world_size,
it is recommended to use ``paddle.distributed.get_rank()`` and
``paddle.distributed.get_world_size()`` .
This class is used to obtain the environment variables required for
the parallel execution of ``paddle.nn.Layer`` in dynamic mode.
The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch``
or ``paddle.distributed.spawn`` .
Examples:
.. code-block:: python
import paddle
import paddle.distributed as dist
def train():
# 1. initialize parallel environment
dist.init_parallel_env()
# 2. get current ParallelEnv
parallel_env = dist.ParallelEnv()
print("rank: ", parallel_env.rank)
print("world_size: ", parallel_env.world_size)
# print result in process 1:
# rank: 1
# world_size: 2
# print result in process 2:
# rank: 2
# world_size: 2
if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
"""
def __init__(self):
self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
self._device_type = str(os.getenv("PADDLE_XCCL_BACKEND", ""))
# imperative only support one gpu or xpu
if self._device_type != "":
FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format(
self._device_type
)
selected_custom_devices = os.getenv(
FLAGS_selected_custom_devices, "0"
).split(",")
self._device_id = int(selected_custom_devices[0])
else:
if core.is_compiled_with_cuda():
selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",")
self._device_id = int(selected_gpus[0])
elif core.is_compiled_with_xpu():
selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",")
self._device_id = int(selected_xpus[0])
elif core.is_compiled_with_npu():
selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",")
self._device_id = int(selected_npus[0])
elif core.is_compiled_with_mlu():
selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",")
self._device_id = int(selected_mlus[0])
self._trainer_endpoints = os.getenv(
"PADDLE_TRAINER_ENDPOINTS", ""
).split(",")
self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "")
self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1"))
assert (
self._nrings > 0
), "nccl_nrings must be an integer greater than 0."
assert (
self._nrings < 9
), "nccl_nrings should be less than 9, which is enough in most scenarios."
@property
def rank(self):
"""
Rank of current trainer.
Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ID`` . The default value is 0.
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_TRAINER_ID=0
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The rank is %d" % env.rank)
# The rank is 0
"""
return self._rank
@property
def world_size(self):
"""
The number of trainers (number of processes participating in current job).
Its value is equal to the value of the environment variable ``PADDLE_TRAINERS_NUM`` . The default value is 1.
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_TRAINERS_NUM=4
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The world_size is %d" % env.world_size)
# The world_size is 4
"""
return self._world_size
@property
def device_id(self):
"""
The ID of selected GPU card for parallel training.
Its value is equal to the value of the environment variable ``FLAGS_selected_gpus`` . The default value is 0.
Examples:
.. code-block:: python
# execute this command in terminal: export FLAGS_selected_gpus=1
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The device id are %d" % env.device_id)
# The device id are 1
"""
return self._device_id
@property
def device_type(self):
"""
The type of custom device for parallel training.
Its value is equal to the value of the environment variable ``PADDLE_XCCL_BACKEND`` . The default value is None.
"""
return self._device_type
@property
def current_endpoint(self):
"""
The endpoint of current trainer, it is in the form of (node IP + port).
Its value is equal to the value of the environment variable ``PADDLE_CURRENT_ENDPOINT`` . The default value is "".
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_CURRENT_ENDPOINT=127.0.0.1:6170
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The current endpoint are %s" % env.current_endpoint)
# The current endpoint are 127.0.0.1:6170
"""
return self._current_endpoint
@property
def trainer_endpoints(self):
"""
The endpoints of all trainer nodes in the task,
which are used to broadcast the NCCL ID when NCCL2 is initialized.
Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ENDPOINTS`` . The default value is "".
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The trainer endpoints are %s" % env.trainer_endpoints)
# The trainer endpoints are ['127.0.0.1:6170', '127.0.0.1:6171']
"""
return self._trainer_endpoints
@property
def nrings(self):
"""
Nrings of current trainer.
Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1.
Examples:
.. code-block:: python
# execute this command in terminal: export FLAGS_nccl_nrings=1
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The nrings is %d" % env.nrings)
# the number of ring is 1
"""
return self._nrings
# [aliases] Compatible with old method names
local_rank = rank
nranks = world_size
dev_id = device_id
def _get_global_parallel_env(): def _get_global_parallel_env():
global _global_parallel_env global _global_parallel_env
if _global_parallel_env is None: if _global_parallel_env is None:
......
...@@ -17,7 +17,6 @@ import logging ...@@ -17,7 +17,6 @@ import logging
import numpy as np import numpy as np
import paddle import paddle
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.framework import IrGraph, core from paddle.framework import IrGraph, core
from paddle.static.quantization import ( from paddle.static.quantization import (
AddQuantDequantForInferencePass, AddQuantDequantForInferencePass,
...@@ -72,7 +71,9 @@ class QuantizationPass(PassBase): ...@@ -72,7 +71,9 @@ class QuantizationPass(PassBase):
# TODO: scope and place will be removed, # TODO: scope and place will be removed,
# cause params should be initialized by engine module. # cause params should be initialized by engine module.
scope = paddle.static.global_scope() scope = paddle.static.global_scope()
place = paddle.framework.CUDAPlace(ParallelEnv().dev_id) place = paddle.framework.CUDAPlace(
paddle.distributed.ParallelEnv().dev_id
)
# 0. record the relation among blocks # 0. record the relation among blocks
parent_idx_dict = dict() parent_idx_dict = dict()
......
...@@ -250,7 +250,7 @@ class DistributedBatchSampler(BatchSampler): ...@@ -250,7 +250,7 @@ class DistributedBatchSampler(BatchSampler):
drop_last, bool drop_last, bool
), "drop_last should be a boolean number" ), "drop_last should be a boolean number"
from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.distributed import ParallelEnv
if num_replicas is not None: if num_replicas is not None:
assert ( assert (
......
...@@ -35,236 +35,21 @@ from paddle.fluid.framework import ( ...@@ -35,236 +35,21 @@ from paddle.fluid.framework import (
in_dygraph_mode, in_dygraph_mode,
) )
__all__ = ["ParallelEnv", "DataParallel"] __all__ = ["DataParallel"]
ParallelStrategy = core.ParallelStrategy ParallelStrategy = core.ParallelStrategy
class ParallelEnv:
"""
.. note::
This API is not recommended, if you need to get rank and world_size,
it is recommended to use ``paddle.distributed.get_rank()`` and
``paddle.distributed.get_world_size()`` .
This class is used to obtain the environment variables required for
the parallel execution of ``paddle.nn.Layer`` in dynamic mode.
The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch``
or ``paddle.distributed.spawn`` .
Examples:
.. code-block:: python
import paddle
import paddle.distributed as dist
def train():
# 1. initialize parallel environment
dist.init_parallel_env()
# 2. get current ParallelEnv
parallel_env = dist.ParallelEnv()
print("rank: ", parallel_env.rank)
print("world_size: ", parallel_env.world_size)
# print result in process 1:
# rank: 1
# world_size: 2
# print result in process 2:
# rank: 2
# world_size: 2
if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
"""
def __init__(self):
self._rank = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._world_size = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
self._device_type = str(os.getenv("PADDLE_XCCL_BACKEND", ""))
# imperative only support one gpu or xpu
if self._device_type != "":
FLAGS_selected_custom_devices = 'FLAGS_selected_{}s'.format(
self._device_type
)
selected_custom_devices = os.getenv(
FLAGS_selected_custom_devices, "0"
).split(",")
self._device_id = int(selected_custom_devices[0])
else:
if core.is_compiled_with_cuda():
selected_gpus = os.getenv("FLAGS_selected_gpus", "0").split(",")
self._device_id = int(selected_gpus[0])
elif core.is_compiled_with_xpu():
selected_xpus = os.getenv("FLAGS_selected_xpus", "0").split(",")
self._device_id = int(selected_xpus[0])
elif core.is_compiled_with_npu():
selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",")
self._device_id = int(selected_npus[0])
elif core.is_compiled_with_mlu():
selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",")
self._device_id = int(selected_mlus[0])
self._trainer_endpoints = os.getenv(
"PADDLE_TRAINER_ENDPOINTS", ""
).split(",")
self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "")
self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1"))
assert (
self._nrings > 0
), "nccl_nrings must be an integer greater than 0."
assert (
self._nrings < 9
), "nccl_nrings should be less than 9, which is enough in most scenarios."
@property
def rank(self):
"""
Rank of current trainer.
Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ID`` . The default value is 0.
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_TRAINER_ID=0
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The rank is %d" % env.rank)
# The rank is 0
"""
return self._rank
@property
def world_size(self):
"""
The number of trainers (number of processes participating in current job).
Its value is equal to the value of the environment variable ``PADDLE_TRAINERS_NUM`` . The default value is 1.
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_TRAINERS_NUM=4
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The world_size is %d" % env.world_size)
# The world_size is 4
"""
return self._world_size
@property
def device_id(self):
"""
The ID of selected GPU card for parallel training.
Its value is equal to the value of the environment variable ``FLAGS_selected_gpus`` . The default value is 0.
Examples:
.. code-block:: python
# execute this command in terminal: export FLAGS_selected_gpus=1
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The device id are %d" % env.device_id)
# The device id are 1
"""
return self._device_id
@property
def device_type(self):
"""
The type of custom device for parallel training.
Its value is equal to the value of the environment variable ``PADDLE_XCCL_BACKEND`` . The default value is None.
"""
return self._device_type
@property
def current_endpoint(self):
"""
The endpoint of current trainer, it is in the form of (node IP + port).
Its value is equal to the value of the environment variable ``PADDLE_CURRENT_ENDPOINT`` . The default value is "".
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_CURRENT_ENDPOINT=127.0.0.1:6170
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The current endpoint are %s" % env.current_endpoint)
# The current endpoint are 127.0.0.1:6170
"""
return self._current_endpoint
@property
def trainer_endpoints(self):
"""
The endpoints of all trainer nodes in the task,
which are used to broadcast the NCCL ID when NCCL2 is initialized.
Its value is equal to the value of the environment variable ``PADDLE_TRAINER_ENDPOINTS`` . The default value is "".
Examples:
.. code-block:: python
# execute this command in terminal: export PADDLE_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The trainer endpoints are %s" % env.trainer_endpoints)
# The trainer endpoints are ['127.0.0.1:6170', '127.0.0.1:6171']
"""
return self._trainer_endpoints
@property
def nrings(self):
"""
Nrings of current trainer.
Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1.
Examples:
.. code-block:: python
# execute this command in terminal: export FLAGS_nccl_nrings=1
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The nrings is %d" % env.nrings)
# the number of ring is 1
"""
return self._nrings
# [aliases] Compatible with old method names
local_rank = rank
nranks = world_size
dev_id = device_id
# NOTE: [ Compatible ] Originally this class name is `Env`. The semantics of the old class names
# are inaccurate and may confuse users, so replace it with `ParallelEnv`, but to be compatible
# with the old examples, here still need to keep this name.
Env = ParallelEnv
def _build_default_parallel_strategy(): def _build_default_parallel_strategy():
strategy = ParallelStrategy() strategy = ParallelStrategy()
strategy.nranks = ParallelEnv().nranks strategy.nranks = paddle.distributed.ParallelEnv().nranks
strategy.local_rank = ParallelEnv().local_rank strategy.local_rank = paddle.distributed.ParallelEnv().local_rank
strategy.trainer_endpoints = ParallelEnv().trainer_endpoints strategy.trainer_endpoints = (
strategy.current_endpoint = ParallelEnv().current_endpoint paddle.distributed.ParallelEnv().trainer_endpoints
)
strategy.current_endpoint = (
paddle.distributed.ParallelEnv().current_endpoint
)
return strategy return strategy
...@@ -318,11 +103,13 @@ def _split_tensors(coalesced_grads_and_grad_vars): ...@@ -318,11 +103,13 @@ def _split_tensors(coalesced_grads_and_grad_vars):
def scale_loss(loss): def scale_loss(loss):
# TODO(liuyuhui) Currently only for xpu. Will be removed in the future. # TODO(liuyuhui) Currently only for xpu. Will be removed in the future.
if not ParallelEnv().world_size > 1: if not paddle.distributed.ParallelEnv().world_size > 1:
return loss return loss
loss_scale = to_variable( loss_scale = to_variable(
np.array([ParallelEnv().world_size]).astype("float32") np.array([paddle.distributed.ParallelEnv().world_size]).astype(
"float32"
)
) )
loss_scale.stop_gradient = True loss_scale.stop_gradient = True
scaled_loss = loss / loss_scale scaled_loss = loss / loss_scale
......
...@@ -19,17 +19,16 @@ import numpy as np ...@@ -19,17 +19,16 @@ import numpy as np
import paddle import paddle
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid.dygraph.parallel import ParallelEnv
def init_process_group(strategy=None): def init_process_group(strategy=None):
nranks = ParallelEnv().nranks nranks = paddle.distributed.ParallelEnv().nranks
rank = ParallelEnv().local_rank rank = paddle.distributed.ParallelEnv().local_rank
is_master = True if rank == 0 else False is_master = True if rank == 0 else False
store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks) store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks)
pg_group = core.ProcessGroupCustom.create( pg_group = core.ProcessGroupCustom.create(
store, store,
ParallelEnv().device_type, paddle.distributed.ParallelEnv().device_type,
rank, rank,
nranks, nranks,
) )
......
...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
def apply_pass(use_amp=False, level=None): def apply_pass(use_amp=False, level=None):
...@@ -62,7 +61,7 @@ class TestAMPPass(unittest.TestCase): ...@@ -62,7 +61,7 @@ class TestAMPPass(unittest.TestCase):
paddle.seed(2021) paddle.seed(2021)
np.random.seed(2021) np.random.seed(2021)
random.seed(2021) random.seed(2021)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_amp=False, level=None): def get_engine(self, use_amp=False, level=None):
......
...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
paddle.enable_static() paddle.enable_static()
...@@ -73,7 +72,7 @@ class TestGradientClipByGlobalNorm(unittest.TestCase): ...@@ -73,7 +72,7 @@ class TestGradientClipByGlobalNorm(unittest.TestCase):
paddle.seed(2022) paddle.seed(2022)
np.random.seed(2022) np.random.seed(2022)
random.seed(2022) random.seed(2022)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_sharding=False): def get_engine(self, use_sharding=False):
......
...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
paddle.enable_static() paddle.enable_static()
...@@ -56,7 +55,7 @@ class TestGradientMergePass(unittest.TestCase): ...@@ -56,7 +55,7 @@ class TestGradientMergePass(unittest.TestCase):
paddle.seed(2021) paddle.seed(2021)
np.random.seed(2021) np.random.seed(2021)
random.seed(2021) random.seed(2021)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_gradient_merge=False): def get_engine(self, use_gradient_merge=False):
......
...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
def apply_pass(use_recompute=False, no_recompute_segments=[]): def apply_pass(use_recompute=False, no_recompute_segments=[]):
...@@ -52,7 +51,7 @@ class TestRecomputePass(unittest.TestCase): ...@@ -52,7 +51,7 @@ class TestRecomputePass(unittest.TestCase):
paddle.seed(2022) paddle.seed(2022)
np.random.seed(2022) np.random.seed(2022)
random.seed(2022) random.seed(2022)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_recompute=False, no_recompute_segments=[]): def get_engine(self, use_recompute=False, no_recompute_segments=[]):
......
...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
paddle.enable_static() paddle.enable_static()
...@@ -83,7 +82,7 @@ class TestShardingStage2WithNewEXE(unittest.TestCase): ...@@ -83,7 +82,7 @@ class TestShardingStage2WithNewEXE(unittest.TestCase):
paddle.seed(2022) paddle.seed(2022)
np.random.seed(2022) np.random.seed(2022)
random.seed(2022) random.seed(2022)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine( def get_engine(
......
...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -20,7 +20,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
paddle.enable_static() paddle.enable_static()
...@@ -56,7 +55,7 @@ class TestShardingPass(unittest.TestCase): ...@@ -56,7 +55,7 @@ class TestShardingPass(unittest.TestCase):
paddle.seed(2022) paddle.seed(2022)
np.random.seed(2022) np.random.seed(2022)
random.seed(2022) random.seed(2022)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_sharding=False, stage=None): def get_engine(self, use_sharding=False, stage=None):
......
...@@ -21,7 +21,6 @@ from get_gpt_model import FakeDataset, generate_model ...@@ -21,7 +21,6 @@ from get_gpt_model import FakeDataset, generate_model
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
sys.path.append("..") sys.path.append("..")
from test_sparse_addmm_op import get_cuda_version from test_sparse_addmm_op import get_cuda_version
...@@ -55,7 +54,7 @@ class TestFusedLinearPass(unittest.TestCase): ...@@ -55,7 +54,7 @@ class TestFusedLinearPass(unittest.TestCase):
paddle.seed(2021) paddle.seed(2021)
np.random.seed(2021) np.random.seed(2021)
random.seed(2021) random.seed(2021)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_fused_passes=False, fused_passes_list=[]): def get_engine(self, use_fused_passes=False, fused_passes_list=[]):
......
...@@ -21,7 +21,6 @@ import paddle ...@@ -21,7 +21,6 @@ import paddle
import paddle.fluid.core as core import paddle.fluid.core as core
import paddle.nn as nn import paddle.nn as nn
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.static import InputSpec from paddle.static import InputSpec
from paddle.static.amp.bf16.amp_utils import _valid_types from paddle.static.amp.bf16.amp_utils import _valid_types
from paddle.static.amp.fp16_utils import find_true_prev_op from paddle.static.amp.fp16_utils import find_true_prev_op
...@@ -90,7 +89,7 @@ class TestBF16Pass(unittest.TestCase): ...@@ -90,7 +89,7 @@ class TestBF16Pass(unittest.TestCase):
paddle.seed(2021) paddle.seed(2021)
np.random.seed(2021) np.random.seed(2021)
random.seed(2021) random.seed(2021)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine(self, use_bf16=False): def get_engine(self, use_bf16=False):
......
...@@ -21,7 +21,6 @@ from get_gpt_model import FakeDataset ...@@ -21,7 +21,6 @@ from get_gpt_model import FakeDataset
import paddle import paddle
from paddle.distributed.fleet import auto from paddle.distributed.fleet import auto
from paddle.fluid.dygraph.parallel import ParallelEnv
sys.path.append("..") sys.path.append("..")
import auto_parallel_gpt_model as modeling import auto_parallel_gpt_model as modeling
...@@ -92,7 +91,7 @@ class TestRecomputePassWithRecomputeAPI(unittest.TestCase): ...@@ -92,7 +91,7 @@ class TestRecomputePassWithRecomputeAPI(unittest.TestCase):
paddle.seed(2022) paddle.seed(2022)
np.random.seed(2022) np.random.seed(2022)
random.seed(2022) random.seed(2022)
place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) place = paddle.fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
engine._executor = paddle.static.Executor(place) engine._executor = paddle.static.Executor(place)
def get_engine( def get_engine(
......
...@@ -19,7 +19,6 @@ import numpy as np ...@@ -19,7 +19,6 @@ import numpy as np
import paddle import paddle
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid.dygraph.parallel import ParallelEnv
class TestProcessGroupFp32(unittest.TestCase): class TestProcessGroupFp32(unittest.TestCase):
...@@ -34,8 +33,8 @@ class TestProcessGroupFp32(unittest.TestCase): ...@@ -34,8 +33,8 @@ class TestProcessGroupFp32(unittest.TestCase):
self.shape = (2, 10, 5) self.shape = (2, 10, 5)
def test_create_process_group_gloo(self): def test_create_process_group_gloo(self):
nranks = ParallelEnv().nranks nranks = paddle.distributed.ParallelEnv().nranks
rank = ParallelEnv().local_rank rank = paddle.distributed.ParallelEnv().local_rank
is_master = True if rank == 0 else False is_master = True if rank == 0 else False
store = paddle.fluid.core.TCPStore( store = paddle.fluid.core.TCPStore(
"127.0.0.1", 6272, is_master, nranks, 30 "127.0.0.1", 6272, is_master, nranks, 30
......
...@@ -19,12 +19,11 @@ import numpy as np ...@@ -19,12 +19,11 @@ import numpy as np
import paddle import paddle
import paddle.distributed as dist import paddle.distributed as dist
from paddle.fluid.dygraph.parallel import ParallelEnv
def init_process_group(strategy=None): def init_process_group(strategy=None):
nranks = ParallelEnv().nranks nranks = paddle.distributed.ParallelEnv().nranks
rank = ParallelEnv().local_rank rank = dist.ParallelEnv().local_rank
is_master = True if rank == 0 else False is_master = True if rank == 0 else False
pg_group = dist.init_parallel_env() pg_group = dist.init_parallel_env()
......
...@@ -20,12 +20,11 @@ import numpy as np ...@@ -20,12 +20,11 @@ import numpy as np
import paddle import paddle
import paddle.distributed as dist import paddle.distributed as dist
from paddle.fluid.dygraph.parallel import ParallelEnv
def init_process_group(strategy=None): def init_process_group(strategy=None):
nranks = ParallelEnv().nranks nranks = paddle.distributed.ParallelEnv().nranks
rank = ParallelEnv().local_rank rank = dist.ParallelEnv().local_rank
is_master = True if rank == 0 else False is_master = True if rank == 0 else False
pg_group = dist.init_parallel_env() pg_group = dist.init_parallel_env()
......
...@@ -20,7 +20,6 @@ import warnings ...@@ -20,7 +20,6 @@ import warnings
import numpy as np import numpy as np
import paddle import paddle
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.utils import try_import from paddle.utils import try_import
from .progressbar import ProgressBar from .progressbar import ProgressBar
...@@ -350,7 +349,7 @@ class ProgBarLogger(Callback): ...@@ -350,7 +349,7 @@ class ProgBarLogger(Callback):
self.log_freq = log_freq self.log_freq = log_freq
def _is_print(self): def _is_print(self):
return self.verbose and ParallelEnv().local_rank == 0 return self.verbose and paddle.distributed.ParallelEnv().local_rank == 0
def on_train_begin(self, logs=None): def on_train_begin(self, logs=None):
self.epochs = self.params['epochs'] self.epochs = self.params['epochs']
...@@ -598,7 +597,11 @@ class ModelCheckpoint(Callback): ...@@ -598,7 +597,11 @@ class ModelCheckpoint(Callback):
self.epoch = epoch self.epoch = epoch
def _is_save(self): def _is_save(self):
return self.model and self.save_dir and ParallelEnv().local_rank == 0 return (
self.model
and self.save_dir
and paddle.distributed.ParallelEnv().local_rank == 0
)
def on_epoch_end(self, epoch, logs=None): def on_epoch_end(self, epoch, logs=None):
if self._is_save() and self.epoch % self.save_freq == 0: if self._is_save() and self.epoch % self.save_freq == 0:
...@@ -922,7 +925,7 @@ class VisualDL(Callback): ...@@ -922,7 +925,7 @@ class VisualDL(Callback):
self.epoch = 0 self.epoch = 0
def _is_write(self): def _is_write(self):
return ParallelEnv().local_rank == 0 return paddle.distributed.ParallelEnv().local_rank == 0
def on_train_begin(self, logs=None): def on_train_begin(self, logs=None):
self.epochs = self.params['epochs'] self.epochs = self.params['epochs']
...@@ -1074,7 +1077,7 @@ class WandbCallback(Callback): ...@@ -1074,7 +1077,7 @@ class WandbCallback(Callback):
_ = self.run _ = self.run
def _is_write(self): def _is_write(self):
return ParallelEnv().local_rank == 0 return paddle.distributed.ParallelEnv().local_rank == 0
@property @property
def run(self): def run(self):
...@@ -1333,7 +1336,10 @@ class ReduceLROnPlateau(Callback): ...@@ -1333,7 +1336,10 @@ class ReduceLROnPlateau(Callback):
new_lr = old_lr * self.factor new_lr = old_lr * self.factor
new_lr = max(new_lr, self.min_lr) new_lr = max(new_lr, self.min_lr)
self.model._optimizer._learning_rate = new_lr self.model._optimizer._learning_rate = new_lr
if self.verbose > 0 and ParallelEnv().local_rank == 0: if (
self.verbose > 0
and paddle.distributed.ParallelEnv().local_rank == 0
):
print( print(
'\nEpoch %d: ReduceLROnPlateau reducing learning ' '\nEpoch %d: ReduceLROnPlateau reducing learning '
'rate to %s.' % (self.epoch + 1, new_lr) 'rate to %s.' % (self.epoch + 1, new_lr)
......
...@@ -16,8 +16,6 @@ import logging ...@@ -16,8 +16,6 @@ import logging
import os import os
import sys import sys
from paddle.fluid.dygraph.parallel import ParallelEnv
__all__ = [] __all__ = []
...@@ -40,7 +38,7 @@ def setup_logger(output=None, name="hapi", log_level=logging.INFO): ...@@ -40,7 +38,7 @@ def setup_logger(output=None, name="hapi", log_level=logging.INFO):
format_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' format_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# stdout logging: only local rank==0 # stdout logging: only local rank==0
local_rank = ParallelEnv().local_rank local_rank = int(os.getenv("PADDLE_TRAINER_ID", "0"))
if local_rank == 0 and len(logger.handlers) == 0: if local_rank == 0 and len(logger.handlers) == 0:
ch = logging.StreamHandler(stream=sys.stdout) ch = logging.StreamHandler(stream=sys.stdout)
ch.setLevel(log_level) ch.setLevel(log_level)
......
...@@ -30,7 +30,6 @@ from paddle.autograd import no_grad ...@@ -30,7 +30,6 @@ from paddle.autograd import no_grad
from paddle.distributed.fleet.base import role_maker from paddle.distributed.fleet.base import role_maker
from paddle.fluid import core from paddle.fluid import core
from paddle.fluid.dygraph.base import to_variable from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.fluid.executor import global_scope from paddle.fluid.executor import global_scope
from paddle.fluid.framework import Variable from paddle.fluid.framework import Variable
from paddle.fluid.framework import _current_expected_place as _get_device from paddle.fluid.framework import _current_expected_place as _get_device
...@@ -190,17 +189,21 @@ def init_communicator( ...@@ -190,17 +189,21 @@ def init_communicator(
def prepare_distributed_context(place=None): def prepare_distributed_context(place=None):
if place is None: if place is None:
place = ( place = (
fluid.CUDAPlace(ParallelEnv().dev_id) fluid.CUDAPlace(paddle.distributed.ParallelEnv().dev_id)
if ParallelEnv().nranks > 1 if paddle.distributed.ParallelEnv().nranks > 1
else fluid.CUDAPlace(0) else fluid.CUDAPlace(0)
) )
place = _get_paddle_place(place) place = _get_paddle_place(place)
strategy = fluid.dygraph.parallel.ParallelStrategy() strategy = fluid.dygraph.parallel.ParallelStrategy()
strategy.nranks = ParallelEnv().nranks strategy.nranks = paddle.distributed.ParallelEnv().nranks
strategy.local_rank = ParallelEnv().local_rank strategy.local_rank = paddle.distributed.ParallelEnv().local_rank
strategy.trainer_endpoints = ParallelEnv().trainer_endpoints strategy.trainer_endpoints = (
strategy.current_endpoint = ParallelEnv().current_endpoint paddle.distributed.ParallelEnv().trainer_endpoints
)
strategy.current_endpoint = (
paddle.distributed.ParallelEnv().current_endpoint
)
if strategy.nranks < 2: if strategy.nranks < 2:
return return
...@@ -282,8 +285,8 @@ class StaticGraphAdapter: ...@@ -282,8 +285,8 @@ class StaticGraphAdapter:
'test_batch': 0, 'test_batch': 0,
} }
self._nranks = ParallelEnv().nranks self._nranks = paddle.distributed.ParallelEnv().nranks
self._local_rank = ParallelEnv().local_rank self._local_rank = paddle.distributed.ParallelEnv().local_rank
self._amp_level = "O0" self._amp_level = "O0"
self._amp_configs = {} self._amp_configs = {}
...@@ -733,8 +736,8 @@ class DynamicGraphAdapter: ...@@ -733,8 +736,8 @@ class DynamicGraphAdapter:
def __init__(self, model): def __init__(self, model):
super().__init__() super().__init__()
self.model = model self.model = model
self._nranks = ParallelEnv().nranks self._nranks = paddle.distributed.ParallelEnv().nranks
self._local_rank = ParallelEnv().local_rank self._local_rank = paddle.distributed.ParallelEnv().local_rank
self._merge_count = { self._merge_count = {
'eval_total': 0, 'eval_total': 0,
'test_total': 0, 'test_total': 0,
...@@ -751,10 +754,14 @@ class DynamicGraphAdapter: ...@@ -751,10 +754,14 @@ class DynamicGraphAdapter:
if self._nranks > 1: if self._nranks > 1:
dist.init_parallel_env() dist.init_parallel_env()
stradegy = fluid.dygraph.parallel.ParallelStrategy() stradegy = fluid.dygraph.parallel.ParallelStrategy()
stradegy.nranks = ParallelEnv().nranks stradegy.nranks = paddle.distributed.ParallelEnv().nranks
stradegy.local_rank = ParallelEnv().local_rank stradegy.local_rank = paddle.distributed.ParallelEnv().local_rank
stradegy.trainer_endpoints = ParallelEnv().trainer_endpoints stradegy.trainer_endpoints = (
stradegy.current_endpoint = ParallelEnv().current_endpoint paddle.distributed.ParallelEnv().trainer_endpoints
)
stradegy.current_endpoint = (
paddle.distributed.ParallelEnv().current_endpoint
)
self.ddp_model = fluid.dygraph.parallel.DataParallel( self.ddp_model = fluid.dygraph.parallel.DataParallel(
self.model.network, stradegy self.model.network, stradegy
) )
...@@ -1373,7 +1380,7 @@ class Model: ...@@ -1373,7 +1380,7 @@ class Model:
""" """
if ParallelEnv().local_rank == 0: if paddle.distributed.ParallelEnv().local_rank == 0:
if not training: if not training:
self._save_inference_model(path) self._save_inference_model(path)
else: else:
...@@ -1657,7 +1664,10 @@ class Model: ...@@ -1657,7 +1664,10 @@ class Model:
self._place = _get_device() self._place = _get_device()
if isinstance(self._place, fluid.CUDAPlace): if isinstance(self._place, fluid.CUDAPlace):
global _parallel_context_initialized global _parallel_context_initialized
if ParallelEnv().nranks > 1 and not _parallel_context_initialized: if (
paddle.distributed.ParallelEnv().nranks > 1
and not _parallel_context_initialized
):
if fluid._non_static_mode(): if fluid._non_static_mode():
main_prog_seed = fluid.default_main_program().random_seed main_prog_seed = fluid.default_main_program().random_seed
startup_prog_seed = ( startup_prog_seed = (
...@@ -2307,7 +2317,9 @@ class Model: ...@@ -2307,7 +2317,9 @@ class Model:
mode == 'train' mode == 'train'
or self._adapter._merge_count.get(mode + '_batch', 0) <= 0 or self._adapter._merge_count.get(mode + '_batch', 0) <= 0
): ):
logs['batch_size'] = batch_size * ParallelEnv().nranks logs['batch_size'] = (
batch_size * paddle.distributed.ParallelEnv().nranks
)
else: else:
logs['batch_size'] = self._adapter._merge_count[mode + '_batch'] logs['batch_size'] = self._adapter._merge_count[mode + '_batch']
......
...@@ -136,7 +136,7 @@ def get_path_from_url( ...@@ -136,7 +136,7 @@ def get_path_from_url(
str: a local path to save downloaded models & weights & datasets. str: a local path to save downloaded models & weights & datasets.
""" """
from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.distributed import ParallelEnv
assert is_url(url), "downloading from {} not a url".format(url) assert is_url(url), "downloading from {} not a url".format(url)
# parse path after download to decompress under root_dir # parse path after download to decompress under root_dir
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册