未验证 提交 d9d314d5 编写于 作者: C chalsliu 提交者: GitHub

Improve stability through the use of caching (#22922)

* Improved stability through the use of caching

test=develop

* fix style

test=develop

* Fix syntax error

test=develop

* Fix syntax error

test=develop
上级 4af491c2
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Testcases for Downpour.""" """Test cases for Downpour."""
from __future__ import print_function from __future__ import print_function
...@@ -31,12 +31,16 @@ from google.protobuf import text_format ...@@ -31,12 +31,16 @@ from google.protobuf import text_format
import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib
from paddle.fluid.trainer_factory import TrainerFactory from paddle.fluid.trainer_factory import TrainerFactory
cache_path = os.path.expanduser('~/.cache/paddle/dataset')
class TestListenAndServOp(unittest.TestCase): class TestListenAndServOp(unittest.TestCase):
"""TestListenAndServOp.""" """This class is Test Listen And ServOp."""
def setUp(self): def setUp(self):
pass """This function is set Up."""
if not os.path.exists(cache_path):
os.makedirs(cache_path)
def test_device_work_use_cvm(self): def test_device_work_use_cvm(self):
"""test device work use_cvm.""" """test device work use_cvm."""
...@@ -44,7 +48,10 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -44,7 +48,10 @@ class TestListenAndServOp(unittest.TestCase):
pass pass
else: else:
print(sys.platform) print(sys.platform)
cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" if not os.path.exists('{}/{}'.format(cache_path,
'fleet_desc.prototxt')):
cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format(
cache_path)
os.system(cmd) os.system(cmd)
x = fluid.layers.data(name='x', shape=[1], dtype='int64') x = fluid.layers.data(name='x', shape=[1], dtype='int64')
x_emb = fluid.layers.embedding( x_emb = fluid.layers.embedding(
...@@ -55,7 +62,7 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -55,7 +62,7 @@ class TestListenAndServOp(unittest.TestCase):
avg_cost = fluid.layers.mean(cost) avg_cost = fluid.layers.mean(cost)
ps_param = pslib.PSParameter() ps_param = pslib.PSParameter()
with open("fleet_desc.prototxt") as f: with open("{}/fleet_desc.prototxt".format(cache_path)) as f:
text_format.Merge(f.read(), ps_param) text_format.Merge(f.read(), ps_param)
fleet_desc = ps_param fleet_desc = ps_param
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
...@@ -91,16 +98,17 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -91,16 +98,17 @@ class TestListenAndServOp(unittest.TestCase):
trainer = TrainerFactory()._create_trainer(main_program._fleet_opt) trainer = TrainerFactory()._create_trainer(main_program._fleet_opt)
trainer._set_program(main_program) trainer._set_program(main_program)
trainer._gen_trainer_desc() trainer._gen_trainer_desc()
cmd = "rm fleet_desc.prototxt*"
os.system(cmd)
def test_device_work(self): def test_device_work(self):
"""test devicve worker.""" """This function is test devicve worker."""
if sys.platform == 'win32' or sys.platform == 'sys.platform': if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass pass
else: else:
print(sys.platform) print(sys.platform)
cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" if not os.path.exists('{}/{}'.format(cache_path,
'fleet_desc.prototxt')):
cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format(
cache_path)
os.system(cmd) os.system(cmd)
x = fluid.layers.data(name='x', shape=[1], dtype='int64') x = fluid.layers.data(name='x', shape=[1], dtype='int64')
x_emb = fluid.layers.embedding( x_emb = fluid.layers.embedding(
...@@ -111,7 +119,7 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -111,7 +119,7 @@ class TestListenAndServOp(unittest.TestCase):
avg_cost = fluid.layers.mean(cost) avg_cost = fluid.layers.mean(cost)
ps_param = pslib.PSParameter() ps_param = pslib.PSParameter()
with open("fleet_desc.prototxt") as f: with open("{}/fleet_desc.prototxt".format(cache_path)) as f:
text_format.Merge(f.read(), ps_param) text_format.Merge(f.read(), ps_param)
fleet_desc = ps_param fleet_desc = ps_param
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
...@@ -147,16 +155,17 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -147,16 +155,17 @@ class TestListenAndServOp(unittest.TestCase):
trainer = TrainerFactory()._create_trainer(main_program._fleet_opt) trainer = TrainerFactory()._create_trainer(main_program._fleet_opt)
trainer._set_program(main_program) trainer._set_program(main_program)
trainer._gen_trainer_desc() trainer._gen_trainer_desc()
cmd = "rm fleet_desc.prototxt*"
os.system(cmd)
def test_downpour_opt_work(self): def test_downpour_opt_work(self):
"""test devicve worker.""" """This function is test devicve worker."""
if sys.platform == 'win32' or sys.platform == 'sys.platform': if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass pass
else: else:
print(sys.platform) print(sys.platform)
cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt" if not os.path.exists('{}/{}'.format(cache_path,
'fleet_desc.prototxt')):
cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format(
cache_path)
os.system(cmd) os.system(cmd)
x = fluid.layers.data(name='x', shape=[1], dtype='int64') x = fluid.layers.data(name='x', shape=[1], dtype='int64')
x_emb = fluid.layers.embedding( x_emb = fluid.layers.embedding(
...@@ -167,7 +176,7 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -167,7 +176,7 @@ class TestListenAndServOp(unittest.TestCase):
avg_cost = fluid.layers.mean(cost) avg_cost = fluid.layers.mean(cost)
ps_param = pslib.PSParameter() ps_param = pslib.PSParameter()
with open("fleet_desc.prototxt") as f: with open("{}/fleet_desc.prototxt".format(cache_path)) as f:
text_format.Merge(f.read(), ps_param) text_format.Merge(f.read(), ps_param)
fleet_desc = ps_param fleet_desc = ps_param
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
...@@ -203,8 +212,6 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -203,8 +212,6 @@ class TestListenAndServOp(unittest.TestCase):
trainer = TrainerFactory()._create_trainer(main_program._fleet_opt) trainer = TrainerFactory()._create_trainer(main_program._fleet_opt)
trainer._set_program(main_program) trainer._set_program(main_program)
trainer._gen_trainer_desc() trainer._gen_trainer_desc()
cmd = "rm fleet_desc.prototxt*"
os.system(cmd)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""test f1 listen and serv_op."""
from __future__ import print_function from __future__ import print_function
...@@ -29,8 +30,23 @@ import urllib ...@@ -29,8 +30,23 @@ import urllib
import sys import sys
from dist_test_utils import * from dist_test_utils import *
cache_path = os.path.expanduser('~/.cache/paddle/dataset')
def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id):
'''
This function is run trainer.
Args:
use_cuda (bool): whether use cuda.
sync_mode (nouse): specify sync mode.
ip (string): the ip address.
port (string): the port for listening.
trainers (int): the count of trainer.
trainer_id (int): the id of trainer.
Returns:
None
'''
x = fluid.layers.data(name='x', shape=[1], dtype='float32') x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None) y_predict = fluid.layers.fc(input=x, size=1, act=None)
y = fluid.layers.data(name='y', shape=[1], dtype='float32') y = fluid.layers.data(name='y', shape=[1], dtype='float32')
...@@ -40,11 +56,11 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -40,11 +56,11 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id):
# optimizer # optimizer
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost) sgd_optimizer.minimize(avg_cost)
with open("trainer_recv_program.dms", "rb") as f: with open("{}/trainer_recv_program.dms".format(cache_path), "rb") as f:
trainer_recv_program_desc_str = f.read() trainer_recv_program_desc_str = f.read()
with open("trainer_main_program.dms", "rb") as f: with open("{}/trainer_main_program.dms".format(cache_path), "rb") as f:
trainer_main_program_desc_str = f.read() trainer_main_program_desc_str = f.read()
with open("trainer_send_program.dms", "rb") as f: with open("{}/trainer_send_program.dms".format(cache_path), "rb") as f:
trainer_send_program_desc_str = f.read() trainer_send_program_desc_str = f.read()
recv_program = Program.parse_from_string(trainer_recv_program_desc_str) recv_program = Program.parse_from_string(trainer_recv_program_desc_str)
main_program = Program.parse_from_string(trainer_main_program_desc_str) main_program = Program.parse_from_string(trainer_main_program_desc_str)
...@@ -66,6 +82,19 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -66,6 +82,19 @@ def run_trainer(use_cuda, sync_mode, ip, port, trainers, trainer_id):
def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
'''
This function is run trainer.
Args:
use_cuda (bool): whether use cuda.
sync_mode (nouse): specify sync mode.
ip (string): the ip address.
port (string): the port for listening.
trainers (int): the count of trainer.
trainer_id (int): the id of trainer.
Returns:
None
'''
remove_ps_flag(os.getpid()) remove_ps_flag(os.getpid())
x = fluid.layers.data(name='x', shape=[1], dtype='float32') x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None) y_predict = fluid.layers.fc(input=x, size=1, act=None)
...@@ -76,9 +105,9 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -76,9 +105,9 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
# optimizer # optimizer
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost) sgd_optimizer.minimize(avg_cost)
with open("pserver_startup_program.dms", "rb") as f: with open("{}/pserver_startup_program.dms".format(cache_path), "rb") as f:
pserver_startup_program_desc_str = f.read() pserver_startup_program_desc_str = f.read()
with open("pserver_main_program.dms", "rb") as f: with open("{}/pserver_main_program.dms".format(cache_path), "rb") as f:
pserver_main_program_desc_str = f.read() pserver_main_program_desc_str = f.read()
startup_program = Program.parse_from_string( startup_program = Program.parse_from_string(
...@@ -92,7 +121,10 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -92,7 +121,10 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
class TestFlListenAndServOp(unittest.TestCase): class TestFlListenAndServOp(unittest.TestCase):
"""This class is Test Fl Listen And ServOp."""
def setUp(self): def setUp(self):
"""This function si set Up."""
self.ps_timeout = 5 self.ps_timeout = 5
self.ip = "127.0.0.1" self.ip = "127.0.0.1"
self.port = "6000" self.port = "6000"
...@@ -100,6 +132,7 @@ class TestFlListenAndServOp(unittest.TestCase): ...@@ -100,6 +132,7 @@ class TestFlListenAndServOp(unittest.TestCase):
self.trainer_id = 0 self.trainer_id = 0
def _start_pserver(self, use_cuda, sync_mode, pserver_func): def _start_pserver(self, use_cuda, sync_mode, pserver_func):
"""This function is start pserver."""
p = Process( p = Process(
target=pserver_func, target=pserver_func,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers,
...@@ -109,6 +142,7 @@ class TestFlListenAndServOp(unittest.TestCase): ...@@ -109,6 +142,7 @@ class TestFlListenAndServOp(unittest.TestCase):
return p return p
def _start_trainer0(self, use_cuda, sync_mode, pserver_func): def _start_trainer0(self, use_cuda, sync_mode, pserver_func):
"""This function is start trainer0."""
p = Process( p = Process(
target=pserver_func, target=pserver_func,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, 0)) args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, 0))
...@@ -117,6 +151,7 @@ class TestFlListenAndServOp(unittest.TestCase): ...@@ -117,6 +151,7 @@ class TestFlListenAndServOp(unittest.TestCase):
return p return p
def _start_trainer1(self, use_cuda, sync_mode, pserver_func): def _start_trainer1(self, use_cuda, sync_mode, pserver_func):
"""This function is start trainer1."""
p = Process( p = Process(
target=pserver_func, target=pserver_func,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, 1)) args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, 1))
...@@ -125,6 +160,7 @@ class TestFlListenAndServOp(unittest.TestCase): ...@@ -125,6 +160,7 @@ class TestFlListenAndServOp(unittest.TestCase):
return p return p
def _wait_ps_ready(self, pid): def _wait_ps_ready(self, pid):
"""This function is wait ps ready."""
start_left_time = self.ps_timeout start_left_time = self.ps_timeout
sleep_time = 0.5 sleep_time = 0.5
while True: while True:
...@@ -137,24 +173,29 @@ class TestFlListenAndServOp(unittest.TestCase): ...@@ -137,24 +173,29 @@ class TestFlListenAndServOp(unittest.TestCase):
start_left_time -= sleep_time start_left_time -= sleep_time
def test_rpc_interfaces(self): def test_rpc_interfaces(self):
"""TODO(Yancey1989): need to make sure the rpc interface correctly."""
# TODO(Yancey1989): need to make sure the rpc interface correctly. # TODO(Yancey1989): need to make sure the rpc interface correctly.
pass pass
def test_handle_signal_in_serv_op(self): def test_handle_signal_in_serv_op(self):
"""run pserver on CPU in sync mode."""
# run pserver on CPU in sync mode # run pserver on CPU in sync mode
if sys.platform == 'win32' or sys.platform == 'sys.platform': if sys.platform == 'win32' or sys.platform == 'sys.platform':
pass pass
else: else:
print(sys.platform) print(sys.platform)
cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/pserver_startup_program.dms" file_list = [
os.system(cmd) 'pserver_startup_program.dms', 'pserver_main_program.dms',
cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/pserver_main_program.dms" 'trainer_recv_program.dms', 'trainer_main_program.dms',
os.system(cmd) 'trainer_send_program.dms'
cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/trainer_recv_program.dms" ]
os.system(cmd) if not os.path.exists(cache_path):
cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/trainer_main_program.dms" os.makedirs(cache_path)
os.system(cmd) prefix = 'wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/'
cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/trainer_send_program.dms" for f in file_list:
if not os.path.exists('{}/{}'.format(cache_path, f)):
cmd = "wget --no-check-certificate https://paddlefl.bj.bcebos.com/test_fl_listen_and_serv/{} -P {}/".format(
f, cache_path)
os.system(cmd) os.system(cmd)
p1 = self._start_pserver(False, True, run_pserver) p1 = self._start_pserver(False, True, run_pserver)
self._wait_ps_ready(p1.pid) self._wait_ps_ready(p1.pid)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册