From d9d314d56ae367e4c5f144231075d5eea2aeddbf Mon Sep 17 00:00:00 2001 From: chalsliu <45041955+chalsliu@users.noreply.github.com> Date: Tue, 17 Mar 2020 10:55:15 +0800 Subject: [PATCH] Improve stability through the use of caching (#22922) * Improved stability through the use of caching test=develop * fix style test=develop * Fix syntax error test=develop * Fix syntax error test=develop --- .../fluid/tests/unittests/test_downpoursgd.py | 47 ++++++------ .../unittests/test_fl_listen_and_serv_op.py | 71 +++++++++++++++---- 2 files changed, 83 insertions(+), 35 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_downpoursgd.py b/python/paddle/fluid/tests/unittests/test_downpoursgd.py index 582c7eef5a3..79e705dfdb0 100644 --- a/python/paddle/fluid/tests/unittests/test_downpoursgd.py +++ b/python/paddle/fluid/tests/unittests/test_downpoursgd.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Testcases for Downpour.""" +"""Test cases for Downpour.""" from __future__ import print_function @@ -31,12 +31,16 @@ from google.protobuf import text_format import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib from paddle.fluid.trainer_factory import TrainerFactory +cache_path = os.path.expanduser('~/.cache/paddle/dataset') + class TestListenAndServOp(unittest.TestCase): - """TestListenAndServOp.""" + """This class is Test Listen And ServOp.""" def setUp(self): - pass + """This function is set Up.""" + if not os.path.exists(cache_path): + os.makedirs(cache_path) def test_device_work_use_cvm(self): """test device work use_cvm.""" @@ -44,8 +48,11 @@ class TestListenAndServOp(unittest.TestCase): pass else: print(sys.platform) - cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" - os.system(cmd) + if not os.path.exists('{}/{}'.format(cache_path, + 'fleet_desc.prototxt')): + cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format( + cache_path) + os.system(cmd) x = fluid.layers.data(name='x', shape=[1], dtype='int64') x_emb = fluid.layers.embedding( input=x, size=[1, 2], is_distributed=True) @@ -55,7 +62,7 @@ class TestListenAndServOp(unittest.TestCase): avg_cost = fluid.layers.mean(cost) ps_param = pslib.PSParameter() - with open("fleet_desc.prototxt") as f: + with open("{}/fleet_desc.prototxt".format(cache_path)) as f: text_format.Merge(f.read(), ps_param) fleet_desc = ps_param exe = fluid.Executor(fluid.CPUPlace()) @@ -91,17 +98,18 @@ class TestListenAndServOp(unittest.TestCase): trainer = TrainerFactory()._create_trainer(main_program._fleet_opt) trainer._set_program(main_program) trainer._gen_trainer_desc() - cmd = "rm fleet_desc.prototxt*" - os.system(cmd) def test_device_work(self): - """test devicve worker.""" + """This function is test devicve worker.""" if sys.platform == 'win32' or sys.platform == 'sys.platform': pass else: print(sys.platform) - cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" - os.system(cmd) + if not os.path.exists('{}/{}'.format(cache_path, + 'fleet_desc.prototxt')): + cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format( + cache_path) + os.system(cmd) x = fluid.layers.data(name='x', shape=[1], dtype='int64') x_emb = fluid.layers.embedding( input=x, size=[1, 2], is_distributed=True) @@ -111,7 +119,7 @@ class TestListenAndServOp(unittest.TestCase): avg_cost = fluid.layers.mean(cost) ps_param = pslib.PSParameter() - with open("fleet_desc.prototxt") as f: + with open("{}/fleet_desc.prototxt".format(cache_path)) as f: text_format.Merge(f.read(), ps_param) fleet_desc = ps_param exe = fluid.Executor(fluid.CPUPlace()) @@ -147,17 +155,18 @@ class TestListenAndServOp(unittest.TestCase): trainer = TrainerFactory()._create_trainer(main_program._fleet_opt) trainer._set_program(main_program) trainer._gen_trainer_desc() - cmd = "rm fleet_desc.prototxt*" - os.system(cmd) def test_downpour_opt_work(self): - """test devicve worker.""" + """This function is test devicve worker.""" if sys.platform == 'win32' or sys.platform == 'sys.platform': pass else: print(sys.platform) - cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" - os.system(cmd) + if not os.path.exists('{}/{}'.format(cache_path, + 'fleet_desc.prototxt')): + cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format( + cache_path) + os.system(cmd) x = fluid.layers.data(name='x', shape=[1], dtype='int64') x_emb = fluid.layers.embedding( input=x, size=[1, 2], is_distributed=True) @@ -167,7 +176,7 @@ class TestListenAndServOp(unittest.TestCase): avg_cost = fluid.layers.mean(cost) ps_param = pslib.PSParameter() - with open("fleet_desc.prototxt") as f: + with open("{}/fleet_desc.prototxt".format(cache_path)) as f: text_format.Merge(f.read(), ps_param) fleet_desc = ps_param exe = fluid.Executor(fluid.CPUPlace()) @@ -203,8 +212,6 @@ class TestListenAndServOp(unittest.TestCase): trainer = TrainerFactory()._create_trainer(main_program._fleet_opt) trainer._set_program(main_program) trainer._gen_trainer_desc() - cmd = "rm fleet_desc.prototxt*" - os.system(cmd) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py b/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py index e9855115268..de6b48e2cec 100644 --- a/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py +++ b/python/paddle/fluid/tests/unittests/test_fl_listen_and_serv_op.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""test f1 listen and serv_op.""" from __future__ import print_function @@ -29,8 +30,23 @@ import urllib import sys from dist_test_utils import * +cache_path = os.path.expanduser('~/.cache/paddle/dataset') + def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): + ''' + This function is run trainer. + Args: + use_cuda (bool): whether use cuda. + sync_mode (nouse): specify sync mode. + ip (string): the ip address. + port (string): the port for listening. + trainers (int): the count of trainer. + trainer_id (int): the id of trainer. + + Returns: + None + ''' x = fluid.layers.data(name='x', shape=[1], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None) y = fluid.layers.data(name='y', shape=[1], dtype='float32') @@ -40,11 +56,11 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): # optimizer sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer.minimize(avg_cost) - with open("trainer_recv_program.dms", "rb") as f: + with open("{}/trainer_recv_program.dms".format(cache_path), "rb") as f: trainer_recv_program_desc_str = f.read() - with open("trainer_main_program.dms", "rb") as f: + with open("{}/trainer_main_program.dms".format(cache_path), "rb") as f: trainer_main_program_desc_str = f.read() - with open("trainer_send_program.dms", "rb") as f: + with open("{}/trainer_send_program.dms".format(cache_path), "rb") as f: trainer_send_program_desc_str = f.read() recv_program = Program.parse_from_string(trainer_recv_program_desc_str) main_program = Program.parse_from_string(trainer_main_program_desc_str) @@ -66,6 +82,19 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): + ''' + This function is run trainer. + Args: + use_cuda (bool): whether use cuda. + sync_mode (nouse): specify sync mode. + ip (string): the ip address. + port (string): the port for listening. + trainers (int): the count of trainer. + trainer_id (int): the id of trainer. + + Returns: + None + ''' remove_ps_flag(os.getpid()) x = fluid.layers.data(name='x', shape=[1], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None) @@ -76,9 +105,9 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): # optimizer sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer.minimize(avg_cost) - with open("pserver_startup_program.dms", "rb") as f: + with open("{}/pserver_startup_program.dms".format(cache_path), "rb") as f: pserver_startup_program_desc_str = f.read() - with open("pserver_main_program.dms", "rb") as f: + with open("{}/pserver_main_program.dms".format(cache_path), "rb") as f: pserver_main_program_desc_str = f.read() startup_program = Program.parse_from_string( @@ -92,7 +121,10 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): class TestFlListenAndServOp(unittest.TestCase): + """This class is Test Fl Listen And ServOp.""" + def setUp(self): + """This function si set Up.""" self.ps_timeout = 5 self.ip = "127.0.0.1" self.port = "6000" @@ -100,6 +132,7 @@ class TestFlListenAndServOp(unittest.TestCase): self.trainer_id = 0 def _start_pserver(self, use_cuda, sync_mode, pserver_func): + """This function is start pserver.""" p = Process( target=pserver_func, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, @@ -109,6 +142,7 @@ class TestFlListenAndServOp(unittest.TestCase): return p def _start_trainer0(self, use_cuda, sync_mode, pserver_func): + """This function is start trainer0.""" p = Process( target=pserver_func, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, 0)) @@ -117,6 +151,7 @@ class TestFlListenAndServOp(unittest.TestCase): return p def _start_trainer1(self, use_cuda, sync_mode, pserver_func): + """This function is start trainer1.""" p = Process( target=pserver_func, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, 1)) @@ -125,6 +160,7 @@ class TestFlListenAndServOp(unittest.TestCase): return p def _wait_ps_ready(self, pid): + """This function is wait ps ready.""" start_left_time = self.ps_timeout sleep_time = 0.5 while True: @@ -137,25 +173,30 @@ class TestFlListenAndServOp(unittest.TestCase): start_left_time -= sleep_time def test_rpc_interfaces(self): + """TODO(Yancey1989): need to make sure the rpc interface correctly.""" # TODO(Yancey1989): need to make sure the rpc interface correctly. pass def test_handle_signal_in_serv_op(self): + """run pserver on CPU in sync mode.""" # run pserver on CPU in sync mode if sys.platform == 'win32' or sys.platform == 'sys.platform': pass else: print(sys.platform) - cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/pserver_startup_program.dms" - os.system(cmd) - cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/pserver_main_program.dms" - os.system(cmd) - cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/trainer_recv_program.dms" - os.system(cmd) - cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/trainer_main_program.dms" - os.system(cmd) - cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/trainer_send_program.dms" - os.system(cmd) + file_list = [ + 'pserver_startup_program.dms', 'pserver_main_program.dms', + 'trainer_recv_program.dms', 'trainer_main_program.dms', + 'trainer_send_program.dms' + ] + if not os.path.exists(cache_path): + os.makedirs(cache_path) + prefix = 'wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/' + for f in file_list: + if not os.path.exists('{}/{}'.format(cache_path, f)): + cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/{} -P {}/".format( + f, cache_path) + os.system(cmd) p1 = self._start_pserver(False, True, run_pserver) self._wait_ps_ready(p1.pid) time.sleep(5) -- GitLab