未验证 提交 7c42f056 编写于 作者: G gongweibao 提交者: GitHub

Finetune the performance of the unittests. (#26402)

上级 df7fe1fe
......@@ -95,10 +95,16 @@ endif()
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint)
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint1)
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint2)
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint3)
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint_multiple)
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint_dist_basic)
LIST(REMOVE_ITEM TEST_OPS test_hdfs1)
LIST(REMOVE_ITEM TEST_OPS test_hdfs2)
LIST(REMOVE_ITEM TEST_OPS test_hdfs3)
LIST(REMOVE_ITEM TEST_OPS test_checkpoint_saver)
if(APPLE OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_hdfs)
LIST(REMOVE_ITEM TEST_OPS test_fs_interface)
LIST(REMOVE_ITEM TEST_OPS test_fleet_metric)
endif()
......@@ -240,6 +246,51 @@ function(bash_test_modules TARGET_NAME)
endif()
endfunction()
function(parallel_bash_test_modules TARGET_NAME)
if(NOT WITH_TESTING)
return()
endif()
set(options SERIAL)
set(oneValueArgs TIMEOUT START_BASH)
set(multiValueArgs DEPS ENVS LABELS UnitTests)
cmake_parse_arguments(parallel_bash_test_modules "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(timeout 120)
if(${parallel_bash_test_modules_TIMEOUT})
set(timeout ${parallel_bash_test_modules_TIMEOUT})
endif()
list(JOIN parallel_bash_test_modules_UnitTests " " uts_string)
if(WITH_COVERAGE)
add_test(NAME ${TARGET_NAME}
COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python
TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${parallel_bash_test_modules_ENVS} UnitTests=${uts_string}
WITH_COVERAGE=ON COVERAGE_FILE=${PADDLE_BINARY_DIR}/python-coverage.data
bash ${CMAKE_CURRENT_BINARY_DIR}/${parallel_bash_test_modules_START_BASH}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
else()
add_test(NAME ${TARGET_NAME}
COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python
TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${parallel_bash_test_modules_ENVS} UnitTests=${uts_string}
bash ${CMAKE_CURRENT_BINARY_DIR}/${parallel_bash_test_modules_START_BASH}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
if (parallel_bash_test_modules_SERIAL)
set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1)
endif()
if(parallel_bash_test_modules_LABELS)
set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT ${timeout} LABELS ${parallel_bash_test_modules_LABELS})
else()
set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT ${timeout})
endif()
endfunction()
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_profiler)
......@@ -469,9 +520,9 @@ if(NOT WIN32)
endif()
if(NOT APPLE AND NOT WIN32)
bash_test_modules(test_auto_checkpoint START_BASH dist_test.sh TIMEOUT 600)
bash_test_modules(test_auto_checkpoint2 START_BASH dist_test.sh TIMEOUT 600)
bash_test_modules(test_checkpoint_saver START_BASH dist_test.sh TIMEOUT 600)
parallel_bash_test_modules(test_acp START_BASH parallel_test.sh TIMEOUT 140 UnitTests test_auto_checkpoint test_auto_checkpoint1 test_auto_checkpoint2 test_auto_checkpoint3)
parallel_bash_test_modules(test_acp2 START_BASH parallel_test.sh TIMEOUT 140 UnitTests test_auto_checkpoint_multiple test_auto_checkpoint_dist_basic)
parallel_bash_test_modules(test_hdfs START_BASH parallel_test.sh TIMEOUT 120 UnitTests test_hdfs1 test_hdfs2 test_hdfs3)
endif()
add_subdirectory(sequence)
......
......@@ -30,11 +30,11 @@ from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
BATCH_NUM = 20
BATCH_SIZE = 16
BATCH_NUM = 4
BATCH_SIZE = 1
#IMAGE_SIZE = 128
CLASS_NUM = 10
CLASS_NUM = 2
USE_GPU = False # whether use GPU to run model
places = fluid.cuda_places() if USE_GPU else fluid.cpu_places()
......@@ -59,7 +59,7 @@ def sample_list_generator_creator():
for _ in range(BATCH_NUM):
sample_list = []
for _ in range(BATCH_SIZE):
image, label = get_random_images_and_labels([16, 16], [1])
image, label = get_random_images_and_labels([4, 4], [1])
sample_list.append([image, label])
yield sample_list
......@@ -75,8 +75,7 @@ class AutoCheckpointBase(unittest.TestCase):
minimize=True,
iterable=True):
def simple_net():
image = fluid.data(
name='image', shape=[-1, 16, 16], dtype='float32')
image = fluid.data(name='image', shape=[-1, 4, 4], dtype='float32')
label = fluid.data(name='label', shape=[-1, 1], dtype='int64')
fc_tmp = fluid.layers.fc(image, size=CLASS_NUM)
......
......@@ -24,7 +24,7 @@ from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFil
java_home = os.environ["JAVA_HOME"]
class FSTest(unittest.TestCase):
class FSTestBase(unittest.TestCase):
def _test_dirs(self, fs):
dir_path = os.path.abspath("./test_dir")
fs.delete(dir_path)
......@@ -188,106 +188,6 @@ class FSTest(unittest.TestCase):
except Exception as e:
pass
def test_exists(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
self.assertFalse(fs.is_exist(os.path.abspath("./xxxx")))
self.assertFalse(fs.is_dir(os.path.abspath("./xxxx")))
self.assertTrue(fs.is_dir(os.path.abspath("./xxx/..")))
dirs, files = fs.ls_dir(os.path.abspath("./test_hdfs.py"))
self.assertTrue(dirs == [])
self.assertTrue(len(files) == 1)
dirs, files = fs.ls_dir(os.path.abspath("./xxx/.."))
def test_hdfs(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
self._test_rm(fs)
self._test_touch(fs)
self._test_dirs(fs)
self._test_upload(fs)
self._test_download(fs)
self._test_mkdirs(fs)
self._test_list_dir(fs)
self._test_try_upload(fs)
self._test_try_download(fs)
def test_local(self):
fs = LocalFS()
self._test_rm(fs)
self._test_touch(fs)
self._test_dirs(fs)
self._test_touch_file(fs)
self._test_mkdirs(fs)
self._test_list_dir(fs)
self._test_try_upload(fs)
self._test_try_download(fs)
def test_timeout(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=6 * 1000,
sleep_inter=100)
src = "hdfs_test_timeout"
dst = "new_hdfs_test_timeout"
fs.delete(dst)
fs.mkdirs(src)
fs.mkdirs(dst)
fs.mkdirs(dst + "/" + src)
output = ""
try:
fs.mv(src, dst, test_exists=False)
self.assertFalse(1, "can't execute cmd:{} output:{}".format(cmd,
output))
except FSTimeOut as e:
print("execute mv {} to {} timeout".format(src, dst))
cmd = "{} -mv {} {}".format(fs._base_cmd, src, dst)
ret, output = fluid.core.shell_execute_cmd(cmd, 6 * 1000, 2 * 1000)
self.assertNotEqual(ret, 0)
print("second mv ret:{} output:{}".format(ret, output))
def test_is_dir(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
self.assertFalse(fs.is_dir("./test_hdfs.py"))
s = """
java.io.IOException: Input/output error
responseErrorMsg : failed to getFileStatus, errorCode: 3, path: /user/PUBLIC_KM_Data/wangxi16/data/serving_model, lparam: d868f6bb6822c621, errorMessage: inner error
at org.apache.hadoop.util.FileSystemUtil.throwException(FileSystemUtil.java:164)
at org.apache.hadoop.util.FileSystemUtil.dealWithResponse(FileSystemUtil.java:118)
at org.apache.hadoop.lite.client.LiteClientImpl.getFileStatus(LiteClientImpl.java:696)
at org.apache.hadoop.fs.LibDFileSystemImpl.getFileStatus(LibDFileSystemImpl.java:297)
at org.apache.hadoop.fs.LiteFileSystem.getFileStatus(LiteFileSystem.java:514)
at org.apache.hadoop.fs.FsShell.test(FsShell.java:1092)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:2285)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.fs.FsShell.main(FsShell.java:2353)
"""
print("split lines:", s.splitlines())
self.assertTrue(fs._test_match(s.splitlines()) != None)
def test_config(self):
config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"}
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
config,
time_out=15 * 1000,
sleep_inter=100)
def _test_list_dir(self, fs):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
......
#!/bin/bash
unset https_proxy http_proxy
export FLAGS_rpc_disable_reuse_port=1
name=${TEST_TARGET_NAME}
UnitTests=${UnitTests}
TEST_TIMEOUT=${TEST_TIMEOUT}
if [[ ${name}"x" == "x" ]]; then
echo "can't find name, please set TEST_TARGET_NAME first"
exit 1
fi
if [[ ${UnitTests}"x" == "x" ]]; then
echo "can't find UnitTests, please set TEST_TARGET_NAME first"
exit 1
fi
if [[ ${TEST_TIMEOUT}"x" == "x" ]]; then
echo "can't find ${TEST_TIMEOUT}, please set ${TEST_TIMEOUT} first"
exit 1
fi
if [[ ${WITH_COVERAGE} == "ON" ]]; then
PYTHON_EXEC="python -u -m coverage run --branch -p "
else
PYTHON_EXEC="python -u "
fi
run_time=$(( $TEST_TIMEOUT - 10 ))
echo "run_time: ${run_time}"
for ut in ${UnitTests}; do
echo "start ${ut}"
timeout -s SIGKILL ${run_time} ${PYTHON_EXEC} ./${ut}.py > ${ut}_run.log 2>&1 &
done
FAIL=0
for job in `jobs -p`
do
echo "jobs -p result:" `jobs -p`
echo $job
wait $job || let FAIL=FAIL+1
done
echo "fail_num:" $FAIL
if [ "$FAIL" == "0" ];
then
exit 0
else
echo "FAIL! ($FAIL)"
for ut in ${UnitTests}; do
log=${ut}_run.log
echo "cat ${log}"
cat $log
done
exit 1
fi
......@@ -59,6 +59,10 @@ class AutoCheckPointACLBase(AutoCheckpointBase):
os.environ.clear()
os.environ.update(self._old_environ)
file_name = os.path.basename(__file__)
base_name = os.path.splitext(file_name)[0]
print("runnng name:", base_name)
def _run_normal(self):
exe, main_prog, startup_prog = self._generate()
......@@ -182,6 +186,20 @@ class AutoCheckPointACLBase(AutoCheckpointBase):
fs.delete(save_dir)
logger.info("begin _run_load_0")
def _test_corner_epoch_no(self, break_epoch_no):
logger.info("begin test_corener_epoch_no")
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
self._run_save_0(break_epoch_no=break_epoch_no)
self._reset_generator()
self._run_load_0(break_epoch_no=break_epoch_no)
fs.delete(checker.hdfs_checkpoint_path)
logger.info("end test_corener_epoch_no")
class AutoCheckpointTest(AutoCheckPointACLBase):
def setUp(self):
......@@ -193,13 +211,13 @@ class AutoCheckpointTest(AutoCheckPointACLBase):
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_1",
"PADDLE_JOB_ID": "test_job_auto_0",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_1",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_0",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_0",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0"
}
os.environ.update(proc_env)
......@@ -246,102 +264,6 @@ class AutoCheckpointTest(AutoCheckPointACLBase):
logger.info("end test_not_use")
def test_multiple(self):
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
logger.info("begin test_multiple")
fs = LocalFS()
save_dir = "./run_save_0"
fs.delete(save_dir)
exe, main_prog1, startup_prog1 = self._generate()
_, main_prog2, startup_prog2 = self._generate()
compiled1, data_loader1, optimizer1, loss1, image1, label1 = \
self._init_env(exe, main_prog1, startup_prog1)
compiled2, data_loader2, optimizer2, loss2, image2, label2 = \
self._init_env(exe, main_prog2, startup_prog2)
o = None
epochs = []
for i in acp.train_epoch_range(3, 0):
for data in data_loader1():
fetch = exe.run(compiled1, feed=data, fetch_list=[loss1])
for data in data_loader2():
fetch = exe.run(compiled2, feed=data, fetch_list=[loss2])
o = acp._get_train_epoch_range()
self.assertEqual(len(o._exe_status), 2)
print(o._exe_status)
epochs.append(i)
o = acp._get_train_epoch_range()
self.assertTrue(o == None, "now train epoch must not exits now")
self.assertEqual(i, 2)
self.assertEqual(epochs, [0, 1, 2])
fs.delete(save_dir)
logger.info("end test_multiple")
def test_distributed_basic(self):
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
logger.info("begin test_distributed_basic")
fs = LocalFS()
save_dir = "./run_save_0"
fs.delete(save_dir)
#basic
exe, main_prog, startup_prog = self._generate()
compiled, data_loader, optimizer, loss, image, label = \
self._init_env(exe, main_prog, startup_prog, minimize=False)
#fleet
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:6070"
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
with fluid.program_guard(main_prog, startup_prog):
dist_optimizer = fleet.distributed_optimizer(optimizer)
dist_optimizer.minimize(loss)
exe.run(startup_prog)
o = None
i = 0
name = None
for i in acp.train_epoch_range(3, 0):
o = acp._get_train_epoch_range()
name = o.name
logger.info("_run_save_0 name:{} epoch_no:{}".format(o.name, i))
for data in data_loader():
fetch = exe.run(fleet.main_program,
feed=data,
fetch_list=[loss])
self.assertEqual(len(o._exe_status), 1)
o = acp._get_train_epoch_range()
assert o == None, "now train epoch must not exits now"
self.assertEqual(i, 2)
fs.delete(save_dir)
logger.info("end test_distributed_basic")
def test_checker(self):
os.environ.pop("PADDLE_JOB_ID", None)
try:
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard
from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger
from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase
logger = get_logger()
class AutoCheckpointTest1(AutoCheckPointACLBase):
def setUp(self):
get_logger()
logger.info("enter tests")
self._old_environ = dict(os.environ)
proc_env = {
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_1",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_1",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_1",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0"
}
os.environ.update(proc_env)
def test_corner_epoch_no(self):
self._test_corner_epoch_no(0)
if __name__ == '__main__':
unittest.main()
......@@ -57,19 +57,7 @@ class AutoCheckpointTest2(AutoCheckPointACLBase):
os.environ.update(proc_env)
def test_corner_epoch_no(self):
logger.info("begin test_corener_epoch_no")
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
for i in range(3):
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
self._run_save_0(break_epoch_no=i)
self._reset_generator()
self._run_load_0(break_epoch_no=i)
fs.delete(checker.hdfs_checkpoint_path)
logger.info("end test_corener_epoch_no")
self._test_corner_epoch_no(1)
if __name__ == '__main__':
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard
from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger
from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase
logger = get_logger()
class AutoCheckpointTest3(AutoCheckPointACLBase):
def setUp(self):
get_logger()
logger.info("enter tests")
self._old_environ = dict(os.environ)
proc_env = {
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_3",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_3",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_3",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0"
}
os.environ.update(proc_env)
def test_corner_epoch_no(self):
self._test_corner_epoch_no(2)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard
from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger
from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase
logger = get_logger()
class AutoCheckpointTestDist(AutoCheckPointACLBase):
def setUp(self):
get_logger()
logger.info("enter tests")
self._old_environ = dict(os.environ)
proc_env = {
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_dist_basic",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_dist_basic",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_dist_basic",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0"
}
os.environ.update(proc_env)
def test_distributed_basic(self):
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
logger.info("begin test_distributed_basic")
fs = LocalFS()
save_dir = "./run_save_0"
fs.delete(save_dir)
#basic
exe, main_prog, startup_prog = self._generate()
compiled, data_loader, optimizer, loss, image, label = \
self._init_env(exe, main_prog, startup_prog, minimize=False)
#fleet
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:6070"
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
with fluid.program_guard(main_prog, startup_prog):
dist_optimizer = fleet.distributed_optimizer(optimizer)
dist_optimizer.minimize(loss)
exe.run(startup_prog)
o = None
i = 0
name = None
for i in acp.train_epoch_range(3, 0):
o = acp._get_train_epoch_range()
name = o.name
logger.info("_run_save_0 name:{} epoch_no:{}".format(o.name, i))
for data in data_loader():
fetch = exe.run(fleet.main_program,
feed=data,
fetch_list=[loss])
self.assertEqual(len(o._exe_status), 1)
o = acp._get_train_epoch_range()
assert o == None, "now train epoch must not exits now"
self.assertEqual(i, 2)
fs.delete(save_dir)
logger.info("end test_distributed_basic")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard
from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger
from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase
logger = get_logger()
class AutoCheckpointTestMul(AutoCheckPointACLBase):
def setUp(self):
get_logger()
logger.info("enter tests")
self._old_environ = dict(os.environ)
proc_env = {
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_dist_multiple",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_dist_multiple",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_dist_multiple",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0"
}
os.environ.update(proc_env)
def test_multiple(self):
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
logger.info("begin test_multiple")
fs = LocalFS()
save_dir = "./run_save_0"
fs.delete(save_dir)
exe, main_prog1, startup_prog1 = self._generate()
_, main_prog2, startup_prog2 = self._generate()
compiled1, data_loader1, optimizer1, loss1, image1, label1 = \
self._init_env(exe, main_prog1, startup_prog1)
compiled2, data_loader2, optimizer2, loss2, image2, label2 = \
self._init_env(exe, main_prog2, startup_prog2)
o = None
epochs = []
for i in acp.train_epoch_range(3, 0):
for data in data_loader1():
fetch = exe.run(compiled1, feed=data, fetch_list=[loss1])
for data in data_loader2():
fetch = exe.run(compiled2, feed=data, fetch_list=[loss2])
o = acp._get_train_epoch_range()
self.assertEqual(len(o._exe_status), 2)
print(o._exe_status)
epochs.append(i)
o = acp._get_train_epoch_range()
self.assertTrue(o == None, "now train epoch must not exits now")
self.assertEqual(i, 2)
self.assertEqual(epochs, [0, 1, 2])
fs.delete(save_dir)
logger.info("end test_multiple")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFileExistsError, FSFileNotExistsError
java_home = os.environ["JAVA_HOME"]
from paddle.fluid.tests.unittests.hdfs_test_utils import FSTestBase
class FSTest1(FSTestBase):
def test_timeout(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=6 * 1000,
sleep_inter=100)
src = "hdfs_test_timeout"
dst = "new_hdfs_test_timeout"
fs.delete(dst)
fs.mkdirs(src)
fs.mkdirs(dst)
fs.mkdirs(dst + "/" + src)
output = ""
try:
fs.mv(src, dst, test_exists=False)
self.assertFalse(1, "can't execute cmd:{} output:{}".format(cmd,
output))
except FSTimeOut as e:
print("execute mv {} to {} timeout".format(src, dst))
cmd = "{} -mv {} {}".format(fs._base_cmd, src, dst)
ret, output = fluid.core.shell_execute_cmd(cmd, 6 * 1000, 2 * 1000)
self.assertNotEqual(ret, 0)
print("second mv ret:{} output:{}".format(ret, output))
def test_is_dir(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=6 * 1000,
sleep_inter=100)
self.assertFalse(fs.is_dir("./test_hdfs.py"))
s = """
java.io.IOException: Input/output error
responseErrorMsg : failed to getFileStatus, errorCode: 3, path: /user/PUBLIC_KM_Data/wangxi16/data/serving_model, lparam: d868f6bb6822c621, errorMessage: inner error
at org.apache.hadoop.util.FileSystemUtil.throwException(FileSystemUtil.java:164)
at org.apache.hadoop.util.FileSystemUtil.dealWithResponse(FileSystemUtil.java:118)
at org.apache.hadoop.lite.client.LiteClientImpl.getFileStatus(LiteClientImpl.java:696)
at org.apache.hadoop.fs.LibDFileSystemImpl.getFileStatus(LibDFileSystemImpl.java:297)
at org.apache.hadoop.fs.LiteFileSystem.getFileStatus(LiteFileSystem.java:514)
at org.apache.hadoop.fs.FsShell.test(FsShell.java:1092)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:2285)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.fs.FsShell.main(FsShell.java:2353)
"""
print("split lines:", s.splitlines())
self.assertTrue(fs._test_match(s.splitlines()) != None)
def test_config(self):
config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"}
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
config,
time_out=6 * 1000,
sleep_inter=100)
def test_exists(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=6 * 1000,
sleep_inter=100)
self.assertFalse(fs.is_exist(os.path.abspath("./xxxx")))
self.assertFalse(fs.is_dir(os.path.abspath("./xxxx")))
self.assertTrue(fs.is_dir(os.path.abspath("./xxx/..")))
dirs, files = fs.ls_dir(os.path.abspath("./test_hdfs1.py"))
self.assertTrue(dirs == [])
self.assertTrue(len(files) == 1)
dirs, files = fs.ls_dir(os.path.abspath("./xxx/.."))
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFileExistsError, FSFileNotExistsError
java_home = os.environ["JAVA_HOME"]
from paddle.fluid.tests.unittests.hdfs_test_utils import FSTestBase
class FSTest2(FSTestBase):
def test_hdfs(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=5 * 1000,
sleep_inter=100)
self._test_rm(fs)
self._test_touch(fs)
self._test_dirs(fs)
def test_local(self):
fs = LocalFS()
self._test_rm(fs)
self._test_touch(fs)
self._test_dirs(fs)
self._test_touch_file(fs)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.distributed.fleet.utils import LocalFS, HDFSClient, FSTimeOut, FSFileExistsError, FSFileNotExistsError
java_home = os.environ["JAVA_HOME"]
from paddle.fluid.tests.unittests.hdfs_test_utils import FSTestBase
class FSTest3(FSTestBase):
def test_hdfs(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=5 * 1000,
sleep_inter=100)
self._test_mkdirs(fs)
self._test_list_dir(fs)
self._test_try_upload(fs)
self._test_try_download(fs)
self._test_upload(fs)
self._test_download(fs)
def test_local(self):
fs = LocalFS()
self._test_mkdirs(fs)
self._test_list_dir(fs)
self._test_try_upload(fs)
self._test_try_download(fs)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册