未验证 提交 16596f64 编写于 作者: C Chengmo 提交者: GitHub

Fix Paddle Cloud role maker (#20860)

* fix PaddleCloud Role maker & add warning in distribute transpiler  & change rpc_retry_times
上级 59de8e12
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// default to 3min to avoid temprary network failures. // default to 3min to avoid temprary network failures.
DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc"); DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc");
DEFINE_int32(rpc_retry_times, 0, "retry times for rpc"); DEFINE_int32(rpc_retry_times, 3, "retry times for rpc");
namespace paddle { namespace paddle {
namespace operators { namespace operators {
......
...@@ -335,18 +335,13 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -335,18 +335,13 @@ class PaddleCloudRoleMaker(RoleMakerBase):
if not self._role_is_generated: if not self._role_is_generated:
if not self._is_collective: if not self._is_collective:
try: try:
port = os.environ["PADDLE_PORT"] # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set
pserver_ips = os.environ["PADDLE_PSERVERS"].split(",") # format: string(ip:port), eg. 127.0.0.1:6001
if "," in port: eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split(
ports = port.split(",") ",")
else:
ports = [port] * len(pserver_ips)
eplist = []
# note that, we usually assign the same port to different ips # note that, we usually assign the same port to different ips
# if we run parameter server training in local mode # if we run parameter server training in local mode
# port should be different in environment variables # port should be different in environment variables
for i, ip in enumerate(pserver_ips):
eplist.append(':'.join([ip, ports[i]]))
trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"])
training_role = os.environ["TRAINING_ROLE"] training_role = os.environ["TRAINING_ROLE"]
...@@ -361,9 +356,9 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -361,9 +356,9 @@ class PaddleCloudRoleMaker(RoleMakerBase):
elif training_role == "PSERVER": elif training_role == "PSERVER":
role = Role.SERVER role = Role.SERVER
cur_ip = os.environ["POD_IP"] cur_ip = os.environ["POD_IP"]
cur_idx = pserver_ips.index(cur_ip) curr_port = os.environ["PADDLE_PORT"]
current_id = eplist.index(":".join( curr_endpoint = ":".join([cur_ip, curr_port])
[cur_ip, ports[cur_idx]])) current_id = eplist.index(curr_endpoint)
else: else:
raise ValueError( raise ValueError(
"TRAINING_ROLE must be PSERVER or TRAINER") "TRAINING_ROLE must be PSERVER or TRAINER")
......
...@@ -152,6 +152,10 @@ class DistributedTranspiler(Fleet): ...@@ -152,6 +152,10 @@ class DistributedTranspiler(Fleet):
if not isinstance(optimizer, Optimizer): if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer") raise ValueError("optimizer must be an instance of Optimizer")
if not fleet._is_initialized:
raise ValueError(
"use fleet.init(role) to initialize the role of current node before optimizer.minimize(loss)"
)
self._optimizer = TranspilerOptimizer(optimizer, strategy) self._optimizer = TranspilerOptimizer(optimizer, strategy)
return self._optimizer return self._optimizer
......
...@@ -20,9 +20,11 @@ from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerCo ...@@ -20,9 +20,11 @@ from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerCo
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import Role from paddle.fluid.incubate.fleet.base.role_maker import Role
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TranspilerOptimizer from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TranspilerOptimizer
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer
from dist_simnet_bow import train_network
class DistributeTranspilerConfigTest(unittest.TestCase): class DistributeTranspilerConfigTest(unittest.TestCase):
...@@ -97,6 +99,30 @@ class FleetTest(unittest.TestCase): ...@@ -97,6 +99,30 @@ class FleetTest(unittest.TestCase):
main_program=compiled_prog) main_program=compiled_prog)
self.assertRaises(Exception, fleet._transpile, "config") self.assertRaises(Exception, fleet._transpile, "config")
def set_program(self, avg_cost, strategy):
optimizer = fluid.optimizer.SGD(0.1)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
def test_init_role(self):
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
# for test optimizer without init(role)
# fleet.init(role)
batch_size = 128
is_sparse = True
is_distribute = False
strategy = DistributeTranspilerConfig()
strategy.sync_mode = False
strategy.geo_sgd_mode = True
strategy.geo_sgd_need_push_nums = 5
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse)
self.assertRaises(Exception, self.set_program, avg_cost, strategy)
class TranspilerOptimizerTest(unittest.TestCase): class TranspilerOptimizerTest(unittest.TestCase):
def testInvalidInputs(self): def testInvalidInputs(self):
...@@ -124,7 +150,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase): ...@@ -124,7 +150,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase):
def testRoleMaker(self): def testRoleMaker(self):
self.createRoleMaker() self.createRoleMaker()
## test all invalid server_endpoints # test all invalid server_endpoints
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
server_endpoints=None) # server_endpoints must be as list server_endpoints=None) # server_endpoints must be as list
...@@ -140,7 +166,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase): ...@@ -140,7 +166,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase):
self.createRoleMaker, self.createRoleMaker,
server_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"] server_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"]
) # element in server_endpoints can't be duplicate ) # element in server_endpoints can't be duplicate
## test all invalid current_id # test all invalid current_id
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
current_id="0") # current_id must be as int current_id="0") # current_id must be as int
...@@ -154,14 +180,14 @@ class UserDefinedRoleMakerTest(unittest.TestCase): ...@@ -154,14 +180,14 @@ class UserDefinedRoleMakerTest(unittest.TestCase):
role=Role.SERVER, role=Role.SERVER,
server_endpoints=["127.0.0.1:8080"] server_endpoints=["127.0.0.1:8080"]
) # if role is server, current_id must be less than len(server_endpoints) ) # if role is server, current_id must be less than len(server_endpoints)
## test all invalid worker_num # test all invalid worker_num
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
worker_num="1") # worker_num must be as int worker_num="1") # worker_num must be as int
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
worker_num=0) # worker_num must be greater than 0 worker_num=0) # worker_num must be greater than 0
## test all invalid role # test all invalid role
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
role=3) # role must be as Role(Role.WORKER=1, Role.SERVER=2) role=3) # role must be as Role(Role.WORKER=1, Role.SERVER=2)
...@@ -174,7 +200,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase): ...@@ -174,7 +200,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase):
def testRoleMaker(self): def testRoleMaker(self):
self.createRoleMaker() self.createRoleMaker()
## test all invalid worker_endpoints # test all invalid worker_endpoints
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
worker_endpoints=None) # worker_endpoints must be as list worker_endpoints=None) # worker_endpoints must be as list
...@@ -190,7 +216,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase): ...@@ -190,7 +216,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase):
self.createRoleMaker, self.createRoleMaker,
worker_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"] worker_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"]
) # element in worker_endpoints can't be duplicate ) # element in worker_endpoints can't be duplicate
## test all invalid current_id # test all invalid current_id
self.assertRaises( self.assertRaises(
Exception, self.createRoleMaker, Exception, self.createRoleMaker,
current_id="0") # current_id must be as int current_id="0") # current_id must be as int
......
...@@ -21,9 +21,9 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker ...@@ -21,9 +21,9 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestCloudRoleMaker(unittest.TestCase): class TestCloudRoleMaker(unittest.TestCase):
def setUp(self): def setUp(self):
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_PSERVERS"] = "127.0.0.1,127.0.0.2"
os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_tr_rolemaker(self): def test_tr_rolemaker(self):
os.environ["TRAINING_ROLE"] = "TRAINER" os.environ["TRAINING_ROLE"] = "TRAINER"
...@@ -39,6 +39,7 @@ class TestCloudRoleMaker(unittest.TestCase): ...@@ -39,6 +39,7 @@ class TestCloudRoleMaker(unittest.TestCase):
def test_ps_rolemaker(self): def test_ps_rolemaker(self):
os.environ["TRAINING_ROLE"] = "PSERVER" os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1" os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
ro = role_maker.PaddleCloudRoleMaker(is_collective=False) ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
ro.generate_role() ro.generate_role()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册