未验证 提交 264ad205 编写于 作者: R Roc 提交者: GitHub

logger manager (#45909)

uniform logger manager in FleetAPI.
hidde API under distributed/utils which users don't need.
上级 8ac3d078
...@@ -65,7 +65,6 @@ from .entry_attr import ShowClickEntry # noqa: F401 ...@@ -65,7 +65,6 @@ from .entry_attr import ShowClickEntry # noqa: F401
from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401 from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401
from . import cloud_utils # noqa: F401 from . import cloud_utils # noqa: F401
from . import utils # noqa: F401
from .sharding import * # noqa: F401 from .sharding import * # noqa: F401
......
...@@ -16,7 +16,7 @@ import paddle ...@@ -16,7 +16,7 @@ import paddle
import warnings import warnings
import logging import logging
import numpy as np import numpy as np
from .utils import get_logger from ..utils.log_utils import get_logger
class Converter(object): class Converter(object):
......
...@@ -27,7 +27,7 @@ from paddle.fluid.framework import static_only ...@@ -27,7 +27,7 @@ from paddle.fluid.framework import static_only
from .utils import get_dist_attr from .utils import get_dist_attr
from .converter import Converter from .converter import Converter
from .process_group import _g_process_group_map from .process_group import _g_process_group_map
from .utils import get_logger from ..utils.log_utils import get_logger
def check_filename(re_exp, filename): def check_filename(re_exp, filename):
......
...@@ -24,7 +24,7 @@ import pickle ...@@ -24,7 +24,7 @@ import pickle
import time import time
import paddle import paddle
from paddle.fluid.backward import append_backward from paddle.fluid.backward import append_backward
from paddle.distributed.utils import get_logger from paddle.distributed.utils.log_utils import get_logger
from paddle.distributed.fleet import cloud_utils from paddle.distributed.fleet import cloud_utils
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid import program_guard from paddle.fluid import program_guard
......
...@@ -14,10 +14,8 @@ ...@@ -14,10 +14,8 @@
import os import os
import paddle import paddle
from paddle.distributed.utils import get_cluster from paddle.distributed.utils.launch_utils import get_cluster, get_gpus, get_cluster_from_args
from paddle.distributed.utils import logger from paddle.distributed.utils.launch_utils import logger
from paddle.distributed.utils import get_gpus
from paddle.distributed.utils import get_cluster_from_args
__all__ = [] __all__ = []
......
...@@ -32,6 +32,7 @@ from .fleet import Fleet ...@@ -32,6 +32,7 @@ from .fleet import Fleet
from .model import distributed_model from .model import distributed_model
from .optimizer import distributed_optimizer from .optimizer import distributed_optimizer
from .scaler import distributed_scaler from .scaler import distributed_scaler
from .utils import log_util
__all__ = [ #noqa __all__ = [ #noqa
"CommunicateTopology", "UtilBase", "HybridCommunicateGroup", "CommunicateTopology", "UtilBase", "HybridCommunicateGroup",
...@@ -90,5 +91,7 @@ distributed_model = distributed_model ...@@ -90,5 +91,7 @@ distributed_model = distributed_model
shrink = fleet.shrink shrink = fleet.shrink
get_hybrid_communicate_group = fleet.get_hybrid_communicate_group get_hybrid_communicate_group = fleet.get_hybrid_communicate_group
distributed_scaler = distributed_scaler distributed_scaler = distributed_scaler
set_log_level = log_util.set_log_level
get_log_level_code = log_util.get_log_level_code
get_log_level_name = log_util.get_log_level_name
from .. import auto_parallel as auto from .. import auto_parallel as auto
...@@ -26,13 +26,9 @@ import subprocess ...@@ -26,13 +26,9 @@ import subprocess
from paddle.distributed.fleet import cloud_utils from paddle.distributed.fleet import cloud_utils
from paddle.distributed.fleet import launch_utils from paddle.distributed.fleet import launch_utils
logger = logging.getLogger("ELASTIC") from paddle.distributed.utils.log_utils import get_logger
logger.setLevel(logging.INFO)
formatter = logging.Formatter( logger = get_logger("INFO", "ELASTIC")
fmt='%(name)s %(levelname)s %(asctime)s %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
ELASTIC_EXIT_CODE = 101 ELASTIC_EXIT_CODE = 101
ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102 ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102
...@@ -354,7 +350,7 @@ class ElasticManager(object): ...@@ -354,7 +350,7 @@ class ElasticManager(object):
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True).communicate() shell=True).communicate()
if err: if err:
logger.warn("pre_hook exec failed") logger.warning("pre_hook exec failed")
else: else:
logger.info(f"pre_hook exec result: {out.decode('utf-8').strip()}") logger.info(f"pre_hook exec result: {out.decode('utf-8').strip()}")
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import copy import copy
import warnings
import paddle import paddle
import os import os
from types import MethodType from types import MethodType
...@@ -32,6 +31,8 @@ from .base import topology as tp ...@@ -32,6 +31,8 @@ from .base import topology as tp
from .meta_parallel import model_parallel_random_seed from .meta_parallel import model_parallel_random_seed
from paddle import _C_ops, _legacy_C_ops from paddle import _C_ops, _legacy_C_ops
from paddle.fluid import core from paddle.fluid import core
from .utils.log_util import logger, set_log_level
import logging
__all__ = [] __all__ = []
...@@ -54,7 +55,7 @@ def apply_ir_passes(main_program, startup_program, config): ...@@ -54,7 +55,7 @@ def apply_ir_passes(main_program, startup_program, config):
# RawProgramOptimizer also inserts coalesce_tensor # RawProgramOptimizer also inserts coalesce_tensor
# into program. These two procedures may conflict # into program. These two procedures may conflict
# in which vars are to be fused. # in which vars are to be fused.
warnings.warn( logger.warning(
'Currently, the fuse_all_optimizer_ops pass has conflict with fuse_all_reduce_ops pass. Disable the fuse_all_optimizer_ops pass temporarily.' 'Currently, the fuse_all_optimizer_ops pass has conflict with fuse_all_reduce_ops pass. Disable the fuse_all_optimizer_ops pass temporarily.'
) )
build_strategy.fuse_all_optimizer_ops = False build_strategy.fuse_all_optimizer_ops = False
...@@ -83,7 +84,7 @@ def _is_non_distributed_check_(func): ...@@ -83,7 +84,7 @@ def _is_non_distributed_check_(func):
if cls._role_maker is not None and cls._role_maker._is_non_distributed( if cls._role_maker is not None and cls._role_maker._is_non_distributed(
) is True: ) is True:
warnings.warn( logger.warning(
"%s() function doesn't work when use non_distributed fleet." % "%s() function doesn't work when use non_distributed fleet." %
(func.__name__)) (func.__name__))
return return
...@@ -165,7 +166,11 @@ class Fleet(object): ...@@ -165,7 +166,11 @@ class Fleet(object):
self._context = {} self._context = {}
self.user_defined_optimizer = paddle.optimizer.Optimizer(0.0) self.user_defined_optimizer = paddle.optimizer.Optimizer(0.0)
def init(self, role_maker=None, is_collective=False, strategy=None): def init(self,
role_maker=None,
is_collective=False,
strategy=None,
log_level="INFO"):
""" """
Initialize role_maker in Fleet. Initialize role_maker in Fleet.
...@@ -183,6 +188,8 @@ class Fleet(object): ...@@ -183,6 +188,8 @@ class Fleet(object):
is False. is False.
strategy (DistributedStrategy): Extra properties for distributed training. strategy (DistributedStrategy): Extra properties for distributed training.
For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None. For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None.
log_level (Integer, String, optional): A ``Integer`` or ``String`` Variable determining how hight
the logging level is. Default is "INFO".
Returns: Returns:
...@@ -218,7 +225,18 @@ class Fleet(object): ...@@ -218,7 +225,18 @@ class Fleet(object):
strategy = fleet.DistributedStrategy() strategy = fleet.DistributedStrategy()
fleet.init(strategy=strategy) fleet.init(strategy=strategy)
Examples5:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
fleet.init(log_level = "DEBUG")
""" """
set_log_level(log_level)
if strategy is None: if strategy is None:
strategy = DistributedStrategy() strategy = DistributedStrategy()
self._user_defined_strategy = copy.deepcopy(strategy) self._user_defined_strategy = copy.deepcopy(strategy)
...@@ -262,12 +280,12 @@ class Fleet(object): ...@@ -262,12 +280,12 @@ class Fleet(object):
self._hcg = tp.HybridCommunicateGroup(self._topology) self._hcg = tp.HybridCommunicateGroup(self._topology)
return return
if parallel_helper._is_parallel_ctx_initialized(): if parallel_helper._is_parallel_ctx_initialized():
warnings.warn( logger.warning(
"The dygraph parallel environment has been initialized.") "The dygraph parallel environment has been initialized.")
else: else:
# FLAGS_nccl_nrings is used for dynamic graph multi-stream communication # FLAGS_nccl_nrings is used for dynamic graph multi-stream communication
if "FLAGS_nccl_nrings" in os.environ: if "FLAGS_nccl_nrings" in os.environ:
warnings.warn( logger.warning(
"You have set the environment variable FLAGS_nccl_nrings " "You have set the environment variable FLAGS_nccl_nrings "
"outside the program, so the nccl_comm_num in " "outside the program, so the nccl_comm_num in "
"DistributedStrategy will not take effect here.") "DistributedStrategy will not take effect here.")
...@@ -282,7 +300,7 @@ class Fleet(object): ...@@ -282,7 +300,7 @@ class Fleet(object):
if tp._HYBRID_PARALLEL_GROUP is None: if tp._HYBRID_PARALLEL_GROUP is None:
self._init_hybrid_parallel_env() self._init_hybrid_parallel_env()
else: else:
warnings.warn( logger.warning(
"The dygraph hybrid parallel environment has been initialized." "The dygraph hybrid parallel environment has been initialized."
) )
elif self._is_collective: elif self._is_collective:
...@@ -851,9 +869,6 @@ class Fleet(object): ...@@ -851,9 +869,6 @@ class Fleet(object):
fleet.init_server() fleet.init_server()
""" """
# warnings.warn(
# "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )
self._runtime_handle._save_inference_model(executor, dirname, self._runtime_handle._save_inference_model(executor, dirname,
feeded_var_names, feeded_var_names,
...@@ -903,10 +918,6 @@ class Fleet(object): ...@@ -903,10 +918,6 @@ class Fleet(object):
fleet.save_persistables(exe, "dirname", paddle.static.default_main_program()) fleet.save_persistables(exe, "dirname", paddle.static.default_main_program())
""" """
# warnings.warn(
# "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )
self._runtime_handle._save_persistables(executor, dirname, main_program, self._runtime_handle._save_persistables(executor, dirname, main_program,
mode) mode)
...@@ -1016,7 +1027,7 @@ class Fleet(object): ...@@ -1016,7 +1027,7 @@ class Fleet(object):
if strategy is not None: if strategy is not None:
if self._is_collective: if self._is_collective:
warnings.warn( logger.warning(
"It is recommended to use DistributedStrategy " "It is recommended to use DistributedStrategy "
"in fleet.init(). The strategy here is only for compatibility. " "in fleet.init(). The strategy here is only for compatibility. "
"If the strategy in fleet.distributed_optimizer() is " "If the strategy in fleet.distributed_optimizer() is "
...@@ -1305,8 +1316,9 @@ class Fleet(object): ...@@ -1305,8 +1316,9 @@ class Fleet(object):
copy_user_defined_strategy, can_not_apply_optimizer_list) copy_user_defined_strategy, can_not_apply_optimizer_list)
context["valid_strategy"] = copy.deepcopy(valid_strategy) context["valid_strategy"] = copy.deepcopy(valid_strategy)
# print("valid_strategy:", context["valid_strategy"]) logger.debug("valid_strategy: " + str(context["valid_strategy"]))
# print("user_defined_strategy:", context["user_defined_strategy"]) logger.debug("user_defined_strategy: " +
str(context["user_defined_strategy"]))
applied_meta_list = self.strategy_compiler._get_applied_meta_list() applied_meta_list = self.strategy_compiler._get_applied_meta_list()
applied_graph_list = self.strategy_compiler._get_applied_graph_list() applied_graph_list = self.strategy_compiler._get_applied_graph_list()
...@@ -1336,17 +1348,19 @@ class Fleet(object): ...@@ -1336,17 +1348,19 @@ class Fleet(object):
no_grad_set=no_grad_set) no_grad_set=no_grad_set)
if meta_optimizer: if meta_optimizer:
# print("before minimize program id:", id(loss.block.program)) logger.debug("before minimize program id: " +
str(id(loss.block.program)))
optimize_ops, params_grads = meta_optimizer.minimize( optimize_ops, params_grads = meta_optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set=no_grad_set) loss, startup_program, parameter_list, no_grad_set=no_grad_set)
# print("after minimize program id:", id(loss.block.program)) logger.debug("after minimize program id: " +
str(id(loss.block.program)))
default_program = paddle.static.default_main_program() default_program = paddle.static.default_main_program()
# print("default program id:", id(default_program)) logger.debug("default program id: " + str(id(default_program)))
if id(default_program) != id(loss.block.program): if id(default_program) != id(loss.block.program):
paddle.fluid.framework.switch_main_program(loss.block.program) paddle.fluid.framework.switch_main_program(loss.block.program)
# print("default program id after switch:", id(default_program)) logger.debug("default program id after switch: " +
str(id(default_program)))
else: else:
optimize_ops, params_grads = self.user_defined_optimizer.minimize( optimize_ops, params_grads = self.user_defined_optimizer.minimize(
...@@ -1356,7 +1370,8 @@ class Fleet(object): ...@@ -1356,7 +1370,8 @@ class Fleet(object):
context["program_params_grads"] = params_grads context["program_params_grads"] = params_grads
if graph_optimizer: if graph_optimizer:
# print("before graph minimize program id:", id(loss.block.program)) logger.debug("before graph minimize program id: " +
str(id(loss.block.program)))
optimize_ops, params_grads = graph_optimizer.minimize( optimize_ops, params_grads = graph_optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set=no_grad_set) loss, startup_program, parameter_list, no_grad_set=no_grad_set)
# since we do not encourage users to use graph operations # since we do not encourage users to use graph operations
...@@ -1455,7 +1470,8 @@ class Fleet(object): ...@@ -1455,7 +1470,8 @@ class Fleet(object):
if v or k not in opt_info: if v or k not in opt_info:
opt_info[k] = v opt_info[k] = v
program._fleet_opt = opt_info program._fleet_opt = opt_info
# print("fleet base opt info:", id(program), program._fleet_opt) logger.debug("fleet base opt info: " + str(id(program)) +
str(program._fleet_opt))
if self._runtime_handle is None: if self._runtime_handle is None:
self._runtime_handle = RuntimeFactory()._create_runtime(context) self._runtime_handle = RuntimeFactory()._create_runtime(context)
......
...@@ -30,15 +30,8 @@ from .sharding.prune import ProgramDeps ...@@ -30,15 +30,8 @@ from .sharding.prune import ProgramDeps
from .sharding import utils from .sharding import utils
# FIXME: import * # FIXME: import *
from .sharding.utils import * from .sharding.utils import *
import logging import logging
from ..utils.log_util import logger
logger = logging.getLogger(__name__)
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
__all__ = [] __all__ = []
......
...@@ -33,7 +33,7 @@ from types import MethodType ...@@ -33,7 +33,7 @@ from types import MethodType
import paddle import paddle
from paddle import nn from paddle import nn
from paddle.distributed import collective from paddle.distributed import collective
from paddle.distributed.utils import get_logger from paddle.distributed.utils.log_utils import get_logger
from .group_sharded_storage import GradStorage from .group_sharded_storage import GradStorage
from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2 from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
import copy import copy
import warnings
import paddle import paddle
import os import os
import numpy as np import numpy as np
...@@ -22,6 +21,7 @@ from .base.distributed_strategy import DistributedStrategy ...@@ -22,6 +21,7 @@ from .base.distributed_strategy import DistributedStrategy
from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer
from paddle.fluid import core from paddle.fluid import core
from paddle.distributed import fleet from paddle.distributed import fleet
from .utils.log_util import logger
def _dygraph_distributed_optimizer(optimizer, strategy=None): def _dygraph_distributed_optimizer(optimizer, strategy=None):
...@@ -52,7 +52,7 @@ def _dygraph_distributed_optimizer(optimizer, strategy=None): ...@@ -52,7 +52,7 @@ def _dygraph_distributed_optimizer(optimizer, strategy=None):
if strategy is not None: if strategy is not None:
if fleet_env._is_collective: if fleet_env._is_collective:
warnings.warn( logger.warning(
"It is recommended to use DistributedStrategy " "It is recommended to use DistributedStrategy "
"in fleet_env.init(). The strategy here is only for compatibility. " "in fleet_env.init(). The strategy here is only for compatibility. "
"If the strategy in fleet_env.distributed_optimizer() is " "If the strategy in fleet_env.distributed_optimizer() is "
......
...@@ -22,13 +22,7 @@ import contextlib ...@@ -22,13 +22,7 @@ import contextlib
from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.framework import in_dygraph_mode
import logging import logging
from ..utils.log_util import logger
logger = logging.getLogger(__name__)
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
__all__ = [] __all__ = []
...@@ -49,7 +43,7 @@ def detach_variable(inputs): ...@@ -49,7 +43,7 @@ def detach_variable(inputs):
def check_recompute_necessary(inputs): def check_recompute_necessary(inputs):
if not any(input_.stop_gradient == False for input_ in inputs if not any(input_.stop_gradient == False for input_ in inputs
if isinstance(input_, (core.eager.Tensor, paddle.Tensor))): if isinstance(input_, (core.eager.Tensor, paddle.Tensor))):
logger.warn( logger.warning(
"[Recompute]: None of the inputs to current recompute block need grad, " "[Recompute]: None of the inputs to current recompute block need grad, "
"therefore there is NO need to recompute this block in backward !") "therefore there is NO need to recompute this block in backward !")
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
import os import os
import six import six
import numpy as np import numpy as np
import warnings
from paddle import framework from paddle import framework
import paddle import paddle
......
...@@ -15,30 +15,50 @@ ...@@ -15,30 +15,50 @@
import logging import logging
import sys import sys
__all__ = [] from paddle.distributed.utils.log_utils import get_logger
logger = get_logger("INFO", __name__)
class LoggerFactory:
@staticmethod def set_log_level(level):
def build_logger(name=None, level=logging.INFO): """
assert name is not None, "name for logger should not be None" Set log level
formatter = logging.Formatter( Args:
"%(asctime)s-%(levelname)s: " level (str|int): a specified level
"[%(filename)s:%(lineno)d:%(funcName)s] %(message)s")
_logger = logging.getLogger(name) Example 1:
_logger.setLevel(level) import paddle
_logger.propagate = False import paddle.distributed.fleet as fleet
handler = logging.StreamHandler(stream=sys.stderr) fleet.init()
handler.setFormatter(formatter) fleet.setLogLevel("DEBUG")
handler.setLevel(level)
_logger.addHandler(handler)
return _logger
Example 2:
import paddle
import paddle.distributed.fleet as fleet
fleet.init()
fleet.setLogLevel(1)
logger = LoggerFactory.build_logger(name="HybridParallel", level=logging.INFO) """
assert isinstance(level, (str, int)), "level's type must be str or int"
if isinstance(level, int):
logger.setLevel(level)
else:
logger.setLevel(level.upper())
def get_log_level_code():
"""
Return current log level code
"""
return logger.getEffectiveLevel()
def get_log_level_name():
"""
Return current log level name
"""
return logging.getLevelName(get_log_level_code())
def layer_to_str(base, *args, **kwargs): def layer_to_str(base, *args, **kwargs):
......
...@@ -16,7 +16,7 @@ import sys ...@@ -16,7 +16,7 @@ import sys
import yaml import yaml
import paddle.fluid as fluid import paddle.fluid as fluid
import logging import logging
from paddle.distributed.utils import get_logger from paddle.distributed.utils.log_utils import get_logger
__all__ = [] __all__ = []
logger = get_logger(logging.INFO, name="metrics") logger = get_logger(logging.INFO, name="metrics")
......
...@@ -19,7 +19,7 @@ from enum import Enum ...@@ -19,7 +19,7 @@ from enum import Enum
import paddle import paddle
from paddle.optimizer import Optimizer from paddle.optimizer import Optimizer
from paddle.distributed.utils import get_logger from paddle.distributed.utils.log_utils import get_logger
from paddle.fluid.framework import in_dygraph_mode from paddle.fluid.framework import in_dygraph_mode
# Old version # Old version
......
...@@ -21,9 +21,7 @@ import six ...@@ -21,9 +21,7 @@ import six
import sys import sys
import warnings import warnings
from paddle.distributed.utils import _print_arguments from paddle.distributed.utils.launch_utils import _print_arguments, _prepare_trainer_env, get_host_name_ip
from paddle.distributed.utils import _prepare_trainer_env
from paddle.distributed.utils import get_host_name_ip
from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num
from paddle.distributed.fleet.launch import get_cluster_from_args from paddle.distributed.fleet.launch import get_cluster_from_args
from paddle.distributed.fleet.cloud_utils import use_paddlecloud from paddle.distributed.fleet.cloud_utils import use_paddlecloud
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = []
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -12,287 +12,22 @@ ...@@ -12,287 +12,22 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools
import logging
import socket
import time import time
import os import os
import signal import signal
import copy import copy
import sys import sys
import six
import subprocess import subprocess
from contextlib import closing from contextlib import closing
import socket import socket
from paddle.fluid import core from paddle.fluid import core
from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag
from distutils.util import strtobool from distutils.util import strtobool
import six
from paddle.fluid.layer_helper import LayerHelper from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag
from paddle.fluid.framework import _non_static_mode from ..utils.log_utils import get_logger
from paddle.fluid.data_feeder import check_variable_and_dtype
from paddle import _C_ops, _legacy_C_ops
__all__ = [ #noqa
'get_host_name_ip',
'Trainer',
'get_cluster',
'start_local_trainers',
'watch_local_trainers',
'find_free_ports',
'JobServer',
'Cluster',
'Pod',
'Hdfs',
'add_arguments',
'terminate_local_procs',
'TrainerProc',
'get_logger',
'pull_worker_log',
'global_scatter',
'global_gather',
]
def global_scatter(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count,
and then receives data according to global_count. The expert refers to a user-defined expert network,
n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the
figure respresent the rank of the current card in all cards.
The process of global_scatter sending data is as follows:
local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card;
local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card;
local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card;
local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card;
Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert;
the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. The tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. The tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_scatter(local_input_buf, \
local_count, global_count)
a.stop_gradient = False
print(a)
# out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]]
# out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]]
# backward test
c = a * a
c.backward()
print("local_input_buf.grad: ", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_scatter(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_scatter'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_scatter')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_scatter')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_scatter')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count],
},
outputs={'Out': [out]},
attrs={
'ring_id': ring_id,
'use_calc_stream': use_calc_stream
})
return out
def global_gather(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count.
The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card.
The rank in the figure respresent the rank of the current card in all cards.
The process of global_gather sending data is as follows:
The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card;
The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. Tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. Tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count)
print(a)
# out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]]
# out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]]
a.stop_gradient = False
c = a * a
c.backward()
print("local_input_buf.grad", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_gather(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_gather'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_gather')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_gather')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_gather')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count]
},
outputs={'Out': [out]},
attrs={
'ring_id': group,
'use_calc_stream': use_calc_stream,
})
return out
logger = logging.getLogger("root") logger = get_logger("INFO", "root")
logger.propagate = False
def get_cluster_from_args(args, selected_gpus): def get_cluster_from_args(args, selected_gpus):
...@@ -354,13 +89,6 @@ def get_gpus(selected_gpus): ...@@ -354,13 +89,6 @@ def get_gpus(selected_gpus):
return gpus return gpus
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
class Hdfs(object): class Hdfs(object):
def __init__(self): def __init__(self):
...@@ -549,21 +277,6 @@ class Pod(object): ...@@ -549,21 +277,6 @@ class Pod(object):
return r return r
def get_logger(log_level, name="root"):
logger = logging.getLogger(name)
# Avoid printing multiple logs
if not logger.handlers:
logger.setLevel(log_level)
log_handler = logging.StreamHandler()
log_format = logging.Formatter(
'%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s')
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
return logger
def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list" assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
cluster = Cluster(hdfs=None) cluster = Cluster(hdfs=None)
...@@ -826,3 +539,10 @@ def watch_local_trainers(procs, nranks): ...@@ -826,3 +539,10 @@ def watch_local_trainers(procs, nranks):
raise raise
return alive return alive
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
def get_logger(log_level, name="root"):
logger = logging.getLogger(name)
# Avoid printing multiple logs
if not logger.handlers:
log_handler = logging.StreamHandler()
logger.setLevel(log_level)
log_format = logging.Formatter(
'[%(asctime)-15s] [%(levelname)8s] %(filename)s:%(lineno)s - %(message)s'
)
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
else:
logger.setLevel(log_level)
return logger
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.framework import _non_static_mode
from paddle.fluid.data_feeder import check_variable_and_dtype
from paddle import _legacy_C_ops
def global_scatter(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count,
and then receives data according to global_count. The expert refers to a user-defined expert network,
n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the
figure respresent the rank of the current card in all cards.
The process of global_scatter sending data is as follows:
local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card;
local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card;
local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card;
local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card;
Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert;
the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. The tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. The tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_scatter(local_input_buf, \
local_count, global_count)
a.stop_gradient = False
print(a)
# out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]]
# out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]]
# backward test
c = a * a
c.backward()
print("local_input_buf.grad: ", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_scatter(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_scatter'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_scatter')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_scatter')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_scatter')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count],
},
outputs={'Out': [out]},
attrs={
'ring_id': ring_id,
'use_calc_stream': use_calc_stream
})
return out
def global_gather(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count.
The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card.
The rank in the figure respresent the rank of the current card in all cards.
The process of global_gather sending data is as follows:
The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card;
The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. Tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. Tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count)
print(a)
# out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]]
# out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]]
a.stop_gradient = False
c = a * a
c.backward()
print("local_input_buf.grad", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_gather(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_gather'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_gather')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_gather')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_gather')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count]
},
outputs={'Out': [out]},
attrs={
'ring_id': group,
'use_calc_stream': use_calc_stream,
})
return out
...@@ -28,7 +28,7 @@ def start_local_trainers(cluster, ...@@ -28,7 +28,7 @@ def start_local_trainers(cluster,
training_script_args, training_script_args,
eager_mode=True, eager_mode=True,
log_dir=None): log_dir=None):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
current_env = copy.copy(os.environ.copy()) current_env = copy.copy(os.environ.copy())
#paddle broadcast ncclUniqueId use socket, and #paddle broadcast ncclUniqueId use socket, and
...@@ -84,7 +84,7 @@ def start_local_trainers(cluster, ...@@ -84,7 +84,7 @@ def start_local_trainers(cluster,
def get_cluster_from_args(selected_gpus): def get_cluster_from_args(selected_gpus):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
cluster_node_ips = '127.0.0.1' cluster_node_ips = '127.0.0.1'
node_ip = '127.0.0.1' node_ip = '127.0.0.1'
...@@ -108,7 +108,7 @@ def get_cluster_from_args(selected_gpus): ...@@ -108,7 +108,7 @@ def get_cluster_from_args(selected_gpus):
class TestMultipleCustomCPU(unittest.TestCase): class TestMultipleCustomCPU(unittest.TestCase):
def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True): def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
selected_devices = [0, 1] selected_devices = [0, 1]
cluster = None cluster = None
...@@ -150,7 +150,7 @@ class TestProcessGroup(TestMultipleCustomCPU): ...@@ -150,7 +150,7 @@ class TestProcessGroup(TestMultipleCustomCPU):
cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build')
def test_process_group_xccl(self): def test_process_group_xccl(self):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
self.run_mnist_2custom_cpu('process_group_xccl.py') self.run_mnist_2custom_cpu('process_group_xccl.py')
......
...@@ -24,6 +24,7 @@ import paddle.fluid.layers as layers ...@@ -24,6 +24,7 @@ import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import pickle import pickle
from paddle.fluid.framework import _enable_legacy_dygraph from paddle.fluid.framework import _enable_legacy_dygraph
import paddle.distributed.utils.moe_utils as moe_utils
paddle.enable_static() paddle.enable_static()
...@@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): ...@@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
shape=[tot_expert], shape=[tot_expert],
dtype="int64") dtype="int64")
output = paddle.distributed.utils.global_gather( output = moe_utils.global_gather(local_input_buf,
local_input_buf, local_expert_count, global_expert_count) local_expert_count,
global_expert_count)
return [output] return [output]
......
...@@ -22,6 +22,7 @@ import paddle.fluid as fluid ...@@ -22,6 +22,7 @@ import paddle.fluid as fluid
import unittest import unittest
import paddle.fluid.layers as layers import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle.distributed.utils.moe_utils as moe_utils
class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
...@@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase): ...@@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
in_feat).astype("float32") in_feat).astype("float32")
local_input_buf = paddle.to_tensor(local_input_buf) local_input_buf = paddle.to_tensor(local_input_buf)
local_input_buf.stop_gradient = False local_input_buf.stop_gradient = False
output = paddle.distributed.utils.global_gather( output = moe_utils.global_gather(local_input_buf,
local_input_buf, local_expert_count, global_expert_count) local_expert_count,
global_expert_count)
output.stop_gradient = False output.stop_gradient = False
c = output * output c = output * output
c.stop_gradient = False c.stop_gradient = False
......
...@@ -23,6 +23,7 @@ import unittest ...@@ -23,6 +23,7 @@ import unittest
import paddle.fluid.layers as layers import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import pickle import pickle
import paddle.distributed.utils.moe_utils as moe_utils
paddle.enable_static() paddle.enable_static()
...@@ -51,8 +52,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): ...@@ -51,8 +52,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
paddle.split(local_expert_count, 2, axis=0), paddle.split(local_expert_count, 2, axis=0),
global_expert_count) global_expert_count)
global_expert_count = paddle.concat(global_expert_count, axis=0) global_expert_count = paddle.concat(global_expert_count, axis=0)
output = paddle.distributed.utils.global_scatter( output = moe_utils.global_scatter(local_input_buf,
local_input_buf, local_expert_count, global_expert_count) local_expert_count,
global_expert_count)
return [output] return [output]
def run_trainer(self, args): def run_trainer(self, args):
......
...@@ -22,6 +22,7 @@ import paddle.fluid as fluid ...@@ -22,6 +22,7 @@ import paddle.fluid as fluid
import unittest import unittest
import paddle.fluid.layers as layers import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle.distributed.utils.moe_utils as moe_utils
class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
...@@ -50,8 +51,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase): ...@@ -50,8 +51,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
global_expert_count) global_expert_count)
global_expert_count = paddle.concat(global_expert_count, axis=0) global_expert_count = paddle.concat(global_expert_count, axis=0)
local_input_buf.stop_gradient = False local_input_buf.stop_gradient = False
output = paddle.distributed.utils.global_scatter( output = moe_utils.global_scatter(local_input_buf,
local_input_buf, local_expert_count, global_expert_count) local_expert_count,
global_expert_count)
output.stop_gradient = False output.stop_gradient = False
c = output * output c = output * output
c.backward() c.backward()
......
...@@ -938,3 +938,8 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX)) ...@@ -938,3 +938,8 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_fleet_checkpoint PROPERTIES TIMEOUT "200" LABELS test_fleet_checkpoint PROPERTIES TIMEOUT "200" LABELS
"RUN_TYPE=EXCLUSIVE:NIGHTLY") "RUN_TYPE=EXCLUSIVE:NIGHTLY")
endif() endif()
if(LOCAL_ALL_ARCH AND LOCAL_ALL_PLAT)
py_test_modules(
test_fleet_log MODULES test_fleet_log ENVS
"http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python")
endif()
...@@ -26,7 +26,7 @@ from paddle.io import DataLoader, Dataset ...@@ -26,7 +26,7 @@ from paddle.io import DataLoader, Dataset
import unittest import unittest
import paddle.nn as nn import paddle.nn as nn
from paddle.fluid.contrib.slim.quantization import ImperativeQuantAware from paddle.fluid.contrib.slim.quantization import ImperativeQuantAware
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def set_random_seed(seed, dp_id, rank_id): def set_random_seed(seed, dp_id, rank_id):
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
from paddle.distributed import fleet
from paddle.distributed.fleet.utils.log_util import logger
import logging
import unittest
class TestFleetLog(unittest.TestCase):
def setUp(self):
fleet.init(log_level="DEBUG")
def test_log_level(self):
# check correctly initialized
assert fleet.get_log_level_code() == logging._nameToLevel["DEBUG"]
assert logger.getEffectiveLevel() == logging._nameToLevel["DEBUG"]
# test set name
fleet.set_log_level("WARNING")
debug1 = fleet.get_log_level_code()
debug2 = logging._nameToLevel["WARNING"]
assert debug1 == debug2
# test set int
fleet.set_log_level(debug2)
# check the logger is changed
assert logger.getEffectiveLevel() == logging._nameToLevel["WARNING"]
...@@ -22,7 +22,7 @@ import copy ...@@ -22,7 +22,7 @@ import copy
import os import os
import subprocess import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus): def get_cluster_from_args(selected_gpus):
......
...@@ -82,3 +82,4 @@ test_hdfs1,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_ ...@@ -82,3 +82,4 @@ test_hdfs1,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_
test_hdfs2,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_hdfs2,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_hdfs3,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_hdfs3,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_fleet_checkpoint,LINUX,GPU;ROCM,200,EXCLUSIVE:NIGHTLY,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../.., test_fleet_checkpoint,LINUX,GPU;ROCM,200,EXCLUSIVE:NIGHTLY,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_fleet_log,,,,DIST,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../..,
...@@ -28,7 +28,7 @@ import paddle.fluid as fluid ...@@ -28,7 +28,7 @@ import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
from paddle.distributed.utils import find_free_ports from paddle.distributed.utils.launch_utils import find_free_ports
paddle.enable_static() paddle.enable_static()
......
...@@ -23,7 +23,7 @@ import unittest ...@@ -23,7 +23,7 @@ import unittest
import paddle.fluid as fluid import paddle.fluid as fluid
from argparse import ArgumentParser, REMAINDER from argparse import ArgumentParser, REMAINDER
from paddle.distributed.utils import _print_arguments, get_gpus, get_cluster_from_args from paddle.distributed.utils.launch_utils import _print_arguments, get_gpus, get_cluster_from_args
from paddle.distributed.fleet.launch_utils import find_free_ports from paddle.distributed.fleet.launch_utils import find_free_ports
......
...@@ -22,7 +22,7 @@ import copy ...@@ -22,7 +22,7 @@ import copy
import os import os
import subprocess import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.fluid.framework import _test_eager_guard from paddle.fluid.framework import _test_eager_guard
......
...@@ -22,7 +22,7 @@ import copy ...@@ -22,7 +22,7 @@ import copy
import os import os
import subprocess import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus): def get_cluster_from_args(selected_gpus):
......
...@@ -26,7 +26,7 @@ import numpy as np ...@@ -26,7 +26,7 @@ import numpy as np
import paddle import paddle
import paddle.nn as nn import paddle.nn as nn
import paddle.nn.functional as F import paddle.nn.functional as F
from paddle.distributed.utils import global_scatter, global_gather from paddle.distributed.utils.moe_utils import global_scatter, global_gather
from paddle.distributed import alltoall, all_gather from paddle.distributed import alltoall, all_gather
from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker
......
...@@ -21,7 +21,7 @@ import copy ...@@ -21,7 +21,7 @@ import copy
import subprocess import subprocess
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus): def get_cluster_from_args(selected_gpus):
......
...@@ -287,6 +287,7 @@ packages=['paddle', ...@@ -287,6 +287,7 @@ packages=['paddle',
'paddle.incubate.asp', 'paddle.incubate.asp',
'paddle.incubate.passes', 'paddle.incubate.passes',
'paddle.distribution', 'paddle.distribution',
'paddle.distributed.utils',
'paddle.distributed.sharding', 'paddle.distributed.sharding',
'paddle.distributed.fleet', 'paddle.distributed.fleet',
'paddle.distributed.launch', 'paddle.distributed.launch',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册