From b6e7f8e9365b0c092f9790722d3896979c82b14a Mon Sep 17 00:00:00 2001 From: xiongkun Date: Thu, 21 Oct 2021 14:07:13 +0800 Subject: [PATCH] User specified backend (#35745) --- paddle/fluid/framework/fleet/gloo_wrapper.h | 18 ++ paddle/fluid/imperative/gloo_context.cc | 115 ++++++++++- paddle/fluid/imperative/gloo_context.h | 8 + python/paddle/distributed/fleet/launch.py | 51 ++++- .../paddle/distributed/fleet/launch_utils.py | 63 +++++- python/paddle/distributed/parallel.py | 27 +-- python/paddle/distributed/spawn.py | 88 +++++++-- python/paddle/distributed/utils.py | 22 ++- .../fluid/tests/unittests/CMakeLists.txt | 18 ++ .../parallel_dygraph_gradient_check.py | 3 +- .../unittests/parallel_dygraph_se_resnext.py | 1 + .../tests/unittests/test_cpuonly_launch.sh | 42 ++++ .../tests/unittests/test_cpuonly_spawn.py | 72 +++++++ .../fluid/tests/unittests/test_dist_base.py | 179 +++++++++++++++++- .../test_parallel_dygraph_dataparallel.py | 65 +++++++ ..._parallel_dygraph_sparse_embedding_gloo.py | 59 ++++++ ...graph_sparse_embedding_over_height_gloo.py | 44 +++++ .../test_parallel_dygraph_transformer_gloo.py | 61 ++++++ ..._parallel_dygraph_unused_variables_gloo.py | 72 +++++++ .../test_spawn_and_init_parallel_env.py | 5 +- 20 files changed, 948 insertions(+), 65 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_cpuonly_launch.sh create mode 100644 python/paddle/fluid/tests/unittests/test_cpuonly_spawn.py create mode 100644 python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py create mode 100644 python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py create mode 100644 python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py create mode 100644 python/paddle/fluid/tests/unittests/test_parallel_dygraph_unused_variables_gloo.py diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.h b/paddle/fluid/framework/fleet/gloo_wrapper.h index eafc991fbca..f1ec042dbd7 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.h +++ b/paddle/fluid/framework/fleet/gloo_wrapper.h @@ -238,6 +238,24 @@ class GlooWrapper { return ret; } + // TODO(xiongkun03): support all gather array of + // numbers with different length + // can use AllgathervOptions, may be work in different + // occasion. Need some survey. + template + void AllGatherVector(T* input_ptr, T* output_ptr, + size_t element_num) { // NOLINT + CHECK_EQ(is_initialized_, true); +#ifdef PADDLE_WITH_GLOO + gloo::AllgatherOptions opts(context_); + opts.setInput(input_ptr, element_num); + opts.setOutput(output_ptr, element_num * size_); + gloo::allgather(opts); +#else + LOG(WARNING) << "AllGather does nothing when WITH_GLOO=OFF"; +#endif + } + protected: bool is_initialized_ = false; #ifdef PADDLE_WITH_GLOO diff --git a/paddle/fluid/imperative/gloo_context.cc b/paddle/fluid/imperative/gloo_context.cc index d7df6ec3c11..0d93cdf5793 100644 --- a/paddle/fluid/imperative/gloo_context.cc +++ b/paddle/fluid/imperative/gloo_context.cc @@ -18,6 +18,7 @@ #include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/string/split.h" +#include "paddle/fluid/string/string_helper.h" namespace paddle { namespace framework { @@ -67,8 +68,36 @@ void GLOOParallelContext::AllReduceByStream(const framework::Variable &src, framework::Variable *dst, int ring_id, bool use_calc_stream) { // AllReduce(src, dst, strategy_, ring_id, use_calc_stream); - auto src_tensor = src.Get(); - auto *dst_tensor = dst->GetMutable(); + if (src.IsType()) { + if (!dst->IsType()) { + dst->Clear(); + } + AllReduce(src.Get(), + dst->GetMutable()); + } else if (src.IsType()) { + if (&src != dst) { + if (!dst->IsType()) { + dst->Clear(); + } + AllReduce(src.Get(), + dst->GetMutable()); + } else { + // SelectedRows cannot be allreduce in-place + framework::Variable tmp_dst; + AllReduce(src.Get(), + tmp_dst.GetMutable()); + *dst = std::move(tmp_dst); + } + } else { + PADDLE_THROW(platform::errors::InvalidArgument( + "Unsupported variable type %s for imperative allreduce, only " + "LoDTensor and SelectedRows are supported.", + platform::demangle(framework::ToTypeName(src.Type())))); + } +} + +void GLOOParallelContext::AllReduce(const framework::Tensor &src_tensor, + framework::Tensor *dst_tensor) { auto gloo_wrapper = framework::GlooWrapper::GetInstance(); dst_tensor->Resize(src_tensor.dims()); switch (src_tensor.type()) { @@ -84,6 +113,88 @@ void GLOOParallelContext::AllReduceByStream(const framework::Variable &src, gloo_wrapper->Barrier(); } +#define GLOO_ALL_GATHER_CASE(type, T, gw) \ + case type: { \ + const auto *src_tensor_ptr = src_tensor.data(); \ + gw->AllGatherVector(const_cast(src_tensor_ptr), \ + reinterpret_cast(dst_tensor_ptr), \ + value_sendcount); \ + break; \ + } + +void GLOOParallelContext::AllReduce(const framework::SelectedRows &src, + framework::SelectedRows *dst) { + // auto ; + // int local_rank = strategy_.local_rank_; + int nranks = strategy_.nranks_; + VLOG(3) << "SelectedRows AllReduce start"; + const auto &src_tensor = src.value(); + const auto &place = src_tensor.place(); + auto dtype = src_tensor.type(); + // 1. Gather rows number from all workers. Here use ncclAllGather to do this, + // but we can use other ways to implement is in the future + const auto &src_rows = src.rows(); + auto gloo_wrapper = framework::GlooWrapper::GetInstance(); + size_t local_row_num = src_rows.size(); + std::vector rows_num_vector = + gloo_wrapper->AllGather(local_row_num); + const auto *cpu_rows_num_ptr = rows_num_vector.data(); + auto rows_num = std::accumulate(cpu_rows_num_ptr, cpu_rows_num_ptr + nranks, + static_cast(0)); + dst->set_height(src.height()); + VLOG(3) << "Gather rows: " << string::join_strings(rows_num_vector, ',') + << ", total rows number: " << rows_num + << ", height: " << src.height(); + auto *dst_rows = dst->mutable_rows(); + dst_rows->resize(rows_num); + auto *dst_rows_ptr = dst_rows->MutableData(place); + const int64_t *src_rows_ptr = src_rows.Data(place); + + // VLOG(3) << "Selected Rows of src:" << string::join_strings(dst_rows, ',') + + auto *dst_tensor = dst->mutable_value(); + auto dims = src_tensor.dims(); + dims[0] = rows_num; + auto feature_size = framework::product(dims) / dims[0]; + dst_tensor->Resize(dims); + if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + nranks, + [&](size_t row) { return row == cpu_rows_num_ptr[0]; })) { + // During sparse communication, the number of each card is same. + // Because gloo wrapper utility class currently don't support + // broadcast, so we only deal the-same case. + VLOG(3) << "Use the gloo all reduce to sync. SRC:" << src_tensor; + // framework::SerializeToStream(VLOG(4), src); + VLOG(3) << "allgather replaces broadcast to speed up in sparse allreduce"; + auto value_sendcount = cpu_rows_num_ptr[0] * feature_size; + auto *dst_tensor_ptr = dst_tensor->mutable_data(place, dtype); + + gloo_wrapper->AllGatherVector(const_cast(src_rows_ptr), + static_cast(dst_rows_ptr), + rows_num_vector[0]); + + switch (dtype) { + GLOO_ALL_GATHER_CASE(framework::proto::VarType::FP32, float, + gloo_wrapper); + GLOO_ALL_GATHER_CASE(framework::proto::VarType::FP64, double, + gloo_wrapper); + GLOO_ALL_GATHER_CASE(framework::proto::VarType::INT32, int, gloo_wrapper); + GLOO_ALL_GATHER_CASE(framework::proto::VarType::INT64, int64_t, + gloo_wrapper); + default: { + PADDLE_THROW(platform::errors::InvalidArgument( + "Invalid datatype for allreduce")); + } + } + VLOG(3) << "Selected Row DST:" << *dst_tensor; + VLOG(3) << "Selected Rows of DST:" + << string::join_strings(std::vector(*dst_rows), ','); + } else { + PADDLE_THROW(platform::errors::InvalidArgument( + "The number of each card is not the same, gloo only support the-same" + "batch division")); + } +} + paddle::platform::DeviceContext *GLOOParallelContext::GetDeviceContext( int ring_id) { // return the CPUDeviceContext diff --git a/paddle/fluid/imperative/gloo_context.h b/paddle/fluid/imperative/gloo_context.h index f54dc1a406a..305a75a8811 100644 --- a/paddle/fluid/imperative/gloo_context.h +++ b/paddle/fluid/imperative/gloo_context.h @@ -16,6 +16,9 @@ #include #include #include +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/imperative/parallel_context.h" #include "paddle/fluid/platform/device_context.h" @@ -52,6 +55,11 @@ class GLOOParallelContext : public ParallelContext { void SynchronizeCompute() override; + private: + void AllReduce(const framework::Tensor& src, framework::Tensor* dst); + void AllReduce(const framework::SelectedRows& src, + framework::SelectedRows* dst); + private: std::unique_ptr device_; }; diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index c0a1c359d17..16b39e0fc8e 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -103,7 +103,12 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra type=str, default="log", help="The path for each process's log. Default --log_dir=log/") - + base_group.add_argument( + "--backend", + type=str, + default="auto", + help="Specifize the backend, can be gloo|nccl|bkcl|auto. Default value is auto which perfers nccl or bkcl." + ) base_group.add_argument( "--nproc_per_node", type=int, @@ -230,8 +235,21 @@ def get_cluster_from_args(args, device_mode, devices_per_proc): devices_per_proc) +def cpuonly_check(args): + if args.ips and len(args.ips.split(',')) > 1: + raise RuntimeError( + "CPUONLY launch only support single trainer, that is len(ips)=1, but got %s." + % args.ips) + if args.run_mode: + assert args.run_mode == 'cpuonly', "CPUONLY launch only support run mode is CPUONLY" + if args.servers: + raise RuntimeError("CPUONLY launch can't have --servers as arguments.") + return True + + def launch_collective(args): # parse arguments, used for cloud-single-machine and local + if args.backend == 'gloo': cpuonly_check(args) (device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args) trainers_num = cloud_utils.get_trainers_num() logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format( @@ -265,6 +283,7 @@ def launch_collective(args): global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + global_envs["PADDLE_DISTRI_BACKEND"] = args.backend procs = start_local_trainers( cluster, @@ -349,9 +368,12 @@ def which_distributed_mode(args): if fluid.core.is_compiled_with_cuda(): accelerators = fluid.core.get_cuda_device_count() + args.backend = 'nccl' elif fluid.core.is_compiled_with_npu(): + args.backend = 'unknown' accelerators = fluid.core.get_npu_device_count() elif fluid.core.is_compiled_with_xpu(): + args.backend = 'bkcl' accelerators = fluid.core.get_xpu_device_count() else: accelerators = 0 @@ -372,10 +394,14 @@ def which_distributed_mode(args): else: if not fluid.core.is_compiled_with_cuda( ) and not fluid.core.is_compiled_with_xpu(): - logger.warning( - "Not found distinct arguments and not compiled with cuda or xpu. Default use ps mode" - ) - return DistributeMode.PS + if args.servers: + logger.warning( + "Not found distinct arguments and not compiled with cuda or xpu. \ +But found args.servers not empty, default use ps mode") + return DistributeMode.PS + else: + args.backend = "gloo" + return DistributeMode.COLLECTIVE else: logger.warning( "Not found distinct arguments and compiled with cuda or xpu. Default use collective mode" @@ -556,7 +582,20 @@ def launch(): logger = get_logger() _print_arguments(args) - distribute_mode = which_distributed_mode(args) + if args.backend == 'auto': + distribute_mode = which_distributed_mode(args) + assert args.backend in [ + 'gloo', 'nccl', 'bkcl', 'unknown' + ] # which_distributed_mode must modify args.backend + else: + assert args.run_mode == 'collective' or args.run_mode == None, "When backend is not 'auto', run mode must be collective" + check_backend(args.backend) + distribute_mode = DistributeMode.COLLECTIVE + + block_windows_and_macos( + args.backend) # raise error when using gloo on windows or macos + if args.backend == 'gloo': + logger.warning("launch start with CPUONLY mode") if enable_elastic(args, distribute_mode): launch_elastic(args, distribute_mode) diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index e114670440c..3aced0ab996 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -22,6 +22,7 @@ import subprocess import tempfile import shutil from contextlib import closing +import multiprocessing import socket import warnings import six @@ -30,6 +31,7 @@ import struct import paddle import paddle.fluid as fluid from distutils.util import strtobool +import paddle.utils.cpp_extension.extension_utils as utils logger = logging.getLogger("root") logger.propagate = False @@ -669,29 +671,31 @@ def get_xpus(xpus): return res_xpus -def get_device_mode(): +def get_device_mode(backend): if fluid.core.is_compiled_with_npu() and \ fluid.core.get_npu_device_count() > 0: print("launch train in ascend npu mode!") return DeviceMode.ASCEND_NPU - if fluid.core.is_compiled_with_cuda() and \ + if backend == 'nccl' and \ fluid.core.get_cuda_device_count() > 0: print("launch train in GPU mode!") return DeviceMode.GPU - if fluid.core.is_compiled_with_xpu() and fluid.core.get_xpu_device_count( - ) > 0: + if backend == 'bkcl' and fluid.core.get_xpu_device_count() > 0: print("launch train in XPU mode") return DeviceMode.XPU - print("launch train in CPU mode") - return DeviceMode.CPU + if backend == 'gloo': + print("launch train in CPU mode") + return DeviceMode.CPU + + raise RuntimeError("Don't supported devices") def get_device_proc_info(args): # device_mode - device_mode = get_device_mode() + device_mode = get_device_mode(args.backend) # devices devices_per_proc = [] @@ -722,6 +726,9 @@ def get_device_proc_info(args): else: devices_per_proc = xpus elif device_mode == DeviceMode.CPU: + if hasattr(args, "paddle_cpuonly") and args.nproc_per_node is None: + #NOTE (xiongkun03) set it to cpu core number + args.nproc_per_node = multiprocessing.cpu_count() if args.nproc_per_node is None: devices_per_proc = [0] else: @@ -1237,3 +1244,45 @@ class ParameterServerLauncher(object): tp.cmd = cmd self.procs["heter_worker"].append(tp) + + +def check_backend(backend): + if backend not in ['nccl', 'gloo', 'bkcl', 'auto']: + raise ValueError( + "paddle.distributed initialize error, " + "backend argument can only be one of 'nccl', 'gloo', 'bkcl', 'auto', but got %s" + % backend) + + if backend == 'nccl' and not fluid.core.is_compiled_with_cuda(): + raise ValueError( + "paddle.distributed initialize error, " + "your paddle is not compiled with cuda but you assign 'nccl' as backend." + ) + + if backend == 'bkcl' and not fluid.core.is_compiled_with_xpu(): + raise ValueError( + "paddle.distributed initialize error, " + "your paddle is not compiled with xpu but you assign 'bkcl' as backend." + ) + + +def block_windows_and_macos(backend): + if backend != 'gloo': return + if utils.OS_NAME.startswith('darwin'): # MACOS , block + raise ValueError( + "You are going to using gloo on macos, but currently is not supported" + ) + if utils.IS_WINDOWS: # MACOS , block + raise ValueError( + "You are going to using gloo on windows, but currently is not supported" + ) + + +def get_backend_by_compile_flag(): + if fluid.core.is_compiled_with_cuda(): + return 'nccl' + + if fluid.core.is_compiled_with_xpu(): + return 'bkcl' + + return 'gloo' diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 7789b17429c..34c74ad3067 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -26,6 +26,7 @@ from paddle import compat as cpt from paddle.fluid import core from paddle.fluid.framework import _set_expected_place from paddle.fluid.dygraph import parallel_helper +from paddle.distributed.fleet.launch_utils import check_backend from paddle.fluid.dygraph.parallel import ParallelEnv from paddle.distributed.fleet.base.private_helper_function import wait_server_ready # noqa: F401 @@ -55,25 +56,8 @@ def _start_kv_server(port, http_server_d, size): http_server.stop() -def _check_backend(backend): - if backend not in ['nccl', 'gloo', 'bkcl', 'auto']: - raise ValueError( - "paddle.distributed initialize error, " - "backend argument can only be one of 'nccl', 'gloo', 'bkcl', 'auto', but got %s" - % backend) - - if backend == 'nccl' and not core.is_compiled_with_cuda(): - raise ValueError( - "paddle.distributed initialize error, " - "your paddle is not compiled with cuda but you assign 'nccl' as backend." - ) - - if backend == 'bkcl' and not core.is_compiled_with_xpu(): - raise ValueError( - "paddle.distributed initialize error, " - "your paddle is not compiled with xpu but you assign 'bkcl' as backend." - ) - +def _is_cpuonly(backend): + check_backend(backend) if backend in ['auto', 'nccl', 'bkcl'] and (core.is_compiled_with_cuda() or core.is_compiled_with_xpu()): # passes 'auto' and can use cuda or xpu, use the default logics. so return False @@ -82,7 +66,7 @@ def _check_backend(backend): return True -def init_parallel_env(backend='auto'): +def init_parallel_env(): """ Initialize parallel training environment in dynamic graph mode. @@ -154,7 +138,8 @@ def init_parallel_env(backend='auto'): return # NOTE(xiongkun): support cpu gloo only, add this environment variable to # enable cpu only gloo prarllel training) - is_cpu_only = _check_backend(backend) + backend = os.environ.get('PADDLE_DISTRI_BACKEND', 'auto') + is_cpu_only = _is_cpuonly(backend) # 1. gpu xpu check, must be gpu or xpu, if not (is_cpu_only or core.is_compiled_with_cuda() or core.is_compiled_with_xpu()): diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index a60e4642e49..cea831d9d90 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -24,8 +24,10 @@ import warnings from paddle.distributed.utils import _print_arguments from paddle.distributed.utils import _prepare_trainer_env from paddle.distributed.utils import get_host_name_ip -from paddle.distributed.cloud_utils import get_cluster_and_pod +from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num +from paddle.distributed.fleet.launch import get_cluster_from_args from paddle.distributed.fleet.cloud_utils import use_paddlecloud +from paddle.distributed.fleet.launch_utils import DeviceMode, check_backend, block_windows_and_macos from paddle.device import get_device # deprecated module import @@ -71,7 +73,9 @@ def _py_supported_check(): def _options_valid_check(options): # `print_config` keeped as a debug options, not show to users - supported_options = ['start_method', 'ips', 'gpus', 'xpus', 'print_config'] + supported_options = [ + 'start_method', 'ips', 'gpus', 'xpus', 'print_config', 'backend' + ] deprecated_options = [ 'selected_devices', 'started_port', 'cluster_node_ips', 'node_ip', 'use_paddlecloud' @@ -95,6 +99,22 @@ def _get_default_nprocs(): return core.get_cuda_device_count() elif 'xpu' in device: return core.get_xpu_device_count() + elif 'cpu' in device: + return multiprocessing.cpu_count() + else: + raise RuntimeError( + "`paddle.distributed.spawn` does not support parallel training on device `{}` now.". + format(device)) + + +def _get_default_backend(): + device = get_device() + if 'gpu' in device: + return 'nccl' + elif 'xpu' in device: + return 'bkcl' + elif 'cpu' in device: + return 'gloo' else: raise RuntimeError( "`paddle.distributed.spawn` does not support parallel training on device `{}` now.". @@ -112,6 +132,16 @@ def _get_node_ip(ips): def _get_subprocess_env_list(nprocs, options): + # NOTE (xiongkun03) Why put backend deduction here ? + # Becase _get_subprocess_env_list is used by many testcases. + # So for campability, we put backend deduction here + + # logic for handle backend option + if 'backend' not in options or options['backend'] == 'auto': + options['backend'] = _get_default_backend() + check_backend(options['backend']) + block_windows_and_macos(options['backend']) + # contruct processes env list processes_env_list = [] @@ -133,7 +163,7 @@ def _get_subprocess_env_list(nprocs, options): # if we set FLAGS_selected_gpus or FLAGS_selected_xpus to be `0,1,2,3`, it may cause error # when using `ParallelEnv` # NOTE(chenweihang): use absolute gpu or xpu card id - if core.is_compiled_with_cuda(): + if options['backend'] == 'nccl': args.selected_devices = options.get('gpus', None) if args.selected_devices is None: args.selected_devices = options.get('selected_devices', None) @@ -168,7 +198,7 @@ def _get_subprocess_env_list(nprocs, options): "CUDA_VISIBLE_DEVICES (%s)." % (card_id, ",".join(env_devices_list))) - elif core.is_compiled_with_xpu(): + elif options['backend'] == 'bkcl': args.selected_devices = options.get('xpus', None) if args.selected_devices is None: args.selected_devices = options.get('selected_devices', None) @@ -202,6 +232,23 @@ def _get_subprocess_env_list(nprocs, options): raise ValueError("The selected xpu card %s cannot found in " "XPU_VISIBLE_DEVICES (%s)." % (card_id, ",".join(env_devices_list))) + elif options['backend'] == 'gloo': + # TODO check gpu / xpu flag must not exist + warnings.warn( + "Your model will be trained under CPUONLY mode by using GLOO," + "because CPUPlace is specified manually or your installed PaddlePaddle only support CPU Device." + ) + args.paddle_cpuonly = True + args.selected_devices = None + args.ips = args.cluster_node_ips + assert options.get( + 'use_paddlecloud', + None) is None, "CPUONLY spawn doesn't support use paddle cloud" + assert len( + args.cluster_node_ips.split(',') + ) <= 1, "CPUONLY spawn only support single trainer, that is len(ips)=1, but got %s." + assert _get_trainers_num( + ) == 1, "CPUONLY spawn doesn't support multi-trainer" # set other inner args args.node_ip = options.get('node_ip', None) @@ -215,11 +262,17 @@ def _get_subprocess_env_list(nprocs, options): args.use_paddlecloud = use_paddlecloud() # get cluster and pod config - cluster, pod = get_cluster_and_pod(args) + if options['backend'] == 'gloo': + devices_per_proc = [x for x in range(0, nprocs)] + cluster, pod = get_cluster_from_args(args, DeviceMode.CPU, + devices_per_proc) + else: + cluster, pod = get_cluster_and_pod(args) # prepare subprocess env list for trainer in pod.trainers: - processes_env_list.append(_prepare_trainer_env(cluster, trainer)) + processes_env_list.append( + _prepare_trainer_env(cluster, trainer, options['backend'])) # [Debug] print config args.print_config = options.get('print_config', False) @@ -236,27 +289,35 @@ def _remove_risky_env(): os.environ.pop("https_proxy", None) -def _set_trainer_env(env_dict): +def _set_trainer_env(env_dict, backend): # NOTE(chenweihang): [ Why need set FLAGS_selected_gpus or FLAGS_selected_xpus here? ] # When the child process starts, it will inherit the configuration of the # main process and set the FLAGS once, but the environment variable has # not been set at this time, which leads to the FLAGS_selected_gpus or FLAGS_selected_xpus # is keep same with mainprocess(usually empty), so manually update the flags here - if core.is_compiled_with_cuda(): + + # NOTE(xiongkun): why put backend here? because if gloo, we shouldn't set FLAGS_selectedXXX + # + + if backend == 'nccl': set_flags({'FLAGS_selected_gpus': env_dict['FLAGS_selected_gpus']}) - elif core.is_compiled_with_xpu(): + elif backend == 'bkcl': set_flags({'FLAGS_selected_xpus': env_dict['FLAGS_selected_xpus']}) else: - raise ValueError("PaddlePaddle should be compiled with XPU or CUDA.") + #NOTE(xiongkun) why not raise Error ? + # So far, we added support for CPU parallel, and will be applied when paddle is not + # compiled with cuda or xp. just do nothing. + pass + for var_name in env_dict: os.environ[var_name] = env_dict[var_name] -def _func_wrapper(func, args, error_queue, return_queue, env_dict): +def _func_wrapper(func, args, error_queue, return_queue, env_dict, backend): try: # config subprocess environment variables _remove_risky_env() - _set_trainer_env(env_dict) + _set_trainer_env(env_dict, backend) # execute function result = func(*args) # record function return value @@ -487,7 +548,8 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): return_queue = mp.SimpleQueue() process = mp.Process( target=_func_wrapper, - args=(func, args, error_queue, return_queue, procs_env_list[i])) + args=(func, args, error_queue, return_queue, procs_env_list[i], + options['backend'])) process.daemon = daemon process.start() error_queues.append(error_queue) diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index 31d5748ce39..1c27a0018fc 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -25,6 +25,7 @@ import subprocess from contextlib import closing import socket from paddle.fluid import core +from paddle.distributed.fleet.launch_utils import get_backend_by_compile_flag from distutils.util import strtobool from paddle.fluid.layer_helper import LayerHelper @@ -613,8 +614,10 @@ def find_free_ports(num): return None -def _prepare_trainer_env(cluster, trainer): - if core.is_compiled_with_xpu(): +def _prepare_trainer_env(cluster, trainer, backend=None): + if backend is None: + backend = get_backend_by_compile_flag() # for compatibility + if backend == 'bkcl': proc_env = { "FLAGS_selected_xpus": "%s" % ",".join([str(g) for g in trainer.gpus]), @@ -623,7 +626,7 @@ def _prepare_trainer_env(cluster, trainer): "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) } - elif core.is_compiled_with_cuda(): + elif backend == 'nccl': proc_env = { "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in trainer.gpus]), @@ -632,6 +635,19 @@ def _prepare_trainer_env(cluster, trainer): "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) } + elif backend == 'gloo': + # NOTE (xiongkun) default fall back into cpu only + proc_env = { + "PADDLE_TRAINER_ID": "%d" % trainer.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()), + "PADDLE_DISTRI_BACKEND": + backend, # only add here, other will be auto + } + else: + raise ValueError("backend must be one of 'gloo, nccl, bkcl'") + return proc_env diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index ac7471f8edf..1c9ce2bef5e 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -200,8 +200,14 @@ endif() list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_hybrid_parallel) +LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer_gloo) # NOTE: @xiongkun03, cpu is too slow, fix it in next PR + if (NOT WITH_GLOO) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel_cpuonly) + + LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_unused_variables_gloo) + LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sparse_embedding_over_height_gloo) + LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sparse_embedding_gloo) endif() if ((NOT WITH_GPU) AND (NOT WITH_ROCM)) @@ -491,6 +497,10 @@ if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dataset) endif() +if (NOT WITH_GLOO) + LIST(REMOVE_ITEM TEST_OPS test_cpuonly_spawn) +endif() + if(NOT WITH_GPU OR WIN32 OR APPLE) list(REMOVE_ITEM TEST_OPS test_build_strategy_fusion_group_pass) endif() @@ -654,6 +664,9 @@ if(WITH_DISTRIBUTE) endforeach(TEST_OP) # solve it later. bash_test_modules(test_fleet_launch_ps START_BASH test_fleet_launch_ps.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}" PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} ) + if (WITH_GLOO) + bash_test_modules(test_cpuonly_launch START_BASH test_cpuonly_launch.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}" PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} ) + endif() bash_test_modules(test_new_group START_BASH test_new_group.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}+20" PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} ) endif(NOT APPLE) endif() @@ -1070,3 +1083,8 @@ set_tests_properties(test_inplace_addto_strategy PROPERTIES TIMEOUT 120) set_tests_properties(test_eigvals_op PROPERTIES TIMEOUT 400) set_tests_properties(test_tensordot PROPERTIES TIMEOUT 1000) set_tests_properties(test_tensordot PROPERTIES LABELS "RUN_TYPE=NIGHTLY") +if (WITH_GLOO) + set_tests_properties(test_parallel_dygraph_unused_variables_gloo PROPERTIES TIMEOUT 120) + set_tests_properties(test_parallel_dygraph_sparse_embedding_gloo PROPERTIES TIMEOUT 120) + set_tests_properties(test_parallel_dygraph_sparse_embedding_over_height_gloo PROPERTIES TIMEOUT 120) +endif() diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py index 048c9b399d8..781d606f33b 100644 --- a/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_gradient_check.py @@ -66,8 +66,7 @@ class SimpleNet(fluid.Layer): class TestDistTraning(unittest.TestCase): def test_multiple_gpus(self): - backend = os.environ.get('PADDLE_DISTRI_BACKEND', 'auto') - dist.init_parallel_env(backend) + dist.init_parallel_env() self.trainer_id = dist.get_rank() model_a = SimpleNet(self.trainer_id) diff --git a/python/paddle/fluid/tests/unittests/parallel_dygraph_se_resnext.py b/python/paddle/fluid/tests/unittests/parallel_dygraph_se_resnext.py index 4ce67676c3e..0387de32c91 100644 --- a/python/paddle/fluid/tests/unittests/parallel_dygraph_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/parallel_dygraph_se_resnext.py @@ -324,6 +324,7 @@ class TestSeResNeXt(TestParallelDyGraphRunnerBase): bs = len(data) dy_x_data = np.array([x[0].reshape(3, 224, 224) for x in data]).astype('float32') + dy_x_data = dy_x_data / 255.0 y_data = np.array([x[1] for x in data]).astype('int64').reshape(bs, 1) img = to_variable(dy_x_data) label = to_variable(y_data) diff --git a/python/paddle/fluid/tests/unittests/test_cpuonly_launch.sh b/python/paddle/fluid/tests/unittests/test_cpuonly_launch.sh new file mode 100644 index 00000000000..1c35166cf44 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_cpuonly_launch.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +function test_launch_cpuonly(){ + python -m paddle.distributed.launch --nproc_per_node=4 --backend=gloo \ + parallel_dygraph_gradient_check.py 2>ut.elog + if grep -q "ABORT" ut.elog; then + echo "test cpu only failed" + exit -1 + else + if grep -q "CPUONLY" ut.elog; then + echo "test_launch_cpuonly successfully" + else + echo "test_launch_cpuonly failed" + exit -1 + fi + fi +} +function test_launch_error_case1(){ + python -m paddle.distributed.launch --nproc_per_node=4 --backend=random_str \ + parallel_dygraph_gradient_check.py 2>ut.elog + if grep -q "ValueError" ut.elog; then + echo "test_launch_error_case1 successfully" + else + exit -1 + fi +} + +test_launch_cpuonly +test_launch_error_case1 diff --git a/python/paddle/fluid/tests/unittests/test_cpuonly_spawn.py b/python/paddle/fluid/tests/unittests/test_cpuonly_spawn.py new file mode 100644 index 00000000000..1def2ffd82a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_cpuonly_spawn.py @@ -0,0 +1,72 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest + +import paddle +import paddle.nn as nn +import paddle.optimizer as opt +import paddle.distributed as dist + + +class LinearNet(nn.Layer): + def __init__(self): + super(LinearNet, self).__init__() + self._linear1 = nn.Linear(10, 10) + self._linear2 = nn.Linear(10, 1) + + def forward(self, x): + return self._linear2(self._linear1(x)) + + +def train(print_result=False): + # 1. initialize parallel environment + dist.init_parallel_env() + + # 2. create data parallel layer & optimizer + layer = LinearNet() + dp_layer = paddle.DataParallel(layer) + + loss_fn = nn.MSELoss() + adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters()) + + # 3. run layer + inputs = paddle.randn([10, 10], 'float32') + outputs = dp_layer(inputs) + labels = paddle.randn([10, 1], 'float32') + loss = loss_fn(outputs, labels) + + if print_result is True: + print("loss:", loss.numpy()) + + loss.backward() + print("Grad is", layer._linear1.weight.grad) + adam.step() + adam.clear_grad() + + +class TestSpawn(unittest.TestCase): + def test_spawn(self): + dist.spawn(train, backend='gloo', nprocs=4) + + def test_wrong_backend(self): + try: + dist.spawn(train, backend='something', nprocs=4) + except ValueError as e: + self.assertEqual(type(e), ValueError) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index eceb484a018..63985415c51 100755 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -209,7 +209,11 @@ class TestDistRunnerBase(object): def get_data(): origin_batch = next(reader_generator) - if args.update_method != "local" and args.use_reader_alloc: + if paddle.distributed.get_world_size( + ) == 1 and args.update_method == 'gloo': # Gloo single mode + return origin_batch + + elif args.update_method != "local" and args.use_reader_alloc: new_batch = [] for offset, item in enumerate(origin_batch): if offset % 2 == args.trainer_id: @@ -506,7 +510,10 @@ class TestParallelDyGraphRunnerBase(object): "train_one_loop should be implemented by the child classes.") def _get_data(self, batch, args): - if args.update_method != "local": + if paddle.distributed.get_world_size( + ) == 1 and args.update_method == 'gloo': # Gloo single mode + return batch + elif args.update_method != "local": new_batch = [] for offset, item in enumerate(batch): if offset % 2 == args.trainer_id: @@ -518,14 +525,16 @@ class TestParallelDyGraphRunnerBase(object): def run_trainer(self, args): seed = 90 - if fluid.core.is_compiled_with_cuda(): + if args.update_method == 'gloo': + place = fluid.CPUPlace() + elif fluid.core.is_compiled_with_cuda(): device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace(device_id) elif fluid.core.is_compiled_with_xpu(): device_id = int(os.getenv("FLAGS_selected_xpus", "0")) place = fluid.XPUPlace(device_id) else: - assert ("Only support CUDAPlace or XPUPlace for now.") + assert ("Only support CUDAPlace or XPUPlace or CPU(Gloo) for now.") with fluid.dygraph.guard(place): fluid.default_startup_program().random_seed = seed @@ -554,6 +563,16 @@ class TestParallelDyGraphRunnerBase(object): model = dygraph.parallel.DataParallel( model, strategy, find_unused_parameters=True) print_to_err(type(self).__name__, "model built in dygraph") + + elif args.update_method == "gloo": + paddle.distributed.init_parallel_env() + if not args.find_unused_parameters: + model = dygraph.parallel.DataParallel( + model, find_unused_parameters=False) + else: + model = dygraph.parallel.DataParallel( + model, find_unused_parameters=True) + out_losses = [] print_to_err(type(self).__name__, "begin to run dygraph training") for step_id, data in enumerate(train_reader()): @@ -588,12 +607,12 @@ class TestParallelDyGraphRunnerBase(object): args.trainer_id = paddle.distributed.get_rank() # 3. init parallel env - if args.update_method == "nccl2": + if args.update_method in ["nccl2", "gloo"]: paddle.distributed.init_parallel_env() # 4. train model model, train_reader, opt = self.get_model() - if args.update_method == "nccl2": + if args.update_method in ["nccl2", "gloo"]: if args.find_unused_parameters: model = paddle.DataParallel(model, find_unused_parameters=True) else: @@ -668,7 +687,9 @@ def runtime_main(test_class): '--update_method', type=str, default="local", - choices=["pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer"]) + choices=[ + "pserver", "nccl2", "bkcl", "local", "nccl2_reduce_layer", "gloo" + ]) parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) parser.add_argument('--nccl_comm_num', type=int, required=False, default=1) @@ -685,6 +706,7 @@ def runtime_main(test_class): '--current_endpoint', type=str, required=False, default="") parser.add_argument('--sync_mode', action='store_true') parser.add_argument('--use_cuda', action='store_true') + parser.add_argument('--use_cpu', action='store_true') parser.add_argument('--use_xpu', action='store_true') parser.add_argument('--use_dgc', action='store_true') parser.add_argument('--accumulate_gradient', action='store_true') @@ -713,6 +735,9 @@ def runtime_main(test_class): args = parser.parse_args() + if args.update_method == 'gloo': + paddle.set_device("cpu") + model = test_class() if args.role == "pserver" and args.update_method == "pserver": model.run_pserver(args) @@ -770,6 +795,7 @@ class TestDistBase(unittest.TestCase): self._use_reader_alloc = True self._nccl2_mode = False self._bkcl_mode = False + self._gloo_mode = False # now, support gloo backend self._pipeline_mode = False self._mp_mode = False # FIXME(typhoonzero): I added this stupid argument to enable @@ -875,7 +901,7 @@ class TestDistBase(unittest.TestCase): batch_size=DEFAULT_BATCH_SIZE, batch_merge_repeat=1, log_name="", - devices="0"): + devices="1"): cmd = self._python_interp @@ -947,6 +973,21 @@ class TestDistBase(unittest.TestCase): return pickle.loads(local_out) + def _run_local_gloo(self, + model, + envs, + check_error_log=False, + batch_size=DEFAULT_BATCH_SIZE, + batch_merge_repeat=1, + log_name="", + devices="0"): + saved_endpoints = self._ps_endpoints + self._ps_endpoints = self._ps_endpoints.split(',')[0] + result = self._run_cluster_gloo(model, envs, 'gloo', check_error_log, + log_name) + self._ps_endpoints = saved_endpoints + return result + def _run_cluster(self, model, envs, check_error_log, log_name): # Run dist train to compare with local results ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver( @@ -1037,6 +1078,62 @@ class TestDistBase(unittest.TestCase): return pickle.loads(tr0_out), pickle.loads(tr1_out) + def _get_gloo_trainer_cmd(self, model, ep, update_method, trainer_id, + trainer_num): + env = {} + tr_cmd = "%s -u" + + if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': + tr_cmd += " -m coverage run --branch -p" + + tr_cmd += " %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --update_method %s --lr %f" + + tr_cmd = tr_cmd % \ + (self._python_interp, model, self._ps_endpoints, + trainer_id, ep, update_method, self._lr) + + if self._use_reduce: + tr_cmd += " --use_reduce" + if self._use_reader_alloc: + tr_cmd += " --use_reader_alloc" + #assert self._use_reduce == False, "gloo not support _use_reduce" + #assert self._use_reader_alloc == False, "gloo not support _use_reduce" + if self._save_model: + tr_cmd += " --save_model" + self.__use_cuda = False + self.__use_xpu = False + assert self.__use_cuda == False, "gloo not support use cuda" + assert self.__use_xpu == False, "gloo not support use xpu" + tr_cmd += " --use_cpu" + env.update({ + "PADDLE_TRAINERS_NUM": "{}".format(trainer_num), + "PADDLE_TRAINER_ID": "{}".format(trainer_id), + "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints, + "PADDLE_CURRENT_ENDPOINT": ep, + "PADDLE_CURRENT_ENDPOINT": ep, + "PADDLE_DISTRI_BACKEND": "gloo", + "GLOG_v": "2", + }) + + assert self._use_dgc == False, "gloo not support use dgc" + if self._accumulate_gradient: + tr_cmd += " --accumulate_gradient" + + if self._find_unused_parameters: + tr_cmd += " --find_unused_parameters" + + assert self._pipeline_mode == False, "gloo not support use pipeline" + + if self._enable_backward_deps: # build strategy, save it + tr_cmd += " --enable_backward_deps" + + if self._fuse_all_reduce is not None: + tr_cmd += " --fuse_all_reduce {}".format(self._fuse_all_reduce) + + assert self._use_fleet_api == False, "gloo not support use fleet api" + assert self._use_fleet_api_20 == False, "gloo not support use fleet api" + return tr_cmd, env + def _get_nccl2_trainer_cmd(self, model, ep, update_method, trainer_id, trainer_num): env = {} @@ -1123,6 +1220,57 @@ class TestDistBase(unittest.TestCase): return tr_cmd, env + def _run_cluster_gloo(self, model, envs, update_method, check_error_log, + log_name): + assert update_method == "gloo", "_run_cluster_gloo must have update_method: gloo, but get %s" % update_method + assert not self._use_hallreduce, "_run_cluster_gloo must have _use_hallreduce = false" + + worker_endpoints = self._ps_endpoints.split(",") + + trainer_num = len(worker_endpoints) + + procs = [] + pipes = [] + for i in range(0, trainer_num): + tr_cmd, tr_env = self._get_gloo_trainer_cmd( + model, worker_endpoints[i], update_method, i, trainer_num) + tr_env.update(envs) + tr_env["GLOG_vmodule"] = 'gloo_context=4' + tr_env["GLOG_v"] = '3' + print("use_hallreduce:{} tr_cmd:{}, env: {}".format( + self._use_hallreduce, tr_cmd, tr_env)) + + tr_pipe = open(log_name + "_tr{}_err.log".format(i), "wb") + + print_to_err( + type(self).__name__, + "going to start process {} with nccl2".format(i)) + tr_proc = subprocess.Popen( + tr_cmd.strip().split(" "), + stdout=subprocess.PIPE, + stderr=tr_pipe, + env=tr_env) + + procs.append(tr_proc) + pipes.append(tr_pipe) + + outs = [] + for i in range(0, trainer_num): + tr_out, tr_err = procs[i].communicate() + outs.append(tr_out) + pipes[i].close() + sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err)) + + if trainer_num == 1: + if check_error_log: print("outs[0]:", outs[0]) + return pickle.loads(outs[0]) + + else: + if check_error_log: + print("outs[0]:", outs[0]) + print("outs[1]:", outs[1]) + return pickle.loads(outs[0]), pickle.loads(outs[1]) + def _run_cluster_nccl2(self, model, envs, update_method, check_error_log, log_name): if self._use_hallreduce: @@ -1262,7 +1410,12 @@ class TestDistBase(unittest.TestCase): required_envs = self._get_required_envs(check_error_log, need_envs) - local_losses \ + if self._gloo_mode: + local_losses \ + = self._run_local_gloo(model_file, required_envs, + check_error_log, log_name=log_name) + else: + local_losses \ = self._run_local(model_file, required_envs, check_error_log, log_name=log_name) @@ -1288,6 +1441,14 @@ class TestDistBase(unittest.TestCase): update_method='bkcl', check_error_log=check_error_log, log_name=log_name) + elif self._gloo_mode: + # gloo mode, cpu only parallel train @xiongkun03 + tr0_losses, tr1_losses = self._run_cluster_gloo( + model_file, + required_envs, + update_method='gloo', + check_error_log=check_error_log, + log_name=log_name) elif self._pipeline_mode: tr0_losses, tr1_losses = self._run_pipeline( diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py index c97cd56e8a7..edf9aed04f5 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_dataparallel.py @@ -49,6 +49,51 @@ def get_gpus(selected_gpus): return selected_gpus +def start_local_trainers_cpu(trainer_endpoints, + training_script, + training_script_args, + log_dir=None): + current_env = copy.copy(os.environ.copy()) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + procs = [] + n_rank = len(trainer_endpoints) + print(trainer_endpoints) + for rank_id, endpoint in enumerate(trainer_endpoints): + proc_env = { + "PADDLE_DISTRI_BACKEND": "gloo", + "PADDLE_TRAINER_ID": "%d" % rank_id, + "PADDLE_CURRENT_ENDPOINT": "%s" % endpoint, + "PADDLE_TRAINERS_NUM": "%d" % n_rank, + "PADDLE_TRAINER_ENDPOINTS": ",".join(trainer_endpoints) + } + + current_env.update(proc_env) + + print("trainer proc env:{}".format(current_env)) + + assert os.getenv('WITH_COVERAGE', + 'OFF') == 'OFF', "Gloo don't support WITH_COVERAGE." + cmd = "python -u " + training_script + + print("start trainer proc:{} env:{}".format(cmd, proc_env)) + + fn = None + + proc = subprocess.Popen(cmd.split(" "), env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = rank_id + tp.log_fn = fn + tp.cmd = cmd + + procs.append(tp) + + return procs + + def start_local_trainers(cluster, pod, training_script, @@ -116,6 +161,26 @@ class TestMultipleGpus(unittest.TestCase): training_script=target_file_name, training_script_args=[]) + while True: + alive = watch_local_trainers(procs, cluster.trainers_endpoints()) + + if not alive: + print("Local procs complete, POD info:{}".format(pod)) + break + time.sleep(3) + + +class TestMultipleWithGloo(unittest.TestCase): + def run_mnist_2cpu(self, target_file_name): + + cluster, pod = get_cluster_from_args( + [0, 1]) #tmp use. for getting trainer_nranks() + + procs = start_local_trainers_cpu( + cluster.trainers_endpoints(), + training_script=target_file_name, + training_script_args=[]) + while True: alive = watch_local_trainers(procs, cluster.trainers_nranks()) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py new file mode 100644 index 00000000000..56fcf806c47 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_gloo.py @@ -0,0 +1,59 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import unittest + +import paddle.fluid as fluid +from test_dist_base import TestDistBase +from spawn_runner_base import TestDistSpawnRunner +from parallel_dygraph_sparse_embedding import TestSparseEmbedding +from parallel_dygraph_sparse_embedding_fp64 import TestSparseEmbeddingFP64 + +flag_name = os.path.splitext(__file__)[0] + + +class TestParallelDygraphSparseEmdedding_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_sparse_embedding(self): + self.check_with_place( + "parallel_dygraph_sparse_embedding.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphSparseEmdeddingFP64_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_sparse_embedding_fp64(self): + self.check_with_place( + "parallel_dygraph_sparse_embedding_fp64.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py new file mode 100644 index 00000000000..ba43e26e23a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_sparse_embedding_over_height_gloo.py @@ -0,0 +1,44 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import unittest + +import paddle.fluid as fluid +from test_dist_base import TestDistBase +from spawn_runner_base import TestDistSpawnRunner +from parallel_dygraph_sparse_embedding_over_height import TestSparseEmbeddingOverHeight + +flag_name = os.path.splitext(__file__)[0] + + +class TestParallelDygraphSparseEmdeddingOverHeight_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_sparse_embedding(self): + self.check_with_place( + "parallel_dygraph_sparse_embedding_over_height.py", + delta=1e-7, + check_error_log=True, + log_name=flag_name) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py new file mode 100644 index 00000000000..d3619cc1b9a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_transformer_gloo.py @@ -0,0 +1,61 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import unittest + +import paddle.fluid as fluid +from test_dist_base import TestDistBase +from spawn_runner_base import TestDistSpawnRunner +from parallel_dygraph_transformer import TestTransformer + +flag_name = os.path.splitext(__file__)[0] + + +class TestParallelDygraphTransformer_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_transformer(self): + self.check_with_place( + "parallel_dygraph_transformer.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphTransformerAccGrad_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + self._accumulate_gradient = True + self._find_unused_parameters = False + + def test_transformer(self): + if fluid.core.is_compiled_with_cuda(): + self.check_with_place( + "parallel_dygraph_transformer.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_unused_variables_gloo.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_unused_variables_gloo.py new file mode 100644 index 00000000000..89373fcb6ee --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_unused_variables_gloo.py @@ -0,0 +1,72 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import sys +import unittest + +import paddle.fluid as fluid +from test_dist_base import TestDistBase +from spawn_runner_base import TestDistSpawnRunner +from parallel_dygraph_unused_variables import TestSparseEmbeddingUnusedVars + +flag_name = os.path.splitext(__file__)[0] + + +class TestParallelDygraphUnusedVar_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_net(self): + self.check_with_place( + "parallel_dygraph_unused_variables.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphNoVar_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_net(self): + self.check_with_place( + "parallel_dygraph_none_var.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +class TestParallelDygraphSharedUnusedVariables_GLOO(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._gloo_mode = True + self._dygraph = True + + def test_mnist(self): + self.check_with_place( + "parallel_dygraph_shared_unused_var.py", + delta=1e-5, + check_error_log=True, + log_name=flag_name) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py b/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py index 14547eca5ac..dccc117f6bc 100644 --- a/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py +++ b/python/paddle/fluid/tests/unittests/test_spawn_and_init_parallel_env.py @@ -24,6 +24,7 @@ from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_ch from paddle.fluid import core from paddle.fluid.dygraph import parallel_helper +import multiprocessing # NOTE(chenweihang): Coverage CI is currently not able to count python3 # unittest, so the unittests here covers some cases that will only be @@ -89,8 +90,8 @@ class TestSpawnAssistMethod(unittest.TestCase): def test_get_default_nprocs(self): paddle.set_device('cpu') - with self.assertRaises(RuntimeError): - nprocs = _get_default_nprocs() + nprocs = _get_default_nprocs() + self.assertEqual(nprocs, multiprocessing.cpu_count()) paddle.set_device('gpu') nprocs = _get_default_nprocs() -- GitLab