未验证 提交 7eb046c7 编写于 作者: R Roc 提交者: GitHub

logger manager (#45909) (#46087)

uniform logger manager in FleetAPI.
hidde API under distributed/utils which users don't need.
上级 372505be
......@@ -65,7 +65,6 @@ from .entry_attr import ShowClickEntry # noqa: F401
from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401
from . import cloud_utils # noqa: F401
from . import utils # noqa: F401
from .sharding import * # noqa: F401
......
......@@ -16,7 +16,7 @@ import paddle
import warnings
import logging
import numpy as np
from .utils import get_logger
from ..utils.log_utils import get_logger
class Converter(object):
......
......@@ -27,7 +27,7 @@ from paddle.fluid.framework import static_only
from .utils import get_dist_attr
from .converter import Converter
from .process_group import _g_process_group_map
from .utils import get_logger
from ..utils.log_utils import get_logger
def check_filename(re_exp, filename):
......
......@@ -24,7 +24,7 @@ import pickle
import time
import paddle
from paddle.fluid.backward import append_backward
from paddle.distributed.utils import get_logger
from paddle.distributed.utils.log_utils import get_logger
from paddle.distributed.fleet import cloud_utils
import paddle.fluid.core as core
from paddle.fluid import program_guard
......
......@@ -14,10 +14,8 @@
import os
import paddle
from paddle.distributed.utils import get_cluster
from paddle.distributed.utils import logger
from paddle.distributed.utils import get_gpus
from paddle.distributed.utils import get_cluster_from_args
from paddle.distributed.utils.launch_utils import get_cluster, get_gpus, get_cluster_from_args
from paddle.distributed.utils.launch_utils import logger
__all__ = []
......
......@@ -32,6 +32,7 @@ from .fleet import Fleet
from .model import distributed_model
from .optimizer import distributed_optimizer
from .scaler import distributed_scaler
from .utils import log_util
__all__ = [ #noqa
"CommunicateTopology", "UtilBase", "HybridCommunicateGroup",
......@@ -90,5 +91,7 @@ distributed_model = distributed_model
shrink = fleet.shrink
get_hybrid_communicate_group = fleet.get_hybrid_communicate_group
distributed_scaler = distributed_scaler
set_log_level = log_util.set_log_level
get_log_level_code = log_util.get_log_level_code
get_log_level_name = log_util.get_log_level_name
from .. import auto_parallel as auto
......@@ -26,13 +26,9 @@ import subprocess
from paddle.distributed.fleet import cloud_utils
from paddle.distributed.fleet import launch_utils
logger = logging.getLogger("ELASTIC")
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
fmt='%(name)s %(levelname)s %(asctime)s %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
from paddle.distributed.utils.log_utils import get_logger
logger = get_logger("INFO", "ELASTIC")
ELASTIC_EXIT_CODE = 101
ELASTIC_AUTO_PARALLEL_EXIT_CODE = 102
......@@ -354,7 +350,7 @@ class ElasticManager(object):
stderr=subprocess.PIPE,
shell=True).communicate()
if err:
logger.warn("pre_hook exec failed")
logger.warning("pre_hook exec failed")
else:
logger.info(f"pre_hook exec result: {out.decode('utf-8').strip()}")
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import copy
import warnings
import paddle
import os
from types import MethodType
......@@ -32,6 +31,8 @@ from .base import topology as tp
from .meta_parallel import model_parallel_random_seed
from paddle import _C_ops, _legacy_C_ops
from paddle.fluid import core
from .utils.log_util import logger, set_log_level
import logging
__all__ = []
......@@ -54,7 +55,7 @@ def apply_ir_passes(main_program, startup_program, config):
# RawProgramOptimizer also inserts coalesce_tensor
# into program. These two procedures may conflict
# in which vars are to be fused.
warnings.warn(
logger.warning(
'Currently, the fuse_all_optimizer_ops pass has conflict with fuse_all_reduce_ops pass. Disable the fuse_all_optimizer_ops pass temporarily.'
)
build_strategy.fuse_all_optimizer_ops = False
......@@ -83,7 +84,7 @@ def _is_non_distributed_check_(func):
if cls._role_maker is not None and cls._role_maker._is_non_distributed(
) is True:
warnings.warn(
logger.warning(
"%s() function doesn't work when use non_distributed fleet." %
(func.__name__))
return
......@@ -165,7 +166,11 @@ class Fleet(object):
self._context = {}
self.user_defined_optimizer = paddle.optimizer.Optimizer(0.0)
def init(self, role_maker=None, is_collective=False, strategy=None):
def init(self,
role_maker=None,
is_collective=False,
strategy=None,
log_level="INFO"):
"""
Initialize role_maker in Fleet.
......@@ -182,6 +187,8 @@ class Fleet(object):
GPU.The default value is False.The default value is False.
strategy (DistributedStrategy): Extra properties for distributed training.
For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None.
log_level (Integer, String, optional): A ``Integer`` or ``String`` Variable determining how hight
the logging level is. Default is "INFO".
Returns:
......@@ -217,7 +224,18 @@ class Fleet(object):
strategy = fleet.DistributedStrategy()
fleet.init(strategy=strategy)
Examples5:
.. code-block:: python
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
fleet.init(log_level = "DEBUG")
"""
set_log_level(log_level)
if strategy is None:
strategy = DistributedStrategy()
self._user_defined_strategy = copy.deepcopy(strategy)
......@@ -261,12 +279,12 @@ class Fleet(object):
self._hcg = tp.HybridCommunicateGroup(self._topology)
return
if parallel_helper._is_parallel_ctx_initialized():
warnings.warn(
logger.warning(
"The dygraph parallel environment has been initialized.")
else:
# FLAGS_nccl_nrings is used for dynamic graph multi-stream communication
if "FLAGS_nccl_nrings" in os.environ:
warnings.warn(
logger.warning(
"You have set the environment variable FLAGS_nccl_nrings "
"outside the program, so the nccl_comm_num in "
"DistributedStrategy will not take effect here.")
......@@ -281,7 +299,7 @@ class Fleet(object):
if tp._HYBRID_PARALLEL_GROUP is None:
self._init_hybrid_parallel_env()
else:
warnings.warn(
logger.warning(
"The dygraph hybrid parallel environment has been initialized."
)
elif self._is_collective:
......@@ -850,9 +868,6 @@ class Fleet(object):
fleet.init_server()
"""
# warnings.warn(
# "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )
self._runtime_handle._save_inference_model(executor, dirname,
feeded_var_names,
......@@ -902,10 +917,6 @@ class Fleet(object):
fleet.save_persistables(exe, "dirname", paddle.static.default_main_program())
"""
# warnings.warn(
# "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )
self._runtime_handle._save_persistables(executor, dirname, main_program,
mode)
......@@ -1015,7 +1026,7 @@ class Fleet(object):
if strategy is not None:
if self._is_collective:
warnings.warn(
logger.warning(
"It is recommended to use DistributedStrategy "
"in fleet.init(). The strategy here is only for compatibility. "
"If the strategy in fleet.distributed_optimizer() is "
......@@ -1304,8 +1315,9 @@ class Fleet(object):
copy_user_defined_strategy, can_not_apply_optimizer_list)
context["valid_strategy"] = copy.deepcopy(valid_strategy)
# print("valid_strategy:", context["valid_strategy"])
# print("user_defined_strategy:", context["user_defined_strategy"])
logger.debug("valid_strategy: " + str(context["valid_strategy"]))
logger.debug("user_defined_strategy: " +
str(context["user_defined_strategy"]))
applied_meta_list = self.strategy_compiler._get_applied_meta_list()
applied_graph_list = self.strategy_compiler._get_applied_graph_list()
......@@ -1335,17 +1347,19 @@ class Fleet(object):
no_grad_set=no_grad_set)
if meta_optimizer:
# print("before minimize program id:", id(loss.block.program))
logger.debug("before minimize program id: " +
str(id(loss.block.program)))
optimize_ops, params_grads = meta_optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set=no_grad_set)
# print("after minimize program id:", id(loss.block.program))
logger.debug("after minimize program id: " +
str(id(loss.block.program)))
default_program = paddle.static.default_main_program()
# print("default program id:", id(default_program))
logger.debug("default program id: " + str(id(default_program)))
if id(default_program) != id(loss.block.program):
paddle.fluid.framework.switch_main_program(loss.block.program)
# print("default program id after switch:", id(default_program))
logger.debug("default program id after switch: " +
str(id(default_program)))
else:
optimize_ops, params_grads = self.user_defined_optimizer.minimize(
......@@ -1355,7 +1369,8 @@ class Fleet(object):
context["program_params_grads"] = params_grads
if graph_optimizer:
# print("before graph minimize program id:", id(loss.block.program))
logger.debug("before graph minimize program id: " +
str(id(loss.block.program)))
optimize_ops, params_grads = graph_optimizer.minimize(
loss, startup_program, parameter_list, no_grad_set=no_grad_set)
# since we do not encourage users to use graph operations
......@@ -1454,7 +1469,8 @@ class Fleet(object):
if v or k not in opt_info:
opt_info[k] = v
program._fleet_opt = opt_info
# print("fleet base opt info:", id(program), program._fleet_opt)
logger.debug("fleet base opt info: " + str(id(program)) +
str(program._fleet_opt))
if self._runtime_handle is None:
self._runtime_handle = RuntimeFactory()._create_runtime(context)
......
......@@ -30,15 +30,8 @@ from .sharding.prune import ProgramDeps
from .sharding import utils
# FIXME: import *
from .sharding.utils import *
import logging
logger = logging.getLogger(__name__)
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
from ..utils.log_util import logger
__all__ = []
......
......@@ -33,7 +33,7 @@ from types import MethodType
import paddle
from paddle import nn
from paddle.distributed import collective
from paddle.distributed.utils import get_logger
from paddle.distributed.utils.log_utils import get_logger
from .group_sharded_storage import GradStorage
from .group_sharded_optimizer_stage2 import GroupShardedOptimizerStage2
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import copy
import warnings
import paddle
import os
import numpy as np
......@@ -22,6 +21,7 @@ from .base.distributed_strategy import DistributedStrategy
from .meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer
from paddle.fluid import core
from paddle.distributed import fleet
from .utils.log_util import logger
def _dygraph_distributed_optimizer(optimizer, strategy=None):
......@@ -52,7 +52,7 @@ def _dygraph_distributed_optimizer(optimizer, strategy=None):
if strategy is not None:
if fleet_env._is_collective:
warnings.warn(
logger.warning(
"It is recommended to use DistributedStrategy "
"in fleet_env.init(). The strategy here is only for compatibility. "
"If the strategy in fleet_env.distributed_optimizer() is "
......
......@@ -22,13 +22,7 @@ import contextlib
from paddle.fluid.framework import in_dygraph_mode
import logging
logger = logging.getLogger(__name__)
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
from ..utils.log_util import logger
__all__ = []
......@@ -49,7 +43,7 @@ def detach_variable(inputs):
def check_recompute_necessary(inputs):
if not any(input_.stop_gradient == False for input_ in inputs
if isinstance(input_, (core.eager.Tensor, paddle.Tensor))):
logger.warn(
logger.warning(
"[Recompute]: None of the inputs to current recompute block need grad, "
"therefore there is NO need to recompute this block in backward !")
......
......@@ -14,7 +14,6 @@
import os
import six
import numpy as np
import warnings
from paddle import framework
import paddle
......
......@@ -15,30 +15,50 @@
import logging
import sys
__all__ = []
from paddle.distributed.utils.log_utils import get_logger
logger = get_logger("INFO", __name__)
class LoggerFactory:
@staticmethod
def build_logger(name=None, level=logging.INFO):
assert name is not None, "name for logger should not be None"
def set_log_level(level):
"""
Set log level
formatter = logging.Formatter(
"%(asctime)s-%(levelname)s: "
"[%(filename)s:%(lineno)d:%(funcName)s] %(message)s")
Args:
level (str|int): a specified level
_logger = logging.getLogger(name)
_logger.setLevel(level)
_logger.propagate = False
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(formatter)
handler.setLevel(level)
_logger.addHandler(handler)
return _logger
Example 1:
import paddle
import paddle.distributed.fleet as fleet
fleet.init()
fleet.setLogLevel("DEBUG")
Example 2:
import paddle
import paddle.distributed.fleet as fleet
fleet.init()
fleet.setLogLevel(1)
logger = LoggerFactory.build_logger(name="HybridParallel", level=logging.INFO)
"""
assert isinstance(level, (str, int)), "level's type must be str or int"
if isinstance(level, int):
logger.setLevel(level)
else:
logger.setLevel(level.upper())
def get_log_level_code():
"""
Return current log level code
"""
return logger.getEffectiveLevel()
def get_log_level_name():
"""
Return current log level name
"""
return logging.getLevelName(get_log_level_code())
def layer_to_str(base, *args, **kwargs):
......
......@@ -16,7 +16,7 @@ import sys
import yaml
import paddle.fluid as fluid
import logging
from paddle.distributed.utils import get_logger
from paddle.distributed.utils.log_utils import get_logger
__all__ = []
logger = get_logger(logging.INFO, name="metrics")
......
......@@ -19,7 +19,7 @@ from enum import Enum
import paddle
from paddle.optimizer import Optimizer
from paddle.distributed.utils import get_logger
from paddle.distributed.utils.log_utils import get_logger
from paddle.fluid.framework import in_dygraph_mode
# Old version
......
......@@ -21,9 +21,7 @@ import six
import sys
import warnings
from paddle.distributed.utils import _print_arguments
from paddle.distributed.utils import _prepare_trainer_env
from paddle.distributed.utils import get_host_name_ip
from paddle.distributed.utils.launch_utils import _print_arguments, _prepare_trainer_env, get_host_name_ip
from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num
from paddle.distributed.fleet.launch import get_cluster_from_args
from paddle.distributed.fleet.cloud_utils import use_paddlecloud
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = []
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -12,287 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import socket
import time
import os
import signal
import copy
import sys
import six
import subprocess
from contextlib import closing
import socket
from paddle.fluid import core
from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag
from distutils.util import strtobool
import six
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.framework import _non_static_mode
from paddle.fluid.data_feeder import check_variable_and_dtype
from paddle import _C_ops, _legacy_C_ops
__all__ = [ #noqa
'get_host_name_ip',
'Trainer',
'get_cluster',
'start_local_trainers',
'watch_local_trainers',
'find_free_ports',
'JobServer',
'Cluster',
'Pod',
'Hdfs',
'add_arguments',
'terminate_local_procs',
'TrainerProc',
'get_logger',
'pull_worker_log',
'global_scatter',
'global_gather',
]
def global_scatter(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count,
and then receives data according to global_count. The expert refers to a user-defined expert network,
n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the
figure respresent the rank of the current card in all cards.
The process of global_scatter sending data is as follows:
local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card;
local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card;
local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card;
local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card;
Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert;
the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. The tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. The tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_scatter(local_input_buf, \
local_count, global_count)
a.stop_gradient = False
print(a)
# out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]]
# out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]]
# backward test
c = a * a
c.backward()
print("local_input_buf.grad: ", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_scatter(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_scatter'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_scatter')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_scatter')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_scatter')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count],
},
outputs={'Out': [out]},
attrs={
'ring_id': ring_id,
'use_calc_stream': use_calc_stream
})
return out
def global_gather(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count.
The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card.
The rank in the figure respresent the rank of the current card in all cards.
The process of global_gather sending data is as follows:
The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card;
The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. Tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. Tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count)
print(a)
# out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]]
# out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]]
a.stop_gradient = False
c = a * a
c.backward()
print("local_input_buf.grad", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_gather(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_gather'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_gather')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_gather')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_gather')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count]
},
outputs={'Out': [out]},
attrs={
'ring_id': group,
'use_calc_stream': use_calc_stream,
})
return out
from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag
from ..utils.log_utils import get_logger
logger = logging.getLogger("root")
logger.propagate = False
logger = get_logger("INFO", "root")
def get_cluster_from_args(args, selected_gpus):
......@@ -354,13 +89,6 @@ def get_gpus(selected_gpus):
return gpus
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
class Hdfs(object):
def __init__(self):
......@@ -549,21 +277,6 @@ class Pod(object):
return r
def get_logger(log_level, name="root"):
logger = logging.getLogger(name)
# Avoid printing multiple logs
if not logger.handlers:
logger.setLevel(log_level)
log_handler = logging.StreamHandler()
log_format = logging.Formatter(
'%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s')
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
return logger
def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
cluster = Cluster(hdfs=None)
......@@ -826,3 +539,10 @@ def watch_local_trainers(procs, nranks):
raise
return alive
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
def get_logger(log_level, name="root"):
logger = logging.getLogger(name)
# Avoid printing multiple logs
if not logger.handlers:
log_handler = logging.StreamHandler()
logger.setLevel(log_level)
log_format = logging.Formatter(
'[%(asctime)-15s] [%(levelname)8s] %(filename)s:%(lineno)s - %(message)s'
)
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
else:
logger.setLevel(log_level)
return logger
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.framework import _non_static_mode
from paddle.fluid.data_feeder import check_variable_and_dtype
from paddle import _legacy_C_ops
def global_scatter(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_scatter operator distributes the data of x to n_expert * world_size experts according to local_count,
and then receives data according to global_count. The expert refers to a user-defined expert network,
n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_scatter operator, local_count[i] represents sending local_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
global_count[i] represents receiving global_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The rank in the
figure respresent the rank of the current card in all cards.
The process of global_scatter sending data is as follows:
local_count[0] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 0th card;
local_count[1] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 0th card;
local_count[2] represents taking out 2 batches from x and sending 2 batches to the 0th expert of the 1th card;
local_count[3] represents taking out 0 batches from x and sending 0 batches to the 1th expert of the 1th card;
Therefore, the global_count[0] of the 0th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 0th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert;
the global_count[0] of the 1th card is equal to 2, which means that 2 batches of data are received from the 0th card to the 0th expert;
the global_count[1] of the 1th card is equal to 0, which means that 0 batches of data are received from the 0th card to the 1th expert.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. The tensor data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. The tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. The tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]], \
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_scatter(local_input_buf, \
local_count, global_count)
a.stop_gradient = False
print(a)
# out for rank 0: [[1, 2], [3, 4], [1, 2], [5, 6], [3, 4]]
# out for rank 1: [[7, 8], [5, 6], [7, 8], [9, 10], [9, 10]]
# backward test
c = a * a
c.backward()
print("local_input_buf.grad: ", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_scatter(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_scatter'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_scatter')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_scatter')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_scatter')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count],
},
outputs={'Out': [out]},
attrs={
'ring_id': ring_id,
'use_calc_stream': use_calc_stream
})
return out
def global_gather(x,
local_count,
global_count,
group=None,
use_calc_stream=True):
"""
The global_gather operator gathers the data of x into n_expert * world_size experts according to global_count, and then receives data according to local_count.
The expert refers to a user-defined expert network, n_expert refers to the number of expert networks owned by each card, and world_size refers to the number of graphics cards running the network.
As shown below, the value of the world size is 2, n_expert 2, the batch size of the x 4 and local_count is [2, 0, 2, 0].
The global_count of the rank 0 is [2, 0, , ], rank 1 is [2, 0, ,](Due to the limited space, only the data calculated on rank 0 is shown here).
In the global_gather operator, the meaning of the global_count and local_count is opposed to global_scatter, global_count[i] represents sending global_count[i] data to the (i % n_expert)th expert of the (i // n_expert)th card,
local_count[i] represents receiving local_count[i] data from the (i // n_expert)th card to the (i % n_expert)th expert of this card. The data sent will be arranged according to the experts of each card.
The rank in the figure respresent the rank of the current card in all cards.
The process of global_gather sending data is as follows:
The global_count[0] of the 0th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 0th card represents sending 0 data to the 1th expert of the 0th card;
The global_count[0] of the 1th card represents sending 2 data to the 0th expert of the 0th card;
The global_count[1] of the 1th card represents sending 0 data to the 1th expert of the 0th card.
.. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/global_scatter_gather.png
:width: 800
:alt: global_scatter_gather
:align: center
Args:
x (Tensor): Tensor. Tensor whose data type should be float16, float32, float64, int32 or int64.
local_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be received. Tensor data type should be int64.
global_count (Tensor): Tensor which have n_expert * world_size elements that indicates
how many data needed to be sent. Tensor data type should be int64.
group (Group, optional): The group instance return by new_group or None for global default group. Default: None.
use_calc_stream (bool, optional): Wether to use calculation stream (True) or communication stream. Default: True.
Returns:
out (Tensor): The data received from all experts.
Examples:
.. code-block:: python
# required: distributed
import numpy as np
import paddle
from paddle.distributed import init_parallel_env
init_parallel_env()
n_expert = 2
world_size = 2
d_model = 2
in_feat = d_model
local_input_buf = np.array([[1, 2],[3, 4],[5, 6],[7, 8],[9, 10]],\
dtype=np.float32)
if paddle.distributed.ParallelEnv().local_rank == 0:
local_count = np.array([2, 1, 1, 1])
global_count = np.array([2, 1, 1, 1])
else:
local_count = np.array([1, 1, 2, 1])
global_count = np.array([1, 1, 2, 1])
local_input_buf = paddle.to_tensor(local_input_buf, dtype="float32", stop_gradient=False)
local_count = paddle.to_tensor(local_count, dtype="int64")
global_count = paddle.to_tensor(global_count, dtype="int64")
a = paddle.distributed.utils.global_gather(local_input_buf, local_count, global_count)
print(a)
# out for rank 0: [[1, 2], [3, 4], [7, 8], [1, 2], [7, 8]]
# out for rank 1: [[5, 6], [9, 10], [3, 4], [5, 6], [9, 10]]
a.stop_gradient = False
c = a * a
c.backward()
print("local_input_buf.grad", local_input_buf.grad)
# out for rank 0: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
# out for rank 1: [[2, 4], [6, 8], [10, 12], [14, 16], [18, 20]]
"""
if group is not None and not group.is_member():
return
ring_id = 0 if group is None else group.id
if _non_static_mode():
return _legacy_C_ops.global_gather(x, local_count, \
global_count, \
'use_calc_stream', use_calc_stream, \
'ring_id', ring_id)
else:
op_type = 'global_gather'
check_variable_and_dtype(
x, 'x', ['float16', 'float32', 'float64', 'int32', 'int64'],
'global_gather')
check_variable_and_dtype(local_count, 'local_count', ['int64'],
'global_gather')
check_variable_and_dtype(global_count, 'global_count', ['int64'],
'global_gather')
helper = LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=x.dtype)
helper.append_op(type=op_type,
inputs={
'X': [x],
'local_count': [local_count],
'global_count': [global_count]
},
outputs={'Out': [out]},
attrs={
'ring_id': group,
'use_calc_stream': use_calc_stream,
})
return out
......@@ -28,7 +28,7 @@ def start_local_trainers(cluster,
training_script_args,
eager_mode=True,
log_dir=None):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
current_env = copy.copy(os.environ.copy())
#paddle broadcast ncclUniqueId use socket, and
......@@ -84,7 +84,7 @@ def start_local_trainers(cluster,
def get_cluster_from_args(selected_gpus):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
cluster_node_ips = '127.0.0.1'
node_ip = '127.0.0.1'
......@@ -108,7 +108,7 @@ def get_cluster_from_args(selected_gpus):
class TestMultipleCustomCPU(unittest.TestCase):
def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
selected_devices = [0, 1]
cluster = None
......@@ -150,7 +150,7 @@ class TestProcessGroup(TestMultipleCustomCPU):
cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build')
def test_process_group_xccl(self):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
self.run_mnist_2custom_cpu('process_group_xccl.py')
......
......@@ -24,6 +24,7 @@ import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import pickle
from paddle.fluid.framework import _enable_legacy_dygraph
import paddle.distributed.utils.moe_utils as moe_utils
paddle.enable_static()
......@@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
shape=[tot_expert],
dtype="int64")
output = paddle.distributed.utils.global_gather(
local_input_buf, local_expert_count, global_expert_count)
output = moe_utils.global_gather(local_input_buf,
local_expert_count,
global_expert_count)
return [output]
......
......@@ -22,6 +22,7 @@ import paddle.fluid as fluid
import unittest
import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle.distributed.utils.moe_utils as moe_utils
class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
......@@ -51,8 +52,9 @@ class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
in_feat).astype("float32")
local_input_buf = paddle.to_tensor(local_input_buf)
local_input_buf.stop_gradient = False
output = paddle.distributed.utils.global_gather(
local_input_buf, local_expert_count, global_expert_count)
output = moe_utils.global_gather(local_input_buf,
local_expert_count,
global_expert_count)
output.stop_gradient = False
c = output * output
c.stop_gradient = False
......
......@@ -23,6 +23,7 @@ import unittest
import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import pickle
import paddle.distributed.utils.moe_utils as moe_utils
paddle.enable_static()
......@@ -51,8 +52,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
paddle.split(local_expert_count, 2, axis=0),
global_expert_count)
global_expert_count = paddle.concat(global_expert_count, axis=0)
output = paddle.distributed.utils.global_scatter(
local_input_buf, local_expert_count, global_expert_count)
output = moe_utils.global_scatter(local_input_buf,
local_expert_count,
global_expert_count)
return [output]
def run_trainer(self, args):
......
......@@ -22,6 +22,7 @@ import paddle.fluid as fluid
import unittest
import paddle.fluid.layers as layers
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle.distributed.utils.moe_utils as moe_utils
class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
......@@ -50,8 +51,9 @@ class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
global_expert_count)
global_expert_count = paddle.concat(global_expert_count, axis=0)
local_input_buf.stop_gradient = False
output = paddle.distributed.utils.global_scatter(
local_input_buf, local_expert_count, global_expert_count)
output = moe_utils.global_scatter(local_input_buf,
local_expert_count,
global_expert_count)
output.stop_gradient = False
c = output * output
c.backward()
......
......@@ -938,3 +938,8 @@ if((WITH_GPU OR WITH_ROCM) AND (LINUX))
test_fleet_checkpoint PROPERTIES TIMEOUT "200" LABELS
"RUN_TYPE=EXCLUSIVE:NIGHTLY")
endif()
if(LOCAL_ALL_ARCH AND LOCAL_ALL_PLAT)
py_test_modules(
test_fleet_log MODULES test_fleet_log ENVS
"http_proxy=;https_proxy=;PYTHONPATH=../..:${PADDLE_BINARY_DIR}/python")
endif()
......@@ -26,7 +26,7 @@ from paddle.io import DataLoader, Dataset
import unittest
import paddle.nn as nn
from paddle.fluid.contrib.slim.quantization import ImperativeQuantAware
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def set_random_seed(seed, dp_id, rank_id):
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
from paddle.distributed import fleet
from paddle.distributed.fleet.utils.log_util import logger
import logging
import unittest
class TestFleetLog(unittest.TestCase):
def setUp(self):
fleet.init(log_level="DEBUG")
def test_log_level(self):
# check correctly initialized
assert fleet.get_log_level_code() == logging._nameToLevel["DEBUG"]
assert logger.getEffectiveLevel() == logging._nameToLevel["DEBUG"]
# test set name
fleet.set_log_level("WARNING")
debug1 = fleet.get_log_level_code()
debug2 = logging._nameToLevel["WARNING"]
assert debug1 == debug2
# test set int
fleet.set_log_level(debug2)
# check the logger is changed
assert logger.getEffectiveLevel() == logging._nameToLevel["WARNING"]
......@@ -22,7 +22,7 @@ import copy
import os
import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus):
......
......@@ -82,3 +82,4 @@ test_hdfs1,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_
test_hdfs2,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_hdfs3,LINUX,,200,EXCLUSIVE:NIGHTLY,../../dist_test.sh,2,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_fleet_checkpoint,LINUX,GPU;ROCM,200,EXCLUSIVE:NIGHTLY,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../..,
test_fleet_log,,,,DIST,test_runner.py,,,http_proxy=;https_proxy=;PYTHONPATH=../..,
......@@ -28,7 +28,7 @@ import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
from paddle.distributed.utils import find_free_ports
from paddle.distributed.utils.launch_utils import find_free_ports
paddle.enable_static()
......
......@@ -23,7 +23,7 @@ import unittest
import paddle.fluid as fluid
from argparse import ArgumentParser, REMAINDER
from paddle.distributed.utils import _print_arguments, get_gpus, get_cluster_from_args
from paddle.distributed.utils.launch_utils import _print_arguments, get_gpus, get_cluster_from_args
from paddle.distributed.fleet.launch_utils import find_free_ports
......
......@@ -22,7 +22,7 @@ import copy
import os
import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.fluid.framework import _test_eager_guard
......
......@@ -22,7 +22,7 @@ import copy
import os
import subprocess
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus):
......
......@@ -26,7 +26,7 @@ import numpy as np
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.distributed.utils import global_scatter, global_gather
from paddle.distributed.utils.moe_utils import global_scatter, global_gather
from paddle.distributed import alltoall, all_gather
from paddle.distributed.fleet.meta_parallel import get_rng_state_tracker
......
......@@ -21,7 +21,7 @@ import copy
import subprocess
import paddle.fluid as fluid
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
def get_cluster_from_args(selected_gpus):
......
......@@ -287,6 +287,7 @@ packages=['paddle',
'paddle.incubate.asp',
'paddle.incubate.passes',
'paddle.distribution',
'paddle.distributed.utils',
'paddle.distributed.sharding',
'paddle.distributed.fleet',
'paddle.distributed.launch',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册