未验证 提交 0b429a22 编写于 作者: C Chengmo 提交者: GitHub

[Cherry-pick]Cherry pick paddle cloud role maker (#20947)

* Fix Paddle Cloud role maker (#20860)
上级 ad867398
......@@ -17,7 +17,7 @@
// default to 3min to avoid temprary network failures.
DEFINE_int32(rpc_deadline, 180000, "deadline timeouts for rpc");
DEFINE_int32(rpc_retry_times, 0, "retry times for rpc");
DEFINE_int32(rpc_retry_times, 3, "retry times for rpc");
namespace paddle {
namespace operators {
......
......@@ -335,18 +335,13 @@ class PaddleCloudRoleMaker(RoleMakerBase):
if not self._role_is_generated:
if not self._is_collective:
try:
port = os.environ["PADDLE_PORT"]
pserver_ips = os.environ["PADDLE_PSERVERS"].split(",")
if "," in port:
ports = port.split(",")
else:
ports = [port] * len(pserver_ips)
eplist = []
# Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set
# format: string(ip:port), eg. 127.0.0.1:6001
eplist = os.environ["PADDLE_PSERVERS_IP_PORT_LIST"].split(
",")
# note that, we usually assign the same port to different ips
# if we run parameter server training in local mode
# port should be different in environment variables
for i, ip in enumerate(pserver_ips):
eplist.append(':'.join([ip, ports[i]]))
trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"])
training_role = os.environ["TRAINING_ROLE"]
......@@ -361,9 +356,9 @@ class PaddleCloudRoleMaker(RoleMakerBase):
elif training_role == "PSERVER":
role = Role.SERVER
cur_ip = os.environ["POD_IP"]
cur_idx = pserver_ips.index(cur_ip)
current_id = eplist.index(":".join(
[cur_ip, ports[cur_idx]]))
curr_port = os.environ["PADDLE_PORT"]
curr_endpoint = ":".join([cur_ip, curr_port])
current_id = eplist.index(curr_endpoint)
else:
raise ValueError(
"TRAINING_ROLE must be PSERVER or TRAINER")
......
......@@ -152,6 +152,10 @@ class DistributedTranspiler(Fleet):
if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")
if not fleet._is_initialized:
raise ValueError(
"use fleet.init(role) to initialize the role before use fleet.distributed_optimizer()"
)
self._optimizer = TranspilerOptimizer(optimizer, strategy)
return self._optimizer
......
......@@ -20,9 +20,11 @@ from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerCo
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import Role
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TranspilerOptimizer
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer
from dist_simnet_bow import train_network
class DistributeTranspilerConfigTest(unittest.TestCase):
......@@ -97,6 +99,30 @@ class FleetTest(unittest.TestCase):
main_program=compiled_prog)
self.assertRaises(Exception, fleet._transpile, "config")
def set_program(self, avg_cost, strategy):
optimizer = fluid.optimizer.SGD(0.1)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
def test_init_role(self):
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
# for test optimizer without init(role)
# fleet.init(role)
batch_size = 128
is_sparse = True
is_distribute = False
strategy = DistributeTranspilerConfig()
strategy.sync_mode = False
strategy.geo_sgd_mode = True
strategy.geo_sgd_need_push_nums = 5
avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse)
self.assertRaises(Exception, self.set_program, avg_cost, strategy)
class TranspilerOptimizerTest(unittest.TestCase):
def testInvalidInputs(self):
......@@ -124,7 +150,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase):
def testRoleMaker(self):
self.createRoleMaker()
## test all invalid server_endpoints
# test all invalid server_endpoints
self.assertRaises(
Exception, self.createRoleMaker,
server_endpoints=None) # server_endpoints must be as list
......@@ -140,7 +166,7 @@ class UserDefinedRoleMakerTest(unittest.TestCase):
self.createRoleMaker,
server_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"]
) # element in server_endpoints can't be duplicate
## test all invalid current_id
# test all invalid current_id
self.assertRaises(
Exception, self.createRoleMaker,
current_id="0") # current_id must be as int
......@@ -154,14 +180,14 @@ class UserDefinedRoleMakerTest(unittest.TestCase):
role=Role.SERVER,
server_endpoints=["127.0.0.1:8080"]
) # if role is server, current_id must be less than len(server_endpoints)
## test all invalid worker_num
# test all invalid worker_num
self.assertRaises(
Exception, self.createRoleMaker,
worker_num="1") # worker_num must be as int
self.assertRaises(
Exception, self.createRoleMaker,
worker_num=0) # worker_num must be greater than 0
## test all invalid role
# test all invalid role
self.assertRaises(
Exception, self.createRoleMaker,
role=3) # role must be as Role(Role.WORKER=1, Role.SERVER=2)
......@@ -174,7 +200,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase):
def testRoleMaker(self):
self.createRoleMaker()
## test all invalid worker_endpoints
# test all invalid worker_endpoints
self.assertRaises(
Exception, self.createRoleMaker,
worker_endpoints=None) # worker_endpoints must be as list
......@@ -190,7 +216,7 @@ class UserDefinedCollectiveRoleMakerTest(unittest.TestCase):
self.createRoleMaker,
worker_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"]
) # element in worker_endpoints can't be duplicate
## test all invalid current_id
# test all invalid current_id
self.assertRaises(
Exception, self.createRoleMaker,
current_id="0") # current_id must be as int
......
......@@ -21,9 +21,9 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestCloudRoleMaker(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_PSERVERS"] = "127.0.0.1,127.0.0.2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_tr_rolemaker(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
......@@ -39,6 +39,7 @@ class TestCloudRoleMaker(unittest.TestCase):
def test_ps_rolemaker(self):
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
ro.generate_role()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册