未验证 提交 08d233f9 编写于 作者: R ronnywang 提交者: GitHub

cherry pick pr46536 (#46901)

cherry pick pr46536 
上级 b051455f
if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU)
set(PLUGIN_URL https://github.com/PaddlePaddle/PaddleCustomDevice.git)
set(PLUGIN_TAG b9ae8452f31525d0524810461b17856838acd821)
set(PLUGIN_TAG 0698428ddba21e6baecb690579f37c48896f7d56)
file(
GLOB TEST_OPS
......@@ -8,10 +8,10 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU)
"test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
list(REMOVE_ITEM TEST_OPS test_collective_process_group_xccl)
foreach(TEST_OP ${TEST_OPS})
py_test(${TEST_OP} SRCS ${TEST_OP}.py ENVS PLUGIN_URL=${PLUGIN_URL}
PLUGIN_TAG=${PLUGIN_TAG})
py_test(${TEST_OP}
SRCS ${TEST_OP}.py ENVS FLAGS_allocator_strategy=naive_best_fit
PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG})
endforeach()
bash_test_modules(
......@@ -19,6 +19,7 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU)
START_BASH
test_fleet_launch_custom_device.sh
ENVS
FLAGS_allocator_strategy=naive_best_fit
PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}
PLUGIN_URL=${PLUGIN_URL}
PLUGIN_TAG=${PLUGIN_TAG})
......@@ -26,4 +27,5 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU)
set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120)
set_tests_properties(test_custom_cpu_profiler_plugin PROPERTIES TIMEOUT 120)
set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120)
set_tests_properties(test_custom_cpu_to_static PROPERTIES TIMEOUT 120)
endif()
......@@ -12,14 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import os
import sys
import copy
import subprocess
import time
import tempfile
def start_local_trainers(cluster,
......@@ -28,7 +26,7 @@ def start_local_trainers(cluster,
training_script_args,
eager_mode=True,
log_dir=None):
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401
current_env = copy.copy(os.environ.copy())
#paddle broadcast ncclUniqueId use socket, and
......@@ -84,7 +82,7 @@ def start_local_trainers(cluster,
def get_cluster_from_args(selected_gpus):
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401
cluster_node_ips = '127.0.0.1'
node_ip = '127.0.0.1'
......@@ -108,7 +106,7 @@ def get_cluster_from_args(selected_gpus):
class TestMultipleCustomCPU(unittest.TestCase):
def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True):
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401
selected_devices = [0, 1]
cluster = None
......@@ -136,21 +134,32 @@ class TestProcessGroup(TestMultipleCustomCPU):
def setUp(self):
# compile so and set to current path
cur_dir = os.path.dirname(os.path.abspath(__file__))
cmd = 'rm -rf PaddleCustomDevice \
self.temp_dir = tempfile.TemporaryDirectory()
cmd = 'cd {} \
&& git clone {} \
&& cd PaddleCustomDevice/backends/custom_cpu \
&& cd PaddleCustomDevice \
&& git fetch origin \
&& git checkout {} -b dev \
&& cd backends/custom_cpu \
&& mkdir build && cd build && cmake .. && make -j8'.format(
os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG'))
self.temp_dir.name, os.getenv('PLUGIN_URL'),
os.getenv('PLUGIN_TAG'))
os.system(cmd)
# set environment for loading and registering compiled custom kernels
# only valid in current process
os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join(
cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build')
cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format(
self.temp_dir.name))
os.environ['FLAGS_selected_custom_cpus'] = '0,1'
os.environ['CUSTOM_CPU_VISIBLE_DEVICES'] = '0,1'
os.environ['PADDLE_XCCL_BACKEND'] = 'custom_cpu'
def tearDown(self):
self.temp_dir.cleanup()
def test_process_group_xccl(self):
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401
self.run_mnist_2custom_cpu('process_group_xccl.py')
......
......@@ -14,9 +14,9 @@
import os
import sys
import site
import unittest
import numpy as np
import tempfile
class TestCustomCPUPlugin(unittest.TestCase):
......@@ -24,18 +24,27 @@ class TestCustomCPUPlugin(unittest.TestCase):
def setUp(self):
# compile so and set to current path
cur_dir = os.path.dirname(os.path.abspath(__file__))
cmd = 'rm -rf PaddleCustomDevice \
self.temp_dir = tempfile.TemporaryDirectory()
cmd = 'cd {} \
&& git clone {} \
&& cd PaddleCustomDevice/backends/custom_cpu \
&& cd PaddleCustomDevice \
&& git fetch origin \
&& git checkout {} -b dev \
&& cd backends/custom_cpu \
&& mkdir build && cd build && cmake .. && make -j8'.format(
os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG'))
self.temp_dir.name, os.getenv('PLUGIN_URL'),
os.getenv('PLUGIN_TAG'))
os.system(cmd)
# set environment for loading and registering compiled custom kernels
# only valid in current process
os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join(
cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build')
cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format(
self.temp_dir.name))
def tearDown(self):
self.temp_dir.cleanup()
del os.environ['CUSTOM_DEVICE_ROOT']
def test_custom_device(self):
import paddle
......@@ -183,9 +192,6 @@ class TestCustomCPUPlugin(unittest.TestCase):
k_t = paddle.to_tensor([3], dtype="int32")
value_1, indices_1 = paddle.topk(data_1, k=k_t)
def tearDown(self):
del os.environ['CUSTOM_DEVICE_ROOT']
if __name__ == '__main__':
if os.name == 'nt' or sys.platform.startswith('darwin'):
......
......@@ -14,9 +14,8 @@
import os
import sys
import site
import unittest
import numpy as np
import tempfile
class TestCustomCPUProfilerPlugin(unittest.TestCase):
......@@ -24,18 +23,27 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase):
def setUp(self):
# compile so and set to current path
cur_dir = os.path.dirname(os.path.abspath(__file__))
cmd = 'rm -rf PaddleCustomDevice \
self.temp_dir = tempfile.TemporaryDirectory()
cmd = 'cd {} \
&& git clone {} \
&& cd PaddleCustomDevice/backends/custom_cpu \
&& cd PaddleCustomDevice \
&& git fetch origin \
&& git checkout {} -b dev \
&& cd backends/custom_cpu \
&& mkdir build && cd build && cmake .. && make -j8'.format(
os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG'))
self.temp_dir.name, os.getenv('PLUGIN_URL'),
os.getenv('PLUGIN_TAG'))
os.system(cmd)
# set environment for loading and registering compiled custom kernels
# only valid in current process
os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join(
cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build')
cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format(
self.temp_dir.name))
def tearDown(self):
self.temp_dir.cleanup()
del os.environ['CUSTOM_DEVICE_ROOT']
def test_custom_device(self):
import paddle
......@@ -59,9 +67,6 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase):
p.stop()
p.summary()
def tearDown(self):
del os.environ['CUSTOM_DEVICE_ROOT']
if __name__ == '__main__':
if os.name == 'nt' or sys.platform.startswith('darwin'):
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import unittest
import numpy as np
import tempfile
EPOCH_NUM = 1
BATCH_SIZE = 1024
def train_func_base(epoch_id, train_loader, model, cost, optimizer):
total_step = len(train_loader)
epoch_start = time.time()
for batch_id, (images, labels) in enumerate(train_loader()):
# forward
outputs = model(images)
loss = cost(outputs, labels)
# backward and optimize
loss.backward()
optimizer.step()
optimizer.clear_grad()
print("Epoch [{}/{}], Step [{}/{}], Loss: {}".format(
epoch_id + 1, EPOCH_NUM, batch_id + 1, total_step, loss.numpy()))
epoch_end = time.time()
print(
f"Epoch ID: {epoch_id+1}, FP32 train epoch time: {(epoch_end - epoch_start) * 1000} ms"
)
def train_func_ampo1(epoch_id, train_loader, model, cost, optimizer, scaler):
import paddle
total_step = len(train_loader)
epoch_start = time.time()
for batch_id, (images, labels) in enumerate(train_loader()):
# forward
with paddle.amp.auto_cast(
custom_black_list={"flatten_contiguous_range", "greater_than"},
level='O1'):
outputs = model(images)
loss = cost(outputs, labels)
# backward and optimize
scaled = scaler.scale(loss)
scaled.backward()
scaler.minimize(optimizer, scaled)
optimizer.clear_grad()
print("Epoch [{}/{}], Step [{}/{}], Loss: {}".format(
epoch_id + 1, EPOCH_NUM, batch_id + 1, total_step, loss.numpy()))
epoch_end = time.time()
print(
f"Epoch ID: {epoch_id+1}, AMPO1 train epoch time: {(epoch_end - epoch_start) * 1000} ms"
)
def test_func(epoch_id, test_loader, model, cost):
import paddle
# evaluation every epoch finish
model.eval()
avg_acc = [[], []]
for batch_id, (images, labels) in enumerate(test_loader()):
# forward
outputs = model(images)
loss = cost(outputs, labels)
# accuracy
acc_top1 = paddle.metric.accuracy(input=outputs, label=labels, k=1)
acc_top5 = paddle.metric.accuracy(input=outputs, label=labels, k=5)
avg_acc[0].append(acc_top1.numpy())
avg_acc[1].append(acc_top5.numpy())
model.train()
print(
f"Epoch ID: {epoch_id+1}, Top1 accurary: {np.array(avg_acc[0]).mean()}, Top5 accurary: {np.array(avg_acc[1]).mean()}"
)
class TestCustomCPUPlugin(unittest.TestCase):
def setUp(self):
# compile so and set to current path
cur_dir = os.path.dirname(os.path.abspath(__file__))
self.temp_dir = tempfile.TemporaryDirectory()
cmd = 'cd {} \
&& git clone {} \
&& cd PaddleCustomDevice \
&& git fetch origin \
&& git checkout {} -b dev \
&& cd backends/custom_cpu \
&& mkdir build && cd build && cmake .. && make -j8'.format(
self.temp_dir.name, os.getenv('PLUGIN_URL'),
os.getenv('PLUGIN_TAG'))
os.system(cmd)
# set environment for loading and registering compiled custom kernels
# only valid in current process
os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join(
cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format(
self.temp_dir.name))
def tearDown(self):
self.temp_dir.cleanup()
def test_custom_cpu_plugin(self):
self._test_to_static()
self._test_amp_o1()
def _test_to_static(self):
import paddle
class LeNet5(paddle.nn.Layer):
def __init__(self):
super(LeNet5, self).__init__()
self.fc = paddle.nn.Linear(in_features=1024, out_features=10)
self.relu = paddle.nn.ReLU()
self.fc1 = paddle.nn.Linear(in_features=10, out_features=10)
def forward(self, x):
out = paddle.flatten(x, 1)
out = self.fc(out)
out = self.relu(out)
out = self.fc1(out)
return out
# set device
paddle.set_device('custom_cpu')
# model
model = LeNet5()
# cost and optimizer
cost = paddle.nn.CrossEntropyLoss()
optimizer = paddle.optimizer.Adam(learning_rate=0.001,
parameters=model.parameters())
# convert to static model
build_strategy = paddle.static.BuildStrategy()
mnist = paddle.jit.to_static(model, build_strategy=build_strategy)
# data loader
transform = paddle.vision.transforms.Compose([
paddle.vision.transforms.Resize((32, 32)),
paddle.vision.transforms.ToTensor(),
paddle.vision.transforms.Normalize(mean=(0.1307, ), std=(0.3081, ))
])
train_dataset = paddle.vision.datasets.MNIST(mode='train',
transform=transform,
download=True)
test_dataset = paddle.vision.datasets.MNIST(mode='test',
transform=transform,
download=True)
train_loader = paddle.io.DataLoader(train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
num_workers=2)
test_loader = paddle.io.DataLoader(test_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
num_workers=2)
# train and eval
for epoch_id in range(EPOCH_NUM):
train_func_base(epoch_id, train_loader, model, cost, optimizer)
test_func(epoch_id, test_loader, model, cost)
def _test_amp_o1(self):
import paddle
class LeNet5(paddle.nn.Layer):
def __init__(self):
super(LeNet5, self).__init__()
self.fc = paddle.nn.Linear(in_features=1024, out_features=10)
self.relu = paddle.nn.ReLU()
self.fc1 = paddle.nn.Linear(in_features=10, out_features=10)
def forward(self, x):
out = paddle.flatten(x, 1)
out = self.fc(out)
out = self.relu(out)
out = self.fc1(out)
return out
# set device
paddle.set_device('custom_cpu')
# model
model = LeNet5()
# cost and optimizer
cost = paddle.nn.CrossEntropyLoss()
optimizer = paddle.optimizer.Adam(learning_rate=0.001,
parameters=model.parameters())
# convert to static model
scaler = paddle.amp.GradScaler(init_loss_scaling=1024)
model, optimizer = paddle.amp.decorate(models=model,
optimizers=optimizer,
level='O1')
# data loader
transform = paddle.vision.transforms.Compose([
paddle.vision.transforms.Resize((32, 32)),
paddle.vision.transforms.ToTensor(),
paddle.vision.transforms.Normalize(mean=(0.1307, ), std=(0.3081, ))
])
train_dataset = paddle.vision.datasets.MNIST(mode='train',
transform=transform,
download=True)
test_dataset = paddle.vision.datasets.MNIST(mode='test',
transform=transform,
download=True)
train_loader = paddle.io.DataLoader(train_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
num_workers=2)
test_loader = paddle.io.DataLoader(test_dataset,
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
num_workers=2)
# train and eval
for epoch_id in range(EPOCH_NUM):
train_func_ampo1(epoch_id, train_loader, model, cost, optimizer,
scaler)
test_func(epoch_id, test_loader, model, cost)
if __name__ == '__main__':
if os.name == 'nt' or sys.platform.startswith('darwin'):
# only support Linux now
exit()
unittest.main()
......@@ -16,17 +16,20 @@
set -e
rm -rf PaddleCustomDevice && \
git clone ${PLUGIN_URL} \
&& pushd PaddleCustomDevice/backends/custom_cpu \
temp_dir=$(mktemp --directory)
pushd ${temp_dir} \
&& git clone ${PLUGIN_URL} \
&& pushd PaddleCustomDevice/ \
&& git fetch origin \
&& git checkout ${PLUGIN_TAG} -b dev \
&& mkdir build && pushd build && cmake .. && make -j8 && popd && popd
&& pushd backends/custom_cpu \
&& mkdir build && pushd build && cmake .. && make -j8 && popd && popd && popd && popd
echo "begin test use custom_cpu"
export FLAGS_selected_custom_cpus=0,1
export CUSTOM_CPU_VISIBLE_DEVICES=0,1
export CUSTOM_DEVICE_ROOT=PaddleCustomDevice/backends/custom_cpu/build
export CUSTOM_DEVICE_ROOT=${temp_dir}/PaddleCustomDevice/backends/custom_cpu/build
distributed_args="--devices=0,1"
python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册