diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 2491cd90a83ef58a257057de14d7386541744fec..19b953c44d8caf30dd513c720591db634538933e 100755 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -41,6 +41,7 @@ if (WITH_ASCEND_CL) endif() if (WITH_CNCL) + set(PYBIND_DEPS ${PYBIND_DEPS} reducer) set(PYBIND_DEPS ${PYBIND_DEPS} cncl_context) endif() diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index 83acedc69bb6018d16a47351f272331e9232f5c1..d24c0355c2493b9a52c22fadb67510688e1b264d 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -2224,8 +2224,9 @@ void BindImperative(py::module *m_ptr) { }, py::call_guard()); -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ - defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) || \ + defined(PADDLE_WITH_CNCL) py::class_>(m, "ParallelContext"); diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index 4adb19830522bafb99939f42f62edd1bb27c189f..66545a8a249ba2e1ed15773055f01148baf3d46b 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -74,7 +74,7 @@ def _py_supported_check(): def _options_valid_check(options): # `print_config` keeped as a debug options, not show to users supported_options = [ - 'start_method', 'ips', 'gpus', 'xpus', 'print_config', 'backend' + 'start_method', 'ips', 'gpus', 'xpus', 'mlus', 'print_config', 'backend' ] deprecated_options = [ 'selected_devices', 'started_port', 'cluster_node_ips', 'node_ip', @@ -99,6 +99,8 @@ def _get_default_nprocs(): return core.get_cuda_device_count() elif 'xpu' in device: return core.get_xpu_device_count() + elif 'mlu' in device: + return core.get_mlu_device_count() elif 'cpu' in device: return multiprocessing.cpu_count() else: @@ -113,6 +115,8 @@ def _get_default_backend(): return 'nccl' elif 'xpu' in device: return 'bkcl' + elif 'mlu' in device: + return 'cncl' elif 'cpu' in device: return 'gloo' else: @@ -232,6 +236,40 @@ def _get_subprocess_env_list(nprocs, options): raise ValueError("The selected xpu card %s cannot found in " "XPU_VISIBLE_DEVICES (%s)." % (card_id, ",".join(env_devices_list))) + elif options['backend'] == 'cncl': + args.selected_devices = options.get('mlus', None) + if args.selected_devices is None: + args.selected_devices = options.get('selected_devices', None) + env_devices = os.getenv("MLU_VISIBLE_DEVICES", None) + if env_devices is None or env_devices == "": + env_devices_list = [ + str(x) for x in six.moves.range(core.get_mlu_device_count()) + ] + else: + env_devices_list = env_devices.split(',') + if args.selected_devices is None: + if len(env_devices_list) < nprocs: + raise RuntimeError( + "the number of visible devices(%d) is less than the number " + "of spawn processes(%d), please ensure that the correct " + "`nprocs` argument is passed or the environment variable " + "`MLU_VISIBLE_DEVICES` is correctly configured." % + (len(env_devices_list), nprocs)) + args.selected_devices = ",".join( + [str(env_devices_list[x]) for x in range(0, nprocs)]) + else: + selected_device_list = args.selected_devices.split(',') + if len(selected_device_list) != nprocs: + raise ValueError( + "The number of selected devices(%s) is not equal to " + "the number of spawn processes(%d), please ensure that the " + "correct `nprocs` and `mlus` arguments are passed." % + (len(selected_device_list), nprocs)) + for card_id in selected_device_list: + if card_id not in env_devices_list: + raise ValueError("The selected mlu card %s cannot found in " + "MLU_VISIBLE_DEVICES (%s)." % + (card_id, ",".join(env_devices_list))) elif options['backend'] == 'gloo': # TODO check gpu / xpu flag must not exist warnings.warn( @@ -303,6 +341,8 @@ def _set_trainer_env(env_dict, backend): set_flags({'FLAGS_selected_gpus': env_dict['FLAGS_selected_gpus']}) elif backend == 'bkcl': set_flags({'FLAGS_selected_xpus': env_dict['FLAGS_selected_xpus']}) + elif backend == 'cncl': + set_flags({'FLAGS_selected_mlus': env_dict['FLAGS_selected_mlus']}) else: #NOTE(xiongkun) why not raise Error ? # So far, we added support for CPU parallel, and will be applied when paddle is not @@ -396,9 +436,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): Start multiple processes with ``spawn`` method for parallel training. .. note:: - ``spawn`` now only supports GPU or XPU collective mode. The collective mode - of GPU and XPU cannot be started at the same time, so the option `gpus` and - `xpus` cannot be configured at the same time. + ``spawn`` now only supports GPU or XPU or MLU collective mode. The collective mode + of GPU and XPU and MLU cannot be started at the same time, so the option `gpus` and + `xpus` and 'mlus' cannot be configured at the same time. Args: func (function): The target function is called by spawned process. @@ -425,7 +465,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): selected gpus, such as "0,1,2,3". Default: None; (3) xpus (string): The training process will run on the selected xpus, such as "0,1,2,3". Default: None; - (4) ips (string): Paddle cluster nodes ips, such as + (4) mlus (string): The training process will run on the + selected mlus, such as "0,1,2,3". Default: None; + (5) ips (string): Paddle cluster nodes ips, such as "192.168.0.16,192.168.0.17". Default: "127.0.0.1" . Returns: @@ -457,7 +499,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): # 2. create data parallel layer & optimizer layer = LinearNet() - dp_layer = paddle.DataParallel(layer, process_group=process_group) + dp_layer = paddle.DataParallel(layer, group = process_group) loss_fn = nn.MSELoss() adam = opt.Adam( diff --git a/python/paddle/distributed/utils.py b/python/paddle/distributed/utils.py index de7359bcd7337cf0392dc33d14971db328f6306c..30cd63ed80ea72bb4e7223d860f6bfd3f32c9527 100644 --- a/python/paddle/distributed/utils.py +++ b/python/paddle/distributed/utils.py @@ -686,6 +686,15 @@ def _prepare_trainer_env(cluster, trainer, backend=None): "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) } + elif backend == 'cncl': + proc_env = { + "FLAGS_selected_mlus": + "%s" % ",".join([str(g) for g in trainer.gpus]), + "PADDLE_TRAINER_ID": "%d" % trainer.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) + } elif backend == 'gloo': # NOTE (xiongkun) default fall back into cpu only proc_env = { diff --git a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt index 1da2fb8b14f75f10e852ef1783866f1d9ceca834..229a2c1792c2561e889d1026fbf7758e8f9ef628 100644 --- a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt @@ -7,12 +7,14 @@ if (WITH_MLU) foreach(TEST_OP ${TEST_DIST_OPS}) LIST(REMOVE_ITEM TEST_OPS ${TEST_OP}) endforeach(TEST_OP) + LIST(REMOVE_ITEM TEST_OPS "test_spawn_mlu") foreach(TEST_OP ${TEST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) endforeach(TEST_OP) if(WITH_CNCL) + LIST(APPEND TEST_DIST_OPS "test_spawn_mlu") foreach(TEST_OP ${TEST_DIST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) endforeach(TEST_OP) diff --git a/python/paddle/fluid/tests/unittests/mlu/test_spawn_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_spawn_mlu.py new file mode 100644 index 0000000000000000000000000000000000000000..773063c7a8ac909580c0e30a349a0b28b9737673 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_spawn_mlu.py @@ -0,0 +1,112 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import os + +import paddle +import paddle.nn as nn +import paddle.optimizer as opt +import paddle.distributed as dist +from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_check, _get_default_nprocs +from paddle.fluid import core + + +class LinearNet(nn.Layer): + def __init__(self): + super(LinearNet, self).__init__() + self._linear1 = nn.Linear(10, 10) + self._linear2 = nn.Linear(10, 1) + + def forward(self, x): + return self._linear2(self._linear1(x)) + + +def train(print_result=False): + # 1. initialize parallel environment + dist.init_parallel_env() + + # 2. create data parallel layer & optimizer + layer = LinearNet() + dp_layer = paddle.DataParallel(layer) + + loss_fn = nn.MSELoss() + adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters()) + + # 3. run layer + inputs = paddle.randn([10, 10], 'float32') + outputs = dp_layer(inputs) + labels = paddle.randn([10, 1], 'float32') + loss = loss_fn(outputs, labels) + + if print_result is True: + print("Rank:", int(os.getenv("PADDLE_TRAINER_ID"))) + + loss.backward() + adam.step() + adam.clear_grad() + + return int(os.getenv("PADDLE_TRAINER_ID")) + + +class TestSpawn(unittest.TestCase): + def test_nprocs_greater_than_device_num_error(self): + with self.assertRaises(RuntimeError): + _get_subprocess_env_list(nprocs=100, options=dict()) + + def test_selected_devices_error(self): + with self.assertRaises(ValueError): + options = dict() + options['selected_devices'] = "100,101" + _get_subprocess_env_list(nprocs=2, options=options) + + def test_get_correct_env(self): + options = dict() + options['print_config'] = True + env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0] + self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0') + self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1') + + def test_nprocs_not_equal_to_selected_devices(self): + with self.assertRaises(ValueError): + options = dict() + options['selected_devices'] = "100,101,102" + _get_subprocess_env_list(nprocs=2, options=options) + + def test_options_valid_check(self): + options = dict() + options['selected_devices'] = "100,101,102" + _options_valid_check(options) + + with self.assertRaises(ValueError): + options['error'] = "error" + _options_valid_check(options) + + def test_get_default_nprocs(self): + paddle.set_device('mlu') + nprocs = _get_default_nprocs() + self.assertEqual(nprocs, core.get_mlu_device_count()) + + def test_spawn(self): + context = dist.spawn(train, backend='cncl', nprocs=4) + rank_list = [] + for i in range(4): + rank_list.append(context.return_queues[i].get()) + rank_list.sort() + self.assertEqual(rank_list, list(range(4))) + + +if __name__ == '__main__': + unittest.main()