From 7eb046c731f5191c660615dce47a973dfd0be727 Mon Sep 17 00:00:00 2001 From: Roc <30228238+sljlp@users.noreply.github.com> Date: Thu, 22 Sep 2022 21:34:26 +0800 Subject: [PATCH] logger manager (#45909) (#46087) uniform logger manager in FleetAPI. hidde API under distributed/utils which users don't need. --- python/paddle/distributed/__init__.py | 1 - .../distributed/auto_parallel/converter.py | 2 +- .../distributed/auto_parallel/dist_saver.py | 2 +- .../distributed/auto_parallel/parallelizer.py | 2 +- python/paddle/distributed/cloud_utils.py | 6 +- python/paddle/distributed/fleet/__init__.py | 5 +- .../distributed/fleet/elastic/manager.py | 12 +- python/paddle/distributed/fleet/fleet.py | 64 ++-- .../meta_optimizers/sharding_optimizer.py | 9 +- .../sharding/group_sharded_stage2.py | 2 +- python/paddle/distributed/fleet/optimizer.py | 4 +- .../distributed/fleet/recompute/recompute.py | 10 +- .../fleet/utils/hybrid_parallel_util.py | 1 - .../distributed/fleet/utils/log_util.py | 54 +++- python/paddle/distributed/metric/metrics.py | 2 +- .../distributed/sharding/group_sharded.py | 2 +- python/paddle/distributed/spawn.py | 4 +- python/paddle/distributed/utils/__init__.py | 15 + .../{utils.py => utils/launch_utils.py} | 304 +----------------- python/paddle/distributed/utils/log_utils.py | 32 ++ python/paddle/distributed/utils/moe_utils.py | 255 +++++++++++++++ .../test_collective_process_group_xccl.py | 8 +- .../collective/collective_global_gather.py | 6 +- .../collective_global_gather_dygraph.py | 6 +- .../collective/collective_global_scatter.py | 6 +- .../collective_global_scatter_dygraph.py | 6 +- .../unittests/collective/fleet/CMakeLists.txt | 5 + .../collective/fleet/hybrid_parallel_qat.py | 2 +- .../collective/fleet/test_fleet_log.py | 43 +++ .../fleet/test_parallel_dygraph_qat.py | 2 +- .../unittests/collective/fleet/testslist.csv | 1 + .../tests/unittests/test_communicator_geo.py | 2 +- .../tests/unittests/test_launch_coverage.py | 2 +- .../test_parallel_dygraph_dataparallel.py | 2 +- ...t_parallel_dygraph_dataparallel_cpuonly.py | 2 +- .../distributed/models/moe/moe_layer.py | 2 +- python/paddle/tests/test_dist_hapi_model.py | 2 +- python/setup.py.in | 1 + 38 files changed, 491 insertions(+), 395 deletions(-) create mode 100644 python/paddle/distributed/utils/__init__.py rename python/paddle/distributed/{utils.py => utils/launch_utils.py} (55%) create mode 100644 python/paddle/distributed/utils/log_utils.py create mode 100644 python/paddle/distributed/utils/moe_utils.py create mode 100644 python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index bf9773ad940..6c7b2fa7329 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -65,7 +65,6 @@ from .entry_attr import ShowClickEntry # noqa: F401 from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401 from . import cloud_utils # noqa: F401 -from . import utils # noqa: F401 from .sharding import * # noqa: F401 diff --git a/python/paddle/distributed/auto_parallel/converter.py b/python/paddle/distributed/auto_parallel/converter.py index d48d56eeafb..35674be05b0 100644 --- a/python/paddle/distributed/auto_parallel/converter.py +++ b/python/paddle/distributed/auto_parallel/converter.py @@ -16,7 +16,7 @@ import paddle import warnings import logging import numpy as np -from .utils import get_logger +from ..utils.log_utils import get_logger class Converter(object): diff --git a/python/paddle/distributed/auto_parallel/dist_saver.py b/python/paddle/distributed/auto_parallel/dist_saver.py index aef2dcc6b7e..350e5ac44e7 100644 --- a/python/paddle/distributed/auto_parallel/dist_saver.py +++ b/python/paddle/distributed/auto_parallel/dist_saver.py @@ -27,7 +27,7 @@ from paddle.fluid.framework import static_only from .utils import get_dist_attr from .converter import Converter from .process_group import _g_process_group_map -from .utils import get_logger +from ..utils.log_utils import get_logger def check_filename(re_exp, filename): diff --git a/python/paddle/distributed/auto_parallel/parallelizer.py b/python/paddle/distributed/auto_parallel/parallelizer.py index 45aac40db8b..3d739421614 100644 --- a/python/paddle/distributed/auto_parallel/parallelizer.py +++ b/python/paddle/distributed/auto_parallel/parallelizer.py @@ -24,7 +24,7 @@ import pickle import time import paddle from paddle.fluid.backward import append_backward -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger from paddle.distributed.fleet import cloud_utils import paddle.fluid.core as core from paddle.fluid import program_guard diff --git a/python/paddle/distributed/cloud_utils.py b/python/paddle/distributed/cloud_utils.py index a8eedb96a3e..651298d6d76 100644 --- a/python/paddle/distributed/cloud_utils.py +++ b/python/paddle/distributed/cloud_utils.py @@ -14,10 +14,8 @@ import os import paddle -from paddle.distributed.utils import get_cluster -from paddle.distributed.utils import logger -from paddle.distributed.utils import get_gpus -from paddle.distributed.utils import get_cluster_from_args +from paddle.distributed.utils.launch_utils import get_cluster, get_gpus, get_cluster_from_args +from paddle.distributed.utils.launch_utils import logger __all__ = [] diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 987cb3d4d7f..b75d84edf29 100755 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -32,6 +32,7 @@ from .fleet import Fleet from .model import distributed_model from .optimizer import distributed_optimizer from .scaler import distributed_scaler +from .utils import log_util __all__ = [ #noqa "CommunicateTopology", "UtilBase", "HybridCommunicateGroup", @@ -90,5 +91,7 @@ distributed_model = distributed_model shrink = fleet.shrink get_hybrid_communicate_group = fleet.get_hybrid_communicate_group distributed_scaler = distributed_scaler - +set_log_level = log_util.set_log_level +get_log_level_code = log_util.get_log_level_code +get_log_level_name = log_util.get_log_level_name from .. import auto_parallel as auto diff --git a/python/paddle/distributed/fleet/elastic/manager.py b/python/paddle/distributed/fleet/elastic/manager.py index e0a6bd81c8e..451ed76741c 100644 --- a/python/paddle/distributed/fleet/elastic/manager.py +++ b/python/paddle/distributed/fleet/elastic/manager.py @@ -26,13 +26,9 @@ import subprocess from paddle.distributed.fleet import cloud_utils from paddle.distributed.fleet import launch_utils -logger = logging.getLogger("ELASTIC") -logger.setLevel(logging.INFO) -formatter = logging.Formatter( - fmt='%(name)s %(levelname)s %(asctime)s %(message)s') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) +from paddle.distributed.utils.log_utils import get_logger + +logger = get_logger("INFO", "ELASTIC") ELASTIC_EXIT_CODE = 101 ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102 @@ -354,7 +350,7 @@ class ElasticManager(object): stderr=subprocess.PIPE, shell=True).communicate() if err: - logger.warn("pre_hook exec failed") + logger.warning("pre_hook exec failed") else: logger.info(f"pre_hook exec result: {out.decode('utf-8').strip()}") diff --git a/python/paddle/distributed/fleet/fleet.py b/python/paddle/distributed/fleet/fleet.py index 1c1bd57f408..dcf427c3954 100644 --- a/python/paddle/distributed/fleet/fleet.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -13,7 +13,6 @@ # limitations under the License. import copy -import warnings import paddle import os from types import MethodType @@ -32,6 +31,8 @@ from .base import topology as tp from .meta_parallel import model_parallel_random_seed from paddle import _C_ops, _legacy_C_ops from paddle.fluid import core +from .utils.log_util import logger, set_log_level +import logging __all__ = [] @@ -54,7 +55,7 @@ def apply_ir_passes(main_program, startup_program, config): # RawProgramOptimizer also inserts coalesce_tensor # into program. These two procedures may conflict # in which vars are to be fused. - warnings.warn( + logger.warning( 'Currently, the fuse_all_optimizer_ops pass has conflict with fuse_all_reduce_ops pass. Disable the fuse_all_optimizer_ops pass temporarily.' ) build_strategy.fuse_all_optimizer_ops = False @@ -83,7 +84,7 @@ def _is_non_distributed_check_(func): if cls._role_maker is not None and cls._role_maker._is_non_distributed( ) is True: - warnings.warn( + logger.warning( "%s() function doesn't work when use non_distributed fleet." % (func.__name__)) return @@ -165,7 +166,11 @@ class Fleet(object): self._context = {} self.user_defined_optimizer = paddle.optimizer.Optimizer(0.0) - def init(self, role_maker=None, is_collective=False, strategy=None): + def init(self, + role_maker=None, + is_collective=False, + strategy=None, + log_level="INFO"): """ Initialize role_maker in Fleet. @@ -182,6 +187,8 @@ class Fleet(object): GPU.The default value is False.The default value is False. strategy (DistributedStrategy): Extra properties for distributed training. For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None. + log_level (Integer, String, optional): A ``Integer`` or ``String`` Variable determining how hight + the logging level is. Default is "INFO". Returns: @@ -217,7 +224,18 @@ class Fleet(object): strategy = fleet.DistributedStrategy() fleet.init(strategy=strategy) + Examples5: + + .. code-block:: python + + import paddle.distributed.fleet as fleet + strategy = fleet.DistributedStrategy() + fleet.init(log_level = "DEBUG") + """ + + set_log_level(log_level) + if strategy is None: strategy = DistributedStrategy() self._user_defined_strategy = copy.deepcopy(strategy) @@ -261,12 +279,12 @@ class Fleet(object): self._hcg = tp.HybridCommunicateGroup(self._topology) return if parallel_helper._is_parallel_ctx_initialized(): - warnings.warn( + logger.warning( "The dygraph parallel environment has been initialized.") else: # FLAGS_nccl_nrings is used for dynamic graph multi-stream communication if "FLAGS_nccl_nrings" in os.environ: - warnings.warn( + logger.warning( "You have set the environment variable FLAGS_nccl_nrings " "outside the program, so the nccl_comm_num in " "DistributedStrategy will not take effect here.") @@ -281,7 +299,7 @@ class Fleet(object): if tp._HYBRID_PARALLEL_GROUP is None: self._init_hybrid_parallel_env() else: - warnings.warn( + logger.warning( "The dygraph hybrid parallel environment has been initialized." ) elif self._is_collective: @@ -850,9 +868,6 @@ class Fleet(object): fleet.init_server() """ - # warnings.warn( - # "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead." - # ) self._runtime_handle._save_inference_model(executor, dirname, feeded_var_names, @@ -902,10 +917,6 @@ class Fleet(object): fleet.save_persistables(exe, "dirname", paddle.static.default_main_program()) """ - # warnings.warn( - # "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead." - # ) - self._runtime_handle._save_persistables(executor, dirname, main_program, mode) @@ -1015,7 +1026,7 @@ class Fleet(object): if strategy is not None: if self._is_collective: - warnings.warn( + logger.warning( "It is recommended to use DistributedStrategy " "in fleet.init(). The strategy here is only for compatibility. " "If the strategy in fleet.distributed_optimizer() is " @@ -1304,8 +1315,9 @@ class Fleet(object): copy_user_defined_strategy, can_not_apply_optimizer_list) context["valid_strategy"] = copy.deepcopy(valid_strategy) - # print("valid_strategy:", context["valid_strategy"]) - # print("user_defined_strategy:", context["user_defined_strategy"]) + logger.debug("valid_strategy: " + str(context["valid_strategy"])) + logger.debug("user_defined_strategy: " + + str(context["user_defined_strategy"])) applied_meta_list = self.strategy_compiler._get_applied_meta_list() applied_graph_list = self.strategy_compiler._get_applied_graph_list() @@ -1335,17 +1347,19 @@ class Fleet(object): no_grad_set=no_grad_set) if meta_optimizer: - # print("before minimize program id:", id(loss.block.program)) + logger.debug("before minimize program id: " + + str(id(loss.block.program))) optimize_ops, params_grads = meta_optimizer.minimize( loss, startup_program, parameter_list, no_grad_set=no_grad_set) - # print("after minimize program id:", id(loss.block.program)) - + logger.debug("after minimize program id: " + + str(id(loss.block.program))) default_program = paddle.static.default_main_program() - # print("default program id:", id(default_program)) + logger.debug("default program id: " + str(id(default_program))) if id(default_program) != id(loss.block.program): paddle.fluid.framework.switch_main_program(loss.block.program) - # print("default program id after switch:", id(default_program)) + logger.debug("default program id after switch: " + + str(id(default_program))) else: optimize_ops, params_grads = self.user_defined_optimizer.minimize( @@ -1355,7 +1369,8 @@ class Fleet(object): context["program_params_grads"] = params_grads if graph_optimizer: - # print("before graph minimize program id:", id(loss.block.program)) + logger.debug("before graph minimize program id: " + + str(id(loss.block.program))) optimize_ops, params_grads = graph_optimizer.minimize( loss, startup_program, parameter_list, no_grad_set=no_grad_set) # since we do not encourage users to use graph operations @@ -1454,7 +1469,8 @@ class Fleet(object): if v or k not in opt_info: opt_info[k] = v program._fleet_opt = opt_info - # print("fleet base opt info:", id(program), program._fleet_opt) + logger.debug("fleet base opt info: " + str(id(program)) + + str(program._fleet_opt)) if self._runtime_handle is None: self._runtime_handle = RuntimeFactory()._create_runtime(context) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index fcecc3a9a67..ccac803e721 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -30,15 +30,8 @@ from .sharding.prune import ProgramDeps from .sharding import utils # FIXME: import * from .sharding.utils import * - import logging - -logger = logging.getLogger(__name__) -formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) +from ..utils.log_util import logger __all__ = [] diff --git a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py index 905af0487ba..cf1ca83d5f6 100644 --- a/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py +++ b/python/paddle/distributed/fleet/meta_parallel/sharding/group_sharded_stage2.py @@ -33,7 +33,7 @@ from types import MethodType import paddle from paddle import nn from paddle.distributed import collective -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger from .group_sharded_storage import GradStorage from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 diff --git a/python/paddle/distributed/fleet/optimizer.py b/python/paddle/distributed/fleet/optimizer.py index bfc3d737f99..ddad6511a0a 100644 --- a/python/paddle/distributed/fleet/optimizer.py +++ b/python/paddle/distributed/fleet/optimizer.py @@ -13,7 +13,6 @@ # limitations under the License. import copy -import warnings import paddle import os import numpy as np @@ -22,6 +21,7 @@ from .base.distributed_strategy import DistributedStrategy from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer from paddle.fluid import core from paddle.distributed import fleet +from .utils.log_util import logger def _dygraph_distributed_optimizer(optimizer, strategy=None): @@ -52,7 +52,7 @@ def _dygraph_distributed_optimizer(optimizer, strategy=None): if strategy is not None: if fleet_env._is_collective: - warnings.warn( + logger.warning( "It is recommended to use DistributedStrategy " "in fleet_env.init(). The strategy here is only for compatibility. " "If the strategy in fleet_env.distributed_optimizer() is " diff --git a/python/paddle/distributed/fleet/recompute/recompute.py b/python/paddle/distributed/fleet/recompute/recompute.py index 28ded25a0e6..6929ca52cb0 100755 --- a/python/paddle/distributed/fleet/recompute/recompute.py +++ b/python/paddle/distributed/fleet/recompute/recompute.py @@ -22,13 +22,7 @@ import contextlib from paddle.fluid.framework import in_dygraph_mode import logging - -logger = logging.getLogger(__name__) -formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') -ch = logging.StreamHandler() -ch.setFormatter(formatter) -logger.addHandler(ch) +from ..utils.log_util import logger __all__ = [] @@ -49,7 +43,7 @@ def detach_variable(inputs): def check_recompute_necessary(inputs): if not any(input_.stop_gradient == False for input_ in inputs if isinstance(input_, (core.eager.Tensor, paddle.Tensor))): - logger.warn( + logger.warning( "[Recompute]: None of the inputs to current recompute block need grad, " "therefore there is NO need to recompute this block in backward !") diff --git a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py index 6079c49a75a..b7b5bc8608b 100644 --- a/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py +++ b/python/paddle/distributed/fleet/utils/hybrid_parallel_util.py @@ -14,7 +14,6 @@ import os import six import numpy as np -import warnings from paddle import framework import paddle diff --git a/python/paddle/distributed/fleet/utils/log_util.py b/python/paddle/distributed/fleet/utils/log_util.py index cf90527c07f..6118d026447 100644 --- a/python/paddle/distributed/fleet/utils/log_util.py +++ b/python/paddle/distributed/fleet/utils/log_util.py @@ -15,30 +15,50 @@ import logging import sys -__all__ = [] +from paddle.distributed.utils.log_utils import get_logger +logger = get_logger("INFO", __name__) -class LoggerFactory: - @staticmethod - def build_logger(name=None, level=logging.INFO): - assert name is not None, "name for logger should not be None" +def set_log_level(level): + """ + Set log level - formatter = logging.Formatter( - "%(asctime)s-%(levelname)s: " - "[%(filename)s:%(lineno)d:%(funcName)s] %(message)s") + Args: + level (str|int): a specified level - _logger = logging.getLogger(name) - _logger.setLevel(level) - _logger.propagate = False - handler = logging.StreamHandler(stream=sys.stderr) - handler.setFormatter(formatter) - handler.setLevel(level) - _logger.addHandler(handler) - return _logger + Example 1: + import paddle + import paddle.distributed.fleet as fleet + fleet.init() + fleet.setLogLevel("DEBUG") + Example 2: + import paddle + import paddle.distributed.fleet as fleet + fleet.init() + fleet.setLogLevel(1) -logger = LoggerFactory.build_logger(name="HybridParallel", level=logging.INFO) + """ + assert isinstance(level, (str, int)), "level's type must be str or int" + if isinstance(level, int): + logger.setLevel(level) + else: + logger.setLevel(level.upper()) + + +def get_log_level_code(): + """ + Return current log level code + """ + return logger.getEffectiveLevel() + + +def get_log_level_name(): + """ + Return current log level name + """ + return logging.getLevelName(get_log_level_code()) def layer_to_str(base, *args, **kwargs): diff --git a/python/paddle/distributed/metric/metrics.py b/python/paddle/distributed/metric/metrics.py index 08d185efd97..4029734545f 100644 --- a/python/paddle/distributed/metric/metrics.py +++ b/python/paddle/distributed/metric/metrics.py @@ -16,7 +16,7 @@ import sys import yaml import paddle.fluid as fluid import logging -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger __all__ = [] logger = get_logger(logging.INFO, name="metrics") diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 9ebe7fd6031..1d67989c065 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -19,7 +19,7 @@ from enum import Enum import paddle from paddle.optimizer import Optimizer -from paddle.distributed.utils import get_logger +from paddle.distributed.utils.log_utils import get_logger from paddle.fluid.framework import in_dygraph_mode # Old version diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index c0ff2bc273d..b7908213c9b 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -21,9 +21,7 @@ import six import sys import warnings -from paddle.distributed.utils import _print_arguments -from paddle.distributed.utils import _prepare_trainer_env -from paddle.distributed.utils import get_host_name_ip +from paddle.distributed.utils.launch_utils import _print_arguments, _prepare_trainer_env, get_host_name_ip from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num from paddle.distributed.fleet.launch import get_cluster_from_args from paddle.distributed.fleet.cloud_utils import use_paddlecloud diff --git a/python/paddle/distributed/utils/__init__.py b/python/paddle/distributed/utils/__init__.py new file mode 100644 index 00000000000..4ce89fa36b0 --- /dev/null +++ b/python/paddle/distributed/utils/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__all__ = [] diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils/launch_utils.py similarity index 55% rename from python/paddle/distributed/utils.py rename to python/paddle/distributed/utils/launch_utils.py index 6d8454a6e9e..3282b5f58bc 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils/launch_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,287 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools -import logging -import socket import time import os import signal import copy import sys -import six import subprocess from contextlib import closing import socket from paddle.fluid import core -from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag from distutils.util import strtobool +import six -from paddle.fluid.layer_helper import LayerHelper -from paddle.fluid.framework import _non_static_mode -from paddle.fluid.data_feeder import check_variable_and_dtype -from paddle import _C_ops, _legacy_C_ops - -__all__ = [ #noqa - 'get_host_name_ip', - 'Trainer', - 'get_cluster', - 'start_local_trainers', - 'watch_local_trainers', - 'find_free_ports', - 'JobServer', - 'Cluster', - 'Pod', - 'Hdfs', - 'add_arguments', - 'terminate_local_procs', - 'TrainerProc', - 'get_logger', - 'pull_worker_log', - 'global_scatter', - 'global_gather', -] - - -def global_scatter(x, - local_count, - global_count, - group=None, - use_calc_stream=True): - """ - The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count, - and then receives data according to global_count. The expert refers to a user-defined expert network, - n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. - - As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. - The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). - In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, - global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the - figure respresent the rank of the current card in all cards. - - The process of global_scatter sending data is as follows: - - local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card; - - local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card; - - local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card; - - local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card; - - Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; - - the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert; - - the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; - - the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png - :width: 800 - :alt: global_scatter_gather - :align: center - - Args: - x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64. - local_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be sent. The tensor data type should be int64. - global_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be received. The tensor data type should be int64. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. - - Returns: - out (Tensor): The data received from all experts. - - Examples: - .. code-block:: python - - # required: distributed - import numpy as np - import paddle - from paddle.distributed import init_parallel_env - init_parallel_env() - n_expert = 2 - world_size = 2 - d_model = 2 - in_feat = d_model - local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \ - dtype=np.float32) - if paddle.distributed.ParallelEnv().local_rank == 0: - local_count = np.array([2, 1, 1, 1]) - global_count = np.array([2, 1, 1, 1]) - else: - local_count = np.array([1, 1, 2, 1]) - global_count = np.array([1, 1, 2, 1]) - local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) - local_count = paddle.to_tensor(local_count, dtype="int64") - global_count = paddle.to_tensor(global_count, dtype="int64") - a = paddle.distributed.utils.global_scatter(local_input_buf, \ - local_count, global_count) - a.stop_gradient = False - print(a) - # out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]] - # out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]] - # backward test - c = a * a - c.backward() - print("local_input_buf.grad: ", local_input_buf.grad) - # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - """ - if group is not None and not group.is_member(): - return - - ring_id = 0 if group is None else group.id - if _non_static_mode(): - return _legacy_C_ops.global_scatter(x, local_count, \ - global_count, \ - 'use_calc_stream', use_calc_stream, \ - 'ring_id', ring_id) - else: - op_type = 'global_scatter' - check_variable_and_dtype( - x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], - 'global_scatter') - check_variable_and_dtype(local_count, 'local_count', ['int64'], - 'global_scatter') - check_variable_and_dtype(global_count, 'global_count', ['int64'], - 'global_scatter') - - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=x.dtype) - - helper.append_op(type=op_type, - inputs={ - 'X': [x], - 'local_count': [local_count], - 'global_count': [global_count], - }, - outputs={'Out': [out]}, - attrs={ - 'ring_id': ring_id, - 'use_calc_stream': use_calc_stream - }) - return out - - -def global_gather(x, - local_count, - global_count, - group=None, - use_calc_stream=True): - """ - The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count. - The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. - - As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. - The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). - In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, - local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card. - The rank in the figure respresent the rank of the current card in all cards. - - The process of global_gather sending data is as follows: - - The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card; - - The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card; - - The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card; - - The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card. - - .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png - :width: 800 - :alt: global_scatter_gather - :align: center - - - Args: - x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64. - local_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be received. Tensor data type should be int64. - global_count (Tensor): Tensor which have n_expert * world_size elements that indicates - how many data needed to be sent. Tensor data type should be int64. - group (Group, optional): The group instance return by new_group or None for global default group. Default: None. - use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. - - Returns: - out (Tensor): The data received from all experts. - - Examples: - .. code-block:: python - - # required: distributed - import numpy as np - import paddle - from paddle.distributed import init_parallel_env - init_parallel_env() - n_expert = 2 - world_size = 2 - d_model = 2 - in_feat = d_model - local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\ - dtype=np.float32) - if paddle.distributed.ParallelEnv().local_rank == 0: - local_count = np.array([2, 1, 1, 1]) - global_count = np.array([2, 1, 1, 1]) - else: - local_count = np.array([1, 1, 2, 1]) - global_count = np.array([1, 1, 2, 1]) - local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) - local_count = paddle.to_tensor(local_count, dtype="int64") - global_count = paddle.to_tensor(global_count, dtype="int64") - a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count) - print(a) - # out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]] - # out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]] - a.stop_gradient = False - c = a * a - c.backward() - print("local_input_buf.grad", local_input_buf.grad) - # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] - """ - if group is not None and not group.is_member(): - return - - ring_id = 0 if group is None else group.id - if _non_static_mode(): - return _legacy_C_ops.global_gather(x, local_count, \ - global_count, \ - 'use_calc_stream', use_calc_stream, \ - 'ring_id', ring_id) - else: - op_type = 'global_gather' - check_variable_and_dtype( - x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], - 'global_gather') - - check_variable_and_dtype(local_count, 'local_count', ['int64'], - 'global_gather') - - check_variable_and_dtype(global_count, 'global_count', ['int64'], - 'global_gather') - helper = LayerHelper(op_type, **locals()) - out = helper.create_variable_for_type_inference(dtype=x.dtype) - - helper.append_op(type=op_type, - inputs={ - 'X': [x], - 'local_count': [local_count], - 'global_count': [global_count] - }, - outputs={'Out': [out]}, - attrs={ - 'ring_id': group, - 'use_calc_stream': use_calc_stream, - }) - return out - +from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag +from ..utils.log_utils import get_logger -logger = logging.getLogger("root") -logger.propagate = False +logger = get_logger("INFO", "root") def get_cluster_from_args(args, selected_gpus): @@ -354,13 +89,6 @@ def get_gpus(selected_gpus): return gpus -def _print_arguments(args): - print("----------- Configuration Arguments -----------") - for arg, value in sorted(six.iteritems(vars(args))): - print("%s: %s" % (arg, value)) - print("------------------------------------------------") - - class Hdfs(object): def __init__(self): @@ -549,21 +277,6 @@ class Pod(object): return r -def get_logger(log_level, name="root"): - logger = logging.getLogger(name) - # Avoid printing multiple logs - if not logger.handlers: - logger.setLevel(log_level) - - log_handler = logging.StreamHandler() - log_format = logging.Formatter( - '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s') - log_handler.setFormatter(log_format) - logger.addHandler(log_handler) - - return logger - - def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): assert type(trainer_endpoints) is list, "trainer_endpoints must be list" cluster = Cluster(hdfs=None) @@ -826,3 +539,10 @@ def watch_local_trainers(procs, nranks): raise return alive + + +def _print_arguments(args): + print("----------- Configuration Arguments -----------") + for arg, value in sorted(six.iteritems(vars(args))): + print("%s: %s" % (arg, value)) + print("------------------------------------------------") diff --git a/python/paddle/distributed/utils/log_utils.py b/python/paddle/distributed/utils/log_utils.py new file mode 100644 index 00000000000..01687fc28d1 --- /dev/null +++ b/python/paddle/distributed/utils/log_utils.py @@ -0,0 +1,32 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + + +def get_logger(log_level, name="root"): + + logger = logging.getLogger(name) + # Avoid printing multiple logs + if not logger.handlers: + log_handler = logging.StreamHandler() + logger.setLevel(log_level) + log_format = logging.Formatter( + '[%(asctime)-15s] [%(levelname)8s] %(filename)s:%(lineno)s - %(message)s' + ) + log_handler.setFormatter(log_format) + logger.addHandler(log_handler) + else: + logger.setLevel(log_level) + return logger diff --git a/python/paddle/distributed/utils/moe_utils.py b/python/paddle/distributed/utils/moe_utils.py new file mode 100644 index 00000000000..d6dbfdfab58 --- /dev/null +++ b/python/paddle/distributed/utils/moe_utils.py @@ -0,0 +1,255 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.fluid.layer_helper import LayerHelper +from paddle.fluid.framework import _non_static_mode +from paddle.fluid.data_feeder import check_variable_and_dtype +from paddle import _legacy_C_ops + + +def global_scatter(x, + local_count, + global_count, + group=None, + use_calc_stream=True): + """ + The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count, + and then receives data according to global_count. The expert refers to a user-defined expert network, + n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. + + As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. + The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). + In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, + global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the + figure respresent the rank of the current card in all cards. + + The process of global_scatter sending data is as follows: + + local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card; + + local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card; + + local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card; + + local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card; + + Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; + + the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert; + + the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert; + + the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png + :width: 800 + :alt: global_scatter_gather + :align: center + + Args: + x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64. + local_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be sent. The tensor data type should be int64. + global_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be received. The tensor data type should be int64. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. + + Returns: + out (Tensor): The data received from all experts. + + Examples: + .. code-block:: python + + # required: distributed + import numpy as np + import paddle + from paddle.distributed import init_parallel_env + init_parallel_env() + n_expert = 2 + world_size = 2 + d_model = 2 + in_feat = d_model + local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \ + dtype=np.float32) + if paddle.distributed.ParallelEnv().local_rank == 0: + local_count = np.array([2, 1, 1, 1]) + global_count = np.array([2, 1, 1, 1]) + else: + local_count = np.array([1, 1, 2, 1]) + global_count = np.array([1, 1, 2, 1]) + local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) + local_count = paddle.to_tensor(local_count, dtype="int64") + global_count = paddle.to_tensor(global_count, dtype="int64") + a = paddle.distributed.utils.global_scatter(local_input_buf, \ + local_count, global_count) + a.stop_gradient = False + print(a) + # out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]] + # out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]] + # backward test + c = a * a + c.backward() + print("local_input_buf.grad: ", local_input_buf.grad) + # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + """ + if group is not None and not group.is_member(): + return + + ring_id = 0 if group is None else group.id + if _non_static_mode(): + return _legacy_C_ops.global_scatter(x, local_count, \ + global_count, \ + 'use_calc_stream', use_calc_stream, \ + 'ring_id', ring_id) + else: + op_type = 'global_scatter' + check_variable_and_dtype( + x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], + 'global_scatter') + check_variable_and_dtype(local_count, 'local_count', ['int64'], + 'global_scatter') + check_variable_and_dtype(global_count, 'global_count', ['int64'], + 'global_scatter') + + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=x.dtype) + + helper.append_op(type=op_type, + inputs={ + 'X': [x], + 'local_count': [local_count], + 'global_count': [global_count], + }, + outputs={'Out': [out]}, + attrs={ + 'ring_id': ring_id, + 'use_calc_stream': use_calc_stream + }) + return out + + +def global_gather(x, + local_count, + global_count, + group=None, + use_calc_stream=True): + """ + The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count. + The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network. + + As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0]. + The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here). + In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card, + local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card. + The rank in the figure respresent the rank of the current card in all cards. + + The process of global_gather sending data is as follows: + + The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card; + + The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card; + + The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card; + + The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card. + + .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png + :width: 800 + :alt: global_scatter_gather + :align: center + + + Args: + x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64. + local_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be received. Tensor data type should be int64. + global_count (Tensor): Tensor which have n_expert * world_size elements that indicates + how many data needed to be sent. Tensor data type should be int64. + group (Group, optional): The group instance return by new_group or None for global default group. Default: None. + use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True. + + Returns: + out (Tensor): The data received from all experts. + + Examples: + .. code-block:: python + + # required: distributed + import numpy as np + import paddle + from paddle.distributed import init_parallel_env + init_parallel_env() + n_expert = 2 + world_size = 2 + d_model = 2 + in_feat = d_model + local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\ + dtype=np.float32) + if paddle.distributed.ParallelEnv().local_rank == 0: + local_count = np.array([2, 1, 1, 1]) + global_count = np.array([2, 1, 1, 1]) + else: + local_count = np.array([1, 1, 2, 1]) + global_count = np.array([1, 1, 2, 1]) + local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False) + local_count = paddle.to_tensor(local_count, dtype="int64") + global_count = paddle.to_tensor(global_count, dtype="int64") + a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count) + print(a) + # out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]] + # out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]] + a.stop_gradient = False + c = a * a + c.backward() + print("local_input_buf.grad", local_input_buf.grad) + # out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + # out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]] + """ + if group is not None and not group.is_member(): + return + + ring_id = 0 if group is None else group.id + if _non_static_mode(): + return _legacy_C_ops.global_gather(x, local_count, \ + global_count, \ + 'use_calc_stream', use_calc_stream, \ + 'ring_id', ring_id) + else: + op_type = 'global_gather' + check_variable_and_dtype( + x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'], + 'global_gather') + + check_variable_and_dtype(local_count, 'local_count', ['int64'], + 'global_gather') + + check_variable_and_dtype(global_count, 'global_count', ['int64'], + 'global_gather') + helper = LayerHelper(op_type, **locals()) + out = helper.create_variable_for_type_inference(dtype=x.dtype) + + helper.append_op(type=op_type, + inputs={ + 'X': [x], + 'local_count': [local_count], + 'global_count': [global_count] + }, + outputs={'Out': [out]}, + attrs={ + 'ring_id': group, + 'use_calc_stream': use_calc_stream, + }) + return out diff --git a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py index db2510d2beb..01f39a39144 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py @@ -28,7 +28,7 @@ def start_local_trainers(cluster, training_script_args, eager_mode=True, log_dir=None): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc current_env = copy.copy(os.environ.copy()) #paddle broadcast ncclUniqueId use socket, and @@ -84,7 +84,7 @@ def start_local_trainers(cluster, def get_cluster_from_args(selected_gpus): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc cluster_node_ips = '127.0.0.1' node_ip = '127.0.0.1' @@ -108,7 +108,7 @@ def get_cluster_from_args(selected_gpus): class TestMultipleCustomCPU(unittest.TestCase): def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc selected_devices = [0, 1] cluster = None @@ -150,7 +150,7 @@ class TestProcessGroup(TestMultipleCustomCPU): cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') def test_process_group_xccl(self): - from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc self.run_mnist_2custom_cpu('process_group_xccl.py') diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py b/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py index 60909f63211..fd6e8106da7 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_gather.py @@ -24,6 +24,7 @@ import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main import pickle from paddle.fluid.framework import _enable_legacy_dygraph +import paddle.distributed.utils.moe_utils as moe_utils paddle.enable_static() @@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): shape=[tot_expert], dtype="int64") - output = paddle.distributed.utils.global_gather( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_gather(local_input_buf, + local_expert_count, + global_expert_count) return [output] diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py b/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py index 0b264f5ba89..39749b81277 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_gather_dygraph.py @@ -22,6 +22,7 @@ import paddle.fluid as fluid import unittest import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main +import paddle.distributed.utils.moe_utils as moe_utils class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): @@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): in_feat).astype("float32") local_input_buf = paddle.to_tensor(local_input_buf) local_input_buf.stop_gradient = False - output = paddle.distributed.utils.global_gather( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_gather(local_input_buf, + local_expert_count, + global_expert_count) output.stop_gradient = False c = output * output c.stop_gradient = False diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py index c4950025877..dd6245df2ac 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter.py @@ -23,6 +23,7 @@ import unittest import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main import pickle +import paddle.distributed.utils.moe_utils as moe_utils paddle.enable_static() @@ -51,8 +52,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): paddle.split(local_expert_count, 2, axis=0), global_expert_count) global_expert_count = paddle.concat(global_expert_count, axis=0) - output = paddle.distributed.utils.global_scatter( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_scatter(local_input_buf, + local_expert_count, + global_expert_count) return [output] def run_trainer(self, args): diff --git a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py index 82816c899e2..e775bf50eb9 100644 --- a/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py +++ b/python/paddle/fluid/tests/unittests/collective/collective_global_scatter_dygraph.py @@ -22,6 +22,7 @@ import paddle.fluid as fluid import unittest import paddle.fluid.layers as layers from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main +import paddle.distributed.utils.moe_utils as moe_utils class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): @@ -50,8 +51,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): global_expert_count) global_expert_count = paddle.concat(global_expert_count, axis=0) local_input_buf.stop_gradient = False - output = paddle.distributed.utils.global_scatter( - local_input_buf, local_expert_count, global_expert_count) + output = moe_utils.global_scatter(local_input_buf, + local_expert_count, + global_expert_count) output.stop_gradient = False c = output * output c.backward() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt b/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt index d2cc96fd3e1..b47e4b5b530 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/collective/fleet/CMakeLists.txt @@ -938,3 +938,8 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX)) test_fleet_checkpoint PROPERTIES TIMEOUT "200" LABELS "RUN_TYPE=EXCLUSIVE:NIGHTLY") endif() +if(LOCAL_ALL_ARCH AND LOCAL_ALL_PLAT) + py_test_modules( + test_fleet_log MODULES test_fleet_log ENVS + "http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python") +endif() diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py index aefe03b2610..69bcdf56f6d 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/hybrid_parallel_qat.py @@ -26,7 +26,7 @@ from paddle.io import DataLoader, Dataset import unittest import paddle.nn as nn from paddle.fluid.contrib.slim.quantization import ImperativeQuantAware -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def set_random_seed(seed, dp_id, rank_id): diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py new file mode 100644 index 00000000000..03cb281cf37 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_log.py @@ -0,0 +1,43 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +from paddle.distributed import fleet +from paddle.distributed.fleet.utils.log_util import logger +import logging +import unittest + + +class TestFleetLog(unittest.TestCase): + + def setUp(self): + fleet.init(log_level="DEBUG") + + def test_log_level(self): + + # check correctly initialized + assert fleet.get_log_level_code() == logging._nameToLevel["DEBUG"] + assert logger.getEffectiveLevel() == logging._nameToLevel["DEBUG"] + + # test set name + fleet.set_log_level("WARNING") + debug1 = fleet.get_log_level_code() + debug2 = logging._nameToLevel["WARNING"] + assert debug1 == debug2 + + # test set int + fleet.set_log_level(debug2) + + # check the logger is changed + assert logger.getEffectiveLevel() == logging._nameToLevel["WARNING"] diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py index a5b2da46740..b0e981babb6 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_parallel_dygraph_qat.py @@ -22,7 +22,7 @@ import copy import os import subprocess -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def get_cluster_from_args(selected_gpus): diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv b/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv index cdc856a9ada..c7fa5463225 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv +++ b/python/paddle/fluid/tests/unittests/collective/fleet/testslist.csv @@ -82,3 +82,4 @@ test_hdfs1,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_ test_hdfs2,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_hdfs3,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_fleet_checkpoint,LINUX,GPU;ROCM,200,EXCLUSIVE:NIGHTLY,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../.., +test_fleet_log,,,,DIST,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../.., diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index f7593f8bb31..df7d04be319 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -28,7 +28,7 @@ import paddle.fluid as fluid import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet -from paddle.distributed.utils import find_free_ports +from paddle.distributed.utils.launch_utils import find_free_ports paddle.enable_static() diff --git a/python/paddle/fluid/tests/unittests/test_launch_coverage.py b/python/paddle/fluid/tests/unittests/test_launch_coverage.py index e4c35a63471..125b56ec3e6 100644 --- a/python/paddle/fluid/tests/unittests/test_launch_coverage.py +++ b/python/paddle/fluid/tests/unittests/test_launch_coverage.py @@ -23,7 +23,7 @@ import unittest import paddle.fluid as fluid from argparse import ArgumentParser, REMAINDER -from paddle.distributed.utils import _print_arguments, get_gpus, get_cluster_from_args +from paddle.distributed.utils.launch_utils import _print_arguments, get_gpus, get_cluster_from_args from paddle.distributed.fleet.launch_utils import find_free_ports diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py index 1e8aae7226a..29b0b16de38 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py @@ -22,7 +22,7 @@ import copy import os import subprocess -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.fluid.framework import _test_eager_guard diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py index 725d5249f59..4713f6619b9 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel_cpuonly.py @@ -22,7 +22,7 @@ import copy import os import subprocess -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def get_cluster_from_args(selected_gpus): diff --git a/python/paddle/incubate/distributed/models/moe/moe_layer.py b/python/paddle/incubate/distributed/models/moe/moe_layer.py index 086ae2d57d2..e74a712f096 100644 --- a/python/paddle/incubate/distributed/models/moe/moe_layer.py +++ b/python/paddle/incubate/distributed/models/moe/moe_layer.py @@ -26,7 +26,7 @@ import numpy as np import paddle import paddle.nn as nn import paddle.nn.functional as F -from paddle.distributed.utils import global_scatter, global_gather +from paddle.distributed.utils.moe_utils import global_scatter, global_gather from paddle.distributed import alltoall, all_gather from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker diff --git a/python/paddle/tests/test_dist_hapi_model.py b/python/paddle/tests/test_dist_hapi_model.py index 895d2bc0c47..b286e3906a9 100644 --- a/python/paddle/tests/test_dist_hapi_model.py +++ b/python/paddle/tests/test_dist_hapi_model.py @@ -21,7 +21,7 @@ import copy import subprocess import paddle.fluid as fluid -from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc +from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc def get_cluster_from_args(selected_gpus): diff --git a/python/setup.py.in b/python/setup.py.in index 084fd5ac042..b101c5e1d6c 100755 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -287,6 +287,7 @@ packages=['paddle', 'paddle.incubate.asp', 'paddle.incubate.passes', 'paddle.distribution', + 'paddle.distributed.utils', 'paddle.distributed.sharding', 'paddle.distributed.fleet', 'paddle.distributed.launch', -- GitLab