diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index bf9773ad9409fea0176884d9f0bc7422c39ef07c..6c7b2fa732969cac7813c8cb5cde36a32853fa1e 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -65,7 +65,6 @@ from .entry_attr import ShowClickEntry # noqa: F401 from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401 from . import cloud_utils # noqa: F401 -from . import utils # noqa: F401 from .sharding import * # noqa: F401 diff --git a/python/paddle/distributed/auto_parallel/converter.py b/python/paddle/distributed/auto_parallel/converter.py index 95f8bad828b098daa0030288ce3984d698bce6a7..21cc2a22368f515d2f30192464f311377df933ab 100644 --- a/python/paddle/distributed/auto_parallel/converter.py +++ b/python/paddle/distributed/auto_parallel/converter.py @@ -16,7 +16,7 @@ import paddle import warnings import logging import numpy as np -from .utils import get_logger +from ..utils.log_utils import get_logger class Converter(object): diff --git a/python/paddle/distributed/auto_parallel/dist_saver.py b/python/paddle/distributed/auto_parallel/dist_saver.py index aef2dcc6b7ee73b3a149ba28bc26496d3d2587c3..350e5ac44e724d3051b74eaa3c784e19361ad669 100644 --- a/python/paddle/distributed/auto_parallel/dist_saver.py +++ b/python/paddle/distributed/auto_parallel/dist_saver.py @@ -27,7 +27,7 @@ from paddle.fluid.framework import static_only from .utils import get_dist_attr from .converter import Converter from .process_group import _g_process_group_map -from .utils import get_logger +from ..utils.log_utils import get_logger def check_filename(re_exp, filename): diff --git a/python/paddle/distributed/auto_parallel/parallelizer.py b/python/paddle/distributed/auto_parallel/parallelizer.py index 2449a1a2c5e5c0aa0fadfe30ccc66e4d709e7ac6..63fc4660c7f043cc1a424b78f12ab6a001390f6a 100644 --- a/python/paddle/distributed/auto_parallel/parallelizer.py +++ b/python/paddle/distributed/auto_parallel/parallelizer.py @@ -24,7 +24,7 @@ import pickle import time import paddle from paddle.fluid.backward import append_backward -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger from paddle.distributed.fleet import cloud_utils import paddle.fluid.core as core from paddle.fluid import program_guard diff --git a/python/paddle/distributed/cloud_utils.py b/python/paddle/distributed/cloud_utils.py index a8eedb96a3ecda44dde9be907a40497e576e6542..651298d6d766f62ac729f253c20c5fb537953e74 100644 --- a/python/paddle/distributed/cloud_utils.py +++ b/python/paddle/distributed/cloud_utils.py @@ -14,10 +14,8 @@ import os import paddle -from paddle.distributed.utils import get_cluster -from paddle.distributed.utils import logger -from paddle.distributed.utils import get_gpus -from paddle.distributed.utils import get_cluster_from_args +from paddle.distributed.utils.launch_utils import get_cluster, get_gpus, get_cluster_from_args +from paddle.distributed.utils.launch_utils import logger __all__ = [] diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 987cb3d4d7fe4e74f1aae1e6c9fa40eb89507099..b75d84edf29b289be02c31a71def6d6b6652eabe 100755 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -32,6 +32,7 @@ from .fleet import Fleet from .model import distributed_model from .optimizer import distributed_optimizer from .scaler import distributed_scaler +from .utils import log_util __all__ = [ #noqa "CommunicateTopology", "UtilBase", "HybridCommunicateGroup", @@ -90,5 +91,7 @@ distributed_model = distributed_model shrink = fleet.shrink get_hybrid_communicate_group = fleet.get_hybrid_communicate_group distributed_scaler = distributed_scaler - +set_log_level = log_util.set_log_level +get_log_level_code = log_util.get_log_level_code +get_log_level_name = log_util.get_log_level_name from .. import auto_parallel as auto diff --git a/python/paddle/distributed/fleet/elastic/manager.py b/python/paddle/distributed/fleet/elastic/manager.py index 473fcc131beb794f060e1790d1632b29dd64efe2..c2cef68f73e82d239655444ea2e2e0ef9ee38c4c 100644 --- a/python/paddle/distributed/fleet/elastic/manager.py +++ b/python/paddle/distributed/fleet/elastic/manager.py @@ -26,13 +26,9 @@ import subprocess from paddle.distributed.fleet import cloud_utils from paddle.distributed.fleet import launch_utils -logger = logging.getLogger("ELASTIC") -logger.setLevel(logging.INFO) -formatter = logging.Formatter( - fmt='%(name)s %(levelname)s %(asctime)s %(message)s') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) +from paddle.distributed.utils.log_utils import get_logger + +logger = get_logger("INFO", "ELASTIC") ELASTIC_EXIT_CODE = 101 ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102 @@ -354,7 +350,7 @@ class ElasticManager(object): stderr=subprocess.PIPE, shell=True).communicate() if err: - logger.warn("pre_hook exec failed") + logger.warning("pre_hook exec failed") else: logger.info(f"pre_hook exec result: {out.decode('utf-8').strip()}") diff --git a/python/paddle/distributed/fleet/fleet.py b/python/paddle/distributed/fleet/fleet.py index 085ca6ee785b09e8e0da5ffc01d79e3ae901c7bd..9cc73cb3a9b4464e56be2b7e57482e406c3c053f 100644 --- a/python/paddle/distributed/fleet/fleet.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -13,7 +13,6 @@ # limitations under the License. import copy -import warnings import paddle import os from types import MethodType @@ -32,6 +31,8 @@ from .base import topology as tp from .meta_parallel import model_parallel_random_seed from paddle import _C_ops, _legacy_C_ops from paddle.fluid import core +from .utils.log_util import logger, set_log_level +import logging __all__ = [] @@ -54,7 +55,7 @@ def apply_ir_passes(main_program, startup_program, config): # RawProgramOptimizer also inserts coalesce_tensor # into program. These two procedures may conflict # in which vars are to be fused. - warnings.warn( + logger.warning( 'Currently, the fuse_all_optimizer_ops pass has conflict with fuse_all_reduce_ops pass. Disable the fuse_all_optimizer_ops pass temporarily.' ) build_strategy.fuse_all_optimizer_ops = False @@ -83,7 +84,7 @@ def _is_non_distributed_check_(func): if cls._role_maker is not None and cls._role_maker._is_non_distributed( ) is True: - warnings.warn( + logger.warning( "%s() function doesn't work when use non_distributed fleet." % (func.__name__)) return @@ -165,7 +166,11 @@ class Fleet(object): self._context = {} self.user_defined_optimizer = paddle.optimizer.Optimizer(0.0) - def init(self, role_maker=None, is_collective=False, strategy=None): + def init(self, + role_maker=None, + is_collective=False, + strategy=None, + log_level="INFO"): """ Initialize role_maker in Fleet. @@ -183,6 +188,8 @@ class Fleet(object): is False. strategy (DistributedStrategy): Extra properties for distributed training. For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None. + log_level (Integer, String, optional): A ``Integer`` or ``String`` Variable determining how hight + the logging level is. Default is "INFO". Returns: @@ -218,7 +225,18 @@ class Fleet(object): strategy = fleet.DistributedStrategy() fleet.init(strategy=strategy) + Examples5: + + .. code-block:: python + + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + fleet.init(log_level = "DEBUG") + """ + + set_log_level(log_level) + if strategy is None: strategy = DistributedStrategy() self._user_defined_strategy = copy.deepcopy(strategy) @@ -262,12 +280,12 @@ class Fleet(object): self._hcg = tp.HybridCommunicateGroup(self._topology) return if parallel_helper._is_parallel_ctx_initialized(): - warnings.warn( + logger.warning( "The dygraph parallel environment has been initialized.") else: # FLAGS_nccl_nrings is used for dynamic graph multi-stream communication if "FLAGS_nccl_nrings" in os.environ: - warnings.warn( + logger.warning( "You have set the environment variable FLAGS_nccl_nrings " "outside the program, so the nccl_comm_num in " "DistributedStrategy will not take effect here.") @@ -282,7 +300,7 @@ class Fleet(object): if tp._HYBRID_PARALLEL_GROUP is None: self._init_hybrid_parallel_env() else: - warnings.warn( + logger.warning( "The dygraph hybrid parallel environment has been initialized." ) elif self._is_collective: @@ -851,9 +869,6 @@ class Fleet(object): fleet.init_server() """ - # warnings.warn( - # "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead." - # ) self._runtime_handle._save_inference_model(executor, dirname, feeded_var_names, @@ -903,10 +918,6 @@ class Fleet(object): fleet.save_persistables(exe, "dirname", paddle.static.default_main_program()) """ - # warnings.warn( - # "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead." - # ) - self._runtime_handle._save_persistables(executor, dirname, main_program, mode) @@ -1016,7 +1027,7 @@ class Fleet(object): if strategy is not None: if self._is_collective: - warnings.warn( + logger.warning( "It is recommended to use DistributedStrategy " "in fleet.init(). The strategy here is only for compatibility. " "If the strategy in fleet.distributed_optimizer() is " @@ -1305,8 +1316,9 @@ class Fleet(object): copy_user_defined_strategy, can_not_apply_optimizer_list) context["valid_strategy"] = copy.deepcopy(valid_strategy) - # print("valid_strategy:", context["valid_strategy"]) - # print("user_defined_strategy:", context["user_defined_strategy"]) + logger.debug("valid_strategy: " + str(context["valid_strategy"])) + logger.debug("user_defined_strategy: " + + str(context["user_defined_strategy"])) applied_meta_list = self.strategy_compiler._get_applied_meta_list() applied_graph_list = self.strategy_compiler._get_applied_graph_list() @@ -1336,17 +1348,19 @@ class Fleet(object): no_grad_set=no_grad_set) if meta_optimizer: - # print("before minimize program id:", id(loss.block.program)) + logger.debug("before minimize program id: " + + str(id(loss.block.program))) optimize_ops, params_grads = meta_optimizer.minimize( loss, startup_program, parameter_list, no_grad_set=no_grad_set) - # print("after minimize program id:", id(loss.block.program)) - + logger.debug("after minimize program id: " + + str(id(loss.block.program))) default_program = paddle.static.default_main_program() - # print("default program id:", id(default_program)) + logger.debug("default program id: " + str(id(default_program))) if id(default_program) != id(loss.block.program): paddle.fluid.framework.switch_main_program(loss.block.program) - # print("default program id after switch:", id(default_program)) + logger.debug("default program id after switch: " + + str(id(default_program))) else: optimize_ops, params_grads = self.user_defined_optimizer.minimize( @@ -1356,7 +1370,8 @@ class Fleet(object): context["program_params_grads"] = params_grads if graph_optimizer: - # print("before graph minimize program id:", id(loss.block.program)) + logger.debug("before graph minimize program id: " + + str(id(loss.block.program))) optimize_ops, params_grads = graph_optimizer.minimize( loss, startup_program, parameter_list, no_grad_set=no_grad_set) # since we do not encourage users to use graph operations @@ -1455,7 +1470,8 @@ class Fleet(object): if v or k not in opt_info: opt_info[k] = v program._fleet_opt = opt_info - # print("fleet base opt info:", id(program), program._fleet_opt) + logger.debug("fleet base opt info: " + str(id(program)) + + str(program._fleet_opt)) if self._runtime_handle is None: self._runtime_handle = RuntimeFactory()._create_runtime(context) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 68b2021b6ead27ae9396ec06fdb4c7697e47c75b..1fde04b2d2a2c961f396a3f9514d75f5dd4321db 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -30,15 +30,8 @@ from .sharding.prune import ProgramDeps from .sharding import utils # FIXME: import * from .sharding.utils import * - import logging - -logger = logging.getLogger(__name__) -formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) +from ..utils.log_util import logger __all__ = [] diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py index 6d9ce68e493d8d8dff1bfda4b39332d72243c0ae..29ac651895bd68cab107a3eea046a9ba842b4e2b 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py @@ -33,7 +33,7 @@ from types import MethodType import paddle from paddle import nn from paddle.distributed import collective -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger from .group_sharded_storage import GradStorage from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 diff --git a/python/paddle/distributed/fleet/optimizer.py b/python/paddle/distributed/fleet/optimizer.py index bfc3d737f9934115d99edda9a3a8f25f347d2a14..ddad6511a0a645f88b4760f2281262beecc41124 100644 --- a/python/paddle/distributed/fleet/optimizer.py +++ b/python/paddle/distributed/fleet/optimizer.py @@ -13,7 +13,6 @@ # limitations under the License. import copy -import warnings import paddle import os import numpy as np @@ -22,6 +21,7 @@ from .base.distributed_strategy import DistributedStrategy from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer from paddle.fluid import core from paddle.distributed import fleet +from .utils.log_util import logger def _dygraph_distributed_optimizer(optimizer, strategy=None): @@ -52,7 +52,7 @@ def _dygraph_distributed_optimizer(optimizer, strategy=None): if strategy is not None: if fleet_env._is_collective: - warnings.warn( + logger.warning( "It is recommended to use DistributedStrategy " "in fleet_env.init(). The strategy here is only for compatibility. " "If the strategy in fleet_env.distributed_optimizer() is " diff --git a/python/paddle/distributed/fleet/recompute/recompute.py b/python/paddle/distributed/fleet/recompute/recompute.py index 28ded25a0e6e0f96d97f396923c3160fc1b007d1..6929ca52cb013077837c85ccd03d7e4bcf22bd95 100755 --- a/python/paddle/distributed/fleet/recompute/recompute.py +++ b/python/paddle/distributed/fleet/recompute/recompute.py @@ -22,13 +22,7 @@ import contextlib from paddle.fluid.framework import in_dygraph_mode import logging - -logger = logging.getLogger(__name__) -formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) +from ..utils.log_util import logger __all__ = [] @@ -49,7 +43,7 @@ def detach_variable(inputs): def check_recompute_necessary(inputs): if not any(input_.stop_gradient == False for input_ in inputs if isinstance(input_, (core.eager.Tensor, paddle.Tensor))): - logger.warn( + logger.warning( "[Recompute]: None of the inputs to current recompute block need grad, " "therefore there is NO need to recompute this block in backward !") diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py index c3b0693d7ebd0b8f8ce4340fb4bbd7f2bb21c1cf..63951c5dc0050407fe420629a3014cea1558d3dd 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py @@ -14,7 +14,6 @@ import os import six import numpy as np -import warnings from paddle import framework import paddle diff --git a/python/paddle/distributed/fleet/utils/log_util.py b/python/paddle/distributed/fleet/utils/log_util.py index cf90527c07fe4911a990471f4f948780feeca26d..6118d0264478b1d21a084334267b2bd31b08e3e5 100644 --- a/python/paddle/distributed/fleet/utils/log_util.py +++ b/python/paddle/distributed/fleet/utils/log_util.py @@ -15,30 +15,50 @@ import logging import sys -__all__ = [] +from paddle.distributed.utils.log_utils import get_logger +logger = get_logger("INFO", __name__) -class LoggerFactory: - @staticmethod - def build_logger(name=None, level=logging.INFO): - assert name is not None, "name for logger should not be None" +def set_log_level(level): + """ + Set log level - formatter = logging.Formatter( - "%(asctime)s-%(levelname)s: " - "[%(filename)s:%(lineno)d:%(funcName)s] %(message)s") + Args: + level (str|int): a specified level - _logger = logging.getLogger(name) - _logger.setLevel(level) - _logger.propagate = False - handler = logging.StreamHandler(stream=sys.stderr) - handler.setFormatter(formatter) - handler.setLevel(level) - _logger.addHandler(handler) - return _logger + Example 1: + import paddle + import paddle.distributed.fleet as fleet + fleet.init() + fleet.setLogLevel("DEBUG") + Example 2: + import paddle + import paddle.distributed.fleet as fleet + fleet.init() + fleet.setLogLevel(1) -logger = LoggerFactory.build_logger(name="HybridParallel", level=logging.INFO) + """ + assert isinstance(level, (str, int)), "level's type must be str or int" + if isinstance(level, int): + logger.setLevel(level) + else: + logger.setLevel(level.upper()) + + +def get_log_level_code(): + """ + Return current log level code + """ + return logger.getEffectiveLevel() + + +def get_log_level_name(): + """ + Return current log level name + """ + return logging.getLevelName(get_log_level_code()) def layer_to_str(base, *args, **kwargs): diff --git a/python/paddle/distributed/metric/metrics.py b/python/paddle/distributed/metric/metrics.py index 08d185efd971aa5b93baeb3a4118dfe654f9f414..4029734545f9d43a6c5d941133d73516cd6ac626 100644 --- a/python/paddle/distributed/metric/metrics.py +++ b/python/paddle/distributed/metric/metrics.py @@ -16,7 +16,7 @@ import sys import yaml import paddle.fluid as fluid import logging -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger __all__ = [] logger = get_logger(logging.INFO, name="metrics") diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 9cb17949acb4709536c74e5a9f5044080fb464c7..0cd67b84a4acb74721f711bcf48a0e4981c581b8 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -19,7 +19,7 @@ from enum import Enum import paddle from paddle.optimizer import Optimizer -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger from paddle.fluid.framework import in_dygraph_mode # Old version diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index c0ff2bc273dc57e031fce0ed8ad0fba59d9f9f50..b7908213c9b51ea98ec9d006ae00bccfb0eddfbd 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -21,9 +21,7 @@ import six import sys import warnings -from paddle.distributed.utils import _print_arguments -from paddle.distributed.utils import _prepare_trainer_env -from paddle.distributed.utils import get_host_name_ip +from paddle.distributed.utils.launch_utils import _print_arguments, _prepare_trainer_env, get_host_name_ip from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num from paddle.distributed.fleet.launch import get_cluster_from_args from paddle.distributed.fleet.cloud_utils import use_paddlecloud diff --git a/python/paddle/distributed/utils/__init__.py b/python/paddle/distributed/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4ce89fa36b06b25cfc7409c093ea227393ae3111 --- /dev/null +++ b/python/paddle/distributed/utils/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [] diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils/launch_utils.py similarity index 55% rename from python/paddle/distributed/utils.py rename to python/paddle/distributed/utils/launch_utils.py index 8177c827c454b320df475998741dc896a3a326f4..3282b5f58bc1a63ab4337147e31cd4b5b90405c0 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils/launch_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,287 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools -import logging -import socket import time import os import signal import copy import sys -import six import subprocess from contextlib import closing import socket from paddle.fluid import core -from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag from distutils.util import strtobool +import six -from paddle.fluid.layer_helper import LayerHelper -from paddle.fluid.framework import _non_static_mode -from paddle.fluid.data_feeder import check_variable_and_dtype -from paddle import _C_ops, _legacy_C_ops - -__all__ = [ #noqa - 'get_host_name_ip', - 'Trainer', - 'get_cluster', - 'start_local_trainers', - 'watch_local_trainers', - 'find_free_ports', - 'JobServer', - 'Cluster', - 'Pod', - 'Hdfs', - 'add_arguments', - 'terminate_local_procs', - 'TrainerProc', - 'get_logger', - 'pull_worker_log', - 'global_scatter', - 'global_gather', -] - - -def global_scatter(x, - local_count, - global_count, - group=None, - use_calc_stream=True): - """ - The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count, - and then receives data according to global_count. The expert refers to a user-defined expert network, - n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. - - As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. - The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). - In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, - global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the - figure respresent the rank of the current card in all cards. - - The process of global_scatter sending data is as follows: - - local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card; - - local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card; - - local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card; - - local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card; - - Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; - - the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert; - - the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; - - the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png - :width: 800 - :alt: global_scatter_gather - :align: center - - Args: - x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64. - local_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be sent. The tensor data type should be int64. - global_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be received. The tensor data type should be int64. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. - - Returns: - out (Tensor): The data received from all experts. - - Examples: - .. code-block:: python - - # required: distributed - import numpy as np - import paddle - from paddle.distributed import init_parallel_env - init_parallel_env() - n_expert = 2 - world_size = 2 - d_model = 2 - in_feat = d_model - local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \ - dtype=np.float32) - if paddle.distributed.ParallelEnv().local_rank == 0: - local_count = np.array([2, 1, 1, 1]) - global_count = np.array([2, 1, 1, 1]) - else: - local_count = np.array([1, 1, 2, 1]) - global_count = np.array([1, 1, 2, 1]) - local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) - local_count = paddle.to_tensor(local_count, dtype="int64") - global_count = paddle.to_tensor(global_count, dtype="int64") - a = paddle.distributed.utils.global_scatter(local_input_buf, \ - local_count, global_count) - a.stop_gradient = False - print(a) - # out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]] - # out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]] - # backward test - c = a * a - c.backward() - print("local_input_buf.grad: ", local_input_buf.grad) - # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - """ - if group is not None and not group.is_member(): - return - - ring_id = 0 if group is None else group.id - if _non_static_mode(): - return _legacy_C_ops.global_scatter(x, local_count, \ - global_count, \ - 'use_calc_stream', use_calc_stream, \ - 'ring_id', ring_id) - else: - op_type = 'global_scatter' - check_variable_and_dtype( - x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], - 'global_scatter') - check_variable_and_dtype(local_count, 'local_count', ['int64'], - 'global_scatter') - check_variable_and_dtype(global_count, 'global_count', ['int64'], - 'global_scatter') - - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=x.dtype) - - helper.append_op(type=op_type, - inputs={ - 'X': [x], - 'local_count': [local_count], - 'global_count': [global_count], - }, - outputs={'Out': [out]}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream - }) - return out - - -def global_gather(x, - local_count, - global_count, - group=None, - use_calc_stream=True): - """ - The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count. - The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. - - As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. - The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). - In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, - local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card. - The rank in the figure respresent the rank of the current card in all cards. - - The process of global_gather sending data is as follows: - - The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card; - - The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card; - - The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card; - - The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png - :width: 800 - :alt: global_scatter_gather - :align: center - - - Args: - x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64. - local_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be received. Tensor data type should be int64. - global_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be sent. Tensor data type should be int64. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. - - Returns: - out (Tensor): The data received from all experts. - - Examples: - .. code-block:: python - - # required: distributed - import numpy as np - import paddle - from paddle.distributed import init_parallel_env - init_parallel_env() - n_expert = 2 - world_size = 2 - d_model = 2 - in_feat = d_model - local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\ - dtype=np.float32) - if paddle.distributed.ParallelEnv().local_rank == 0: - local_count = np.array([2, 1, 1, 1]) - global_count = np.array([2, 1, 1, 1]) - else: - local_count = np.array([1, 1, 2, 1]) - global_count = np.array([1, 1, 2, 1]) - local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) - local_count = paddle.to_tensor(local_count, dtype="int64") - global_count = paddle.to_tensor(global_count, dtype="int64") - a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count) - print(a) - # out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]] - # out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]] - a.stop_gradient = False - c = a * a - c.backward() - print("local_input_buf.grad", local_input_buf.grad) - # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - """ - if group is not None and not group.is_member(): - return - - ring_id = 0 if group is None else group.id - if _non_static_mode(): - return _legacy_C_ops.global_gather(x, local_count, \ - global_count, \ - 'use_calc_stream', use_calc_stream, \ - 'ring_id', ring_id) - else: - op_type = 'global_gather' - check_variable_and_dtype( - x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], - 'global_gather') - - check_variable_and_dtype(local_count, 'local_count', ['int64'], - 'global_gather') - - check_variable_and_dtype(global_count, 'global_count', ['int64'], - 'global_gather') - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=x.dtype) - - helper.append_op(type=op_type, - inputs={ - 'X': [x], - 'local_count': [local_count], - 'global_count': [global_count] - }, - outputs={'Out': [out]}, - attrs={ - 'ring_id': group, - 'use_calc_stream': use_calc_stream, - }) - return out - +from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag +from ..utils.log_utils import get_logger -logger = logging.getLogger("root") -logger.propagate = False +logger = get_logger("INFO", "root") def get_cluster_from_args(args, selected_gpus): @@ -354,13 +89,6 @@ def get_gpus(selected_gpus): return gpus -def _print_arguments(args): - print("----------- Configuration Arguments -----------") - for arg, value in sorted(six.iteritems(vars(args))): - print("%s: %s" % (arg, value)) - print("------------------------------------------------") - - class Hdfs(object): def __init__(self): @@ -549,21 +277,6 @@ class Pod(object): return r -def get_logger(log_level, name="root"): - logger = logging.getLogger(name) - # Avoid printing multiple logs - if not logger.handlers: - logger.setLevel(log_level) - - log_handler = logging.StreamHandler() - log_format = logging.Formatter( - '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') - log_handler.setFormatter(log_format) - logger.addHandler(log_handler) - - return logger - - def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): assert type(trainer_endpoints) is list, "trainer_endpoints must be list" cluster = Cluster(hdfs=None) @@ -826,3 +539,10 @@ def watch_local_trainers(procs, nranks): raise return alive + + +def _print_arguments(args): + print("----------- Configuration Arguments -----------") + for arg, value in sorted(six.iteritems(vars(args))): + print("%s: %s" % (arg, value)) + print("------------------------------------------------") diff --git a/python/paddle/distributed/utils/log_utils.py b/python/paddle/distributed/utils/log_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..01687fc28d18f4ab6614e6836739cb59998ba7f0 --- /dev/null +++ b/python/paddle/distributed/utils/log_utils.py @@ -0,0 +1,32 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + + +def get_logger(log_level, name="root"): + + logger = logging.getLogger(name) + # Avoid printing multiple logs + if not logger.handlers: + log_handler = logging.StreamHandler() + logger.setLevel(log_level) + log_format = logging.Formatter( + '[%(asctime)-15s] [%(levelname)8s] %(filename)s:%(lineno)s - %(message)s' + ) + log_handler.setFormatter(log_format) + logger.addHandler(log_handler) + else: + logger.setLevel(log_level) + return logger diff --git a/python/paddle/distributed/utils/moe_utils.py b/python/paddle/distributed/utils/moe_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..d6dbfdfab58c0205882077de68a056ed509930ea --- /dev/null +++ b/python/paddle/distributed/utils/moe_utils.py @@ -0,0 +1,255 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.fluid.layer_helper import LayerHelper +from paddle.fluid.framework import _non_static_mode +from paddle.fluid.data_feeder import check_variable_and_dtype +from paddle import _legacy_C_ops + + +def global_scatter(x, + local_count, + global_count, + group=None, + use_calc_stream=True): + """ + The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count, + and then receives data according to global_count. The expert refers to a user-defined expert network, + n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. + + As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. + The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). + In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, + global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the + figure respresent the rank of the current card in all cards. + + The process of global_scatter sending data is as follows: + + local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card; + + local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card; + + local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card; + + local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card; + + Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; + + the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert; + + the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; + + the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png + :width: 800 + :alt: global_scatter_gather + :align: center + + Args: + x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64. + local_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be sent. The tensor data type should be int64. + global_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be received. The tensor data type should be int64. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. + + Returns: + out (Tensor): The data received from all experts. + + Examples: + .. code-block:: python + + # required: distributed + import numpy as np + import paddle + from paddle.distributed import init_parallel_env + init_parallel_env() + n_expert = 2 + world_size = 2 + d_model = 2 + in_feat = d_model + local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \ + dtype=np.float32) + if paddle.distributed.ParallelEnv().local_rank == 0: + local_count = np.array([2, 1, 1, 1]) + global_count = np.array([2, 1, 1, 1]) + else: + local_count = np.array([1, 1, 2, 1]) + global_count = np.array([1, 1, 2, 1]) + local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) + local_count = paddle.to_tensor(local_count, dtype="int64") + global_count = paddle.to_tensor(global_count, dtype="int64") + a = paddle.distributed.utils.global_scatter(local_input_buf, \ + local_count, global_count) + a.stop_gradient = False + print(a) + # out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]] + # out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]] + # backward test + c = a * a + c.backward() + print("local_input_buf.grad: ", local_input_buf.grad) + # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + """ + if group is not None and not group.is_member(): + return + + ring_id = 0 if group is None else group.id + if _non_static_mode(): + return _legacy_C_ops.global_scatter(x, local_count, \ + global_count, \ + 'use_calc_stream', use_calc_stream, \ + 'ring_id', ring_id) + else: + op_type = 'global_scatter' + check_variable_and_dtype( + x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], + 'global_scatter') + check_variable_and_dtype(local_count, 'local_count', ['int64'], + 'global_scatter') + check_variable_and_dtype(global_count, 'global_count', ['int64'], + 'global_scatter') + + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=x.dtype) + + helper.append_op(type=op_type, + inputs={ + 'X': [x], + 'local_count': [local_count], + 'global_count': [global_count], + }, + outputs={'Out': [out]}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': use_calc_stream + }) + return out + + +def global_gather(x, + local_count, + global_count, + group=None, + use_calc_stream=True): + """ + The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count. + The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. + + As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. + The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). + In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, + local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card. + The rank in the figure respresent the rank of the current card in all cards. + + The process of global_gather sending data is as follows: + + The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card; + + The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card; + + The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card; + + The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png + :width: 800 + :alt: global_scatter_gather + :align: center + + + Args: + x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64. + local_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be received. Tensor data type should be int64. + global_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be sent. Tensor data type should be int64. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. + + Returns: + out (Tensor): The data received from all experts. + + Examples: + .. code-block:: python + + # required: distributed + import numpy as np + import paddle + from paddle.distributed import init_parallel_env + init_parallel_env() + n_expert = 2 + world_size = 2 + d_model = 2 + in_feat = d_model + local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\ + dtype=np.float32) + if paddle.distributed.ParallelEnv().local_rank == 0: + local_count = np.array([2, 1, 1, 1]) + global_count = np.array([2, 1, 1, 1]) + else: + local_count = np.array([1, 1, 2, 1]) + global_count = np.array([1, 1, 2, 1]) + local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) + local_count = paddle.to_tensor(local_count, dtype="int64") + global_count = paddle.to_tensor(global_count, dtype="int64") + a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count) + print(a) + # out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]] + # out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]] + a.stop_gradient = False + c = a * a + c.backward() + print("local_input_buf.grad", local_input_buf.grad) + # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + """ + if group is not None and not group.is_member(): + return + + ring_id = 0 if group is None else group.id + if _non_static_mode(): + return _legacy_C_ops.global_gather(x, local_count, \ + global_count, \ + 'use_calc_stream', use_calc_stream, \ + 'ring_id', ring_id) + else: + op_type = 'global_gather' + check_variable_and_dtype( + x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], + 'global_gather') + + check_variable_and_dtype(local_count, 'local_count', ['int64'], + 'global_gather') + + check_variable_and_dtype(global_count, 'global_count', ['int64'], + 'global_gather') + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=x.dtype) + + helper.append_op(type=op_type, + inputs={ + 'X': [x], + 'local_count': [local_count], + 'global_count': [global_count] + }, + outputs={'Out': [out]}, + attrs={ + 'ring_id': group, + 'use_calc_stream': use_calc_stream, + }) + return out diff --git a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py index db2510d2beb377c981b391da78af4f0fdb619629..01f39a39144fe6116d80b2f707d5a52a05455aca 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py @@ -28,7 +28,7 @@ def start_local_trainers(cluster, training_script_args, eager_mode=True, log_dir=None): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc current_env = copy.copy(os.environ.copy()) #paddle broadcast ncclUniqueId use socket, and @@ -84,7 +84,7 @@ def start_local_trainers(cluster, def get_cluster_from_args(selected_gpus): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc cluster_node_ips = '127.0.0.1' node_ip = '127.0.0.1' @@ -108,7 +108,7 @@ def get_cluster_from_args(selected_gpus): class TestMultipleCustomCPU(unittest.TestCase): def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc selected_devices = [0, 1] cluster = None @@ -150,7 +150,7 @@ class TestProcessGroup(TestMultipleCustomCPU): cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') def test_process_group_xccl(self): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc self.run_mnist_2custom_cpu('process_group_xccl.py') diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py b/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py index 60909f63211de9160250786909db069d86a3e7ba..fd6e8106da70f43d655307a61909e3def65a2e22 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py @@ -24,6 +24,7 @@ import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main import pickle from paddle.fluid.framework import _enable_legacy_dygraph +import paddle.distributed.utils.moe_utils as moe_utils paddle.enable_static() @@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): shape=[tot_expert], dtype="int64") - output = paddle.distributed.utils.global_gather( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_gather(local_input_buf, + local_expert_count, + global_expert_count) return [output] diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py b/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py index 0b264f5ba89669089c5c80ba6026cb2cca2240a4..39749b812779480e08b5483bb280cf5c17d6378c 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py @@ -22,6 +22,7 @@ import paddle.fluid as fluid import unittest import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main +import paddle.distributed.utils.moe_utils as moe_utils class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): @@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): in_feat).astype("float32") local_input_buf = paddle.to_tensor(local_input_buf) local_input_buf.stop_gradient = False - output = paddle.distributed.utils.global_gather( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_gather(local_input_buf, + local_expert_count, + global_expert_count) output.stop_gradient = False c = output * output c.stop_gradient = False diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py index c4950025877df183f98c58e053c69ba980bc0a44..dd6245df2aceab800ec82ac4c4549523d7e9e310 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py @@ -23,6 +23,7 @@ import unittest import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main import pickle +import paddle.distributed.utils.moe_utils as moe_utils paddle.enable_static() @@ -51,8 +52,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): paddle.split(local_expert_count, 2, axis=0), global_expert_count) global_expert_count = paddle.concat(global_expert_count, axis=0) - output = paddle.distributed.utils.global_scatter( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_scatter(local_input_buf, + local_expert_count, + global_expert_count) return [output] def run_trainer(self, args): diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py index 82816c899e2cb4d60a9e835fb8249bd32488e302..e775bf50eb9e72578225fc8b7fbeb97d1e7a63a6 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py @@ -22,6 +22,7 @@ import paddle.fluid as fluid import unittest import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main +import paddle.distributed.utils.moe_utils as moe_utils class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): @@ -50,8 +51,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): global_expert_count) global_expert_count = paddle.concat(global_expert_count, axis=0) local_input_buf.stop_gradient = False - output = paddle.distributed.utils.global_scatter( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_scatter(local_input_buf, + local_expert_count, + global_expert_count) output.stop_gradient = False c = output * output c.backward() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt b/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt index d2cc96fd3e17746fa5a65b0902c838df93117ddb..b47e4b5b530f9784d56b80320237c715b928526e 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt @@ -938,3 +938,8 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX)) test_fleet_checkpoint PROPERTIES TIMEOUT "200" LABELS "RUN_TYPE=EXCLUSIVE:NIGHTLY") endif() +if(LOCAL_ALL_ARCH AND LOCAL_ALL_PLAT) + py_test_modules( + test_fleet_log MODULES test_fleet_log ENVS + "http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python") +endif() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py index aefe03b26108059c617f1d590848ab452a6a62db..69bcdf56f6ddb316dddf3b86660f02de7edb0980 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py @@ -26,7 +26,7 @@ from paddle.io import DataLoader, Dataset import unittest import paddle.nn as nn from paddle.fluid.contrib.slim.quantization import ImperativeQuantAware -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def set_random_seed(seed, dp_id, rank_id): diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py new file mode 100644 index 0000000000000000000000000000000000000000..03cb281cf37c930840d02ca22f3e13f204ff2e29 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py @@ -0,0 +1,43 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +from paddle.distributed import fleet +from paddle.distributed.fleet.utils.log_util import logger +import logging +import unittest + + +class TestFleetLog(unittest.TestCase): + + def setUp(self): + fleet.init(log_level="DEBUG") + + def test_log_level(self): + + # check correctly initialized + assert fleet.get_log_level_code() == logging._nameToLevel["DEBUG"] + assert logger.getEffectiveLevel() == logging._nameToLevel["DEBUG"] + + # test set name + fleet.set_log_level("WARNING") + debug1 = fleet.get_log_level_code() + debug2 = logging._nameToLevel["WARNING"] + assert debug1 == debug2 + + # test set int + fleet.set_log_level(debug2) + + # check the logger is changed + assert logger.getEffectiveLevel() == logging._nameToLevel["WARNING"] diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py index a5b2da46740ddfd98678877c491664e389b4ae5c..b0e981babb665261f3bc5c7d7381bd72ce8b76c9 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py @@ -22,7 +22,7 @@ import copy import os import subprocess -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def get_cluster_from_args(selected_gpus): diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv b/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv index cdc856a9adaf43380c1b13281fc0c77ab2ab422e..c7fa5463225738e623c0577221ef244386c0a27e 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv +++ b/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv @@ -82,3 +82,4 @@ test_hdfs1,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_ test_hdfs2,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_hdfs3,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_fleet_checkpoint,LINUX,GPU;ROCM,200,EXCLUSIVE:NIGHTLY,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../.., +test_fleet_log,,,,DIST,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../.., diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index f7593f8bb31fe47db9ee50f352717291d19c5df3..df7d04be3198d6aca4c5bfa580f2c776d4a11f0e 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -28,7 +28,7 @@ import paddle.fluid as fluid import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet -from paddle.distributed.utils import find_free_ports +from paddle.distributed.utils.launch_utils import find_free_ports paddle.enable_static() diff --git a/python/paddle/fluid/tests/unittests/test_launch_coverage.py b/python/paddle/fluid/tests/unittests/test_launch_coverage.py index 62826c0858b4617770f8a120286d313958aff2e4..12eb3bbc9ddeaa72a1895166e33a967e7e76a1b2 100644 --- a/python/paddle/fluid/tests/unittests/test_launch_coverage.py +++ b/python/paddle/fluid/tests/unittests/test_launch_coverage.py @@ -23,7 +23,7 @@ import unittest import paddle.fluid as fluid from argparse import ArgumentParser, REMAINDER -from paddle.distributed.utils import _print_arguments, get_gpus, get_cluster_from_args +from paddle.distributed.utils.launch_utils import _print_arguments, get_gpus, get_cluster_from_args from paddle.distributed.fleet.launch_utils import find_free_ports diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py index 1e8aae7226a7e864c1e27834cc2d56276a696b8d..29b0b16de38c0b285bf0f2ca282d389156cbea3d 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py @@ -22,7 +22,7 @@ import copy import os import subprocess -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.fluid.framework import _test_eager_guard diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py index 725d5249f594bf9a9b5684954ebcd6a54d91cab4..4713f6619b93d71eaf74d3a2fea7290adcbbecd4 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py @@ -22,7 +22,7 @@ import copy import os import subprocess -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def get_cluster_from_args(selected_gpus): diff --git a/python/paddle/incubate/distributed/models/moe/moe_layer.py b/python/paddle/incubate/distributed/models/moe/moe_layer.py index 65a7eec9973001d4ade517dddbf1a3bfd32e3b8d..58b026a3b2a306ac40eb6f05b11a71f33b546712 100644 --- a/python/paddle/incubate/distributed/models/moe/moe_layer.py +++ b/python/paddle/incubate/distributed/models/moe/moe_layer.py @@ -26,7 +26,7 @@ import numpy as np import paddle import paddle.nn as nn import paddle.nn.functional as F -from paddle.distributed.utils import global_scatter, global_gather +from paddle.distributed.utils.moe_utils import global_scatter, global_gather from paddle.distributed import alltoall, all_gather from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker diff --git a/python/paddle/tests/test_dist_hapi_model.py b/python/paddle/tests/test_dist_hapi_model.py index 895d2bc0c478a31b0c36f50494259ea4ca02c0aa..b286e3906a9e11c780383d6a82d3a5a8085c6421 100644 --- a/python/paddle/tests/test_dist_hapi_model.py +++ b/python/paddle/tests/test_dist_hapi_model.py @@ -21,7 +21,7 @@ import copy import subprocess import paddle.fluid as fluid -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def get_cluster_from_args(selected_gpus): diff --git a/python/setup.py.in b/python/setup.py.in index 084fd5ac042c60f801c07abb1daea1ed95a50a69..b101c5e1d6c0d3518b626af793a160e2e38489f3 100755 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -287,6 +287,7 @@ packages=['paddle', 'paddle.incubate.asp', 'paddle.incubate.passes', 'paddle.distribution', + 'paddle.distributed.utils', 'paddle.distributed.sharding', 'paddle.distributed.fleet', 'paddle.distributed.launch',