From 08d233f91410d394cded8d1f46cddd3b9b510ef4 Mon Sep 17 00:00:00 2001 From: ronnywang Date: Wed, 12 Oct 2022 11:00:11 +0800 Subject: [PATCH] cherry pick pr46536 (#46901) cherry pick pr46536 --- .../fluid/tests/custom_runtime/CMakeLists.txt | 10 +- .../test_collective_process_group_xccl.py | 31 ++- .../custom_runtime/test_custom_cpu_plugin.py | 22 +- .../test_custom_cpu_profiler_plugin.py | 23 +- .../test_custom_cpu_to_static.py | 252 ++++++++++++++++++ .../test_fleet_launch_custom_device.sh | 13 +- 6 files changed, 314 insertions(+), 37 deletions(-) create mode 100644 python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py diff --git a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt index 820e2b357aa..367d1e63990 100644 --- a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt @@ -1,6 +1,6 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) set(PLUGIN_URL https://github.com/PaddlePaddle/PaddleCustomDevice.git) - set(PLUGIN_TAG b9ae8452f31525d0524810461b17856838acd821) + set(PLUGIN_TAG 0698428ddba21e6baecb690579f37c48896f7d56) file( GLOB TEST_OPS @@ -8,10 +8,10 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) "test_*.py") string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") - list(REMOVE_ITEM TEST_OPS test_collective_process_group_xccl) foreach(TEST_OP ${TEST_OPS}) - py_test(${TEST_OP} SRCS ${TEST_OP}.py ENVS PLUGIN_URL=${PLUGIN_URL} - PLUGIN_TAG=${PLUGIN_TAG}) + py_test(${TEST_OP} + SRCS ${TEST_OP}.py ENVS FLAGS_allocator_strategy=naive_best_fit + PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG}) endforeach() bash_test_modules( @@ -19,6 +19,7 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) START_BASH test_fleet_launch_custom_device.sh ENVS + FLAGS_allocator_strategy=naive_best_fit PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR} PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG}) @@ -26,4 +27,5 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120) set_tests_properties(test_custom_cpu_profiler_plugin PROPERTIES TIMEOUT 120) set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120) + set_tests_properties(test_custom_cpu_to_static PROPERTIES TIMEOUT 120) endif() diff --git a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py index 01f39a39144..1127352d85d 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py +++ b/python/paddle/fluid/tests/custom_runtime/test_collective_process_group_xccl.py @@ -12,14 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function - import unittest import os -import sys import copy import subprocess import time +import tempfile def start_local_trainers(cluster, @@ -28,7 +26,7 @@ def start_local_trainers(cluster, training_script_args, eager_mode=True, log_dir=None): - from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401 current_env = copy.copy(os.environ.copy()) #paddle broadcast ncclUniqueId use socket, and @@ -84,7 +82,7 @@ def start_local_trainers(cluster, def get_cluster_from_args(selected_gpus): - from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401 cluster_node_ips = '127.0.0.1' node_ip = '127.0.0.1' @@ -108,7 +106,7 @@ def get_cluster_from_args(selected_gpus): class TestMultipleCustomCPU(unittest.TestCase): def run_mnist_2custom_cpu(self, target_file_name, eager_mode=True): - from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401 selected_devices = [0, 1] cluster = None @@ -136,21 +134,32 @@ class TestProcessGroup(TestMultipleCustomCPU): def setUp(self): # compile so and set to current path cur_dir = os.path.dirname(os.path.abspath(__file__)) - cmd = 'rm -rf PaddleCustomDevice \ + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ && git clone {} \ - && cd PaddleCustomDevice/backends/custom_cpu \ + && cd PaddleCustomDevice \ + && git fetch origin \ && git checkout {} -b dev \ + && cd backends/custom_cpu \ && mkdir build && cd build && cmake .. && make -j8'.format( - os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG')) + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) os.system(cmd) # set environment for loading and registering compiled custom kernels # only valid in current process os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( - cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + os.environ['FLAGS_selected_custom_cpus'] = '0,1' + os.environ['CUSTOM_CPU_VISIBLE_DEVICES'] = '0,1' + os.environ['PADDLE_XCCL_BACKEND'] = 'custom_cpu' + + def tearDown(self): + self.temp_dir.cleanup() def test_process_group_xccl(self): - from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc + from paddle.distributed.utils.launch_utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc # noqa: F401 self.run_mnist_2custom_cpu('process_group_xccl.py') diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py index 371f0018a0f..79e3e506b90 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py @@ -14,9 +14,9 @@ import os import sys -import site import unittest import numpy as np +import tempfile class TestCustomCPUPlugin(unittest.TestCase): @@ -24,18 +24,27 @@ class TestCustomCPUPlugin(unittest.TestCase): def setUp(self): # compile so and set to current path cur_dir = os.path.dirname(os.path.abspath(__file__)) - cmd = 'rm -rf PaddleCustomDevice \ + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ && git clone {} \ - && cd PaddleCustomDevice/backends/custom_cpu \ + && cd PaddleCustomDevice \ + && git fetch origin \ && git checkout {} -b dev \ + && cd backends/custom_cpu \ && mkdir build && cd build && cmake .. && make -j8'.format( - os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG')) + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) os.system(cmd) # set environment for loading and registering compiled custom kernels # only valid in current process os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( - cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + + def tearDown(self): + self.temp_dir.cleanup() + del os.environ['CUSTOM_DEVICE_ROOT'] def test_custom_device(self): import paddle @@ -183,9 +192,6 @@ class TestCustomCPUPlugin(unittest.TestCase): k_t = paddle.to_tensor([3], dtype="int32") value_1, indices_1 = paddle.topk(data_1, k=k_t) - def tearDown(self): - del os.environ['CUSTOM_DEVICE_ROOT'] - if __name__ == '__main__': if os.name == 'nt' or sys.platform.startswith('darwin'): diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py index 34bdb067c67..2e307fbb826 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_profiler_plugin.py @@ -14,9 +14,8 @@ import os import sys -import site import unittest -import numpy as np +import tempfile class TestCustomCPUProfilerPlugin(unittest.TestCase): @@ -24,18 +23,27 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase): def setUp(self): # compile so and set to current path cur_dir = os.path.dirname(os.path.abspath(__file__)) - cmd = 'rm -rf PaddleCustomDevice \ + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ && git clone {} \ - && cd PaddleCustomDevice/backends/custom_cpu \ + && cd PaddleCustomDevice \ + && git fetch origin \ && git checkout {} -b dev \ + && cd backends/custom_cpu \ && mkdir build && cd build && cmake .. && make -j8'.format( - os.getenv('PLUGIN_URL'), os.getenv('PLUGIN_TAG')) + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) os.system(cmd) # set environment for loading and registering compiled custom kernels # only valid in current process os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( - cur_dir, 'PaddleCustomDevice/backends/custom_cpu/build') + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + + def tearDown(self): + self.temp_dir.cleanup() + del os.environ['CUSTOM_DEVICE_ROOT'] def test_custom_device(self): import paddle @@ -59,9 +67,6 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase): p.stop() p.summary() - def tearDown(self): - del os.environ['CUSTOM_DEVICE_ROOT'] - if __name__ == '__main__': if os.name == 'nt' or sys.platform.startswith('darwin'): diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py new file mode 100644 index 00000000000..e5e9638a0b5 --- /dev/null +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py @@ -0,0 +1,252 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import unittest +import numpy as np +import tempfile + +EPOCH_NUM = 1 +BATCH_SIZE = 1024 + + +def train_func_base(epoch_id, train_loader, model, cost, optimizer): + + total_step = len(train_loader) + epoch_start = time.time() + for batch_id, (images, labels) in enumerate(train_loader()): + # forward + outputs = model(images) + loss = cost(outputs, labels) + # backward and optimize + loss.backward() + optimizer.step() + optimizer.clear_grad() + print("Epoch [{}/{}], Step [{}/{}], Loss: {}".format( + epoch_id + 1, EPOCH_NUM, batch_id + 1, total_step, loss.numpy())) + epoch_end = time.time() + print( + f"Epoch ID: {epoch_id+1}, FP32 train epoch time: {(epoch_end - epoch_start) * 1000} ms" + ) + + +def train_func_ampo1(epoch_id, train_loader, model, cost, optimizer, scaler): + import paddle + + total_step = len(train_loader) + epoch_start = time.time() + for batch_id, (images, labels) in enumerate(train_loader()): + # forward + with paddle.amp.auto_cast( + custom_black_list={"flatten_contiguous_range", "greater_than"}, + level='O1'): + outputs = model(images) + loss = cost(outputs, labels) + # backward and optimize + scaled = scaler.scale(loss) + scaled.backward() + scaler.minimize(optimizer, scaled) + optimizer.clear_grad() + print("Epoch [{}/{}], Step [{}/{}], Loss: {}".format( + epoch_id + 1, EPOCH_NUM, batch_id + 1, total_step, loss.numpy())) + epoch_end = time.time() + print( + f"Epoch ID: {epoch_id+1}, AMPO1 train epoch time: {(epoch_end - epoch_start) * 1000} ms" + ) + + +def test_func(epoch_id, test_loader, model, cost): + import paddle + + # evaluation every epoch finish + model.eval() + avg_acc = [[], []] + for batch_id, (images, labels) in enumerate(test_loader()): + # forward + outputs = model(images) + loss = cost(outputs, labels) + # accuracy + acc_top1 = paddle.metric.accuracy(input=outputs, label=labels, k=1) + acc_top5 = paddle.metric.accuracy(input=outputs, label=labels, k=5) + avg_acc[0].append(acc_top1.numpy()) + avg_acc[1].append(acc_top5.numpy()) + model.train() + print( + f"Epoch ID: {epoch_id+1}, Top1 accurary: {np.array(avg_acc[0]).mean()}, Top5 accurary: {np.array(avg_acc[1]).mean()}" + ) + + +class TestCustomCPUPlugin(unittest.TestCase): + + def setUp(self): + # compile so and set to current path + cur_dir = os.path.dirname(os.path.abspath(__file__)) + self.temp_dir = tempfile.TemporaryDirectory() + cmd = 'cd {} \ + && git clone {} \ + && cd PaddleCustomDevice \ + && git fetch origin \ + && git checkout {} -b dev \ + && cd backends/custom_cpu \ + && mkdir build && cd build && cmake .. && make -j8'.format( + self.temp_dir.name, os.getenv('PLUGIN_URL'), + os.getenv('PLUGIN_TAG')) + os.system(cmd) + + # set environment for loading and registering compiled custom kernels + # only valid in current process + os.environ['CUSTOM_DEVICE_ROOT'] = os.path.join( + cur_dir, '{}/PaddleCustomDevice/backends/custom_cpu/build'.format( + self.temp_dir.name)) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_custom_cpu_plugin(self): + self._test_to_static() + self._test_amp_o1() + + def _test_to_static(self): + import paddle + + class LeNet5(paddle.nn.Layer): + + def __init__(self): + super(LeNet5, self).__init__() + self.fc = paddle.nn.Linear(in_features=1024, out_features=10) + self.relu = paddle.nn.ReLU() + self.fc1 = paddle.nn.Linear(in_features=10, out_features=10) + + def forward(self, x): + out = paddle.flatten(x, 1) + out = self.fc(out) + out = self.relu(out) + out = self.fc1(out) + return out + + # set device + paddle.set_device('custom_cpu') + + # model + model = LeNet5() + + # cost and optimizer + cost = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam(learning_rate=0.001, + parameters=model.parameters()) + + # convert to static model + build_strategy = paddle.static.BuildStrategy() + mnist = paddle.jit.to_static(model, build_strategy=build_strategy) + + # data loader + transform = paddle.vision.transforms.Compose([ + paddle.vision.transforms.Resize((32, 32)), + paddle.vision.transforms.ToTensor(), + paddle.vision.transforms.Normalize(mean=(0.1307, ), std=(0.3081, )) + ]) + train_dataset = paddle.vision.datasets.MNIST(mode='train', + transform=transform, + download=True) + test_dataset = paddle.vision.datasets.MNIST(mode='test', + transform=transform, + download=True) + train_loader = paddle.io.DataLoader(train_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + test_loader = paddle.io.DataLoader(test_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + + # train and eval + for epoch_id in range(EPOCH_NUM): + train_func_base(epoch_id, train_loader, model, cost, optimizer) + test_func(epoch_id, test_loader, model, cost) + + def _test_amp_o1(self): + import paddle + + class LeNet5(paddle.nn.Layer): + + def __init__(self): + super(LeNet5, self).__init__() + self.fc = paddle.nn.Linear(in_features=1024, out_features=10) + self.relu = paddle.nn.ReLU() + self.fc1 = paddle.nn.Linear(in_features=10, out_features=10) + + def forward(self, x): + out = paddle.flatten(x, 1) + out = self.fc(out) + out = self.relu(out) + out = self.fc1(out) + return out + + # set device + paddle.set_device('custom_cpu') + + # model + model = LeNet5() + + # cost and optimizer + cost = paddle.nn.CrossEntropyLoss() + optimizer = paddle.optimizer.Adam(learning_rate=0.001, + parameters=model.parameters()) + + # convert to static model + scaler = paddle.amp.GradScaler(init_loss_scaling=1024) + model, optimizer = paddle.amp.decorate(models=model, + optimizers=optimizer, + level='O1') + + # data loader + transform = paddle.vision.transforms.Compose([ + paddle.vision.transforms.Resize((32, 32)), + paddle.vision.transforms.ToTensor(), + paddle.vision.transforms.Normalize(mean=(0.1307, ), std=(0.3081, )) + ]) + train_dataset = paddle.vision.datasets.MNIST(mode='train', + transform=transform, + download=True) + test_dataset = paddle.vision.datasets.MNIST(mode='test', + transform=transform, + download=True) + train_loader = paddle.io.DataLoader(train_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + test_loader = paddle.io.DataLoader(test_dataset, + batch_size=BATCH_SIZE, + shuffle=True, + drop_last=True, + num_workers=2) + + # train and eval + for epoch_id in range(EPOCH_NUM): + train_func_ampo1(epoch_id, train_loader, model, cost, optimizer, + scaler) + test_func(epoch_id, test_loader, model, cost) + + +if __name__ == '__main__': + if os.name == 'nt' or sys.platform.startswith('darwin'): + # only support Linux now + exit() + unittest.main() diff --git a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh index 5570c629dd9..5269cd32120 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh +++ b/python/paddle/fluid/tests/custom_runtime/test_fleet_launch_custom_device.sh @@ -16,17 +16,20 @@ set -e -rm -rf PaddleCustomDevice && \ -git clone ${PLUGIN_URL} \ -&& pushd PaddleCustomDevice/backends/custom_cpu \ +temp_dir=$(mktemp --directory) +pushd ${temp_dir} \ +&& git clone ${PLUGIN_URL} \ +&& pushd PaddleCustomDevice/ \ +&& git fetch origin \ && git checkout ${PLUGIN_TAG} -b dev \ -&& mkdir build && pushd build && cmake .. && make -j8 && popd && popd +&& pushd backends/custom_cpu \ +&& mkdir build && pushd build && cmake .. && make -j8 && popd && popd && popd && popd echo "begin test use custom_cpu" export FLAGS_selected_custom_cpus=0,1 export CUSTOM_CPU_VISIBLE_DEVICES=0,1 -export CUSTOM_DEVICE_ROOT=PaddleCustomDevice/backends/custom_cpu/build +export CUSTOM_DEVICE_ROOT=${temp_dir}/PaddleCustomDevice/backends/custom_cpu/build distributed_args="--devices=0,1" python -m paddle.distributed.fleet.launch ${distributed_args} custom_device_multi_process_collective.py fleetlaunch_custom_cpu -- GitLab