未验证 提交 cd48bdad 编写于 作者: M mapingshuo 提交者: GitHub

add feature to fleet2.0 role_maker, distribute_strategy, test=develop (#26267)

* add feature to fleet2.0 role_maker, distribute_strategy, test=develop
上级 4ec51e02
......@@ -81,6 +81,8 @@ class DistributedJobInfo(object):
class DistributedStrategy(object):
__lock_attr = False
def __init__(self):
"""
DistributedStrategy is the main configuration entry for distributed training of Paddle.
......@@ -95,6 +97,13 @@ class DistributedStrategy(object):
"""
self.strategy = distributed_strategy_pb2.DistributedStrategy()
self.__lock_attr = True
def __setattr__(self, key, value):
if self.__lock_attr and not hasattr(self, key):
raise TypeError("%s is not a attribute of %s" %
(key, self.__class__.__name__))
object.__setattr__(self, key, value)
def save_to_prototxt(self, output):
"""
......
......@@ -110,6 +110,14 @@ class RoleMakerBase(object):
"""
raise NotImplementedError("Please implement this method in child class")
def node_num(self):
"""
Get the training node number
Returns:
int: node num
"""
raise NotImplementedError("Please implement this method in child class")
def get_trainer_endpoints(self):
"""
return trainer endpoints
......@@ -286,6 +294,14 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self.generate_role()
return self._trainers_num
def node_num(self):
"""
return the training node number
"""
if not self._role_is_generated:
self.generate_role()
return self._node_num
def get_trainer_endpoints(self):
"""
get endpoint of all trainers
......@@ -353,6 +369,8 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._trainers_num = trainers_num
self._role = role
self._current_id = current_id
self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints]))
def _collective_env(self):
self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
......@@ -363,6 +381,8 @@ class PaddleCloudRoleMaker(RoleMakerBase):
assert self._worker_endpoints is not None, "can't find PADDLE_TRAINER_ENDPOINTS"
self._worker_endpoints = self._worker_endpoints.split(",")
self._trainers_num = len(self._worker_endpoints)
self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints]))
def _init_gloo_env(self):
def init_gloo_instance(role="trainer"):
......@@ -513,12 +533,16 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
self._cur_endpoint = self._worker_endpoints[self._current_id]
elif self._role == Role.SERVER:
self._cur_endpoint = self._server_endpoints[self._current_id]
self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints]))
def _user_defined_collective_env(self):
self._worker_endpoints = self._kwargs.get("worker_endpoints")
self._current_id = self._kwargs.get("current_id")
self._trainers_num = len(self._worker_endpoints)
self._training_role = Role.Worker
self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints]))
def generate_role(self):
"""
......
......@@ -119,18 +119,26 @@ class GraphExecutionOptimizer(MetaOptimizerBase):
local_build_strategy.nccl_comm_num = \
dist_strategy.nccl_comm_num
if self.user_defined_strategy.recompute == True:
logging.warn(
"set enable_sequential_execution=True since you have enable the recompute strategy"
)
local_build_strategy.enable_sequential_execution = True
exe_strategy = self.user_defined_strategy.execution_strategy
node_num = self.role_maker.worker_num()
worker_num = self.role_maker.worker_num()
node_num = self.role_maker.node_num()
if self.role_maker._is_collective:
assert node_num >= 1, "nccl2 node_num must >= 1, now:{}" % node_num
assert worker_num >= 1, "nccl2 worker_num must >= 1, now:{}" % worker_num
if node_num <= 1:
if worker_num <= 1:
# local mode
if local_build_strategy.nccl_comm_num > 1:
logging.warn("set nccl_comm_num=1 since you only have 1 node.")
local_build_strategy.nccl_comm_num = 1
if node_num <= 1:
if local_build_strategy.use_hierarchical_allreduce:
logging.warn(
"set hierachical_allreduce=False since you only have 1 node."
......
......@@ -30,7 +30,8 @@ class RecomputeOptimizer(MetaOptimizerBase):
user_defined_strategy):
super(RecomputeOptimizer, self)._set_basic_info(
loss, role_maker, user_defined_optimizer, user_defined_strategy)
self.wrapped_opt._set_checkpoints([])
self.wrapped_opt._set_checkpoints(
list(user_defined_strategy.recompute_configs["checkpoints"]))
def _can_apply(self):
if self.user_defined_strategy.recompute == True:
......
......@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import unittest
import paddle
import os
......@@ -23,8 +25,6 @@ class TestFleetAMPOptimizer(unittest.TestCase):
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
def test_amp_optimizer(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
input_x = paddle.fluid.layers.data(
......
......@@ -14,6 +14,8 @@
import unittest
import paddle
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import os
......@@ -26,67 +28,49 @@ class TestFleetBase(unittest.TestCase):
"127.0.0.1:36001,127.0.0.2:36001"
def test_init(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
def test_is_first_worker(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_first_worker():
print("test fleet first worker done.")
def test_worker_index(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
print(fleet.worker_index())
def test_worker_num(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
print(fleet.worker_num())
def test_is_worker(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
print("test fleet is worker")
def test_worker_endpoints(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
print(fleet.worker_endpoints(to_string=True))
def test_server_num(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_server():
print("fleet server num: {}".format(fleet.server_num()))
def test_server_index(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_server():
print("fleet server index: {}".format(fleet.server_index()))
def test_server_endpoints(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_server():
......@@ -94,55 +78,41 @@ class TestFleetBase(unittest.TestCase):
fleet.server_endpoints(to_string=True)))
def test_is_server(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_server():
print("test fleet is server")
def test_util(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
self.assertEqual(fleet.util, None)
def test_barrier_worker(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.barrier_worker()
def test_init_worker(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.init_worker()
def test_run_server(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.run_worker()
def test_stop_worker(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
if fleet.is_worker():
fleet.stop_worker()
def test_distributed_optimizer(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
......@@ -150,10 +120,6 @@ class TestFleetBase(unittest.TestCase):
optimizer = fleet.distributed_optimizer(optimizer)
def test_minimize(self):
import paddle
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
......
......@@ -17,7 +17,7 @@ import paddle
from paddle import fluid
import os
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
class TestFleetDGCOptimizer(unittest.TestCase):
......
......@@ -289,6 +289,11 @@ class TestStrategyConfig(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.execution_strategy = exe_strategy
def test_unknown_strategy(self):
strategy = paddle.distributed.fleet.DistributedStrategy()
with self.assertRaises(TypeError):
strategy.unknown_key = 'UNK'
if __name__ == '__main__':
unittest.main()
......@@ -16,7 +16,7 @@ import unittest
import paddle
import os
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
......
......@@ -14,6 +14,8 @@
import unittest
import paddle
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import os
from launch_function_helper import launch_func
......@@ -39,8 +41,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
}
def node_func():
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
input_x = paddle.fluid.layers.data(
......
......@@ -17,7 +17,7 @@ import paddle
from paddle import fluid
import os
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
class TestFleetLambMetaOptimizer(unittest.TestCase):
......
......@@ -17,7 +17,7 @@ import paddle
from paddle import fluid
import os
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
class TestFleetLarsMetaOptimizer(unittest.TestCase):
......
......@@ -17,7 +17,7 @@ import paddle
import os
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
class TestFleetLocalSGDMetaOptimizer(unittest.TestCase):
......
......@@ -25,7 +25,7 @@ class TestFleetMetaOptimizer(unittest.TestCase):
def test_pipeline_optimizer(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
with paddle.fluid.device_guard("cpu"):
......
......@@ -27,7 +27,7 @@ class TestFleetRecomputeMetaOptimizer(unittest.TestCase):
def test_recompute_optimizer(self):
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
input_x = paddle.fluid.layers.data(
......@@ -43,7 +43,7 @@ class TestFleetRecomputeMetaOptimizer(unittest.TestCase):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.recompute = True
strategy.recompute_configs = {"checkpoints": ["fc2"]}
strategy.recompute_configs = {"checkpoints": ["fc_1.tmp_0"]}
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
......
......@@ -34,6 +34,7 @@ class TestRoleMakerBase(unittest.TestCase):
self.assertRaises(Exception, role.worker_index)
self.assertRaises(Exception, role.server_index)
self.assertRaises(Exception, role.role_id)
self.assertRaises(Exception, role.node_num)
trainer_endpoints = role.get_trainer_endpoints()
self.assertTrue(len(trainer_endpoints) == 0)
......@@ -80,10 +81,12 @@ class TestCloudRoleMaker(unittest.TestCase):
worker_endpoints = ro.get_trainer_endpoints()
self.assertEqual(worker_endpoints[0], '127.0.0.1:36001')
self.assertEqual(ro.role_id(), 0)
self.assertEqual(ro.node_num(), 2)
def test_tr_rolemaker_collective(self):
ro = role_maker.PaddleCloudRoleMaker(is_collective=True)
self.assertEqual(ro.worker_num(), 2)
self.assertEqual(ro.node_num(), 2)
def test_ps_rolemaker(self):
"""Test ps rolemaker."""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册