diff --git a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt index 820e2b357aaf5d66836fb7bcbc0a196fd477a433..367d1e6399032f77ac06618b9740ef93742f3779 100644 --- a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt @@ -1,6 +1,6 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) set(PLUGIN_URL https://github.com/PaddlePaddle/PaddleCustomDevice.git) - set(PLUGIN_TAG b9ae8452f31525d0524810461b17856838acd821) + set(PLUGIN_TAG 0698428ddba21e6baecb690579f37c48896f7d56) file( GLOB TEST_OPS @@ -8,10 +8,10 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") - list(REMOVE_ITEM TEST_OPS test_collective_process_group_xccl) foreach(TEST_OP ${TEST_OPS}) - py_test(${TEST_OP} SRCS ${TEST_OP}.py ENVS PLUGIN_URL=${PLUGIN_URL} - PLUGIN_TAG=${PLUGIN_TAG}) + py_test(${TEST_OP} + SRCS ${TEST_OP}.py ENVS FLAGS_allocator_strategy=naive_best_fit + PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG}) endforeach() bash_test_modules( @@ -19,6 +19,7 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) START_BASH test_fleet_launch_custom_device.sh ENVS + FLAGS_allocator_strategy=naive_best_fit PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG}) @@ -26,4 +27,5 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120) set_tests_properties(test_custom_cpu_profiler_plugin PROPERTIES TIMEOUT 120) set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120) + set_tests_properties(test_custom_cpu_to_static PROPERTIES TIMEOUT 120) endif() diff --git a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py index 36706310c68db7577d9fc089030300e7a54d5670..88b7bb8fd438059ead1ef435398064faa53c2d0e 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py @@ -18,6 +18,7 @@ import sys import copy import subprocess import time +import tempfile def start_local_trainers(cluster, @@ -134,18 +135,29 @@ class TestProcessGroup(TestMultipleCustomCPU): def setUp(self): # compile so and set to current path cur_dir = os.path.dirname(os.path.abspath(__file__)) - cmd = 'rm -rf PaddleCustomDevice \ + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ && git clone {} \ - && cd PaddleCustomDevice/backends/custom_cpu \ + && cd PaddleCustomDevice \ + && git fetch origin \ && git checkout {} -b dev \ + && cd backends/custom_cpu \ && mkdir build && cd build && cmake .. && make -j8'.format( - os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG')) + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) os.system(cmd) # set environment for loading and registering compiled custom kernels # only valid in current process os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( - cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + os.environ['FLAGS_selected_custom_cpus'] = '0,1' + os.environ['CUSTOM_CPU_VISIBLE_DEVICES'] = '0,1' + os.environ['PADDLE_XCCL_BACKEND'] = 'custom_cpu' + + def tearDown(self): + self.temp_dir.cleanup() def test_process_group_xccl(self): from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py index 371f0018a0f8d0403485e778a34c2763024efc8c..1bba2e73c0895d95b77b2eb04b5969ba8f5e2c26 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py @@ -17,6 +17,7 @@ import sys import site import unittest import numpy as np +import tempfile class TestCustomCPUPlugin(unittest.TestCase): @@ -24,18 +25,27 @@ class TestCustomCPUPlugin(unittest.TestCase): def setUp(self): # compile so and set to current path cur_dir = os.path.dirname(os.path.abspath(__file__)) - cmd = 'rm -rf PaddleCustomDevice \ + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ && git clone {} \ - && cd PaddleCustomDevice/backends/custom_cpu \ + && cd PaddleCustomDevice \ + && git fetch origin \ && git checkout {} -b dev \ + && cd backends/custom_cpu \ && mkdir build && cd build && cmake .. && make -j8'.format( - os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG')) + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) os.system(cmd) # set environment for loading and registering compiled custom kernels # only valid in current process os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( - cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + + def tearDown(self): + self.temp_dir.cleanup() + del os.environ['CUSTOM_DEVICE_ROOT'] def test_custom_device(self): import paddle @@ -183,9 +193,6 @@ class TestCustomCPUPlugin(unittest.TestCase): k_t = paddle.to_tensor([3], dtype="int32") value_1, indices_1 = paddle.topk(data_1, k=k_t) - def tearDown(self): - del os.environ['CUSTOM_DEVICE_ROOT'] - if __name__ == '__main__': if os.name == 'nt' or sys.platform.startswith('darwin'): diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py index 34bdb067c67c5d2e3ebaf0b5168d8ab5684d924c..83c846f2ba092190fc2630e0f83ba78ce35e526b 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py @@ -17,6 +17,7 @@ import sys import site import unittest import numpy as np +import tempfile class TestCustomCPUProfilerPlugin(unittest.TestCase): @@ -24,18 +25,27 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase): def setUp(self): # compile so and set to current path cur_dir = os.path.dirname(os.path.abspath(__file__)) - cmd = 'rm -rf PaddleCustomDevice \ + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ && git clone {} \ - && cd PaddleCustomDevice/backends/custom_cpu \ + && cd PaddleCustomDevice \ + && git fetch origin \ && git checkout {} -b dev \ + && cd backends/custom_cpu \ && mkdir build && cd build && cmake .. && make -j8'.format( - os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG')) + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) os.system(cmd) # set environment for loading and registering compiled custom kernels # only valid in current process os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( - cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + + def tearDown(self): + self.temp_dir.cleanup() + del os.environ['CUSTOM_DEVICE_ROOT'] def test_custom_device(self): import paddle @@ -59,9 +69,6 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase): p.stop() p.summary() - def tearDown(self): - del os.environ['CUSTOM_DEVICE_ROOT'] - if __name__ == '__main__': if os.name == 'nt' or sys.platform.startswith('darwin'): diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py new file mode 100644 index 0000000000000000000000000000000000000000..626b4a6c05fbae214347ac3a05368eba920d0d02 --- /dev/null +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py @@ -0,0 +1,253 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import unittest +import numpy as np +import tempfile + +EPOCH_NUM = 1 +BATCH_SIZE = 1024 + + +def train_func_base(epoch_id, train_loader, model, cost, optimizer): + import paddle + + total_step = len(train_loader) + epoch_start = time.time() + for batch_id, (images, labels) in enumerate(train_loader()): + # forward + outputs = model(images) + loss = cost(outputs, labels) + # backward and optimize + loss.backward() + optimizer.step() + optimizer.clear_grad() + print("Epoch [{}/{}], Step [{}/{}], Loss: {}".format( + epoch_id + 1, EPOCH_NUM, batch_id + 1, total_step, loss.numpy())) + epoch_end = time.time() + print( + f"Epoch ID: {epoch_id+1}, FP32 train epoch time: {(epoch_end - epoch_start) * 1000} ms" + ) + + +def train_func_ampo1(epoch_id, train_loader, model, cost, optimizer, scaler): + import paddle + + total_step = len(train_loader) + epoch_start = time.time() + for batch_id, (images, labels) in enumerate(train_loader()): + # forward + with paddle.amp.auto_cast( + custom_black_list={"flatten_contiguous_range", "greater_than"}, + level='O1'): + outputs = model(images) + loss = cost(outputs, labels) + # backward and optimize + scaled = scaler.scale(loss) + scaled.backward() + scaler.minimize(optimizer, scaled) + optimizer.clear_grad() + print("Epoch [{}/{}], Step [{}/{}], Loss: {}".format( + epoch_id + 1, EPOCH_NUM, batch_id + 1, total_step, loss.numpy())) + epoch_end = time.time() + print( + f"Epoch ID: {epoch_id+1}, AMPO1 train epoch time: {(epoch_end - epoch_start) * 1000} ms" + ) + + +def test_func(epoch_id, test_loader, model, cost): + import paddle + + # evaluation every epoch finish + model.eval() + avg_acc = [[], []] + for batch_id, (images, labels) in enumerate(test_loader()): + # forward + outputs = model(images) + loss = cost(outputs, labels) + # accuracy + acc_top1 = paddle.metric.accuracy(input=outputs, label=labels, k=1) + acc_top5 = paddle.metric.accuracy(input=outputs, label=labels, k=5) + avg_acc[0].append(acc_top1.numpy()) + avg_acc[1].append(acc_top5.numpy()) + model.train() + print( + f"Epoch ID: {epoch_id+1}, Top1 accurary: {np.array(avg_acc[0]).mean()}, Top5 accurary: {np.array(avg_acc[1]).mean()}" + ) + + +class TestCustomCPUPlugin(unittest.TestCase): + + def setUp(self): + # compile so and set to current path + cur_dir = os.path.dirname(os.path.abspath(__file__)) + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ + && git clone {} \ + && cd PaddleCustomDevice \ + && git fetch origin \ + && git checkout {} -b dev \ + && cd backends/custom_cpu \ + && mkdir build && cd build && cmake .. && make -j8'.format( + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) + os.system(cmd) + + # set environment for loading and registering compiled custom kernels + # only valid in current process + os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_custom_cpu_plugin(self): + self._test_to_static() + self._test_amp_o1() + + def _test_to_static(self): + import paddle + + class LeNet5(paddle.nn.Layer): + + def __init__(self): + super(LeNet5, self).__init__() + self.fc = paddle.nn.Linear(in_features=1024, out_features=10) + self.relu = paddle.nn.ReLU() + self.fc1 = paddle.nn.Linear(in_features=10, out_features=10) + + def forward(self, x): + out = paddle.flatten(x, 1) + out = self.fc(out) + out = self.relu(out) + out = self.fc1(out) + return out + + # set device + paddle.set_device('custom_cpu') + + # model + model = LeNet5() + + # cost and optimizer + cost = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam(learning_rate=0.001, + parameters=model.parameters()) + + # convert to static model + build_strategy = paddle.static.BuildStrategy() + mnist = paddle.jit.to_static(model, build_strategy=build_strategy) + + # data loader + transform = paddle.vision.transforms.Compose([ + paddle.vision.transforms.Resize((32, 32)), + paddle.vision.transforms.ToTensor(), + paddle.vision.transforms.Normalize(mean=(0.1307, ), std=(0.3081, )) + ]) + train_dataset = paddle.vision.datasets.MNIST(mode='train', + transform=transform, + download=True) + test_dataset = paddle.vision.datasets.MNIST(mode='test', + transform=transform, + download=True) + train_loader = paddle.io.DataLoader(train_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + test_loader = paddle.io.DataLoader(test_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + + # train and eval + for epoch_id in range(EPOCH_NUM): + train_func_base(epoch_id, train_loader, model, cost, optimizer) + test_func(epoch_id, test_loader, model, cost) + + def _test_amp_o1(self): + import paddle + + class LeNet5(paddle.nn.Layer): + + def __init__(self): + super(LeNet5, self).__init__() + self.fc = paddle.nn.Linear(in_features=1024, out_features=10) + self.relu = paddle.nn.ReLU() + self.fc1 = paddle.nn.Linear(in_features=10, out_features=10) + + def forward(self, x): + out = paddle.flatten(x, 1) + out = self.fc(out) + out = self.relu(out) + out = self.fc1(out) + return out + + # set device + paddle.set_device('custom_cpu') + + # model + model = LeNet5() + + # cost and optimizer + cost = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam(learning_rate=0.001, + parameters=model.parameters()) + + # convert to static model + scaler = paddle.amp.GradScaler(init_loss_scaling=1024) + model, optimizer = paddle.amp.decorate(models=model, + optimizers=optimizer, + level='O1') + + # data loader + transform = paddle.vision.transforms.Compose([ + paddle.vision.transforms.Resize((32, 32)), + paddle.vision.transforms.ToTensor(), + paddle.vision.transforms.Normalize(mean=(0.1307, ), std=(0.3081, )) + ]) + train_dataset = paddle.vision.datasets.MNIST(mode='train', + transform=transform, + download=True) + test_dataset = paddle.vision.datasets.MNIST(mode='test', + transform=transform, + download=True) + train_loader = paddle.io.DataLoader(train_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + test_loader = paddle.io.DataLoader(test_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + + # train and eval + for epoch_id in range(EPOCH_NUM): + train_func_ampo1(epoch_id, train_loader, model, cost, optimizer, + scaler) + test_func(epoch_id, test_loader, model, cost) + + +if __name__ == '__main__': + if os.name == 'nt' or sys.platform.startswith('darwin'): + # only support Linux now + exit() + unittest.main() diff --git a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh index 5570c629dd96545cedfa2dd52630c898c382c0ab..5269cd32120584b4b4ea7a63be7a888d36502c89 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh +++ b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh @@ -16,17 +16,20 @@ set -e -rm -rf PaddleCustomDevice && \ -git clone ${PLUGIN_URL} \ -&& pushd PaddleCustomDevice/backends/custom_cpu \ +temp_dir=$(mktemp --directory) +pushd ${temp_dir} \ +&& git clone ${PLUGIN_URL} \ +&& pushd PaddleCustomDevice/ \ +&& git fetch origin \ && git checkout ${PLUGIN_TAG} -b dev \ -&& mkdir build && pushd build && cmake .. && make -j8 && popd && popd +&& pushd backends/custom_cpu \ +&& mkdir build && pushd build && cmake .. && make -j8 && popd && popd && popd && popd echo "begin test use custom_cpu" export FLAGS_selected_custom_cpus=0,1 export CUSTOM_CPU_VISIBLE_DEVICES=0,1 -export CUSTOM_DEVICE_ROOT=PaddleCustomDevice/backends/custom_cpu/build +export CUSTOM_DEVICE_ROOT=${temp_dir}/PaddleCustomDevice/backends/custom_cpu/build distributed_args="--devices=0,1" python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu