未验证 提交 2533cac6 编写于 作者: Z zn 提交者: GitHub

[MLU]support launch process on mlu (#39839)

上级 64f1485a
......@@ -156,6 +156,16 @@ see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/tra
)
base_group.add_argument("--selected_npus", dest="npus")
if fluid.core.is_compiled_with_mlu():
base_group.add_argument(
"--mlus",
type=str,
default=None,
help="It's for mlu training. For example: "
"--mlus=\"0,1,2,3\" will launch four training processes each bound to one mlu."
)
base_group.add_argument("--selected_mlus", dest="mlus")
base_group.add_argument(
"training_script",
type=str,
......@@ -429,6 +439,8 @@ def infer_backend(args):
args.backend = 'unknown'
elif fluid.core.is_compiled_with_xpu():
args.backend = 'bkcl'
elif fluid.core.is_compiled_with_mlu():
args.backend = 'cncl'
else:
args.backend = 'gloo'
......@@ -472,6 +484,8 @@ def which_distributed_mode(args):
accelerators = fluid.core.get_npu_device_count()
elif fluid.core.is_compiled_with_xpu():
accelerators = fluid.core.get_xpu_device_count()
elif fluid.core.is_compiled_with_mlu():
accelerators = fluid.core.get_mlu_device_count()
else:
accelerators = 0
......@@ -490,17 +504,18 @@ def which_distributed_mode(args):
return DistributeMode.COLLECTIVE
else:
if not fluid.core.is_compiled_with_cuda(
) and not fluid.core.is_compiled_with_xpu():
) and not fluid.core.is_compiled_with_xpu(
) and not fluid.core.is_compiled_with_mlu():
if args.servers:
logger.warning(
"Not found distinct arguments and not compiled with cuda or xpu or npu. "
"Not found distinct arguments and not compiled with cuda or xpu or npu or mlu. "
"But found args.servers not empty, default use ps mode")
return DistributeMode.PS
else:
return DistributeMode.COLLECTIVE
else:
logger.warning(
"Not found distinct arguments and compiled with cuda or xpu or npu. "
"Not found distinct arguments and compiled with cuda or xpu or npu or mlu. "
"Default use collective mode")
return DistributeMode.COLLECTIVE
......@@ -536,6 +551,10 @@ def launch():
- ``--selected_xpus``: xpus aliases, recommend to use ``--xpus``.
- ``--mlus``: It's for mlu training. e.g., ``--mlus=0,1,2,3`` will launch four training processes each bound to one mlu.
- ``--selected_mlus``: mlus aliases, recommend to use ``--mlus``.
- ``training_script``: The full path to the single GPU training program/script to be launched in parallel, followed by all the arguments for the training script. e.g., ``traing.py``
- ``training_script_args``: The args of training_script. e.g., ``--lr=0.1``
......@@ -688,7 +707,7 @@ def launch():
check_backend(args.backend)
distribute_mode = DistributeMode.COLLECTIVE
#assert args.backend in ['gloo', 'nccl', 'bkcl', 'heter', 'unknown']
#assert args.backend in ['gloo', 'nccl', 'bkcl', 'cncl', 'heter', 'unknown']
if args.backend == 'gloo':
logger.warning("launch start with CPUONLY mode")
......
......@@ -57,6 +57,7 @@ class DeviceMode():
XPU = 2
ASCEND_NPU = 3
UNKNOWN = 3
MLU = 4
class Cluster(object):
......@@ -287,7 +288,7 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
), "current trainer_endpoints size should be greater equal than acclerators size."
for i in range(len(devices_per_proc)):
trainer = Trainer()
if device_mode == DeviceMode.GPU or device_mode == DeviceMode.ASCEND_NPU:
if device_mode == DeviceMode.GPU or device_mode == DeviceMode.ASCEND_NPU or device_mode == DeviceMode.MLU:
if isinstance(devices_per_proc[i], (list, tuple)):
trainer.accelerators.extend(devices_per_proc[i])
pod.accelerators.extend(devices_per_proc[i])
......@@ -530,6 +531,9 @@ def start_local_trainers(cluster,
accelerators) > 0 and pod.device_mode == DeviceMode.ASCEND_NPU:
proc_env["FLAGS_selected_npus"] = "%s" % ",".join(
[str(g) for g in t.accelerators])
elif len(t.accelerators) > 0 and pod.device_mode == DeviceMode.MLU:
proc_env["FLAGS_selected_mlus"] = "%s" % ",".join(
[str(g) for g in t.accelerators])
if len(t.accelerators) > 0:
proc_env["FLAGS_selected_accelerators"] = "%s" % ",".join(
......@@ -735,6 +739,35 @@ def get_npus(npus):
return res_npus
def get_mlus(mlus):
if mlus is None:
mlus_num = fluid.core.get_mlu_device_count()
res_mlus = [str(x) for x in range(0, mlus_num)]
else:
mlu_visible_devices = os.getenv("MLU_VISIBLE_DEVICES")
if mlu_visible_devices is None or mlu_visible_devices == "":
res_mlus = [x.strip() for x in mlus.split(',')]
else:
# change mlus into relative values
# e.g. MLU_VISIBLE_DEVICES=4,5,6,7; args.mlus=4,5,6,7;
# therefore mlus=0,1,2,3
mlu_visible_devices_list = mlu_visible_devices.split(',')
for x in mlus.split(','):
assert x in mlu_visible_devices_list, "Can't find "\
"your mlus %s in MLU_VISIBLE_DEVICES[%s]."\
% (x, mlu_visible_devices)
res_mlus = [
mlu_visible_devices_list.index(x.strip())
for x in mlus.split(',')
]
logger.info("Change selected_mlus into reletive values. --ips:{} "
"will change into relative_ips:{} according to your "
"MLU_VISIBLE_DEVICES:{}".format(
mlus, res_mlus, mlu_visible_devices_list))
return res_mlus
def get_device_mode(backend):
if backend == 'heter':
if fluid.core.is_compiled_with_cuda() and \
......@@ -763,6 +796,10 @@ def get_device_mode(backend):
print("launch train in XPU mode")
return DeviceMode.XPU
if backend == 'cncl' and fluid.core.get_mlu_device_count() > 0:
print("launch train in MLU mode")
return DeviceMode.MLU
if backend == 'gloo':
print("launch train in CPU mode")
return DeviceMode.CPU
......@@ -812,6 +849,18 @@ def get_device_proc_info(args):
]
else:
devices_per_proc = xpus
elif device_mode == DeviceMode.MLU:
mlus = get_mlus(args.mlus)
if args.nproc_per_node is not None:
assert (len(mlus) % int(args.nproc_per_node)) ==0, \
"mlus' number:{} mod args.nproc_per_node:{} must == 0".format(len(mlus), args.nproc_per_node)
n = int(len(mlus) / int(args.nproc_per_node))
devices_per_proc = [
mlus[i:i + n] for i in six.moves.range(0, len(mlus), n)
]
else:
devices_per_proc = mlus
elif device_mode == DeviceMode.CPU:
if hasattr(args, "paddle_cpuonly") and args.nproc_per_node is None:
#NOTE (xiongkun03) set it to cpu core number
......@@ -1719,7 +1768,7 @@ class ParameterServerLauncher(object):
def check_backend(backend):
if backend not in ['nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter']:
if backend not in ['nccl', 'gloo', 'bkcl', 'cncl', 'auto', 'hccl', 'heter']:
raise ValueError("paddle.distributed initialize error, "
"backend argument can only be one of "
"'nccl', 'gloo', 'bkcl', 'auto', 'hccl', 'heter' "
......@@ -1743,6 +1792,12 @@ def check_backend(backend):
"your paddle is not compiled with npu but you assign 'hccl' as backend."
)
if backend == 'cncl' and not fluid.core.is_compiled_with_mlu():
raise ValueError(
"paddle.distributed initialize error, "
"your paddle is not compiled with mlu but you assign 'cncl' as backend."
)
def block_windows_and_macos(backend):
if backend != 'gloo': return
......@@ -1766,4 +1821,7 @@ def get_backend_by_compile_flag():
if fluid.core.is_compiled_with_npu():
return 'hccl'
if fluid.core.is_compiled_with_mlu():
return 'cncl'
return 'gloo'
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
file(GLOB TEST_DIST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_collective_*.py")
string(REPLACE ".py" "" TEST_DIST_OPS "${TEST_DIST_OPS}")
if (WITH_MLU)
foreach(TEST_OP ${TEST_DIST_OPS})
LIST(REMOVE_ITEM TEST_OPS ${TEST_OP})
endforeach(TEST_OP)
foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach(TEST_OP)
if(WITH_CNCL)
foreach(TEST_OP ${TEST_DIST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach(TEST_OP)
bash_test_modules(test_launch_async_mlu START_BASH test_launch_async_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_launch_cloud_mlu START_BASH test_launch_cloud_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_launch_nproc_mlu START_BASH test_launch_nproc_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120)
endif(WITH_CNCL)
endif()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import paddle.fluid as fluid
def train(prefix):
selected_mlus = os.getenv("FLAGS_selected_mlus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
name = "selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\
.format(selected_mlus, worker_endpoints, trainers_num, current_endpoint,trainer_id)
print(name)
with open("multi_process_{}.check_{}.log".format(prefix, trainer_id),
"w") as f:
f.write(name)
def train_abort(prefix):
selected_mlus = os.getenv("FLAGS_selected_mlus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
if trainer_id == 0:
try:
# train abort
exit(1)
except SystemExit:
name = "abort>>> selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\
.format(selected_mlus, worker_endpoints, trainers_num, current_endpoint,trainer_id)
print(name)
with open(
"multi_process_{}.check_{}.log".format(prefix, trainer_id),
"w") as f:
f.write(name)
raise
else:
# sleep 30s to make sure paddle.distributed.launch will terminate this process
time.sleep(30)
name = "selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\
.format(selected_mlus, worker_endpoints, trainers_num, current_endpoint,trainer_id)
print(name)
with open("multi_process_{}.check_{}.log".format(prefix, trainer_id),
"w") as f:
f.write(name)
if __name__ == '__main__':
if len(sys.argv) == 3 and sys.argv[2] == "abort":
prefix = sys.argv[1]
train_abort(prefix)
else:
prefix = sys.argv[1]
train(prefix)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
def train(prefix):
selected_mlus = os.getenv("FLAGS_selected_mlus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
name = "selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\
.format(selected_mlus, worker_endpoints, trainers_num, current_endpoint,trainer_id)
print(name)
with open("{}.check_{}.log".format(prefix, trainer_id), "w") as f:
f.write(name)
if __name__ == '__main__':
prefix = sys.argv[1]
train(prefix)
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud
unset PADDLE_PORT
export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171
export cluster_node_ips="127.0.0.1,127.0.0.2"
export PADDLE_TRAINERS_NUM=2
export POD_IP=127.0.0.1
export PADDLE_TRAINERS=127.0.0.1,127.0.0.2
export PADDLE_TRAINER_ID=0
export TRAINER_PORTS_NUM=2
file_0="multi_process_fullpath_launch.check_0.log"
file_1="multi_process_fullpath_launch.check_1.log"
distributed_args="--ips=${cluster_node_ips} --mlus=0,1 --log_dir=testlog"
echo "paddle.distributed.fleet.launch async poll process test"
if ! MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process_mlu.py fullpath_launch abort; then
echo "train abort as planned"
fi
abort_str1="abort>>> selected_mlus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0"
if grep -q "$abort_str1" "$file_0"; then
echo "trainer 0 abort as planned"
else
echo "trainer 0 not abort as planned"
exit -1
fi
if [ ! -f $file_1 ]; then
echo "trainer 1 terminate as planned"
else
echo "trainer 1 not terminate as planned"
rm $file_1
exit -1
fi
if [ -f $file_0 ]; then
rm $file_0
fi
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# use paddlecloud
echo "begin test use paddlecloud"
cluster_node_ips="127.0.0.1,127.0.0.2"
export PADDLE_TRAINERS_NUM=2
export POD_IP=127.0.0.1
export PADDLE_TRAINERS=127.0.0.1,127.0.0.2
export PADDLE_TRAINER_ID=0
export PADDLE_PORT=35789
export TRAINER_PORTS_NUM=2
distributed_args="--ips=${cluster_node_ips} --mlus=0,1 --log_dir=testlog"
MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process_mlu.py fleetlaunchcloud
str1="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0"
str2="selected_mlus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1"
file_0="multi_process_fleetlaunchcloud.check_0.log"
file_1="multi_process_fleetlaunchcloud.check_1.log"
echo "paddlecloud params test"
if grep -q "$str1" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if grep -q "$str2" "$file_1"; then
echo "find trainer 1"
else
echo "not find trainer 1"
exit -1
fi
# test async poll process
if [ -f $file_0 ]; then
rm $file_0
fi
if [ -f $file_1 ]; then
rm $file_1
fi
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
export FLAGS_START_PORT=35789
export MLU_VISIBLE_DEVICES=0,1
function test_nproc_0(){
mlus=$1
file_0="fleet_nproc_0.check_0.log"
rm -f ${file_0}
distributed_args="--log_dir=testlog --nproc_per_node=1"
python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_0
str0="selected_mlus:${mlus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0"
if grep -q "$str0" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if [ -f $file_0 ]; then
rm $file_0
fi
}
function test_nproc_1(){
file_0="fleet_nproc_1.check_0.log"
file_1="fleet_nproc_1.check_1.log"
rm -f ${file_0} ${file_1}
distributed_args="--log_dir=testlog --nproc_per_node=2"
python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_1
str0="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
if grep -q "$str0" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
str1="selected_mlus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35790 trainer_id:1"
if grep -q "$str1" "$file_1"; then
echo "find trainer 1"
else
echo "not find trainer 1"
exit -1
fi
if [ -f $file_0 ]; then
rm $file_0
fi
if [ -f $file_1 ]; then
rm $file_1
fi
}
test_nproc_0 "0,1"
test_nproc_1
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册