未验证 提交 b0e86d55 编写于 作者: T tianshuo78520a 提交者: GitHub

mv all unittests test (#53235)

* mv all unittests test

* fix error

* fix error

* fix

* fix

* del unittests

* fix paddle_build.sh

* fix

* fix test

* fix add test

* fix

* fix

* fix

* merge develop

* fix

* fix

* fix

* fix

* fix

* merge develop

* fix test_async_read_write

* fix test_async_read_write

* merge develop

* fix

* fix import legacy_test

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix bug

* fix

* fix coverage test bug

* fix

* fix

* fix

* fix

* fix

* fix code sstyle

* fix code

* fix code

* fix

* fix

* fix

* del test_sequence_enumerate_op.py

* fix
上级 d06b9ba4

要显示的变更太多。

To preserve performance only 1000 of 1000+ files are displayed.
......@@ -67,4 +67,3 @@ extra {
type: BOOLEAN
}
}
......@@ -79,4 +79,3 @@ extra {
type: BOOLEAN
}
}
......@@ -14,4 +14,3 @@ def {
type: BOOLEAN
}
}
......@@ -25,4 +25,3 @@ extra {
type: BOOLEAN
}
}
......@@ -53,4 +53,3 @@ extra {
type: STRING
}
}
......@@ -49,4 +49,3 @@ extra {
type: STRING
}
}
......@@ -89,4 +89,3 @@ extra {
type: STRING
}
}
......@@ -49,4 +49,3 @@ extra {
type: STRING
}
}
......@@ -41,4 +41,3 @@ extra {
type: STRING
}
}
......@@ -11,3 +11,4 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/fused/fused_transformer_op.h"
......@@ -757,9 +757,9 @@ function run_linux_cpu_test() {
if [ -d "${PADDLE_ROOT}/dist/" ]; then
pip install ${PADDLE_ROOT}/dist/*whl
fi
cp ${PADDLE_ROOT}/build/python/paddle/fluid/tests/unittests/eager_op_test.py ${PADDLE_ROOT}/build/python
cp ${PADDLE_ROOT}/build/python/paddle/fluid/tests/unittests/testsuite.py ${PADDLE_ROOT}/build/python
cp -r ${PADDLE_ROOT}/build/python/paddle/fluid/tests/unittests/white_list ${PADDLE_ROOT}/build/python
cp ${PADDLE_ROOT}/build/test/legacy_test/eager_op_test.py ${PADDLE_ROOT}/build/python
cp ${PADDLE_ROOT}/build/test/legacy_test/testsuite.py ${PADDLE_ROOT}/build/python
cp -r ${PADDLE_ROOT}/build/test/white_list ${PADDLE_ROOT}/build/python
ut_total_startTime_s=`date +%s`
if [ ${WITH_TESTING:-ON} == "ON" ] ; then
cat <<EOF
......@@ -2700,8 +2700,8 @@ function parallel_test() {
if ls ${PADDLE_ROOT}/dist/*whl >/dev/null 2>&1; then
pip install ${PADDLE_ROOT}/dist/*whl
fi
cp ${PADDLE_ROOT}/build/python/paddle/fluid/tests/unittests/testsuite.py ${PADDLE_ROOT}/build/python
cp -r ${PADDLE_ROOT}/build/python/paddle/fluid/tests/unittests/white_list ${PADDLE_ROOT}/build/python
cp ${PADDLE_ROOT}/build/test/legacy_test/testsuite.py ${PADDLE_ROOT}/build/python
cp -r ${PADDLE_ROOT}/build/test/white_list ${PADDLE_ROOT}/build/python
ut_total_startTime_s=`date +%s`
if [ "$WITH_CINN" == "ON" ];then
parallel_test_base_cinn
......
......@@ -7,5 +7,3 @@ string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
add_subdirectory(unittests)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.p
# Note: On Windows, import form subdirectories such as dirA()->dirB(), current directory
# will still be dirA(), But is should be dirB(). So it will ModulNotFoundError
# please refer to https://stackoverflow.com/questions/8953844/import-module-from-subfolder
import os
import sys
dirname, filename = os.path.split(os.path.abspath(__file__))
sys.path.insert(0, dirname)
print(sys.path)
# This file is generated by ${PADDLE_ROOT}/tools/gen_ut_cmakelists.py.
# Please don't modify this file manually.
# If you need to change unittests in this file, please modify testslist.csv in the current directory
# and then run the command `python3 ${PADDLE_ROOT}/tools/gen_ut_cmakelists.py -f ${CURRENT_DIRECTORY}/testslist.csv`
set(LOCAL_ALL_ARCH ON)
set(LOCAL_ALL_PLAT ON)
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_allreduce MODULES test_allreduce ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_allreduce PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_broadcast MODULES test_broadcast ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_broadcast PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_c_concat MODULES test_c_concat ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_c_concat PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_c_identity MODULES test_c_identity ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_c_identity PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_c_split MODULES test_c_split ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_c_split PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_ROCM OR WITH_GPU) AND (LINUX))
bash_test_modules(
test_collective_split_embedding
START_BASH
../dist_test.sh
TIMEOUT
"300"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21288;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy="
)
set_tests_properties(test_collective_split_embedding PROPERTIES TIMEOUT "300")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_allgather_api MODULES test_collective_allgather_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_allgather_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_allgather_object_api MODULES
test_collective_allgather_object_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_allgather_object_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_allreduce_api MODULES test_collective_allreduce_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_allreduce_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_alltoall_api MODULES test_collective_alltoall_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_alltoall_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
test_collective_alltoall_single
START_BASH
../dist_test.sh
TIMEOUT
"350"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21290;http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python"
)
set_tests_properties(test_collective_alltoall_single PROPERTIES TIMEOUT "350")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_alltoall_single_api MODULES
test_collective_alltoall_single_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_alltoall_single_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_barrier_api MODULES test_collective_barrier_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_barrier_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
test_collective_batch_isend_irecv
START_BASH
../dist_test.sh
TIMEOUT
"350"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21292;http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python"
)
set_tests_properties(test_collective_batch_isend_irecv PROPERTIES TIMEOUT
"350")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_broadcast_api MODULES test_collective_broadcast_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_broadcast_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_broadcast_object_list_api MODULES
test_collective_broadcast_object_list_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_broadcast_object_list_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_cpu_barrier_with_gloo MODULES
test_collective_cpu_barrier_with_gloo ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_cpu_barrier_with_gloo
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_global_gather MODULES test_collective_global_gather ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_global_gather
PROPERTIES TIMEOUT "200" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_global_scatter MODULES test_collective_global_scatter ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_global_scatter
PROPERTIES TIMEOUT "200" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_isend_irecv_api MODULES test_collective_isend_irecv_api
ENVS "http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_isend_irecv_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_optimizer MODULES test_collective_optimizer ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_optimizer
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
test_collective_process_group
START_BASH
../dist_test.sh
TIMEOUT
"350"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21294;http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python"
)
set_tests_properties(test_collective_process_group PROPERTIES TIMEOUT "350")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_reduce MODULES test_collective_reduce ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_reduce PROPERTIES TIMEOUT "300" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_reduce_api MODULES test_collective_reduce_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_reduce_api
PROPERTIES TIMEOUT "500" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
bash_test_modules(
test_collective_reduce_scatter
START_BASH
../dist_test.sh
TIMEOUT
"350"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21296;http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python"
)
set_tests_properties(test_collective_reduce_scatter PROPERTIES TIMEOUT "350")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_reduce_scatter_api MODULES
test_collective_reduce_scatter_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_reduce_scatter_api
PROPERTIES TIMEOUT "150" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_scatter MODULES test_collective_scatter ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_scatter PROPERTIES TIMEOUT "300" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_scatter_api MODULES test_collective_scatter_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_scatter_api
PROPERTIES TIMEOUT "180" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_scatter_object_list_api MODULES
test_collective_scatter_object_list_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_scatter_object_list_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_gather_api MODULES test_collective_gather_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_gather_api
PROPERTIES TIMEOUT "180" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_sendrecv MODULES test_collective_sendrecv ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_sendrecv PROPERTIES TIMEOUT "300" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_sendrecv_api MODULES test_collective_sendrecv_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_sendrecv_api
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_split_col_linear MODULES test_collective_split_col_linear
ENVS "http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_split_col_linear
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_split_embedding_none_divisible MODULES
test_collective_split_embedding_none_divisible ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_split_embedding_none_divisible
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_split_row_linear MODULES test_collective_split_row_linear
ENVS "http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_split_row_linear
PROPERTIES TIMEOUT "300" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_collective_wait MODULES test_collective_wait ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_collective_wait PROPERTIES TIMEOUT "300" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_allgather_api MODULES
test_communication_stream_allgather_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_allgather_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_allreduce_api MODULES
test_communication_stream_allreduce_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_allreduce_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_alltoall_api MODULES
test_communication_stream_alltoall_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_alltoall_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_alltoall_single_api MODULES
test_communication_stream_alltoall_single_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_alltoall_single_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_broadcast_api MODULES
test_communication_stream_broadcast_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_broadcast_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_reduce_api MODULES
test_communication_stream_reduce_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_reduce_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_reduce_scatter_api MODULES
test_communication_stream_reduce_scatter_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_reduce_scatter_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_scatter_api MODULES
test_communication_stream_scatter_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_scatter_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_communication_stream_sendrecv_api MODULES
test_communication_stream_sendrecv_api ENVS
"PYTHONPATH=..:${PADDLE_BINARY_DIR}/python;http_proxy=;https_proxy=")
set_tests_properties(test_communication_stream_sendrecv_api
PROPERTIES TIMEOUT "120" LABELS "RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_eager_dist_api MODULES test_eager_dist_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_eager_dist_api PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_GPU OR WITH_ROCM) AND LOCAL_ALL_PLAT)
bash_test_modules(
test_gen_nccl_id_op
START_BASH
../dist_test.sh
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21298;http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python"
)
endif()
if((WITH_GPU OR WITH_ROCM) AND (LINUX))
py_test_modules(
test_new_group_api MODULES test_new_group_api ENVS
"http_proxy=;https_proxy=;PYTHONPATH=..:${PADDLE_BINARY_DIR}/python")
set_tests_properties(test_new_group_api PROPERTIES TIMEOUT "120" LABELS
"RUN_TYPE=DIST")
endif()
if((WITH_ROCM OR WITH_GPU) AND (LINUX))
bash_test_modules(
test_world_size_and_rank
START_BASH
test_world_size_and_rank.sh
TIMEOUT
"120"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21532;http_proxy=;https_proxy=")
set_tests_properties(test_world_size_and_rank PROPERTIES TIMEOUT "120")
endif()
if(WITH_MPI)
if(LOCAL_ALL_ARCH AND (LINUX))
bash_test_modules(
test_mpi_comm
START_BASH
test_mpi_comm.sh
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21672;http_proxy=;https_proxy=")
endif()
endif()
if((WITH_ROCM OR WITH_GPU) AND (LINUX))
bash_test_modules(
test_strategy_group
START_BASH
test_strategy_group.sh
TIMEOUT
"120"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21814;http_proxy=;https_proxy=")
set_tests_properties(test_strategy_group PROPERTIES TIMEOUT "120")
endif()
if((WITH_ROCM OR WITH_GPU) AND (LINUX))
bash_test_modules(
test_orthogonal_strategy
START_BASH
test_orthogonal_strategy.sh
TIMEOUT
"120"
LABELS
"RUN_TYPE=DIST"
ENVS
"PADDLE_DIST_UT_PORT=21958;http_proxy=;https_proxy=")
set_tests_properties(test_orthogonal_strategy PROPERTIES TIMEOUT "120")
endif()
add_subdirectory(fleet)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
from paddle import fluid
class TestCollectiveAllgatherObjectAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
object_list = []
paddle.distributed.all_gather_object(object_list, indata)
return object_list
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveAllgatherObjectAPI, "allgather_object")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveAllreduce(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofallreduce",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_allreduce_sum",
inputs={'X': tindata},
attrs={'ring_id': ring_id},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveAllreduce, "allreduce", 0)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveAllreduce(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofallreduce",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
# tout = tin + tin - tin = tin
if True:
main_prog.global_block().append_op(
type="elementwise_add",
inputs={
'X': tindata,
'Y': tindata,
},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="elementwise_sub",
inputs={
'X': toutdata,
'Y': tindata,
},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type='c_wait_compute',
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
main_prog.global_block().append_op(
type="c_allreduce_sum",
inputs={'X': toutdata},
attrs={'ring_id': ring_id},
outputs={'Out': toutdata},
attr={'use_calc_stream': False},
)
main_prog.global_block().append_op(
type="c_wait_comm",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
# tout = tin + tout - tin = tout
if True:
main_prog.global_block().append_op(
type="elementwise_add",
inputs={
'X': tindata,
'Y': toutdata,
},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="elementwise_sub",
inputs={
'X': toutdata,
'Y': tindata,
},
outputs={'Out': toutdata},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveAllreduce, "allreduce", 0)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveAllToAllAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
tindata = paddle.split(tindata, 2, axis=0)
tout_data = []
paddle.distributed.alltoall(tindata, tout_data)
return tout_data
if __name__ == "__main__":
runtime_main(TestCollectiveAllToAllAPI, "alltoall")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveAllToAllAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
toutdata = []
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
tindata = paddle.split(tindata, 2, axis=0)
dist.alltoall(tindata, toutdata)
return [data.cast("float32").numpy() for data in toutdata]
else:
tindata = paddle.to_tensor(indata)
tindata = paddle.split(tindata, 2, axis=0)
dist.alltoall(tindata, toutdata)
return [data.numpy() for data in toutdata]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveAllToAllAPI, "alltoall")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveAllToAllSingleAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
toutdata = paddle.to_tensor(tindata, "float32").cast("uint16")
dist.alltoall_single(tindata, toutdata)
return [toutdata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
toutdata = paddle.to_tensor(indata)
dist.alltoall_single(tindata, toutdata)
return [toutdata.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveAllToAllSingleAPI, "alltoall")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveBarrierAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
paddle.distributed.barrier()
return []
if __name__ == "__main__":
runtime_main(TestCollectiveBarrierAPI, "barrier")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid, framework
from paddle.fluid import data_feeder
paddle.enable_static()
def broadcast_new(tensor, src, group=None, sync_op=True):
op_type = 'broadcast'
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
'uint16',
],
op_type,
)
helper = framework.LayerHelper(op_type, **locals())
ring_id = 0 if group is None else group.id
helper.append_op(
type=op_type,
inputs={'x': [tensor]},
outputs={'out': [tensor]},
attrs={
'root': src,
'ring_id': ring_id,
},
)
class TestCollectiveBroadcastAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, dtype='float32'):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype=dtype
)
tindata.desc.set_need_check_feed(False)
paddle.distributed.broadcast(tindata, src=1)
return [tindata]
def get_model_new(
self, main_prog, startup_program, rank, dtype=None, reduce_type=None
):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype=dtype
)
tindata.desc.set_need_check_feed(False)
broadcast_new(tindata, src=1)
return [tindata]
if __name__ == "__main__":
runtime_main(TestCollectiveBroadcastAPI, "broadcast")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveBroadcastAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
dist.broadcast(tindata, src=1)
return [tindata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
dist.broadcast(tindata, src=1)
return [tindata.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveBroadcastAPI, "broadcast")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveBroadcast(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
rootid = 1
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofbroadcast",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_broadcast",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'root': rootid},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveBroadcast, "broadcast", 0)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveConcat(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
nranks = 2
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofconcat",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_concat",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'rank': self.rank, 'nranks': nranks},
outputs={'Out': toutdata},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveConcat, "concat", 0)
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveGatherAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
gather_list = []
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
dist.gather(tindata, gather_list, dst=0)
return [e.cast("float32").numpy() for e in gather_list]
else:
tindata = paddle.to_tensor(indata)
dist.gather(tindata, gather_list, dst=0)
return [e.numpy() for e in gather_list]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveGatherAPI, "gather")
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pickle
import sys
import numpy as np
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.distributed.utils import moe_utils
paddle.enable_static()
class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
seed = os.getpid()
np.random.seed(seed)
in_feat = 2
n_expert = 2
world_size = 2
tot_expert = n_expert * world_size
local_input_buf = paddle.static.data(
name="local_input_buf", shape=[-1, in_feat], dtype="float32"
)
local_expert_count = paddle.static.data(
name="local_expert_count", shape=[tot_expert], dtype="int64"
)
global_expert_count = paddle.static.data(
name="global_expert_count", shape=[tot_expert], dtype="int64"
)
output = moe_utils.global_gather(
local_input_buf, local_expert_count, global_expert_count
)
return [output]
def run_trainer(self, args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
endpoints = args["endpoints"].split(",")
rank = args["trainerid"]
current_endpoint = args["currentendpoint"]
paddle.distributed.init_parallel_env()
nranks = 2
if args['backend'] == 'nccl':
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(
device_id
) # if args.use_gpu else fluid.CPUPlace()
elif args['backend'] == 'bkcl':
device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
place = fluid.XPUPlace(device_id)
else:
place = fluid.CPUPlace()
in_feat = 2
n_expert = 2
world_size = 2
tot_expert = n_expert * world_size
tmp_main_prog = fluid.Program()
with fluid.program_guard(tmp_main_prog, fluid.Program()):
local_expert_count = paddle.static.data(
name="local_expert_count", shape=[tot_expert], dtype="int64"
)
global_expert_count = []
paddle.distributed.alltoall(
paddle.split(local_expert_count, 2, axis=0), global_expert_count
)
global_expert_count = paddle.concat(global_expert_count, axis=0)
exe = fluid.Executor(place)
exe.run(startup_prog)
np.random.seed(os.getpid())
local_expert_count = np.random.randint(1, 4, size=tot_expert).astype(
"int64"
)
(global_expert_count,) = exe.run(
tmp_main_prog,
feed={"local_expert_count": local_expert_count},
fetch_list=[global_expert_count.name],
)
fwd_expert_count = sum(global_expert_count)
np.random.seed(os.getpid())
local_input_buf = np.random.rand(fwd_expert_count, in_feat).astype(
"float32"
)
if args['static_mode']:
result = self.get_model(train_prog, startup_prog, rank)
fetch_list = []
for elem in result:
fetch_list.append(elem.name)
out = exe.run(
train_prog,
feed={
'local_expert_count': local_expert_count,
'global_expert_count': global_expert_count,
'local_input_buf': local_input_buf,
},
fetch_list=fetch_list,
)
sys.stdout.buffer.write(pickle.dumps(out))
if __name__ == "__main__":
runtime_main(TestCollectiveGlobalGatherAPI, "global_gather")
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.distributed.utils import moe_utils
class TestCollectiveGlobalGatherAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
seed = os.getpid()
np.random.seed(seed)
in_feat = 2
n_expert = 2
world_size = 2
tot_expert = n_expert * world_size
local_expert_count = np.random.randint(
1, 4, size=tot_expert
).astype("int")
local_expert_count = paddle.to_tensor(local_expert_count)
global_expert_count = []
paddle.distributed.alltoall(
paddle.split(local_expert_count, 2, axis=0), global_expert_count
)
global_expert_count = paddle.concat(global_expert_count, axis=0)
fwd_expert_count = sum(global_expert_count)
np.random.seed(seed)
local_input_buf = np.random.rand(fwd_expert_count, in_feat).astype(
"float32"
)
local_input_buf = paddle.to_tensor(local_input_buf)
local_input_buf.stop_gradient = False
output = moe_utils.global_gather(
local_input_buf, local_expert_count, global_expert_count
)
output.stop_gradient = False
c = output * output
c.stop_gradient = False
c.backward()
return [output.numpy(), local_input_buf.grad.numpy()]
if __name__ == "__main__":
runtime_main(TestCollectiveGlobalGatherAPI, "global_gather")
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pickle
import sys
import numpy as np
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.distributed.utils import moe_utils
paddle.enable_static()
class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
seed = os.getpid()
np.random.seed(seed)
in_feat = 2
n_expert = 2
world_size = 2
tot_expert = n_expert * world_size
local_input_buf = paddle.static.data(
name="local_input_buf", shape=[-1, in_feat], dtype="float32"
)
local_expert_count = paddle.static.data(
name="local_expert_count", shape=[tot_expert], dtype="int64"
)
global_expert_count = []
paddle.distributed.alltoall(
paddle.split(local_expert_count, 2, axis=0), global_expert_count
)
global_expert_count = paddle.concat(global_expert_count, axis=0)
output = moe_utils.global_scatter(
local_input_buf, local_expert_count, global_expert_count
)
return [output]
def run_trainer(self, args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
endpoints = args["endpoints"].split(",")
rank = args["trainerid"]
current_endpoint = args["currentendpoint"]
nranks = 2
paddle.distributed.init_parallel_env()
if args['backend'] == 'nccl':
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(
device_id
) # if args.use_gpu else fluid.CPUPlace()
elif args['backend'] == 'bkcl':
device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
place = fluid.XPUPlace(device_id)
else:
place = fluid.CPUPlace()
np.random.seed(os.getpid())
in_feat = 2
n_expert = 2
world_size = 2
tot_expert = n_expert * world_size
local_expert_count = np.random.randint(1, 4, size=tot_expert).astype(
"int64"
)
fwd_expert_count = sum(local_expert_count)
local_input_buf = np.random.rand(fwd_expert_count, in_feat).astype(
"float32"
)
if args['static_mode']:
result = self.get_model(train_prog, startup_prog, rank)
exe = fluid.Executor(place)
exe.run(startup_prog)
fetch_list = []
for elem in result:
fetch_list.append(elem.name)
out = exe.run(
train_prog,
feed={
'local_expert_count': local_expert_count,
'local_input_buf': local_input_buf,
},
fetch_list=fetch_list,
)
sys.stdout.buffer.write(pickle.dumps(out))
if __name__ == "__main__":
runtime_main(TestCollectiveGlobalScatterAPI, "global_scatter")
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.distributed.utils import moe_utils
class TestCollectiveGlobalScatterAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
seed = os.getpid()
np.random.seed(seed)
in_feat = 2
n_expert = 2
world_size = 2
tot_expert = n_expert * world_size
local_expert_count = np.random.randint(
1, 4, size=tot_expert
).astype("int")
fwd_expert_count = sum(local_expert_count)
local_input_buf = np.random.rand(fwd_expert_count, in_feat).astype(
"float32"
)
local_expert_count = paddle.to_tensor(local_expert_count)
local_input_buf = paddle.to_tensor(local_input_buf)
global_expert_count = []
paddle.distributed.alltoall(
paddle.split(local_expert_count, 2, axis=0), global_expert_count
)
global_expert_count = paddle.concat(global_expert_count, axis=0)
local_input_buf.stop_gradient = False
output = moe_utils.global_scatter(
local_input_buf, local_expert_count, global_expert_count
)
output.stop_gradient = False
c = output * output
c.backward()
return [output.numpy(), local_input_buf.grad.numpy()]
if __name__ == "__main__":
runtime_main(TestCollectiveGlobalScatterAPI, "global_scatter")
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveIdentity(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
nranks = 2
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofgather",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_identity",
inputs={'X': tindata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id, 'nranks': nranks},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveIdentity, "identity", 0)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveIsendIrecvAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
if rank == 0:
task = dist.isend(tindata, dst=1)
else:
task = dist.irecv(tindata, src=0)
task.wait()
return [tindata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
if rank == 0:
task = dist.isend(tindata, dst=1)
else:
task = dist.irecv(tindata, src=0)
task.wait()
return [tindata.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveIsendIrecvAPI, "sendrecv")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
import paddle.distributed as dist
from paddle import fluid, framework
from paddle.fluid import data_feeder
paddle.enable_static()
def reduce_new(tensor, dst, reduce_type=str(dist.ReduceOp.SUM), group=None):
op_type = "reduce"
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
'uint16',
],
op_type,
)
ring_id = 0 if group is None else group.id
helper = framework.LayerHelper(op_type, **locals())
if not reduce_type.isdigit():
raise ValueError("The type of 'reduce_type' for reduce should be int.")
helper.append_op(
type=op_type,
inputs={'x': [tensor]},
outputs={'out': [tensor]},
attrs={
'ring_id': ring_id,
'root_id': dst,
'reduce_type': int(reduce_type),
},
)
return None
class TestCollectiveReduceAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
paddle.distributed.reduce(tindata, dst=0)
return [tindata]
def get_model_new(
self,
main_prog,
startup_program,
rank,
dtype='float32',
reduce_type=str(dist.ReduceOp.SUM),
):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[10, 1000], dtype=dtype
)
tindata.desc.set_need_check_feed(False)
reduce_new(tindata, dst=0, reduce_type=reduce_type)
return [tindata]
if __name__ == "__main__":
runtime_main(TestCollectiveReduceAPI, "reduce")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveReduceAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
dist.reduce(tindata, dst=0)
return [tindata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
dist.reduce(tindata, dst=0)
return [tindata.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveReduceAPI, "reduce")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveReduce(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
rootid = 1
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofreduce",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_reduce_sum",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'root_id': rootid},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveReduce, "reduce", 0)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveReduce(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
rootid = 1
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofreduce",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_reduce_sum",
inputs={'X': tindata},
attrs={
'ring_id': ring_id,
'use_calc_stream': True,
'root_id': rootid,
},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveReduce, "reduce", 0)
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveReduceScatterAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
pass
def get_model_new(
self,
main_prog,
startup_program,
rank,
dtype='float32',
reduce_type=None,
):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[10, 1000], dtype=dtype
)
tindata.desc.set_need_check_feed(False)
# toutdata = layers.fill_constant(shape=[5, 1000], dtype=dtype, value=1.0)
toutdata = paddle.static.data(
name="toutdata", shape=[5, 1000], dtype=dtype
)
paddle.distributed.reduce_scatter(toutdata, tindata)
return [toutdata]
if __name__ == "__main__":
runtime_main(TestCollectiveReduceScatterAPI, "reduce_scatter")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveReduceScatterAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
subdata1, subdata2 = paddle.split(tindata, 2, axis=0)
dist.reduce_scatter(subdata1, [subdata1, subdata2])
return [subdata1.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
subdata1, subdata2 = paddle.split(tindata, 2, axis=0)
dist.reduce_scatter(subdata1, [subdata1, subdata2])
return [subdata1.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveReduceScatterAPI, "reduce_scatter")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveScatterAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata",
shape=[10, 1000],
dtype='float32',
)
toutdata = paddle.tensor.fill_constant(
shape=[5, 1000], dtype='float32', value=1.0
)
tensor_list = None
if rank == 1:
tensor_list = paddle.split(tindata, 2, axis=0)
paddle.distributed.scatter(toutdata, tensor_list, src=1)
return [toutdata]
if __name__ == "__main__":
runtime_main(TestCollectiveScatterAPI, "scatter")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveScatterAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
subdata1, subdata2 = paddle.split(tindata, 2, axis=0)
if rank == 0:
dist.scatter(subdata1, src=1)
else:
dist.scatter(
subdata1, tensor_list=[subdata1, subdata2], src=1
)
return [subdata1.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
subdata1, subdata2 = paddle.split(tindata, 2, axis=0)
if rank == 0:
dist.scatter(subdata1, src=1)
else:
dist.scatter(
subdata1, tensor_list=[subdata1, subdata2], src=1
)
return [subdata1.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveScatterAPI, "scatter")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveScatterObjectListAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
data_len = len(indata) // 2
in_object_list = [indata[:data_len], indata[data_len:]]
out_object_list = []
dist.scatter_object_list(out_object_list, in_object_list, src=1)
return out_object_list
if __name__ == "__main__":
test_base.runtime_main(
TestCollectiveScatterObjectListAPI, "scatter_object_list"
)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveScatter(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
rootid = 1
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofreduce",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_scatter",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'root': rootid, 'nranks': 2},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveScatter, "scatter", 0)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid, framework
from paddle.fluid import data_feeder
paddle.enable_static()
def send_new(tensor, dst, group=None, sync_op=True):
op_type = 'p_send'
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
'uint16',
],
op_type,
)
ring_id = 0 if group is None else group.id
helper = framework.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
inputs={'x': [tensor]},
attrs={
'ring_id': ring_id,
'peer': dst,
'dynamic_shape': True,
},
)
return None
def recv_new(tensor, src, group=None, sync_op=True, dtype='float32'):
op_type = 'p_recv'
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
'int8',
'uint8',
'bool',
'uint16',
],
op_type,
)
ring_id = 0 if group is None else group.id
helper = framework.LayerHelper(op_type, **locals())
helper.append_op(
type=op_type,
outputs={'out': [tensor]},
attrs={
'ring_id': ring_id,
'peer': src,
'dynamic_shape': True,
'out_shape': tensor.shape,
'dtype': fluid.framework.convert_np_dtype_to_dtype_(dtype),
},
)
return None
class TestCollectiveSendRecvAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata",
shape=[10, 1000],
dtype='float32',
)
if rank == 0:
paddle.distributed.send(tindata, dst=1)
else:
paddle.distributed.recv(tindata, src=0)
return [tindata]
def get_model_new(
self,
main_prog,
startup_program,
rank,
dtype='float32',
reduce_type=None,
):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata",
shape=[10, 1000],
dtype=dtype,
)
if rank == 0:
send_new(tindata, dst=1)
else:
recv_new(tindata, src=0, dtype=dtype)
return [tindata]
if __name__ == "__main__":
runtime_main(TestCollectiveSendRecvAPI, "sendrecv")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import test_collective_api_base as test_base
import paddle
import paddle.distributed as dist
from paddle import fluid
class TestCollectiveSendRecvAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank, indata=None):
with fluid.program_guard(main_prog, startup_program):
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
if rank == 0:
dist.send(tindata, dst=1)
else:
dist.recv(tindata, src=0)
return [tindata.cast("float32").numpy()]
else:
tindata = paddle.to_tensor(indata)
if rank == 0:
dist.send(tindata, dst=1)
else:
dist.recv(tindata, src=0)
return [tindata.numpy()]
if __name__ == "__main__":
test_base.runtime_main(TestCollectiveSendRecvAPI, "sendrecv")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveSendRecv(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = self.global_ring_id
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata",
shape=[10, 1000],
dtype='float64',
)
tindata.desc.set_need_check_feed(False)
if self.rank == 0:
main_prog.global_block().append_op(
type="send_v2",
inputs={'X': tindata},
attrs={
'ring_id': ring_id,
'peer': 1,
'use_calc_stream': True,
},
)
else:
main_prog.global_block().append_op(
type="recv_v2",
outputs={'Out': tindata},
attrs={
'peer': 0,
'ring_id': ring_id,
'dtype': tindata.dtype,
'out_shape': tindata.shape,
'use_calc_stream': True,
},
)
return tindata
if __name__ == "__main__":
runtime_main(TestCollectiveSendRecv, "sendrecv", 0)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveSendRecv(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = self.global_ring_id
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata",
shape=[10, 1000],
dtype='float64',
)
tindata.desc.set_need_check_feed(False)
if self.rank == 0:
data1 = paddle.assign(np.array([[0, 1, 2]], dtype='float32'))
data2 = paddle.assign(np.array([[3, 4, 5]], dtype='float32'))
elif self.rank == 1:
data1 = paddle.assign(np.array([[3, 4, 5]], dtype='float32'))
data2 = paddle.assign(np.array([[0, 1, 2]], dtype='float32'))
tensor_array = paddle.tensor.create_array(dtype='float32')
i = paddle.tensor.fill_constant(shape=[1], dtype='int64', value=0)
paddle.tensor.array_write(data1, i, tensor_array)
paddle.tensor.array_write(data2, i + 1, tensor_array)
if self.rank == 0:
main_prog.global_block().append_op(
type="send_v2",
inputs={'X': tensor_array},
attrs={
'ring_id': ring_id,
'peer': 1,
'use_calc_stream': True,
},
)
else:
main_prog.global_block().append_op(
type="recv_v2",
outputs={'Out': tensor_array},
attrs={
'peer': 0,
'ring_id': ring_id,
'dtype': data1.dtype,
'out_shape': [1, 3],
'use_calc_stream': True,
},
)
return tensor_array
if __name__ == "__main__":
runtime_main(TestCollectiveSendRecv, "sendrecv_array", 0)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
paddle.enable_static()
class TestCollectiveSendRecvDynamicShape(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = self.global_ring_id
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata",
shape=[-1, 10, 1000],
dtype='float64',
)
tindata.desc.set_need_check_feed(False)
if self.rank == 0:
main_prog.global_block().append_op(
type="send_v2",
inputs={'X': tindata},
attrs={
'ring_id': ring_id,
'peer': 1,
'use_calc_stream': True,
'dynamic_shape': True,
},
)
else:
main_prog.global_block().append_op(
type="recv_v2",
outputs={'Out': tindata},
attrs={
'peer': 0,
'ring_id': ring_id,
'dtype': tindata.dtype,
'out_shape': tindata.shape,
'use_calc_stream': True,
'dynamic_shape': True,
},
)
return tindata
if __name__ == "__main__":
runtime_main(
TestCollectiveSendRecvDynamicShape, "sendrecv_dynamic_shape", 0
)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_collective_base import TestCollectiveRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.fluid import core
paddle.enable_static()
class TestCollectiveAllGather(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program):
ring_id = 0
nranks = 2
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofsplit",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_split",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'rank': self.rank, 'nranks': nranks},
outputs={'Out': toutdata},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveAllGather, "split", 0)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from test_collective_api_base import TestCollectiveAPIRunnerBase, runtime_main
import paddle
from paddle import fluid
from paddle.distributed import fleet
paddle.enable_static()
class TestColumnParallelLinearAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
fleet.init(is_collective=True)
np.random.seed(2020)
np_array = np.random.rand(1000, 16)
data = paddle.static.data(
name='tindata', shape=[10, 1000], dtype="float32"
)
paddle.distributed.broadcast(data, src=0)
if rank == 0:
param_attr = paddle.fluid.ParamAttr(
initializer=paddle.nn.initializer.Assign(np_array[:, 0:8]),
)
else:
param_attr = paddle.fluid.ParamAttr(
initializer=paddle.nn.initializer.Assign(np_array[:, 8:16]),
)
linear_out = paddle.distributed.split(
data,
size=(1000, 16),
operation='linear',
axis=1,
num_partitions=2,
weight_attr=param_attr,
bias_attr=True,
)
return [linear_out]
if __name__ == "__main__":
runtime_main(TestColumnParallelLinearAPI, "column_parallel_linear")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamAllgatherTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
# case 1: pass an empty tensor list
empty_tensor_list = []
task = dist.stream.all_gather(
empty_tensor_list,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
np.testing.assert_allclose(
empty_tensor_list, test_data_list, rtol=1e-05, atol=1e-05
)
# case 2: pass a pre-sized tensor list
full_tensor_list = [paddle.empty_like(tensor) for _ in test_data_list]
task = dist.stream.all_gather(
full_tensor_list,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
np.testing.assert_allclose(
full_tensor_list, test_data_list, rtol=1e-05, atol=1e-05
)
# case 3: pass a pre-sized tensor
result_tensor = paddle.concat(
[paddle.to_tensor(data) for data in test_data_list]
)
out_tensor = paddle.empty_like(result_tensor)
task = dist.stream.all_gather(
out_tensor,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
np.testing.assert_allclose(
out_tensor, result_tensor, rtol=1e-05, atol=1e-05
)
if __name__ == "__main__":
StreamAllgatherTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamAllReduceTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
task = dist.stream.all_reduce(
tensor, sync_op=self._sync_op, use_calc_stream=self._use_calc_stream
)
if not self._sync_op:
task.wait()
result = test_data_list[0]
for i in range(1, len(test_data_list)):
result += test_data_list[i]
np.testing.assert_allclose(tensor, result, rtol=1e-05, atol=1e-05)
if __name__ == "__main__":
StreamAllReduceTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamAllToAllTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
nranks = len(test_data_list)
data1 = test_data_list[0]
data2 = test_data_list[1]
result1 = np.vstack(
[
data1[0 : data1.shape[0] // 2, :],
data2[0 : data2.shape[0] // 2, :],
]
)
result2 = np.vstack(
[data1[data1.shape[0] // 2 :, :], data2[data2.shape[0] // 2 :, :]]
)
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
t1, t2 = paddle.split(tensor, nranks, axis=0)
# case 1: pass an empty tensor list
empty_tensor_list = []
task = dist.stream.alltoall(
empty_tensor_list,
[t1, t2],
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
result_tensor_list = np.vstack(empty_tensor_list)
if rank == 0:
np.testing.assert_allclose(
result_tensor_list, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
result_tensor_list, result2, rtol=1e-05, atol=1e-05
)
# case 2: pass a pre-sized tensor list
full_tensor_list = [paddle.empty_like(t1) for _ in test_data_list]
task = dist.stream.alltoall(
full_tensor_list,
[t1, t2],
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
result_tensor_list = np.vstack(full_tensor_list)
if rank == 0:
np.testing.assert_allclose(
result_tensor_list, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
result_tensor_list, result2, rtol=1e-05, atol=1e-05
)
# case 3: pass a pre-sized tensor
out_tensor = paddle.empty_like(tensor)
task = dist.stream.alltoall(
out_tensor,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == 0:
np.testing.assert_allclose(
out_tensor, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
out_tensor, result2, rtol=1e-05, atol=1e-05
)
if __name__ == "__main__":
StreamAllToAllTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamAllToAllSingleTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
nranks = len(test_data_list)
data1 = paddle.to_tensor(test_data_list[0])
data2 = paddle.to_tensor(test_data_list[1])
result1 = np.vstack(
(
data1[0 : data1.shape[0] // 2, :],
data2[0 : data2.shape[0] // 2, :],
)
)
result2 = np.vstack(
(data1[data1.shape[0] // 2 :, :], data2[data2.shape[0] // 2 :, :])
)
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
out_tensor = paddle.empty_like(tensor)
task = dist.stream.alltoall_single(
out_tensor,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == 0:
np.testing.assert_allclose(
out_tensor, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
out_tensor, result2, rtol=1e-05, atol=1e-05
)
if __name__ == "__main__":
StreamAllToAllSingleTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamBroadcastTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
src_rank = 1
result = test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=self._seeds[src_rank]
)
tensor = paddle.to_tensor(result)
task = dist.stream.broadcast(
tensor,
src=src_rank,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
np.testing.assert_allclose(tensor, result, rtol=1e-05, atol=1e-05)
if __name__ == "__main__":
StreamBroadcastTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamReduceTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
task = dist.stream.reduce(
tensor,
dst=1,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
result = sum(test_data_list)
if rank == 1:
np.testing.assert_allclose(tensor, result, rtol=1e-05, atol=1e-05)
else:
np.testing.assert_allclose(
tensor, test_data_list[rank], rtol=1e-05, atol=1e-05
)
if __name__ == "__main__":
StreamReduceTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
from paddle.distributed.communication.stream.reduce_scatter import (
_reduce_scatter_base,
)
class StreamReduceScatterTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
reduce_result = sum(test_data_list)
result1 = reduce_result[0 : reduce_result.shape[0] // 2]
result2 = reduce_result[reduce_result.shape[0] // 2 :]
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
# case 1: pass a pre-sized tensor list
t1, t2 = paddle.split(tensor, 2, axis=0)
result_tensor = paddle.empty_like(t1)
task = dist.stream.reduce_scatter(
result_tensor,
[t1, t2],
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == 0:
np.testing.assert_allclose(
result_tensor, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
result_tensor, result2, rtol=1e-05, atol=1e-05
)
# case 2: pass a pre-sized tensor
result_tensor = paddle.empty_like(t1)
task = dist.stream.reduce_scatter(
result_tensor,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == 0:
np.testing.assert_allclose(
result_tensor, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
result_tensor, result2, rtol=1e-05, atol=1e-05
)
# case 3: test the legacy API
result_tensor = paddle.empty_like(t1)
task = _reduce_scatter_base(
result_tensor,
tensor,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == 0:
np.testing.assert_allclose(
result_tensor, result1, rtol=1e-05, atol=1e-05
)
else:
np.testing.assert_allclose(
result_tensor, result2, rtol=1e-05, atol=1e-05
)
if __name__ == "__main__":
StreamReduceScatterTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamScatterTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
src_rank = 1
src_data = test_data_list[src_rank]
result1 = src_data[0 : src_data.shape[0] // 2]
result2 = src_data[src_data.shape[0] // 2 :]
rank = dist.get_rank()
# case 1: pass a pre-sized tensor list
tensor = paddle.to_tensor(test_data_list[rank])
t1, t2 = paddle.split(tensor, 2, axis=0)
task = dist.stream.scatter(
t1,
[t1, t2],
src=src_rank,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == src_rank:
np.testing.assert_allclose(t1, result2, rtol=1e-05, atol=1e-05)
else:
np.testing.assert_allclose(t1, result1, rtol=1e-05, atol=1e-05)
# case 2: pass a pre-sized tensor
tensor = paddle.to_tensor(src_data)
t1 = paddle.empty_like(t1)
task = dist.stream.scatter(
t1,
tensor,
src=src_rank,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
if rank == src_rank:
np.testing.assert_allclose(t1, result2, rtol=1e-05, atol=1e-05)
else:
np.testing.assert_allclose(t1, result1, rtol=1e-05, atol=1e-05)
if __name__ == "__main__":
StreamScatterTestCase().run_test_case()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import test_collective_api_base as test_collective_base
import paddle
import paddle.distributed as dist
class StreamSendRecvTestCase:
def __init__(self):
self._sync_op = eval(os.getenv("sync_op"))
self._use_calc_stream = eval(os.getenv("use_calc_stream"))
self._backend = os.getenv("backend")
self._shape = eval(os.getenv("shape"))
self._dtype = os.getenv("dtype")
self._seeds = eval(os.getenv("seeds"))
if self._backend not in ["nccl", "gloo"]:
raise NotImplementedError(
"Only support nccl and gloo as the backend for now."
)
os.environ["PADDLE_DISTRI_BACKEND"] = self._backend
def run_test_case(self):
dist.init_parallel_env()
test_data_list = []
for seed in self._seeds:
test_data_list.append(
test_collective_base.create_test_data(
shape=self._shape, dtype=self._dtype, seed=seed
)
)
src_rank = 0
dst_rank = 1
rank = dist.get_rank()
tensor = paddle.to_tensor(test_data_list[rank])
if rank == 0:
task = dist.stream.send(
tensor,
dst=dst_rank,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
else:
task = dist.stream.recv(
tensor,
src=src_rank,
sync_op=self._sync_op,
use_calc_stream=self._use_calc_stream,
)
if not self._sync_op:
task.wait()
result = test_data_list[src_rank]
np.testing.assert_allclose(tensor, result, rtol=1e-05, atol=1e-05)
if __name__ == "__main__":
StreamSendRecvTestCase().run_test_case()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dist_mnist import cnn_model
from test_dist_base import TestDistRunnerBase, _insert_comm_op, runtime_main
import paddle
from paddle import fluid
DTYPE = "float32"
paddle.dataset.mnist.fetch()
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
class TestDistMnist2x2(TestDistRunnerBase):
def get_model(self, batch_size=2, single_device=False):
# Input data
images = paddle.static.data(
name='pixel', shape=[-1, 1, 28, 28], dtype=DTYPE
)
label = paddle.static.data(name='label', shape=[-1, 1], dtype='int64')
# Train program
predict = cnn_model(images)
cost = paddle.nn.functional.cross_entropy(
input=predict, label=label, reduction='none', use_softmax=False
)
avg_cost = paddle.mean(x=cost)
# Evaluator
batch_size_tensor = paddle.tensor.create_tensor(dtype='int64')
batch_acc = paddle.static.accuracy(
input=predict, label=label, total=batch_size_tensor
)
inference_program = fluid.default_main_program().clone()
# Optimization
opt = fluid.optimizer.MomentumOptimizer(
learning_rate=0.001, momentum=0.9
)
opt = fluid.optimizer.GradientMergeOptimizer(opt, 2)
if single_device:
opt.minimize(avg_cost)
else:
opt._learning_rate = 0.001
opt._learning_rate_map = {}
_insert_comm_op(opt, avg_cost)
# Reader
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size
)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size
)
return (
inference_program,
avg_cost,
train_reader,
test_reader,
batch_acc,
predict,
)
if __name__ == "__main__":
runtime_main(TestDistMnist2x2)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
from dist_mnist import cnn_model
from test_dist_base import TestDistRunnerBase, runtime_main
import paddle
from paddle import fluid, nn
from paddle.distributed import fleet
class TestDistMnistGradientMergeRawOptimizer(TestDistRunnerBase):
def get_model(self, batch_size=2, single_device=False):
paddle.enable_static()
paddle.seed(1)
np.random.seed(1)
assert fluid.core.globals()['FLAGS_apply_pass_to_program']
strategy = fleet.DistributedStrategy()
build_strategy = paddle.static.BuildStrategy()
settings = {
"fuse_relu_depthwise_conv": True,
"fuse_bn_act_ops": True,
"fuse_bn_add_act_ops": True,
"fuse_elewise_add_act_ops": True,
"fuse_all_optimizer_ops": True,
"enable_addto": True,
"enable_inplace": True,
}
for k, v in settings.items():
setattr(build_strategy, k, v)
strategy.build_strategy = build_strategy
strategy.gradient_merge = True
avg = os.environ['enable_gm_avg'] == "True"
strategy.gradient_merge_configs = {
"k_steps": 2,
"avg": avg,
}
strategy.without_graph_optimization = True
fleet.init(is_collective=True, strategy=strategy)
image = paddle.static.data(
name='image', shape=[None, 1, 28, 28], dtype="float32"
)
label = paddle.static.data(name='label', shape=[None, 1], dtype='int64')
predict = cnn_model(image)
acc = paddle.metric.accuracy(predict, label)
loss_fn = nn.CrossEntropyLoss(use_softmax=False)
cost = loss_fn(predict, label)
test_program = paddle.static.default_main_program().clone(for_test=True)
optimizer = paddle.optimizer.Adam(learning_rate=1e-3)
if single_device:
optimizer = fluid.optimizer.GradientMergeOptimizer(
optimizer,
k_steps=strategy.gradient_merge_configs["k_steps"],
avg=strategy.gradient_merge_configs["avg"],
)
world_size = 1
else:
optimizer = fleet.distributed_optimizer(optimizer)
world_size = fleet.world_size()
optimizer.minimize(cost)
if world_size > 1:
assert paddle.static.default_main_program().num_blocks == 2
gm_block = paddle.static.default_main_program().block(1)
start_allreduce_idx = None
for i, op in enumerate(gm_block.ops):
if op.type == "c_allreduce_sum":
start_allreduce_idx = i
break
# the magic number 1 below means skip the c_sync_calc_stream op
if avg:
assert start_allreduce_idx > 1
else:
assert start_allreduce_idx == 1
train_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size
)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=batch_size
)
return test_program, cost, train_reader, test_reader, acc, predict
if __name__ == "__main__":
runtime_main(TestDistMnistGradientMergeRawOptimizer)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from hybrid_parallel_pp_alexnet import TestDistPPTraning
import paddle
class TestPPClipGrad(TestDistPPTraning):
def build_optimizer(self, model):
grad_clip = paddle.nn.ClipGradByGlobalNorm(0.5)
scheduler = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2], values=[0.001, 0.002], verbose=True
)
optimizer = paddle.optimizer.SGD(
learning_rate=scheduler,
grad_clip=grad_clip,
parameters=model.parameters(),
)
return scheduler, optimizer
class TestPPClipGradParamGroup(TestDistPPTraning):
def build_optimizer(self, model):
grad_clip = paddle.nn.ClipGradByGlobalNorm(0.5)
scheduler = paddle.optimizer.lr.PiecewiseDecay(
boundaries=[2], values=[0.001, 0.002], verbose=True
)
optimizer = paddle.optimizer.Momentum(
learning_rate=scheduler,
grad_clip=grad_clip,
parameters=[{"params": model.parameters()}],
)
return scheduler, optimizer
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
from test_auto_checkpoint import AutoCheckPointACLBase
import paddle
from paddle.fluid.tests.unittests.auto_checkpoint_utils import get_logger
paddle.enable_static()
logger = get_logger()
class AutoCheckpointTest1(AutoCheckPointACLBase):
def setUp(self):
get_logger()
logger.info("enter tests")
self._old_environ = dict(os.environ)
proc_env = {
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_1",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_1",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_1",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0",
}
os.environ.update(proc_env)
def test_corner_epoch_no(self):
self._test_corner_epoch_no(0)
if __name__ == '__main__':
unittest.main()
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册