From c722ee690dd75389bf000cd5435f5f4519c4b7a2 Mon Sep 17 00:00:00 2001 From: maxhuiy <1508399706@qq.com> Date: Tue, 8 Mar 2022 10:35:27 +0800 Subject: [PATCH] [MLU] add fleet init api and collective api pytest for mlu (#40010) * [MLU] add fleet init api and collective api pytest for mlu * fix no value for argument 'data_type' in method call --- python/paddle/distributed/collective.py | 4 + python/paddle/distributed/parallel.py | 14 +- python/paddle/fluid/dygraph/parallel.py | 3 + .../fluid/tests/unittests/mlu/CMakeLists.txt | 12 +- .../tests/unittests/mlu/c_comm_init_op_mlu.py | 71 ++++++ .../unittests/mlu/collective_allreduce_api.py | 54 +++++ .../unittests/mlu/collective_broadcast_api.py | 54 +++++ .../unittests/mlu/test_c_comm_init_op_mlu.sh | 21 ++ .../mlu/test_collective_allreduce_api_mlu.py | 43 ++++ .../mlu/test_collective_api_base_mlu.py | 223 ++++++++++++++++++ .../mlu/test_collective_broadcast_api_mlu.py | 43 ++++ 11 files changed, 535 insertions(+), 7 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/mlu/c_comm_init_op_mlu.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/collective_allreduce_api.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/collective_broadcast_api.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_c_comm_init_op_mlu.sh create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_api_mlu.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_broadcast_api_mlu.py diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 8042aced6b..bf6556d21e 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -267,6 +267,10 @@ def new_group(ranks=None, backend=None): place = core.NPUPlace(genv.device_id) core.HCCLParallelContext(strategy, place).init_with_ring_id(ring_id) + elif core.is_compiled_with_mlu(): + place = core.MLUPlace(genv.device_id) + core.CNCLParallelContext(strategy, + place).init_with_ring_id(ring_id) else: assert False, ("no cuda device found") else: diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 177e19194a..16ed528b64 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -58,9 +58,9 @@ def _start_kv_server(port, http_server_d, size): def _is_cpuonly(backend): check_backend(backend) - if backend in ['auto', 'nccl', 'bkcl', 'hccl', 'heter'] and ( + if backend in ['auto', 'nccl', 'bkcl', 'hccl', 'heter', 'cncl'] and ( core.is_compiled_with_cuda() or core.is_compiled_with_xpu() or - core.is_compiled_with_npu()): + core.is_compiled_with_npu() or core.is_compiled_with_mlu()): # passes 'auto' and can use cuda or xpu, use the default logics. so return False return False @@ -152,7 +152,8 @@ def init_parallel_env(): is_cpu_only = _is_cpuonly(backend) # 1. gpu xpu check, must be gpu or xpu, if not (is_cpu_only or core.is_compiled_with_cuda() or - core.is_compiled_with_xpu() or core.is_compiled_with_npu()): + core.is_compiled_with_xpu() or core.is_compiled_with_npu() or + core.is_compiled_with_mlu()): raise NotImplementedError( "If you want to use CPU-only version, please use 'gloo' as backend") @@ -162,6 +163,8 @@ def init_parallel_env(): _check_var_exists('FLAGS_selected_xpus') elif not is_cpu_only and core.is_compiled_with_npu(): _check_var_exists('FLAGS_selected_npus') + elif not is_cpu_only and core.is_compiled_with_mlu(): + _check_var_exists('FLAGS_selected_mlus') _check_var_exists("PADDLE_TRAINER_ID") _check_var_exists("PADDLE_CURRENT_ENDPOINT") @@ -213,6 +216,8 @@ def init_parallel_env(): place = core.XPUPlace(parallel_env.device_id) elif core.is_compiled_with_npu(): place = core.NPUPlace(parallel_env.device_id) + elif core.is_compiled_with_mlu(): + place = core.MLUPlace(parallel_env.device_id) _set_expected_place(place) # init nccl or hccl or bkcl or heter context @@ -231,6 +236,9 @@ def init_parallel_env(): elif core.is_compiled_with_npu(): parallel_helper._set_parallel_ctx( core.HCCLParallelContext(strategy, place)) + elif core.is_compiled_with_mlu(): + parallel_helper._set_parallel_ctx( + core.CNCLParallelContext(strategy, place)) if backend != "heter": other_endpoints = strategy.trainer_endpoints[:] diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 0049f387b7..652916491e 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -128,6 +128,9 @@ class ParallelEnv(object): elif core.is_compiled_with_npu(): selected_npus = os.getenv("FLAGS_selected_npus", "0").split(",") self._device_id = int(selected_npus[0]) + elif core.is_compiled_with_mlu(): + selected_mlus = os.getenv("FLAGS_selected_mlus", "0").split(",") + self._device_id = int(selected_mlus[0]) self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", "").split(",") diff --git a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt index c17790bd32..17f5509bdb 100644 --- a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt @@ -13,13 +13,17 @@ if (WITH_MLU) endforeach(TEST_OP) if(WITH_CNCL) - foreach(TEST_OP ${TEST_DIST_OPS}) + foreach(TEST_OP ${TEST_DIST_OPS}) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) endforeach(TEST_OP) bash_test_modules(test_launch_async_mlu START_BASH test_launch_async_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) - bash_test_modules(test_launch_cloud_mlu START_BASH test_launch_cloud_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) - bash_test_modules(test_launch_nproc_mlu START_BASH test_launch_nproc_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_launch_cloud_mlu START_BASH test_launch_cloud_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_launch_nproc_mlu START_BASH test_launch_nproc_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) + bash_test_modules(test_c_comm_init_op_mlu START_BASH test_c_comm_init_op_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120) - set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_broadcast_api_mlu PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_api_mlu PROPERTIES TIMEOUT 120) + set_tests_properties(test_c_comm_init_op_mlu PROPERTIES TIMEOUT 120) endif(WITH_CNCL) endif() diff --git a/python/paddle/fluid/tests/unittests/mlu/c_comm_init_op_mlu.py b/python/paddle/fluid/tests/unittests/mlu/c_comm_init_op_mlu.py new file mode 100644 index 0000000000..e91f28e3b1 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/c_comm_init_op_mlu.py @@ -0,0 +1,71 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import os +import paddle.fluid.core as core +import paddle.fluid as fluid +from paddle.distributed.fleet.base.private_helper_function import wait_server_ready +import paddle + +paddle.enable_static() + + +class TestCCommInitOp(unittest.TestCase): + def setUp(self): + self.endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS").split(',') + self.current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + self.nranks = len(self.endpoints) + self.rank = self.endpoints.index(self.current_endpoint) + self.mlu_id = int(os.getenv("FLAGS_selected_mlus")) + self.place = fluid.MLUPlace(self.mlu_id) + self.exe = fluid.Executor(self.place) + self.endpoints.remove(self.current_endpoint) + self.other_endpoints = self.endpoints + if self.rank == 0: + wait_server_ready(self.other_endpoints) + + def test_specifying_devices(self): + program = fluid.Program() + block = program.global_block() + cncl_id_var = block.create_var( + name=fluid.unique_name.generate('cncl_id'), + persistable=True, + type=fluid.core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_cncl_id', + inputs={}, + outputs={'Out': cncl_id_var}, + attrs={ + 'rank': self.rank, + 'endpoint': self.current_endpoint, + 'other_endpoints': self.other_endpoints + }) + block.append_op( + type='c_comm_init', + inputs={'X': cncl_id_var}, + outputs={}, + attrs={ + 'nranks': self.nranks, + 'rank': self.rank, + 'ring_id': 0, + 'device_id': self.mlu_id + }) + self.exe.run(program) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_api.py b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_api.py new file mode 100644 index 0000000000..ebe4e71d22 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_api.py @@ -0,0 +1,54 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_api_base_mlu import TestCollectiveAPIRunnerBase, runtime_main + +paddle.enable_static() + + +class TestCollectiveAllreduceAPI(TestCollectiveAPIRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program, rank): + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + paddle.distributed.all_reduce(tindata) + return [tindata] + + +if __name__ == "__main__": + runtime_main(TestCollectiveAllreduceAPI, "allreduce") diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_api.py b/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_api.py new file mode 100644 index 0000000000..2002909ea2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_api.py @@ -0,0 +1,54 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_api_base_mlu import TestCollectiveAPIRunnerBase, runtime_main + +paddle.enable_static() + + +class TestCollectiveBroadcastAPI(TestCollectiveAPIRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program, rank): + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype="float32") + paddle.distributed.broadcast(tindata, src=1) + return [tindata] + + +if __name__ == "__main__": + runtime_main(TestCollectiveBroadcastAPI, "broadcast") diff --git a/python/paddle/fluid/tests/unittests/mlu/test_c_comm_init_op_mlu.sh b/python/paddle/fluid/tests/unittests/mlu/test_c_comm_init_op_mlu.sh new file mode 100644 index 0000000000..97f21798c1 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_c_comm_init_op_mlu.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e +# use default values +# FIXME: random fails on Unknown command lines -c (or -m). +launch_py=${PADDLE_BINARY_DIR}/python/paddle/distributed/launch.py +MLU_VISIBLE_DEVICES=0,1 python ${launch_py} c_comm_init_op_mlu.py diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_api_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_api_mlu.py new file mode 100644 index 0000000000..447498b902 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_api_mlu.py @@ -0,0 +1,43 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle + +from test_collective_api_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCollectiveAllreduceAPI(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_cncl_fp16(self): + self.check_with_place("collective_allreduce_api.py", "allreduce", + "float16") + + def test_allreduce_cncl_fp32(self): + self.check_with_place("collective_allreduce_api.py", "allreduce", + "float32") + + def test_allreduce_cncl_int32(self): + self.check_with_place("collective_allreduce_api.py", "allreduce", + "int32") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py new file mode 100644 index 0000000000..556fc6fcbb --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py @@ -0,0 +1,223 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import numpy as np +import unittest +import os +import sys +import subprocess +import pickle +from contextlib import closing +import paddle +import paddle.fluid as fluid +from paddle.fluid import core + + +def DataTypeCast(date_type): + np_data_type = None + + if date_type == "float16": + np_data_type = np.float16 + elif date_type == "float32": + np_data_type = np.float32 + elif date_type == "int32": + np_data_type = np.int32 + else: + raise ValueError("This data type is not support!") + + return np_data_type + + +class TestCollectiveAPIRunnerBase(object): + def get_model(self, train_prog, startup_prog, rank, indata=None): + raise NotImplementedError( + "get model should be implemented by child class.") + + def run_trainer(self, args): + train_prog = fluid.Program() + startup_prog = fluid.Program() + endpoints = args["endpoints"].split(",") + rank = args["trainerid"] + current_endpoint = args["currentendpoint"] + nranks = 2 + paddle.distributed.init_parallel_env() + device_id = int(os.getenv("FLAGS_selected_mlus", "0")) + place = fluid.MLUPlace(device_id) + np.random.seed(os.getpid()) + np_data_type = DataTypeCast(args["data_type"]) + indata = np.random.random((10, 1000)).astype(np_data_type) + if args['static_mode']: + result = self.get_model(train_prog, startup_prog, rank) + exe = fluid.Executor(place) + exe.run(startup_prog) + fetch_list = [] + for elem in result: + fetch_list.append(elem.name) + out = exe.run(train_prog, + feed={'tindata': indata}, + fetch_list=fetch_list) + else: + out = self.get_model(train_prog, startup_prog, rank, indata) + #print(out, sys.stderr) + sys.stdout.buffer.write(pickle.dumps(out)) + + +def runtime_main(test_class, col_type): + args = {} + model = test_class() + args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID")) + args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM")) + args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS') + args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT") + args["col_type"] = col_type + args["backend"] = os.getenv("BACKEND") + args["path_id"] = int(os.getenv("PATH_ID")) + args["static_mode"] = int(os.getenv("STATIC_MODE")) + args["data_type"] = os.getenv("DATA_TYPE") + model.run_trainer(args) + + +import paddle.compat as cpt +import socket +from contextlib import closing + + +class TestDistBase(unittest.TestCase): + def setUp(self): + self._port_set = set() + self._trainers = 2 + self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % ( + self._find_free_port(), self._find_free_port()) + self._python_interp = sys.executable + + def _find_free_port(self): + def __free_port(): + with closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + while True: + port = __free_port() + if port not in self._port_set: + self._port_set.add(port) + return port + + def _run_cluster(self, model_file, envs): + worker_endpoints = self._ps_endpoints.split(",") + w0_ep, w1_ep = worker_endpoints + #print("w0_ep:",w0_ep," w1_ep:",w1_ep) + env0 = { + "FLAGS_selected_mlus": "0", + "PADDLE_TRAINER_ID": "0", + "PADDLE_TRAINERS_NUM": "2", + "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints, + "PADDLE_CURRENT_ENDPOINT": w0_ep + } + + env1 = { + "FLAGS_selected_mlus": "1", + "PADDLE_TRAINER_ID": "1", + "PADDLE_TRAINERS_NUM": "2", + "PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints, + "PADDLE_CURRENT_ENDPOINT": w1_ep + } + #update environment + env0.update(envs) + env1.update(envs) + if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': + tr_cmd = "%s -m coverage run --branch -p %s" + else: + tr_cmd = "%s %s" + tr0_cmd = tr_cmd % (self._python_interp, model_file) + tr1_cmd = tr_cmd % (self._python_interp, model_file) + tr0_pipe = open("/tmp/tr0_err_%d.log" % os.getpid(), "w") + tr1_pipe = open("/tmp/tr1_err_%d.log" % os.getpid(), "w") + #print(tr0_cmd) + tr0_proc = subprocess.Popen( + tr0_cmd.strip().split(), + stdout=subprocess.PIPE, + stderr=tr0_pipe, + env=env0) + + tr1_proc = subprocess.Popen( + tr0_cmd.strip().split(), + stdout=subprocess.PIPE, + stderr=tr1_pipe, + env=env1) + + tr0_out, tr0_err = tr0_proc.communicate() + tr1_out, tr1_err = tr1_proc.communicate() + sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err) + sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err) + # close trainer file + tr0_pipe.close() + tr1_pipe.close() + with open("/tmp/tr0_err_%d.log" % os.getpid(), "r") as f: + sys.stderr.write('trainer 0 stderr file: %s\n' % f.read()) + with open("/tmp/tr1_err_%d.log" % os.getpid(), "r") as f: + sys.stderr.write('trainer 1 stderr file: %s\n' % f.read()) + return pickle.loads(tr0_out), pickle.loads( + tr1_out), tr0_proc.pid, tr1_proc.pid + + def check_with_place(self, + model_file, + col_type, + data_type, + path_id="0", + static_mode="1", + check_error_log=False, + need_envs={}): + required_envs = { + "FLAGS_fraction_of_gpu_memory_to_use": "0.15", + "FLAGS_eager_delete_tensor_gb": "0.0", + "PATH": os.getenv("PATH"), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "LD_PRELOAD": os.getenv("LD_PRELOAD", ""), + "FLAGS_call_stack_level": "2", + "GLOG_v": "3", + "STATIC_MODE": static_mode, + "PADDLE_WITH_GLOO": '0', + "BACKEND": "cncl", + "PATH_ID": path_id, + "DATA_TYPE": data_type + } + required_envs.update(need_envs) + if check_error_log: + required_envs["GLOG_v"] = "3" + required_envs["GLOG_logtostderr"] = "1" + required_envs["GLOO_LOG_LEVEL"] = "TRACE" + tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file, + required_envs) + np_data_type = DataTypeCast(data_type) + np.random.seed(pid0) + input1 = np.random.random((10, 1000)).astype(np_data_type) + np.random.seed(pid1) + input2 = np.random.random((10, 1000)).astype(np_data_type) + if col_type == "broadcast": + need_result = input2 + self.assertTrue(np.allclose(tr0_out, need_result)) + self.assertTrue(np.allclose(tr1_out, need_result)) + elif col_type == "allreduce": + need_result = input1 + input2 + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) + else: + pass diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_broadcast_api_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_broadcast_api_mlu.py new file mode 100644 index 0000000000..95919f3332 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_broadcast_api_mlu.py @@ -0,0 +1,43 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle + +from test_collective_api_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCollectiveBroadcastAPI(TestDistBase): + def _setup_config(self): + pass + + def test_broadcast_cncl_fp16(self): + self.check_with_place("collective_broadcast_api.py", "broadcast", + "float16") + + def test_broadcast_cncl_fp32(self): + self.check_with_place("collective_broadcast_api.py", "broadcast", + "float32") + + def test_broadcast_cncl_int32(self): + self.check_with_place("collective_broadcast_api.py", "broadcast", + "int32") + + +if __name__ == '__main__': + unittest.main() -- GitLab