From 16596f6498faf9be75070938458f04433449c7e6 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Thu, 31 Oct 2019 20:01:50 +0800 Subject: [PATCH] Fix Paddle Cloud role maker (#20860) * fix PaddleCloud Role maker & add warning in distribute transpiler & change rpc_retry_times --- .../fluid/operators/distributed/rpc_client.cc | 2 +- .../fluid/incubate/fleet/base/role_maker.py | 19 ++++------ .../distribute_transpiler/__init__.py | 4 ++ .../tests/unittests/test_fleet_api_input.py | 38 ++++++++++++++++--- .../tests/unittests/test_fleet_rolemaker.py | 5 ++- 5 files changed, 47 insertions(+), 21 deletions(-) diff --git a/paddle/fluid/operators/distributed/rpc_client.cc b/paddle/fluid/operators/distributed/rpc_client.cc index ca48c22aa76..57ce54870de 100644 --- a/paddle/fluid/operators/distributed/rpc_client.cc +++ b/paddle/fluid/operators/distributed/rpc_client.cc @@ -17,7 +17,7 @@ // default to 3min to avoid temprary network failures. DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc"); -DEFINE_int32(rpc_retry_times, 0, "retry times for rpc"); +DEFINE_int32(rpc_retry_times, 3, "retry times for rpc"); namespace paddle { namespace operators { diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index ab68a5248cf..03657ada029 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -335,18 +335,13 @@ class PaddleCloudRoleMaker(RoleMakerBase): if not self._role_is_generated: if not self._is_collective: try: - port = os.environ["PADDLE_PORT"] - pserver_ips = os.environ["PADDLE_PSERVERS"].split(",") - if "," in port: - ports = port.split(",") - else: - ports = [port] * len(pserver_ips) - eplist = [] + # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set + # format: string(ip:port), eg. 127.0.0.1:6001 + eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split( + ",") # note that, we usually assign the same port to different ips # if we run parameter server training in local mode # port should be different in environment variables - for i, ip in enumerate(pserver_ips): - eplist.append(':'.join([ip, ports[i]])) trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) training_role = os.environ["TRAINING_ROLE"] @@ -361,9 +356,9 @@ class PaddleCloudRoleMaker(RoleMakerBase): elif training_role == "PSERVER": role = Role.SERVER cur_ip = os.environ["POD_IP"] - cur_idx = pserver_ips.index(cur_ip) - current_id = eplist.index(":".join( - [cur_ip, ports[cur_idx]])) + curr_port = os.environ["PADDLE_PORT"] + curr_endpoint = ":".join([cur_ip, curr_port]) + current_id = eplist.index(curr_endpoint) else: raise ValueError( "TRAINING_ROLE must be PSERVER or TRAINER") diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index f2c67c65b01..81e6ecc1199 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -152,6 +152,10 @@ class DistributedTranspiler(Fleet): if not isinstance(optimizer, Optimizer): raise ValueError("optimizer must be an instance of Optimizer") + if not fleet._is_initialized: + raise ValueError( + "use fleet.init(role) to initialize the role of current node before optimizer.minimize(loss)" + ) self._optimizer = TranspilerOptimizer(optimizer, strategy) return self._optimizer diff --git a/python/paddle/fluid/tests/unittests/test_fleet_api_input.py b/python/paddle/fluid/tests/unittests/test_fleet_api_input.py index eb54470623c..b3911c3d51c 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_api_input.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_api_input.py @@ -20,9 +20,11 @@ from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerCo from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import Role +import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TranspilerOptimizer from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer +from dist_simnet_bow import train_network class DistributeTranspilerConfigTest(unittest.TestCase): @@ -97,6 +99,30 @@ class FleetTest(unittest.TestCase): main_program=compiled_prog) self.assertRaises(Exception, fleet._transpile, "config") + def set_program(self, avg_cost, strategy): + optimizer = fluid.optimizer.SGD(0.1) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + def test_init_role(self): + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) + # for test optimizer without init(role) + # fleet.init(role) + batch_size = 128 + is_sparse = True + is_distribute = False + strategy = DistributeTranspilerConfig() + strategy.sync_mode = False + strategy.geo_sgd_mode = True + strategy.geo_sgd_need_push_nums = 5 + avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse) + + self.assertRaises(Exception, self.set_program, avg_cost, strategy) + class TranspilerOptimizerTest(unittest.TestCase): def testInvalidInputs(self): @@ -124,7 +150,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase): def testRoleMaker(self): self.createRoleMaker() - ## test all invalid server_endpoints + # test all invalid server_endpoints self.assertRaises( Exception, self.createRoleMaker, server_endpoints=None) # server_endpoints must be as list @@ -140,7 +166,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase): self.createRoleMaker, server_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"] ) # element in server_endpoints can't be duplicate - ## test all invalid current_id + # test all invalid current_id self.assertRaises( Exception, self.createRoleMaker, current_id="0") # current_id must be as int @@ -154,14 +180,14 @@ class UserDefinedRoleMakerTest(unittest.TestCase): role=Role.SERVER, server_endpoints=["127.0.0.1:8080"] ) # if role is server, current_id must be less than len(server_endpoints) - ## test all invalid worker_num + # test all invalid worker_num self.assertRaises( Exception, self.createRoleMaker, worker_num="1") # worker_num must be as int self.assertRaises( Exception, self.createRoleMaker, worker_num=0) # worker_num must be greater than 0 - ## test all invalid role + # test all invalid role self.assertRaises( Exception, self.createRoleMaker, role=3) # role must be as Role(Role.WORKER=1, Role.SERVER=2) @@ -174,7 +200,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase): def testRoleMaker(self): self.createRoleMaker() - ## test all invalid worker_endpoints + # test all invalid worker_endpoints self.assertRaises( Exception, self.createRoleMaker, worker_endpoints=None) # worker_endpoints must be as list @@ -190,7 +216,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase): self.createRoleMaker, worker_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"] ) # element in worker_endpoints can't be duplicate - ## test all invalid current_id + # test all invalid current_id self.assertRaises( Exception, self.createRoleMaker, current_id="0") # current_id must be as int diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py index f8ee00a3188..298f7687093 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker.py @@ -21,9 +21,9 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker class TestCloudRoleMaker(unittest.TestCase): def setUp(self): - os.environ["PADDLE_PORT"] = "36001" - os.environ["PADDLE_PSERVERS"] = "127.0.0.1,127.0.0.2" os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" def test_tr_rolemaker(self): os.environ["TRAINING_ROLE"] = "TRAINER" @@ -39,6 +39,7 @@ class TestCloudRoleMaker(unittest.TestCase): def test_ps_rolemaker(self): os.environ["TRAINING_ROLE"] = "PSERVER" os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" ro = role_maker.PaddleCloudRoleMaker(is_collective=False) ro.generate_role() -- GitLab