未验证 提交 efb4d5c2 编写于 作者: R ronnywang 提交者: GitHub

[CustomDevice] add process_group_xccl ut (#44632)

* [CustomDevice] add process_group_xccl ut

* update
上级 2a5437a2
......@@ -5,6 +5,7 @@ if(WITH_CUSTOM_DEVICE)
"test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
list(REMOVE_ITEM TEST_OPS "test_collective_process_group_xccl")
foreach(TEST_OP ${TEST_OPS})
py_test(${TEST_OP} SRCS ${TEST_OP}.py)
endforeach()
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
def train(prefix):
selected_accelerators = os.getenv("FLAGS_selected_accelerators")
selected_custom_devices = os.getenv("FLAGS_selected_custom_cpus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
device_ids = os.getenv("PADDLE_WORLD_DEVICE_IDS")
current_device_id = os.getenv("PADDLE_LOCAL_DEVICE_IDS")
details = "selected_accelerators:{} selected_custom_devices:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{} device_ids:{} device_id:{}"\
.format(selected_accelerators, selected_custom_devices, worker_endpoints, trainers_num, current_endpoint,trainer_id,device_ids, current_device_id)
print(details)
with open("multi_process_{}.check_{}.log".format(prefix, trainer_id),
"w") as f:
f.write(details)
if __name__ == '__main__':
prefix = sys.argv[1]
train(prefix)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import random
import numpy as np
import os
import shutil
import paddle
from paddle.fluid import core
from datetime import timedelta
import paddle.fluid.core as core
from paddle.fluid.framework import _test_eager_guard
from paddle.fluid.dygraph.parallel import ParallelEnv
def init_process_group(strategy=None):
nranks = ParallelEnv().nranks
rank = ParallelEnv().local_rank
is_master = True if rank == 0 else False
store = paddle.fluid.core.TCPStore("127.0.0.1", 6173, is_master, nranks)
pg_group = core.ProcessGroupCustom(
store, rank, nranks,
paddle.CustomPlace(ParallelEnv().device_type,
ParallelEnv().device_id))
return pg_group
class TestProcessGroupFp32(unittest.TestCase):
def setUp(self):
paddle.seed(2022)
random.seed(2022)
np.random.seed(2022)
self.config()
def config(self):
self.dtype = "float32"
self.shape = (2, 10, 5)
def test_create_process_group_xccl(self):
with _test_eager_guard():
paddle.set_device('custom_cpu:%d' %
paddle.distributed.ParallelEnv().dev_id)
pg = init_process_group()
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.allreduce(tensor_x)
task.wait()
# assert np.array_equal(tensor_x, sum_result)
else:
task = pg.allreduce(tensor_y)
task.wait()
# assert np.array_equal(tensor_y, sum_result)
print("test allreduce sum api ok")
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = pg.allreduce(tensor_x, core.ReduceOp.MAX)
task.wait()
# assert np.array_equal(tensor_x, max_result)
else:
task = pg.allreduce(tensor_y, core.ReduceOp.MAX)
task.wait()
# assert np.array_equal(tensor_y, max_result)
print("test allreduce max api ok")
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
task = pg.broadcast(tensor_x, 0)
task.synchronize()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
assert task.is_completed()
# assert np.array_equal(broadcast_result, tensor_x)
else:
task = pg.broadcast(tensor_y, 0)
task.synchronize()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
assert task.is_completed()
# assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok")
# test barrier
# rank 0
if pg.rank() == 0:
task = pg.barrier()
task.wait()
# rank 1
else:
task = pg.barrier()
task.wait()
print("test barrier api ok\n")
return
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.all_gather(tensor_y, tensor_out)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2],
[out_shape[0]])
# assert np.array_equal(tensor_x, out_1)
# assert np.array_equal(tensor_y, out_2)
print("test allgather api ok\n")
# test alltoall
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
out1 = np.random.random(self.shape).astype(self.dtype)
out2 = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
tensor_out1 = paddle.to_tensor(out1)
tensor_out2 = paddle.to_tensor(out2)
raw_tensor_x_2 = paddle.slice(tensor_x, [0], [self.shape[0] // 2],
[self.shape[0]])
raw_tensor_y_1 = paddle.slice(tensor_y, [0], [0],
[self.shape[0] // 2])
if pg.rank() == 0:
task = pg.alltoall(tensor_x, tensor_out1)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.alltoall(tensor_y, tensor_out2)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out1_2 = paddle.slice(tensor_out1, [0], [self.shape[0] // 2],
[self.shape[0]])
out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2])
# if pg.rank() == 0:
# assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy())
# else:
# assert np.array_equal(out2_1, raw_tensor_x_2)
print("test alltoall api ok\n")
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.reduce(tensor_x, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.reduce(tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# if pg.rank() == 0:
# assert np.array_equal(tensor_x, sum_result)
print("test reduce sum api ok\n")
# test Scatter
# rank 0
in_shape = list(self.shape)
in_shape[0] *= 2
x = np.random.random(in_shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]])
out2 = paddle.slice(tensor_x, [0], [self.shape[0]],
[self.shape[0] * 2])
# if pg.rank() == 0:
# assert np.array_equal(tensor_y, out1)
# else:
# assert np.array_equal(tensor_y, out2)
print("test scatter api ok\n")
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import os
import sys
import copy
import subprocess
import time
def start_local_trainers(cluster,
pod,
training_script,
training_script_args,
eager_mode=True,
log_dir=None):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
current_env = copy.copy(os.environ.copy())
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
os.system("rm -rf log && mkdir -p log")
for idx, t in enumerate(pod.trainers):
proc_env = {
"FLAGS_selected_custom_cpus":
"%s" % ",".join([str(g) for g in t.gpus]),
"PADDLE_TRAINER_ID": "%d" % t.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
"PADDLE_DISTRI_CUSTOM_DEVICE_TYPE": "custom_cpu",
}
if not eager_mode:
proc_env["FLAGS_enable_eager_mode"] = "%d" % 0
current_env.update(proc_env)
print("trainer proc env:{}".format(current_env))
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
cmd = "python -m coverage run --branch -p " + training_script
else:
cmd = "python -u " + training_script
print("start trainer proc:{} env:{}".format(cmd, proc_env))
fn = open("workerlog.%d" % idx, "a")
proc = subprocess.Popen(cmd.split(" "),
env=current_env,
stdout=fn,
stderr=fn)
tp = TrainerProc()
tp.proc = proc
tp.rank = t.rank
tp.log_fn = fn
tp.cmd = cmd
procs.append(tp)
return procs
def get_cluster_from_args(selected_gpus):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
cluster_node_ips = '127.0.0.1'
node_ip = '127.0.0.1'
node_ips = [x.strip() for x in cluster_node_ips.split(',')]
node_ips.index(node_ip)
free_ports = None
free_ports = find_free_ports(len(selected_gpus))
if free_ports is not None:
free_ports = list(free_ports)
trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus)
class TestMultipleCustomCPU(unittest.TestCase):
def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
selected_devices = [0, 1]
cluster = None
pod = None
cluster, pod = get_cluster_from_args(selected_devices)
procs = start_local_trainers(cluster,
pod,
eager_mode=eager_mode,
training_script=target_file_name,
training_script_args=[])
while True:
alive = watch_local_trainers(procs, cluster.trainers_endpoints())
if not alive:
print("Local procs complete, POD info:{}".format(pod))
break
time.sleep(3)
class TestProcessGroup(TestMultipleCustomCPU):
def setUp(self):
# compile so and set to current path
cur_dir = os.path.dirname(os.path.abspath(__file__))
cmd = 'rm -rf PaddleCustomDevice && git clone https://github.com/PaddlePaddle/PaddleCustomDevice.git && cd PaddleCustomDevice/backends/custom_cpu && mkdir build && cd build && cmake .. && make -j8'
os.system(cmd)
# set environment for loading and registering compiled custom kernels
# only valid in current process
os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join(
cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build')
def test_process_group_xccl(self):
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
self.run_mnist_2custom_cpu('process_group_xccl.py')
if __name__ == "__main__":
unittest.main()
#!/bin/bash
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
rm -rf PaddleCustomDevice && git clone https://github.com/PaddlePaddle/PaddleCustomDevice.git && pushd PaddleCustomDevice/backends/custom_cpu && mkdir build && pushd build && cmake .. && make -j8 && popd && popd
echo "begin test use custom_cpu"
export FLAGS_selected_custom_cpus=0,1
distributed_args="--ips=127.0.0.1 --backend=xccl --custom_device_type=custom_cpu --custom_devices=0,1 --run_mode=collective --log_dir=testlog"
python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册