diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 33d9326681d09eb53f3ecaf234bb00f1a3f7138b..e7b395e2915a71031a1da8f789e3b0e6e5e3b94b 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -95,10 +95,16 @@ endif() LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint) +LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint1) LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint2) +LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint3) +LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint_multiple) +LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint_dist_basic) +LIST(REMOVE_ITEM TEST_OPS test_hdfs1) +LIST(REMOVE_ITEM TEST_OPS test_hdfs2) +LIST(REMOVE_ITEM TEST_OPS test_hdfs3) LIST(REMOVE_ITEM TEST_OPS test_checkpoint_saver) if(APPLE OR WIN32) - LIST(REMOVE_ITEM TEST_OPS test_hdfs) LIST(REMOVE_ITEM TEST_OPS test_fs_interface) LIST(REMOVE_ITEM TEST_OPS test_fleet_metric) endif() @@ -240,6 +246,51 @@ function(bash_test_modules TARGET_NAME) endif() endfunction() +function(parallel_bash_test_modules TARGET_NAME) + if(NOT WITH_TESTING) + return() + endif() + + set(options SERIAL) + set(oneValueArgs TIMEOUT START_BASH) + set(multiValueArgs DEPS ENVS LABELS UnitTests) + cmake_parse_arguments(parallel_bash_test_modules "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + + set(timeout 120) + if(${parallel_bash_test_modules_TIMEOUT}) + set(timeout ${parallel_bash_test_modules_TIMEOUT}) + endif() + + list(JOIN parallel_bash_test_modules_UnitTests " " uts_string) + + if(WITH_COVERAGE) + add_test(NAME ${TARGET_NAME} + COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python + TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${parallel_bash_test_modules_ENVS} UnitTests=${uts_string} + WITH_COVERAGE=ON COVERAGE_FILE=${PADDLE_BINARY_DIR}/python-coverage.data + bash ${CMAKE_CURRENT_BINARY_DIR}/${parallel_bash_test_modules_START_BASH} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + else() + add_test(NAME ${TARGET_NAME} + COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python + TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${parallel_bash_test_modules_ENVS} UnitTests=${uts_string} + bash ${CMAKE_CURRENT_BINARY_DIR}/${parallel_bash_test_modules_START_BASH} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + endif() + + if (parallel_bash_test_modules_SERIAL) + set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1) + endif() + + if(parallel_bash_test_modules_LABELS) + set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT ${timeout} LABELS ${parallel_bash_test_modules_LABELS}) + else() + set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT ${timeout}) + endif() +endfunction() + + list(REMOVE_ITEM TEST_OPS test_warpctc_op) list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) list(REMOVE_ITEM TEST_OPS test_parallel_executor_profiler) @@ -469,9 +520,9 @@ if(NOT WIN32) endif() if(NOT APPLE AND NOT WIN32) - bash_test_modules(test_auto_checkpoint START_BASH dist_test.sh TIMEOUT 600) - bash_test_modules(test_auto_checkpoint2 START_BASH dist_test.sh TIMEOUT 600) - bash_test_modules(test_checkpoint_saver START_BASH dist_test.sh TIMEOUT 600) + parallel_bash_test_modules(test_acp START_BASH parallel_test.sh TIMEOUT 140 UnitTests test_auto_checkpoint test_auto_checkpoint1 test_auto_checkpoint2 test_auto_checkpoint3) + parallel_bash_test_modules(test_acp2 START_BASH parallel_test.sh TIMEOUT 140 UnitTests test_auto_checkpoint_multiple test_auto_checkpoint_dist_basic) + parallel_bash_test_modules(test_hdfs START_BASH parallel_test.sh TIMEOUT 120 UnitTests test_hdfs1 test_hdfs2 test_hdfs3) endif() add_subdirectory(sequence) diff --git a/python/paddle/fluid/tests/unittests/auto_checkpoint_utils.py b/python/paddle/fluid/tests/unittests/auto_checkpoint_utils.py index 812730e9523f8d24ade68474b858e04b41fc6895..529ff4ec45d1fdc6d1d8e765e38cff53d36aade7 100644 --- a/python/paddle/fluid/tests/unittests/auto_checkpoint_utils.py +++ b/python/paddle/fluid/tests/unittests/auto_checkpoint_utils.py @@ -30,11 +30,11 @@ from paddle.fluid import unique_name import numpy as np from paddle.io import Dataset, BatchSampler, DataLoader -BATCH_NUM = 20 -BATCH_SIZE = 16 +BATCH_NUM = 4 +BATCH_SIZE = 1 #IMAGE_SIZE = 128 -CLASS_NUM = 10 +CLASS_NUM = 2 USE_GPU = False # whether use GPU to run model places = fluid.cuda_places() if USE_GPU else fluid.cpu_places() @@ -59,7 +59,7 @@ def sample_list_generator_creator(): for _ in range(BATCH_NUM): sample_list = [] for _ in range(BATCH_SIZE): - image, label = get_random_images_and_labels([16, 16], [1]) + image, label = get_random_images_and_labels([4, 4], [1]) sample_list.append([image, label]) yield sample_list @@ -75,8 +75,7 @@ class AutoCheckpointBase(unittest.TestCase): minimize=True, iterable=True): def simple_net(): - image = fluid.data( - name='image', shape=[-1, 16, 16], dtype='float32') + image = fluid.data(name='image', shape=[-1, 4, 4], dtype='float32') label = fluid.data(name='label', shape=[-1, 1], dtype='int64') fc_tmp = fluid.layers.fc(image, size=CLASS_NUM) diff --git a/python/paddle/fluid/tests/unittests/test_hdfs.py b/python/paddle/fluid/tests/unittests/hdfs_test_utils.py similarity index 62% rename from python/paddle/fluid/tests/unittests/test_hdfs.py rename to python/paddle/fluid/tests/unittests/hdfs_test_utils.py index 75e2f5d679204c33f922ea8ee6be71a900c83cd6..6a752bc3053d7d0672bd0002250252c3bbbfa1e1 100644 --- a/python/paddle/fluid/tests/unittests/test_hdfs.py +++ b/python/paddle/fluid/tests/unittests/hdfs_test_utils.py @@ -24,7 +24,7 @@ from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFil java_home = os.environ["JAVA_HOME"] -class FSTest(unittest.TestCase): +class FSTestBase(unittest.TestCase): def _test_dirs(self, fs): dir_path = os.path.abspath("./test_dir") fs.delete(dir_path) @@ -188,106 +188,6 @@ class FSTest(unittest.TestCase): except Exception as e: pass - def test_exists(self): - fs = HDFSClient( - "/usr/local/hadoop-2.7.7/", - None, - time_out=15 * 1000, - sleep_inter=100) - self.assertFalse(fs.is_exist(os.path.abspath("./xxxx"))) - self.assertFalse(fs.is_dir(os.path.abspath("./xxxx"))) - self.assertTrue(fs.is_dir(os.path.abspath("./xxx/.."))) - dirs, files = fs.ls_dir(os.path.abspath("./test_hdfs.py")) - self.assertTrue(dirs == []) - self.assertTrue(len(files) == 1) - dirs, files = fs.ls_dir(os.path.abspath("./xxx/..")) - - def test_hdfs(self): - fs = HDFSClient( - "/usr/local/hadoop-2.7.7/", - None, - time_out=15 * 1000, - sleep_inter=100) - self._test_rm(fs) - self._test_touch(fs) - self._test_dirs(fs) - self._test_upload(fs) - - self._test_download(fs) - self._test_mkdirs(fs) - self._test_list_dir(fs) - self._test_try_upload(fs) - self._test_try_download(fs) - - def test_local(self): - fs = LocalFS() - self._test_rm(fs) - self._test_touch(fs) - self._test_dirs(fs) - self._test_touch_file(fs) - self._test_mkdirs(fs) - self._test_list_dir(fs) - self._test_try_upload(fs) - self._test_try_download(fs) - - def test_timeout(self): - fs = HDFSClient( - "/usr/local/hadoop-2.7.7/", - None, - time_out=6 * 1000, - sleep_inter=100) - src = "hdfs_test_timeout" - dst = "new_hdfs_test_timeout" - fs.delete(dst) - fs.mkdirs(src) - fs.mkdirs(dst) - fs.mkdirs(dst + "/" + src) - output = "" - try: - fs.mv(src, dst, test_exists=False) - self.assertFalse(1, "can't execute cmd:{} output:{}".format(cmd, - output)) - except FSTimeOut as e: - print("execute mv {} to {} timeout".format(src, dst)) - - cmd = "{} -mv {} {}".format(fs._base_cmd, src, dst) - ret, output = fluid.core.shell_execute_cmd(cmd, 6 * 1000, 2 * 1000) - self.assertNotEqual(ret, 0) - print("second mv ret:{} output:{}".format(ret, output)) - - def test_is_dir(self): - fs = HDFSClient( - "/usr/local/hadoop-2.7.7/", - None, - time_out=15 * 1000, - sleep_inter=100) - self.assertFalse(fs.is_dir("./test_hdfs.py")) - s = """ -java.io.IOException: Input/output error - responseErrorMsg : failed to getFileStatus, errorCode: 3, path: /user/PUBLIC_KM_Data/wangxi16/data/serving_model, lparam: d868f6bb6822c621, errorMessage: inner error - at org.apache.hadoop.util.FileSystemUtil.throwException(FileSystemUtil.java:164) - at org.apache.hadoop.util.FileSystemUtil.dealWithResponse(FileSystemUtil.java:118) - at org.apache.hadoop.lite.client.LiteClientImpl.getFileStatus(LiteClientImpl.java:696) - at org.apache.hadoop.fs.LibDFileSystemImpl.getFileStatus(LibDFileSystemImpl.java:297) - at org.apache.hadoop.fs.LiteFileSystem.getFileStatus(LiteFileSystem.java:514) - at org.apache.hadoop.fs.FsShell.test(FsShell.java:1092) - at org.apache.hadoop.fs.FsShell.run(FsShell.java:2285) - at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) - at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) - at org.apache.hadoop.fs.FsShell.main(FsShell.java:2353) - """ - - print("split lines:", s.splitlines()) - self.assertTrue(fs._test_match(s.splitlines()) != None) - - def test_config(self): - config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"} - fs = HDFSClient( - "/usr/local/hadoop-2.7.7/", - config, - time_out=15 * 1000, - sleep_inter=100) - def _test_list_dir(self, fs): fs = HDFSClient( "/usr/local/hadoop-2.7.7/", diff --git a/python/paddle/fluid/tests/unittests/parallel_test.sh b/python/paddle/fluid/tests/unittests/parallel_test.sh new file mode 100644 index 0000000000000000000000000000000000000000..9da4f035345d7f04b69a1c9483cba7022ad10baa --- /dev/null +++ b/python/paddle/fluid/tests/unittests/parallel_test.sh @@ -0,0 +1,60 @@ +#!/bin/bash +unset https_proxy http_proxy +export FLAGS_rpc_disable_reuse_port=1 + +name=${TEST_TARGET_NAME} +UnitTests=${UnitTests} +TEST_TIMEOUT=${TEST_TIMEOUT} + +if [[ ${name}"x" == "x" ]]; then + echo "can't find name, please set TEST_TARGET_NAME first" + exit 1 +fi + +if [[ ${UnitTests}"x" == "x" ]]; then + echo "can't find UnitTests, please set TEST_TARGET_NAME first" + exit 1 +fi + +if [[ ${TEST_TIMEOUT}"x" == "x" ]]; then + echo "can't find ${TEST_TIMEOUT}, please set ${TEST_TIMEOUT} first" + exit 1 +fi + +if [[ ${WITH_COVERAGE} == "ON" ]]; then + PYTHON_EXEC="python -u -m coverage run --branch -p " +else + PYTHON_EXEC="python -u " +fi + +run_time=$(( $TEST_TIMEOUT - 10 )) +echo "run_time: ${run_time}" +for ut in ${UnitTests}; do + echo "start ${ut}" + timeout -s SIGKILL ${run_time} ${PYTHON_EXEC} ./${ut}.py > ${ut}_run.log 2>&1 & +done + +FAIL=0 +for job in `jobs -p` +do + echo "jobs -p result:" `jobs -p` + echo $job + wait $job || let FAIL=FAIL+1 +done + +echo "fail_num:" $FAIL + +if [ "$FAIL" == "0" ]; +then + exit 0 +else + echo "FAIL! ($FAIL)" + + for ut in ${UnitTests}; do + log=${ut}_run.log + echo "cat ${log}" + cat $log + done + + exit 1 +fi diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py index 729fe20c8f87ed1cd07b7f5c2784a79acd1fa54b..fd009db5fd00133c5bad7c8c52662002ebd03fa8 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint.py @@ -59,6 +59,10 @@ class AutoCheckPointACLBase(AutoCheckpointBase): os.environ.clear() os.environ.update(self._old_environ) + file_name = os.path.basename(__file__) + base_name = os.path.splitext(file_name)[0] + print("runnng name:", base_name) + def _run_normal(self): exe, main_prog, startup_prog = self._generate() @@ -182,6 +186,20 @@ class AutoCheckPointACLBase(AutoCheckpointBase): fs.delete(save_dir) logger.info("begin _run_load_0") + def _test_corner_epoch_no(self, break_epoch_no): + logger.info("begin test_corener_epoch_no") + checker = acp._get_checker() + fs = HDFSClient(checker.hdfs_home, None) + + fs.delete(checker.hdfs_checkpoint_path) + self._reset_generator() + self._run_save_0(break_epoch_no=break_epoch_no) + self._reset_generator() + self._run_load_0(break_epoch_no=break_epoch_no) + + fs.delete(checker.hdfs_checkpoint_path) + logger.info("end test_corener_epoch_no") + class AutoCheckpointTest(AutoCheckPointACLBase): def setUp(self): @@ -193,13 +211,13 @@ class AutoCheckpointTest(AutoCheckPointACLBase): "PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT", "PADDLE_TRAINER_ID": "0", "PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD", - "PADDLE_JOB_ID": "test_job_auto_1", + "PADDLE_JOB_ID": "test_job_auto_0", "PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7", "PADDLE_EDL_HDFS_NAME": "", "PADDLE_EDL_HDFS_UGI": "", - "PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_1", + "PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_0", "PADDLE_EDL_ONLY_FOR_CE_TEST": "1", - "PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_1", + "PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_0", "PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0" } os.environ.update(proc_env) @@ -246,102 +264,6 @@ class AutoCheckpointTest(AutoCheckPointACLBase): logger.info("end test_not_use") - def test_multiple(self): - checker = acp._get_checker() - fs = HDFSClient(checker.hdfs_home, None) - fs.delete(checker.hdfs_checkpoint_path) - self._reset_generator() - - logger.info("begin test_multiple") - fs = LocalFS() - save_dir = "./run_save_0" - fs.delete(save_dir) - - exe, main_prog1, startup_prog1 = self._generate() - _, main_prog2, startup_prog2 = self._generate() - - compiled1, data_loader1, optimizer1, loss1, image1, label1 = \ - self._init_env(exe, main_prog1, startup_prog1) - - compiled2, data_loader2, optimizer2, loss2, image2, label2 = \ - self._init_env(exe, main_prog2, startup_prog2) - - o = None - epochs = [] - for i in acp.train_epoch_range(3, 0): - for data in data_loader1(): - fetch = exe.run(compiled1, feed=data, fetch_list=[loss1]) - - for data in data_loader2(): - fetch = exe.run(compiled2, feed=data, fetch_list=[loss2]) - - o = acp._get_train_epoch_range() - self.assertEqual(len(o._exe_status), 2) - print(o._exe_status) - epochs.append(i) - - o = acp._get_train_epoch_range() - self.assertTrue(o == None, "now train epoch must not exits now") - self.assertEqual(i, 2) - self.assertEqual(epochs, [0, 1, 2]) - - fs.delete(save_dir) - logger.info("end test_multiple") - - def test_distributed_basic(self): - checker = acp._get_checker() - fs = HDFSClient(checker.hdfs_home, None) - fs.delete(checker.hdfs_checkpoint_path) - self._reset_generator() - - logger.info("begin test_distributed_basic") - fs = LocalFS() - save_dir = "./run_save_0" - fs.delete(save_dir) - - #basic - exe, main_prog, startup_prog = self._generate() - - compiled, data_loader, optimizer, loss, image, label = \ - self._init_env(exe, main_prog, startup_prog, minimize=False) - - #fleet - os.environ["TRAINING_ROLE"] = "TRAINER" - os.environ["PADDLE_TRAINER_ID"] = "0" - os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:6070" - - role = role_maker.PaddleCloudRoleMaker(is_collective=True) - fleet.init(role) - - with fluid.program_guard(main_prog, startup_prog): - dist_optimizer = fleet.distributed_optimizer(optimizer) - dist_optimizer.minimize(loss) - - exe.run(startup_prog) - - o = None - i = 0 - name = None - for i in acp.train_epoch_range(3, 0): - o = acp._get_train_epoch_range() - name = o.name - logger.info("_run_save_0 name:{} epoch_no:{}".format(o.name, i)) - - for data in data_loader(): - fetch = exe.run(fleet.main_program, - feed=data, - fetch_list=[loss]) - - self.assertEqual(len(o._exe_status), 1) - - o = acp._get_train_epoch_range() - assert o == None, "now train epoch must not exits now" - self.assertEqual(i, 2) - - fs.delete(save_dir) - - logger.info("end test_distributed_basic") - def test_checker(self): os.environ.pop("PADDLE_JOB_ID", None) try: diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint1.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint1.py new file mode 100644 index 0000000000000000000000000000000000000000..55173325f621f7333a7c3ca32a9c55becee72e5a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint1.py @@ -0,0 +1,64 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient +import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp +from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel +from paddle.fluid.framework import program_guard +from paddle.fluid import unique_name + +import numpy as np +from paddle.io import Dataset, BatchSampler, DataLoader + +from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger +from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase + +logger = get_logger() + + +class AutoCheckpointTest1(AutoCheckPointACLBase): + def setUp(self): + get_logger() + logger.info("enter tests") + + self._old_environ = dict(os.environ) + proc_env = { + "PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT", + "PADDLE_TRAINER_ID": "0", + "PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD", + "PADDLE_JOB_ID": "test_job_auto_1", + "PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7", + "PADDLE_EDL_HDFS_NAME": "", + "PADDLE_EDL_HDFS_UGI": "", + "PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_1", + "PADDLE_EDL_ONLY_FOR_CE_TEST": "1", + "PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_1", + "PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0" + } + os.environ.update(proc_env) + + def test_corner_epoch_no(self): + self._test_corner_epoch_no(0) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py index 30a743510537e1fa0e2aeedb18de25c7e1fd120c..5d72fa01008af55a83d7b9a19747a8d96fb74b2b 100644 --- a/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint2.py @@ -57,19 +57,7 @@ class AutoCheckpointTest2(AutoCheckPointACLBase): os.environ.update(proc_env) def test_corner_epoch_no(self): - logger.info("begin test_corener_epoch_no") - checker = acp._get_checker() - fs = HDFSClient(checker.hdfs_home, None) - - for i in range(3): - fs.delete(checker.hdfs_checkpoint_path) - self._reset_generator() - self._run_save_0(break_epoch_no=i) - self._reset_generator() - self._run_load_0(break_epoch_no=i) - - fs.delete(checker.hdfs_checkpoint_path) - logger.info("end test_corener_epoch_no") + self._test_corner_epoch_no(1) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint3.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint3.py new file mode 100644 index 0000000000000000000000000000000000000000..5382f7e328ed1afa2d7516cd0d8db2db659aadd7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint3.py @@ -0,0 +1,64 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient +import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp +from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel +from paddle.fluid.framework import program_guard +from paddle.fluid import unique_name + +import numpy as np +from paddle.io import Dataset, BatchSampler, DataLoader + +from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger +from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase + +logger = get_logger() + + +class AutoCheckpointTest3(AutoCheckPointACLBase): + def setUp(self): + get_logger() + logger.info("enter tests") + + self._old_environ = dict(os.environ) + proc_env = { + "PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT", + "PADDLE_TRAINER_ID": "0", + "PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD", + "PADDLE_JOB_ID": "test_job_auto_3", + "PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7", + "PADDLE_EDL_HDFS_NAME": "", + "PADDLE_EDL_HDFS_UGI": "", + "PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_3", + "PADDLE_EDL_ONLY_FOR_CE_TEST": "1", + "PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_3", + "PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0" + } + os.environ.update(proc_env) + + def test_corner_epoch_no(self): + self._test_corner_epoch_no(2) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint_dist_basic.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint_dist_basic.py new file mode 100644 index 0000000000000000000000000000000000000000..90db9595d92ef602c03fa7dd104484a4f6101a87 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint_dist_basic.py @@ -0,0 +1,115 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient +import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp +from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel +from paddle.fluid.framework import program_guard +from paddle.fluid import unique_name + +import numpy as np +from paddle.io import Dataset, BatchSampler, DataLoader + +from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger +from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase + +logger = get_logger() + + +class AutoCheckpointTestDist(AutoCheckPointACLBase): + def setUp(self): + get_logger() + logger.info("enter tests") + + self._old_environ = dict(os.environ) + proc_env = { + "PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT", + "PADDLE_TRAINER_ID": "0", + "PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD", + "PADDLE_JOB_ID": "test_job_auto_dist_basic", + "PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7", + "PADDLE_EDL_HDFS_NAME": "", + "PADDLE_EDL_HDFS_UGI": "", + "PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_dist_basic", + "PADDLE_EDL_ONLY_FOR_CE_TEST": "1", + "PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_dist_basic", + "PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0" + } + os.environ.update(proc_env) + + def test_distributed_basic(self): + checker = acp._get_checker() + fs = HDFSClient(checker.hdfs_home, None) + fs.delete(checker.hdfs_checkpoint_path) + self._reset_generator() + + logger.info("begin test_distributed_basic") + fs = LocalFS() + save_dir = "./run_save_0" + fs.delete(save_dir) + + #basic + exe, main_prog, startup_prog = self._generate() + + compiled, data_loader, optimizer, loss, image, label = \ + self._init_env(exe, main_prog, startup_prog, minimize=False) + + #fleet + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:6070" + + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) + + with fluid.program_guard(main_prog, startup_prog): + dist_optimizer = fleet.distributed_optimizer(optimizer) + dist_optimizer.minimize(loss) + + exe.run(startup_prog) + + o = None + i = 0 + name = None + for i in acp.train_epoch_range(3, 0): + o = acp._get_train_epoch_range() + name = o.name + logger.info("_run_save_0 name:{} epoch_no:{}".format(o.name, i)) + + for data in data_loader(): + fetch = exe.run(fleet.main_program, + feed=data, + fetch_list=[loss]) + + self.assertEqual(len(o._exe_status), 1) + + o = acp._get_train_epoch_range() + assert o == None, "now train epoch must not exits now" + self.assertEqual(i, 2) + + fs.delete(save_dir) + + logger.info("end test_distributed_basic") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_auto_checkpoint_multiple.py b/python/paddle/fluid/tests/unittests/test_auto_checkpoint_multiple.py new file mode 100644 index 0000000000000000000000000000000000000000..8c10cd0e9922859bf3bad2015587fc0a6b2ba5da --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_auto_checkpoint_multiple.py @@ -0,0 +1,103 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient +import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp +from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel +from paddle.fluid.framework import program_guard +from paddle.fluid import unique_name + +import numpy as np +from paddle.io import Dataset, BatchSampler, DataLoader + +from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger +from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase + +logger = get_logger() + + +class AutoCheckpointTestMul(AutoCheckPointACLBase): + def setUp(self): + get_logger() + logger.info("enter tests") + + self._old_environ = dict(os.environ) + proc_env = { + "PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT", + "PADDLE_TRAINER_ID": "0", + "PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD", + "PADDLE_JOB_ID": "test_job_auto_dist_multiple", + "PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7", + "PADDLE_EDL_HDFS_NAME": "", + "PADDLE_EDL_HDFS_UGI": "", + "PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_dist_multiple", + "PADDLE_EDL_ONLY_FOR_CE_TEST": "1", + "PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_dist_multiple", + "PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0" + } + os.environ.update(proc_env) + + def test_multiple(self): + checker = acp._get_checker() + fs = HDFSClient(checker.hdfs_home, None) + fs.delete(checker.hdfs_checkpoint_path) + self._reset_generator() + + logger.info("begin test_multiple") + fs = LocalFS() + save_dir = "./run_save_0" + fs.delete(save_dir) + + exe, main_prog1, startup_prog1 = self._generate() + _, main_prog2, startup_prog2 = self._generate() + + compiled1, data_loader1, optimizer1, loss1, image1, label1 = \ + self._init_env(exe, main_prog1, startup_prog1) + + compiled2, data_loader2, optimizer2, loss2, image2, label2 = \ + self._init_env(exe, main_prog2, startup_prog2) + + o = None + epochs = [] + for i in acp.train_epoch_range(3, 0): + for data in data_loader1(): + fetch = exe.run(compiled1, feed=data, fetch_list=[loss1]) + + for data in data_loader2(): + fetch = exe.run(compiled2, feed=data, fetch_list=[loss2]) + + o = acp._get_train_epoch_range() + self.assertEqual(len(o._exe_status), 2) + print(o._exe_status) + epochs.append(i) + + o = acp._get_train_epoch_range() + self.assertTrue(o == None, "now train epoch must not exits now") + self.assertEqual(i, 2) + self.assertEqual(epochs, [0, 1, 2]) + + fs.delete(save_dir) + logger.info("end test_multiple") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_hdfs1.py b/python/paddle/fluid/tests/unittests/test_hdfs1.py new file mode 100644 index 0000000000000000000000000000000000000000..430ed1abe860869d791f0eac17accc8416db1eca --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_hdfs1.py @@ -0,0 +1,104 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFileExistsError, FSFileNotExistsError + +java_home = os.environ["JAVA_HOME"] + +from paddle.fluid.tests.unittests.hdfs_test_utils import FSTestBase + + +class FSTest1(FSTestBase): + def test_timeout(self): + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + None, + time_out=6 * 1000, + sleep_inter=100) + src = "hdfs_test_timeout" + dst = "new_hdfs_test_timeout" + fs.delete(dst) + fs.mkdirs(src) + fs.mkdirs(dst) + fs.mkdirs(dst + "/" + src) + output = "" + try: + fs.mv(src, dst, test_exists=False) + self.assertFalse(1, "can't execute cmd:{} output:{}".format(cmd, + output)) + except FSTimeOut as e: + print("execute mv {} to {} timeout".format(src, dst)) + + cmd = "{} -mv {} {}".format(fs._base_cmd, src, dst) + ret, output = fluid.core.shell_execute_cmd(cmd, 6 * 1000, 2 * 1000) + self.assertNotEqual(ret, 0) + print("second mv ret:{} output:{}".format(ret, output)) + + def test_is_dir(self): + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + None, + time_out=6 * 1000, + sleep_inter=100) + self.assertFalse(fs.is_dir("./test_hdfs.py")) + s = """ +java.io.IOException: Input/output error + responseErrorMsg : failed to getFileStatus, errorCode: 3, path: /user/PUBLIC_KM_Data/wangxi16/data/serving_model, lparam: d868f6bb6822c621, errorMessage: inner error + at org.apache.hadoop.util.FileSystemUtil.throwException(FileSystemUtil.java:164) + at org.apache.hadoop.util.FileSystemUtil.dealWithResponse(FileSystemUtil.java:118) + at org.apache.hadoop.lite.client.LiteClientImpl.getFileStatus(LiteClientImpl.java:696) + at org.apache.hadoop.fs.LibDFileSystemImpl.getFileStatus(LibDFileSystemImpl.java:297) + at org.apache.hadoop.fs.LiteFileSystem.getFileStatus(LiteFileSystem.java:514) + at org.apache.hadoop.fs.FsShell.test(FsShell.java:1092) + at org.apache.hadoop.fs.FsShell.run(FsShell.java:2285) + at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) + at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79) + at org.apache.hadoop.fs.FsShell.main(FsShell.java:2353) + """ + + print("split lines:", s.splitlines()) + self.assertTrue(fs._test_match(s.splitlines()) != None) + + def test_config(self): + config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"} + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + config, + time_out=6 * 1000, + sleep_inter=100) + + def test_exists(self): + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + None, + time_out=6 * 1000, + sleep_inter=100) + self.assertFalse(fs.is_exist(os.path.abspath("./xxxx"))) + self.assertFalse(fs.is_dir(os.path.abspath("./xxxx"))) + self.assertTrue(fs.is_dir(os.path.abspath("./xxx/.."))) + dirs, files = fs.ls_dir(os.path.abspath("./test_hdfs1.py")) + self.assertTrue(dirs == []) + self.assertTrue(len(files) == 1) + dirs, files = fs.ls_dir(os.path.abspath("./xxx/..")) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_hdfs2.py b/python/paddle/fluid/tests/unittests/test_hdfs2.py new file mode 100644 index 0000000000000000000000000000000000000000..7754f89e3c901ac14cb102881e8d338442038559 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_hdfs2.py @@ -0,0 +1,50 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFileExistsError, FSFileNotExistsError + +java_home = os.environ["JAVA_HOME"] + +from paddle.fluid.tests.unittests.hdfs_test_utils import FSTestBase + + +class FSTest2(FSTestBase): + def test_hdfs(self): + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + None, + time_out=5 * 1000, + sleep_inter=100) + self._test_rm(fs) + self._test_touch(fs) + self._test_dirs(fs) + + def test_local(self): + fs = LocalFS() + self._test_rm(fs) + self._test_touch(fs) + self._test_dirs(fs) + + self._test_touch_file(fs) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_hdfs3.py b/python/paddle/fluid/tests/unittests/test_hdfs3.py new file mode 100644 index 0000000000000000000000000000000000000000..1a045f4b17fc9b8b68ccf81a23cb953db58a9db7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_hdfs3.py @@ -0,0 +1,53 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle.fluid as fluid +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet +import os +import sys + +from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFileExistsError, FSFileNotExistsError + +java_home = os.environ["JAVA_HOME"] + +from paddle.fluid.tests.unittests.hdfs_test_utils import FSTestBase + + +class FSTest3(FSTestBase): + def test_hdfs(self): + fs = HDFSClient( + "/usr/local/hadoop-2.7.7/", + None, + time_out=5 * 1000, + sleep_inter=100) + self._test_mkdirs(fs) + self._test_list_dir(fs) + self._test_try_upload(fs) + self._test_try_download(fs) + + self._test_upload(fs) + self._test_download(fs) + + def test_local(self): + fs = LocalFS() + self._test_mkdirs(fs) + self._test_list_dir(fs) + self._test_try_upload(fs) + self._test_try_download(fs) + + +if __name__ == '__main__': + unittest.main()