未验证 提交 6b4ca0d7 编写于 作者: D danleifeng 提交者: GitHub

【paddle.fleet】distributed_optimizer supports dygraph (#26541)

paddle.distributed.fleet supports dynamic graph execution.
上级 c8cc0945
...@@ -50,3 +50,10 @@ distributed_optimizer = fleet.distributed_optimizer ...@@ -50,3 +50,10 @@ distributed_optimizer = fleet.distributed_optimizer
save_inference_model = fleet.save_inference_model save_inference_model = fleet.save_inference_model
save_persistables = fleet.save_persistables save_persistables = fleet.save_persistables
minimize = fleet.minimize minimize = fleet.minimize
distributed_model = fleet.distributed_model
step = fleet.step
clear_grad = fleet.clear_grad
set_lr = fleet.set_lr
get_lr = fleet.get_lr
state_dict = fleet.state_dict
set_state_dict = fleet.set_state_dict
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from __future__ import print_function from __future__ import print_function
import warnings import warnings
import paddle import paddle
from paddle.fluid.framework import dygraph_only
from paddle.fluid import compiler from paddle.fluid import compiler
from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase
from .strategy_compiler import StrategyCompiler from .strategy_compiler import StrategyCompiler
...@@ -23,6 +24,7 @@ from .meta_optimizer_factory import MetaOptimizerFactory ...@@ -23,6 +24,7 @@ from .meta_optimizer_factory import MetaOptimizerFactory
from .runtime_factory import RuntimeFactory from .runtime_factory import RuntimeFactory
from .util_factory import UtilFactory from .util_factory import UtilFactory
from paddle.fluid.wrapped_decorator import wrap_decorator from paddle.fluid.wrapped_decorator import wrap_decorator
from paddle.fluid.dygraph import parallel_helper
def _inited_runtime_handler_(func): def _inited_runtime_handler_(func):
...@@ -178,6 +180,12 @@ class Fleet(object): ...@@ -178,6 +180,12 @@ class Fleet(object):
"`role_maker` should be subclass of `RoleMakerBase`, but got {}". "`role_maker` should be subclass of `RoleMakerBase`, but got {}".
format(type(role_maker))) format(type(role_maker)))
self.strategy_compiler = StrategyCompiler() self.strategy_compiler = StrategyCompiler()
if paddle.fluid.framework.in_dygraph_mode():
if parallel_helper._is_parallel_ctx_initialized():
warnings.warn(
"The dygraph parallel environment has been initialized.")
else:
paddle.distributed.init_parallel_env()
return None return None
def is_first_worker(self): def is_first_worker(self):
...@@ -587,12 +595,344 @@ class Fleet(object): ...@@ -587,12 +595,344 @@ class Fleet(object):
""" """
self.user_defined_optimizer = optimizer self.user_defined_optimizer = optimizer
if paddle.fluid.framework.in_dygraph_mode():
return self
if strategy == None: if strategy == None:
strategy = DistributedStrategy() strategy = DistributedStrategy()
self.user_defined_strategy = strategy self.user_defined_strategy = strategy
self.valid_strategy = None self.valid_strategy = None
return self return self
@dygraph_only
def distributed_model(self, model):
"""
Return dygraph distributed data parallel model (Layer)
Only work in dygraph mode
Examples:
.. code-block:: python
import paddle
import paddle.nn as nn
from paddle.distributed import fleet
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train():
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize fleet environment
fleet.init(is_collective=True)
# 3. create layer & optimizer
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())
# 4. get data_parallel model using fleet
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
# 5. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
print("loss:", loss.numpy())
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
paddle.distributed.spawn(train)
"""
assert model is not None
self.model = paddle.DataParallel(model)
return self.model
@dygraph_only
def state_dict(self):
"""
Get state dict information from optimizer.
Only work in dygraph mode
Returns:
state_dict(dict) : dict contains all the Tensor used by optimizer
Examples:
.. code-block:: python
import numpy as np
import paddle
from paddle.distributed import fleet
paddle.disable_static()
fleet.init(is_collective=True)
value = np.arange(26).reshape(2, 13).astype("float32")
a = paddle.fluid.dygraph.to_variable(value)
layer = paddle.nn.Linear(13, 5)
adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
state_dict = adam.state_dict()
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.state_dict()
@dygraph_only
def set_state_dict(self, state_dict):
"""
Load optimizer state dict.
Only work in dygraph mode
Args:
state_dict(dict) : Dict contains all the Tensor needed by optimizer
Returns: None
Examples:
.. code-block:: python
import numpy as np
import paddle
from paddle.distributed import fleet
paddle.disable_static()
fleet.init(is_collective=True)
value = np.arange(26).reshape(2, 13).astype("float32")
a = paddle.fluid.dygraph.to_variable(value)
layer = paddle.nn.Linear(13, 5)
adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
state_dict = adam.state_dict()
paddle.framework.save(state_dict, "paddle_dy")
para_state_dict, opti_state_dict = paddle.framework.load( "paddle_dy")
adam.set_state_dict(opti_state_dict)
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.set_state_dict(state_dict)
@dygraph_only
def set_lr(self, value):
"""
Set the value of the learning rate manually in the optimizer.
Only work in dygraph mode
Args:
value (float|Tensor): the value of learning rate
Returns: None
Examples:
.. code-block:: python
import numpy as np
import paddle
from paddle.distributed import fleet
paddle.disable_static()
fleet.init(is_collective=True)
value = np.arange(26).reshape(2, 13).astype("float32")
a = paddle.fluid.dygraph.to_variable(value)
layer = paddle.nn.Linear(13, 5)
adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
lr_list = [0.2, 0.3, 0.4, 0.5, 0.6]
for i in range(5):
adam.set_lr(lr_list[i])
lr = adam.get_lr()
print("current lr is {}".format(lr))
# Print:
# current lr is 0.2
# current lr is 0.3
# current lr is 0.4
# current lr is 0.5
# current lr is 0.6
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.set_lr(value)
@dygraph_only
def get_lr(self):
"""
Get current step learning rate.
Only work in dygraph mode
Returns:
float: The learning rate of the current step.
Examples:
.. code-block:: python
import numpy as np
import paddle
from paddle.distributed import fleet
paddle.disable_static()
fleet.init(is_collective=True)
value = np.arange(26).reshape(2, 13).astype("float32")
a = paddle.fluid.dygraph.to_variable(value)
layer = paddle.nn.Linear(13, 5)
adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
lr = adam.get_lr()
print(lr) # 0.01
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.get_lr()
@dygraph_only
def step(self):
"""
Execute the optimizer once.
Only work in dygraph mode
Returns: None
Examples:
.. code-block:: python
import paddle
import paddle.nn as nn
from paddle.distributed import fleet
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train():
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize fleet environment
fleet.init(is_collective=True)
# 3. create layer & optimizer
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())
# 4. get data_parallel model using fleet
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
# 5. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
print("loss:", loss.numpy())
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
paddle.distributed.spawn(train)
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.step()
@dygraph_only
def clear_grad(self):
"""
Execute the optimizer once.
Only work in dygraph mode
Returns: None
Examples:
.. code-block:: python
import paddle
import paddle.nn as nn
from paddle.distributed import fleet
class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train():
# 1. enable dynamic mode
paddle.disable_static()
# 2. initialize fleet environment
fleet.init(is_collective=True)
# 3. create layer & optimizer
layer = LinearNet()
loss_fn = nn.MSELoss()
adam = paddle.optimizer.Adam(
learning_rate=0.001, parameters=layer.parameters())
# 4. get data_parallel model using fleet
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
# 5. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
print("loss:", loss.numpy())
loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()
adam.step()
adam.clear_grad()
if __name__ == '__main__':
paddle.distributed.spawn(train)
"""
# imitate target optimizer retrieval
return self.user_defined_optimizer.clear_grad()
def minimize(self, def minimize(self,
loss, loss,
startup_program=None, startup_program=None,
...@@ -642,6 +982,11 @@ class Fleet(object): ...@@ -642,6 +982,11 @@ class Fleet(object):
# for more examples, please reference https://github.com/PaddlePaddle/FleetX # for more examples, please reference https://github.com/PaddlePaddle/FleetX
""" """
if paddle.fluid.framework.in_dygraph_mode():
# imitate target optimizer retrieval
target_opt = self.user_defined_optimizer
return target_opt.minimize(loss)
context = {} context = {}
# cache original feed forward program # cache original feed forward program
self.origin_main_program = loss.block.program self.origin_main_program = loss.block.program
......
...@@ -114,8 +114,8 @@ class TestMnist(TestParallelDyGraphRunnerBase): ...@@ -114,8 +114,8 @@ class TestMnist(TestParallelDyGraphRunnerBase):
model = MNIST() model = MNIST()
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=2, drop_last=True) paddle.dataset.mnist.train(), batch_size=2, drop_last=True)
opt = fluid.optimizer.Adam( opt = paddle.optimizer.Adam(
learning_rate=1e-3, parameter_list=model.parameters()) learning_rate=1e-3, parameters=model.parameters())
return model, train_reader, opt return model, train_reader, opt
def run_one_loop(self, model, opt, data): def run_one_loop(self, model, opt, data):
......
...@@ -488,6 +488,50 @@ class TestParallelDyGraphRunnerBase(object): ...@@ -488,6 +488,50 @@ class TestParallelDyGraphRunnerBase(object):
model.clear_gradients() model.clear_gradients()
return out_losses return out_losses
def run_gpu_fleet_api_trainer(self, args):
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
# 1. enable dygraph
paddle.disable_static()
# 2. init seed
seed = 90
paddle.static.default_startup_program().random_seed = seed
paddle.static.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed = seed
# get trainer id
args.trainer_id = paddle.distributed.get_rank()
# 3. init parallel env
if args.update_method == "nccl2":
fleet.init(is_collective=True)
# 4. train model
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
opt = fleet.distributed_optimizer(opt)
model = fleet.distributed_model(model)
out_losses = []
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
loss = self.run_one_loop(model, opt, data)
out_losses.append(loss.numpy())
if args.update_method == "nccl2":
loss = model.scale_loss(loss)
loss.backward()
if args.update_method == "nccl2":
model.apply_collective_grads()
opt.step()
opt.clear_grad()
print_to_out(out_losses)
def runtime_main(test_class): def runtime_main(test_class):
parser = argparse.ArgumentParser(description='Run dist test.') parser = argparse.ArgumentParser(description='Run dist test.')
...@@ -687,7 +731,8 @@ class TestDistBase(unittest.TestCase): ...@@ -687,7 +731,8 @@ class TestDistBase(unittest.TestCase):
envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '') envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
cmd += " -m coverage run --branch -p" cmd += " -m coverage run --branch -p"
cmd += " %s --role trainer --lr %f" % (model, self._lr) cmd += " %s --role trainer --update_method local --lr %f" % (model,
self._lr)
if batch_size != DEFAULT_BATCH_SIZE: if batch_size != DEFAULT_BATCH_SIZE:
cmd += " --batch_size %d" % batch_size cmd += " --batch_size %d" % batch_size
...@@ -850,6 +895,7 @@ class TestDistBase(unittest.TestCase): ...@@ -850,6 +895,7 @@ class TestDistBase(unittest.TestCase):
if self.__use_cuda: if self.__use_cuda:
tr_cmd += " --use_cuda" tr_cmd += " --use_cuda"
env.update({ env.update({
"FLAGS_selected_gpus": "{}".format(0),
"CUDA_VISIBLE_DEVICES": "{}".format(trainer_id % 2), "CUDA_VISIBLE_DEVICES": "{}".format(trainer_id % 2),
"PADDLE_TRAINERS_NUM": "{}".format(trainer_num), "PADDLE_TRAINERS_NUM": "{}".format(trainer_num),
"PADDLE_TRAINER_ID": "{}".format(trainer_id), "PADDLE_TRAINER_ID": "{}".format(trainer_id),
......
...@@ -126,6 +126,32 @@ class TestFleetBase(unittest.TestCase): ...@@ -126,6 +126,32 @@ class TestFleetBase(unittest.TestCase):
self.assertRaises(Exception, fleet.init_worker) self.assertRaises(Exception, fleet.init_worker)
class TestFleetDygraph(unittest.TestCase):
def setUp(self):
os.environ[
"PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36213,127.0.0.1:36214"
os.environ["PADDLE_CURRENT_ENDPOINTS"] = "127.0.0.1:36213"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_TRAINER_ID"] = "0"
def test_dygraph_method(self):
paddle.disable_static()
value = np.arange(26).reshape(2, 13).astype("float32")
a = fluid.dygraph.to_variable(value)
layer = paddle.nn.Linear(13, 5)
adam = paddle.optimizer.Adam(
learning_rate=0.01, parameters=layer.parameters())
# remove init cause this UT cannot launch distributed task
adam = fleet.distributed_optimizer(adam)
dp_layer = fleet.distributed_model(layer)
lr = 0.001
adam.set_lr(lr)
cur_lr = adam.get_lr()
assert (lr == cur_lr)
state_dict = adam.state_dict()
adam.set_state_dict(state_dict)
class TestFleetBaseSingleRunCollective(unittest.TestCase): class TestFleetBaseSingleRunCollective(unittest.TestCase):
def setUp(self): def setUp(self):
os.environ.pop("PADDLE_TRAINER_ENDPOINTS") os.environ.pop("PADDLE_TRAINER_ENDPOINTS")
......
...@@ -47,5 +47,21 @@ class TestParallelDygraphMnistSpawn(TestDistSpawnRunner): ...@@ -47,5 +47,21 @@ class TestParallelDygraphMnistSpawn(TestDistSpawnRunner):
self.check_dist_result_with_spawn(test_class=TestMnist, delta=1e-5) self.check_dist_result_with_spawn(test_class=TestMnist, delta=1e-5)
class TestFleetDygraphMnist(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._gpu_fleet_api = True
def test_mnist(self):
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"parallel_dygraph_mnist.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册