未验证 提交 5d1bbecb 编写于 作者: Z zn 提交者: GitHub

[MLU]support to spawn processes on mlu (#41787)

上级 2caee61f
...@@ -41,6 +41,7 @@ if (WITH_ASCEND_CL) ...@@ -41,6 +41,7 @@ if (WITH_ASCEND_CL)
endif() endif()
if (WITH_CNCL) if (WITH_CNCL)
set(PYBIND_DEPS ${PYBIND_DEPS} reducer)
set(PYBIND_DEPS ${PYBIND_DEPS} cncl_context) set(PYBIND_DEPS ${PYBIND_DEPS} cncl_context)
endif() endif()
......
...@@ -2225,7 +2225,8 @@ void BindImperative(py::module *m_ptr) { ...@@ -2225,7 +2225,8 @@ void BindImperative(py::module *m_ptr) {
py::call_guard<py::gil_scoped_release>()); py::call_guard<py::gil_scoped_release>());
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ #if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \
defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_GLOO) || \
defined(PADDLE_WITH_CNCL)
py::class_<imperative::ParallelContext, py::class_<imperative::ParallelContext,
std::shared_ptr<imperative::ParallelContext>>(m, std::shared_ptr<imperative::ParallelContext>>(m,
"ParallelContext"); "ParallelContext");
......
...@@ -74,7 +74,7 @@ def _py_supported_check(): ...@@ -74,7 +74,7 @@ def _py_supported_check():
def _options_valid_check(options): def _options_valid_check(options):
# `print_config` keeped as a debug options, not show to users # `print_config` keeped as a debug options, not show to users
supported_options = [ supported_options = [
'start_method', 'ips', 'gpus', 'xpus', 'print_config', 'backend' 'start_method', 'ips', 'gpus', 'xpus', 'mlus', 'print_config', 'backend'
] ]
deprecated_options = [ deprecated_options = [
'selected_devices', 'started_port', 'cluster_node_ips', 'node_ip', 'selected_devices', 'started_port', 'cluster_node_ips', 'node_ip',
...@@ -99,6 +99,8 @@ def _get_default_nprocs(): ...@@ -99,6 +99,8 @@ def _get_default_nprocs():
return core.get_cuda_device_count() return core.get_cuda_device_count()
elif 'xpu' in device: elif 'xpu' in device:
return core.get_xpu_device_count() return core.get_xpu_device_count()
elif 'mlu' in device:
return core.get_mlu_device_count()
elif 'cpu' in device: elif 'cpu' in device:
return multiprocessing.cpu_count() return multiprocessing.cpu_count()
else: else:
...@@ -113,6 +115,8 @@ def _get_default_backend(): ...@@ -113,6 +115,8 @@ def _get_default_backend():
return 'nccl' return 'nccl'
elif 'xpu' in device: elif 'xpu' in device:
return 'bkcl' return 'bkcl'
elif 'mlu' in device:
return 'cncl'
elif 'cpu' in device: elif 'cpu' in device:
return 'gloo' return 'gloo'
else: else:
...@@ -232,6 +236,40 @@ def _get_subprocess_env_list(nprocs, options): ...@@ -232,6 +236,40 @@ def _get_subprocess_env_list(nprocs, options):
raise ValueError("The selected xpu card %s cannot found in " raise ValueError("The selected xpu card %s cannot found in "
"XPU_VISIBLE_DEVICES (%s)." % "XPU_VISIBLE_DEVICES (%s)." %
(card_id, ",".join(env_devices_list))) (card_id, ",".join(env_devices_list)))
elif options['backend'] == 'cncl':
args.selected_devices = options.get('mlus', None)
if args.selected_devices is None:
args.selected_devices = options.get('selected_devices', None)
env_devices = os.getenv("MLU_VISIBLE_DEVICES", None)
if env_devices is None or env_devices == "":
env_devices_list = [
str(x) for x in six.moves.range(core.get_mlu_device_count())
]
else:
env_devices_list = env_devices.split(',')
if args.selected_devices is None:
if len(env_devices_list) < nprocs:
raise RuntimeError(
"the number of visible devices(%d) is less than the number "
"of spawn processes(%d), please ensure that the correct "
"`nprocs` argument is passed or the environment variable "
"`MLU_VISIBLE_DEVICES` is correctly configured." %
(len(env_devices_list), nprocs))
args.selected_devices = ",".join(
[str(env_devices_list[x]) for x in range(0, nprocs)])
else:
selected_device_list = args.selected_devices.split(',')
if len(selected_device_list) != nprocs:
raise ValueError(
"The number of selected devices(%s) is not equal to "
"the number of spawn processes(%d), please ensure that the "
"correct `nprocs` and `mlus` arguments are passed." %
(len(selected_device_list), nprocs))
for card_id in selected_device_list:
if card_id not in env_devices_list:
raise ValueError("The selected mlu card %s cannot found in "
"MLU_VISIBLE_DEVICES (%s)." %
(card_id, ",".join(env_devices_list)))
elif options['backend'] == 'gloo': elif options['backend'] == 'gloo':
# TODO check gpu / xpu flag must not exist # TODO check gpu / xpu flag must not exist
warnings.warn( warnings.warn(
...@@ -303,6 +341,8 @@ def _set_trainer_env(env_dict, backend): ...@@ -303,6 +341,8 @@ def _set_trainer_env(env_dict, backend):
set_flags({'FLAGS_selected_gpus': env_dict['FLAGS_selected_gpus']}) set_flags({'FLAGS_selected_gpus': env_dict['FLAGS_selected_gpus']})
elif backend == 'bkcl': elif backend == 'bkcl':
set_flags({'FLAGS_selected_xpus': env_dict['FLAGS_selected_xpus']}) set_flags({'FLAGS_selected_xpus': env_dict['FLAGS_selected_xpus']})
elif backend == 'cncl':
set_flags({'FLAGS_selected_mlus': env_dict['FLAGS_selected_mlus']})
else: else:
#NOTE(xiongkun) why not raise Error ? #NOTE(xiongkun) why not raise Error ?
# So far, we added support for CPU parallel, and will be applied when paddle is not # So far, we added support for CPU parallel, and will be applied when paddle is not
...@@ -396,9 +436,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): ...@@ -396,9 +436,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
Start multiple processes with ``spawn`` method for parallel training. Start multiple processes with ``spawn`` method for parallel training.
.. note:: .. note::
``spawn`` now only supports GPU or XPU collective mode. The collective mode ``spawn`` now only supports GPU or XPU or MLU collective mode. The collective mode
of GPU and XPU cannot be started at the same time, so the option `gpus` and of GPU and XPU and MLU cannot be started at the same time, so the option `gpus` and
`xpus` cannot be configured at the same time. `xpus` and 'mlus' cannot be configured at the same time.
Args: Args:
func (function): The target function is called by spawned process. func (function): The target function is called by spawned process.
...@@ -425,7 +465,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): ...@@ -425,7 +465,9 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
selected gpus, such as "0,1,2,3". Default: None; selected gpus, such as "0,1,2,3". Default: None;
(3) xpus (string): The training process will run on the (3) xpus (string): The training process will run on the
selected xpus, such as "0,1,2,3". Default: None; selected xpus, such as "0,1,2,3". Default: None;
(4) ips (string): Paddle cluster nodes ips, such as (4) mlus (string): The training process will run on the
selected mlus, such as "0,1,2,3". Default: None;
(5) ips (string): Paddle cluster nodes ips, such as
"192.168.0.16,192.168.0.17". Default: "127.0.0.1" . "192.168.0.16,192.168.0.17". Default: "127.0.0.1" .
Returns: Returns:
...@@ -457,7 +499,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options): ...@@ -457,7 +499,7 @@ def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
# 2. create data parallel layer & optimizer # 2. create data parallel layer & optimizer
layer = LinearNet() layer = LinearNet()
dp_layer = paddle.DataParallel(layer, process_group=process_group) dp_layer = paddle.DataParallel(layer, group = process_group)
loss_fn = nn.MSELoss() loss_fn = nn.MSELoss()
adam = opt.Adam( adam = opt.Adam(
......
...@@ -686,6 +686,15 @@ def _prepare_trainer_env(cluster, trainer, backend=None): ...@@ -686,6 +686,15 @@ def _prepare_trainer_env(cluster, trainer, backend=None):
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
} }
elif backend == 'cncl':
proc_env = {
"FLAGS_selected_mlus":
"%s" % ",".join([str(g) for g in trainer.gpus]),
"PADDLE_TRAINER_ID": "%d" % trainer.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
}
elif backend == 'gloo': elif backend == 'gloo':
# NOTE (xiongkun) default fall back into cpu only # NOTE (xiongkun) default fall back into cpu only
proc_env = { proc_env = {
......
...@@ -7,12 +7,14 @@ if (WITH_MLU) ...@@ -7,12 +7,14 @@ if (WITH_MLU)
foreach(TEST_OP ${TEST_DIST_OPS}) foreach(TEST_OP ${TEST_DIST_OPS})
LIST(REMOVE_ITEM TEST_OPS ${TEST_OP}) LIST(REMOVE_ITEM TEST_OPS ${TEST_OP})
endforeach(TEST_OP) endforeach(TEST_OP)
LIST(REMOVE_ITEM TEST_OPS "test_spawn_mlu")
foreach(TEST_OP ${TEST_OPS}) foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP}) py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach(TEST_OP) endforeach(TEST_OP)
if(WITH_CNCL) if(WITH_CNCL)
LIST(APPEND TEST_DIST_OPS "test_spawn_mlu")
foreach(TEST_OP ${TEST_DIST_OPS}) foreach(TEST_OP ${TEST_DIST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP}) py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach(TEST_OP) endforeach(TEST_OP)
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import os
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
from paddle.distributed.spawn import _get_subprocess_env_list, _options_valid_check, _get_default_nprocs
from paddle.fluid import core
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train(print_result=False):
# 1. initialize parallel environment
dist.init_parallel_env()
# 2. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
loss_fn = nn.MSELoss()
adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters())
# 3. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
if print_result is True:
print("Rank:", int(os.getenv("PADDLE_TRAINER_ID")))
loss.backward()
adam.step()
adam.clear_grad()
return int(os.getenv("PADDLE_TRAINER_ID"))
class TestSpawn(unittest.TestCase):
def test_nprocs_greater_than_device_num_error(self):
with self.assertRaises(RuntimeError):
_get_subprocess_env_list(nprocs=100, options=dict())
def test_selected_devices_error(self):
with self.assertRaises(ValueError):
options = dict()
options['selected_devices'] = "100,101"
_get_subprocess_env_list(nprocs=2, options=options)
def test_get_correct_env(self):
options = dict()
options['print_config'] = True
env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0]
self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0')
self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1')
def test_nprocs_not_equal_to_selected_devices(self):
with self.assertRaises(ValueError):
options = dict()
options['selected_devices'] = "100,101,102"
_get_subprocess_env_list(nprocs=2, options=options)
def test_options_valid_check(self):
options = dict()
options['selected_devices'] = "100,101,102"
_options_valid_check(options)
with self.assertRaises(ValueError):
options['error'] = "error"
_options_valid_check(options)
def test_get_default_nprocs(self):
paddle.set_device('mlu')
nprocs = _get_default_nprocs()
self.assertEqual(nprocs, core.get_mlu_device_count())
def test_spawn(self):
context = dist.spawn(train, backend='cncl', nprocs=4)
rank_list = []
for i in range(4):
rank_list.append(context.return_queues[i].get())
rank_list.sort()
self.assertEqual(rank_list, list(range(4)))
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册