未验证 提交 b09f4d7f 编写于 作者: H Haohongxiang 提交者: GitHub

Add no_sync in data parallel for dynamic graph (#34740)

* Add no_sync in data parallel for dynamic graph

* modify UT of no_sync

* delete test_parallel_dygraph_dataparallel_no_sync.py

* add test_parallel_dygraph_no_sync.py

* modify run_trainer_with_spawn in UTs

* Add UT of complex control flow in no_sync

* add specific descriptions and notes for no_sync

* check code style

* modify UT's TIMEOUT in CMakeLists.txt
上级 a332352a
...@@ -527,6 +527,7 @@ void Reducer::TraverseBackwardGraph( ...@@ -527,6 +527,7 @@ void Reducer::TraverseBackwardGraph(
void Reducer::PrepareForBackward( void Reducer::PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) { const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward."; VLOG(3) << "after forward, then reset count for backward.";
grad_need_hooks_ = true;
next_group_ = 0; next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) { std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size(); group.pending_ = group.variable_indices_.size();
...@@ -599,6 +600,11 @@ void Reducer::AddDistHook(size_t var_index) { ...@@ -599,6 +600,11 @@ void Reducer::AddDistHook(size_t var_index) {
"than %d, but it is %d", "than %d, but it is %d",
variable_locators_.size(), var_index)); variable_locators_.size(), var_index));
// gradient synchronization is not required when grad_need_hooks_ is false.
if (!grad_need_hooks_) {
return;
}
VLOG(3) << "Var[" << var_index << "] [" VLOG(3) << "Var[" << var_index << "] ["
<< vars_[var_index]->GradVarBase()->Name() << vars_[var_index]->GradVarBase()->Name()
<< "] arrived and triggered disthook"; << "] arrived and triggered disthook";
...@@ -692,8 +698,8 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { ...@@ -692,8 +698,8 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
auto var_base = vars_[var_index]->GradVarBase(); auto var_base = vars_[var_index]->GradVarBase();
auto tensor = auto tensor =
var_base->MutableVar()->GetMutable<framework::LoDTensor>(); var_base->MutableVar()->GetMutable<framework::LoDTensor>();
TensorCopy(*tensor, place_, *dev_ctx, &group_tensor); group_tensor.ShareDataWith(*tensor).Resize(
group_tensor.Resize({static_cast<int64_t>(length)}); {static_cast<int64_t>(length)});
} else { } else {
group_tensor.Resize({static_cast<int64_t>(length)}); group_tensor.Resize({static_cast<int64_t>(length)});
operators::math::set_constant(*dev_ctx, &group_tensor, 0.0); operators::math::set_constant(*dev_ctx, &group_tensor, 0.0);
...@@ -907,6 +913,10 @@ void Reducer::ProcessUnusedDenseVars() { ...@@ -907,6 +913,10 @@ void Reducer::ProcessUnusedDenseVars() {
// 3. create grad var base or get grad var base // 3. create grad var base or get grad var base
auto grad_var_base_tmp = dest_var_base->MutableGradVarBase(); auto grad_var_base_tmp = dest_var_base->MutableGradVarBase();
// NOTE(haohongxiang): Calling SetIsEmpty here is to make sure that
// gradient accumulation can continue normally after clear_gradients()
// especiall in cases including complex control flow.
grad_var_base_tmp->SharedVar()->SetIsEmpty(false);
// 4. set grad tensor // 4. set grad tensor
auto *dest_grad_tensor = auto *dest_grad_tensor =
...@@ -942,6 +952,7 @@ bool Reducer::HasGrad(size_t var_index) { ...@@ -942,6 +952,7 @@ bool Reducer::HasGrad(size_t var_index) {
void Reducer::FinalizeBackward() { void Reducer::FinalizeBackward() {
groups_need_finalize_ = false; groups_need_finalize_ = false;
grad_need_hooks_ = false;
#ifdef PADDLE_WITH_XPU_BKCL #ifdef PADDLE_WITH_XPU_BKCL
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
......
...@@ -209,6 +209,12 @@ class Reducer { ...@@ -209,6 +209,12 @@ class Reducer {
std::condition_variable cv_; std::condition_variable cv_;
#endif #endif
// grad_need_hooks_ is used to mark whether gradient synchronization is
// required across process. The default value is false. When backward()
// is called, grad_need_hooks_ will be assigned to true during preparation
// of backward and revert to false while finalizing backward.
bool grad_need_hooks_{false};
// it just for checking hook, each parameter can only trigger one hook // it just for checking hook, each parameter can only trigger one hook
std::vector<bool> vars_marked_ready_; std::vector<bool> vars_marked_ready_;
......
...@@ -19,6 +19,7 @@ import warnings ...@@ -19,6 +19,7 @@ import warnings
from collections import OrderedDict from collections import OrderedDict
import itertools import itertools
import warnings import warnings
from contextlib import contextmanager
import paddle import paddle
from paddle.fluid import core from paddle.fluid import core
...@@ -483,6 +484,7 @@ class DataParallel(layers.Layer): ...@@ -483,6 +484,7 @@ class DataParallel(layers.Layer):
self._layers = layers self._layers = layers
self.find_unused_parameters = find_unused_parameters self.find_unused_parameters = find_unused_parameters
self.grad_need_sync = True
# NOTE(chenweihang): The ParallelStrategy here is not strictly a strategy. # NOTE(chenweihang): The ParallelStrategy here is not strictly a strategy.
# It just stores some environment variables, which can be constructed by # It just stores some environment variables, which can be constructed by
...@@ -576,9 +578,55 @@ class DataParallel(layers.Layer): ...@@ -576,9 +578,55 @@ class DataParallel(layers.Layer):
return itertools.chain(*map(self._find_varbase, obj.values())) return itertools.chain(*map(self._find_varbase, obj.values()))
return [] return []
@contextmanager
def no_sync(self):
"""
A context manager to stop gradient synchronization. Within no_sync(),
gradients of parameters will only be accumulated on model and not
synchronized util the first forward-backward out of this context.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.nn as nn
import paddle.distributed as dist
class SimpleNet(nn.Layer):
def __init__(self):
super(SimpleNet, self).__init__()
self._linear = nn.Linear(10, 1)
def forward(self, x):
return self._linear(x)
dist.init_parallel_env()
model = SimpleNet()
dp_model = paddle.DataParallel(model)
inputs_1 = paddle.randn([10, 10], 'float32')
inputs_2 = paddle.ones([10, 10], 'float32')
with dp_model.no_sync():
# gradients will not be synchronized
dp_model(inputs_1).backward()
# synchronization happens here
dp_model(inputs_2).backward()
"""
tmp_grad_need_sync = self.grad_need_sync
self.grad_need_sync = False
try:
yield
finally:
self.grad_need_sync = tmp_grad_need_sync
def forward(self, *inputs, **kwargs): def forward(self, *inputs, **kwargs):
outputs = self._layers(*inputs, **kwargs) outputs = self._layers(*inputs, **kwargs)
if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad: if self._strategy.nranks > 1 and framework._dygraph_tracer(
)._has_grad and self.grad_need_sync:
self._reducer.prepare_for_backward( self._reducer.prepare_for_backward(
list(self._find_varbase(outputs))) list(self._find_varbase(outputs)))
return outputs return outputs
......
...@@ -24,6 +24,8 @@ list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) ...@@ -24,6 +24,8 @@ list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
list(APPEND DIST_TEST_OPS test_gen_nccl_id_op) list(APPEND DIST_TEST_OPS test_gen_nccl_id_op)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables) list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow) list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync_gradient_check)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel) list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_parallel) list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_parallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_tensor_parallel) list(APPEND DIST_TEST_OPS test_parallel_dygraph_tensor_parallel)
...@@ -191,6 +193,8 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM)) ...@@ -191,6 +193,8 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM))
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer)
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync_gradient_check)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_parallel) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_parallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_tensor_parallel) list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_tensor_parallel)
...@@ -910,6 +914,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL) ...@@ -910,6 +914,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
set_tests_properties(test_parallel_dygraph_dataparallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_dataparallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_no_sync_gradient_check PROPERTIES TIMEOUT 30)
set_tests_properties(test_parallel_dygraph_pipeline_parallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_pipeline_parallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT 200) set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120) set_tests_properties(test_parallel_dygraph_sharding_parallel PROPERTIES TIMEOUT 120)
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import contextlib
import unittest
import numpy as np
import six
import pickle
import random
import paddle
import paddle.fluid as fluid
import paddle.distributed as dist
import paddle.fluid.dygraph as dygraph
from paddle.fluid import core
from paddle.fluid.dygraph.nn import Linear
from test_dist_base import print_to_err, print_to_out, runtime_main, TestParallelDyGraphRunnerBase
seed = 90
RUN_STEP = 20
batch_size = 4
batch_num = 1000
class SimpleNet(fluid.Layer):
def __init__(self):
super(SimpleNet, self).__init__()
self.net_a = Linear(input_dim=10, output_dim=20)
self.net_b = Linear(input_dim=20, output_dim=5)
self.net_c = Linear(input_dim=5, output_dim=10)
def forward(self, x):
x = self.net_a(x)
x = self.net_b(x)
x = self.net_c(x)
return x
class TestNoSync(TestParallelDyGraphRunnerBase):
def get_model(self):
model = SimpleNet()
train_reader = paddle.batch(
fake_sample_reader(), batch_size=batch_size, drop_last=True)
optimizer = paddle.optimizer.SGD(learning_rate=0.001,
parameters=model.parameters())
return model, train_reader, optimizer
def run_one_loop(self, model, optimizer, batch):
x_data = np.array([x for x in batch])
x_data = x_data.reshape((-1, 10))
x = paddle.to_tensor(x_data)
out = model(x)
loss = out.sum() / len(batch)
return loss
def run_trainer(self, args):
if fluid.core.is_compiled_with_cuda():
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(device_id)
else:
assert ("Only support CUDAPlace for now.")
with fluid.dygraph.guard(place):
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed(seed)
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
dist.init_parallel_env()
print_to_err(
type(self).__name__,
"begin to prepare context in dygraph with nccl2")
if not args.find_unused_parameters:
model = paddle.DataParallel(
model, find_unused_parameters=False)
else:
model = paddle.DataParallel(
model, find_unused_parameters=True)
print_to_err(type(self).__name__, "model built in dygraph")
out_losses = []
print_to_err(type(self).__name__, "begin to run dygraph training")
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
if step_id % 3 != 0:
if args.update_method == "nccl2":
with model.no_sync():
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
opt.minimize(loss)
print_to_err(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy())
if not args.accumulate_gradient:
model.clear_gradients()
print_to_out(out_losses)
def run_trainer_with_spawn(self, args):
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed(seed)
args.trainer_id = dist.get_rank()
if args.update_method == "nccl2":
dist.init_parallel_env()
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
if args.find_unused_parameters:
model = paddle.DataParallel(model, find_unused_parameters=True)
else:
model = paddle.DataParallel(model, find_unused_parameters=False)
out_losses = []
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
if step_id % 3 != 0:
if args.update_method == "nccl2":
with model.no_sync():
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
opt.minimize(loss)
print_to_err(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy())
model.clear_gradients()
print_to_out(out_losses)
return out_losses
def fake_sample_reader():
def __reader__():
for i in range(batch_num):
x_data = np.random.random_sample((10, )).astype('float32')
yield x_data
return __reader__
if __name__ == "__main__":
runtime_main(TestNoSync)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import contextlib
import unittest
import numpy as np
import six
import pickle
import random
import paddle
import paddle.fluid as fluid
import paddle.distributed as dist
import paddle.fluid.dygraph as dygraph
from paddle.fluid import core
from paddle.fluid.dygraph.nn import Linear
from test_dist_base import print_to_err, print_to_out, runtime_main, TestParallelDyGraphRunnerBase
seed = 90
RUN_STEP = 20
batch_size = 4
batch_num = 1000
class SimpleNetControlFlow(fluid.Layer):
def __init__(self):
super(SimpleNetControlFlow, self).__init__()
self.net_a = Linear(input_dim=10, output_dim=20)
self.net_b = Linear(input_dim=20, output_dim=5)
self.net_c = Linear(input_dim=5, output_dim=10)
self.step = 0
def forward(self, x):
self.step = self.step + 1
x = self.net_a(x)
if self.step > 10:
x.stop_gradient = True
x = self.net_b(x)
x = self.net_c(x)
return x
class TestNoSyncControlFlow(TestParallelDyGraphRunnerBase):
def get_model(self):
model = SimpleNetControlFlow()
train_reader = paddle.batch(
fake_sample_reader(), batch_size=batch_size, drop_last=True)
optimizer = paddle.optimizer.SGD(learning_rate=0.001,
parameters=model.parameters())
return model, train_reader, optimizer
def run_one_loop(self, model, optimizer, batch):
x_data = np.array([x for x in batch])
x_data = x_data.reshape((-1, 10))
x = paddle.to_tensor(x_data)
out = model(x)
loss = out.sum() / len(batch)
return loss
def run_trainer(self, args):
if fluid.core.is_compiled_with_cuda():
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(device_id)
else:
assert ("Only support CUDAPlace for now.")
with fluid.dygraph.guard(place):
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed(seed)
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
dist.init_parallel_env()
print_to_err(
type(self).__name__,
"begin to prepare context in dygraph with nccl2")
if not args.find_unused_parameters:
model = paddle.DataParallel(
model, find_unused_parameters=False)
else:
model = paddle.DataParallel(
model, find_unused_parameters=True)
print_to_err(type(self).__name__, "model built in dygraph")
out_losses = []
print_to_err(type(self).__name__, "begin to run dygraph training")
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
if step_id % 3 != 0:
if args.update_method == "nccl2":
with model.no_sync():
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
opt.minimize(loss)
print_to_err(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy())
if not args.accumulate_gradient:
model.clear_gradients()
print_to_out(out_losses)
def run_trainer_with_spawn(self, args):
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed(seed)
args.trainer_id = dist.get_rank()
if args.update_method == "nccl2":
dist.init_parallel_env()
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
model = paddle.DataParallel(model, find_unused_parameters=True)
out_losses = []
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
if step_id % 3 != 0:
if args.update_method == "nccl2":
with model.no_sync():
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
opt.minimize(loss)
print_to_err(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy())
model.clear_gradients()
print_to_out(out_losses)
return out_losses
def fake_sample_reader():
def __reader__():
for i in range(batch_num):
x_data = np.random.random_sample((10, )).astype('float32')
yield x_data
return __reader__
if __name__ == "__main__":
runtime_main(TestNoSyncControlFlow)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import unittest
import paddle
import numpy as np
import paddle.distributed as dist
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear
paddle.seed(1024)
np.random.seed(2021)
batch = 1
in_dim = 10
out_dim = 20
class SimpleNet(fluid.Layer):
def __init__(self, train_id):
super(SimpleNet, self).__init__()
self.w1 = self.create_parameter(
shape=[in_dim, out_dim], dtype="float32")
self.w2 = self.create_parameter(
shape=[in_dim, out_dim], dtype="float32")
self.share_net = Linear(out_dim, 1)
self.unused_param = self.create_parameter(
shape=[out_dim, in_dim], dtype="float32")
# just for test sync_params_buffers
self.register_buffer("queue", paddle.randn([10, 5]))
self.queue = paddle.nn.functional.normalize(self.queue, axis=0)
self.register_buffer("queue_ptr", paddle.zeros([1], 'int64'))
self.trainer_id = train_id
def forward(self, x):
is_use = (paddle.equal_all(
x, paddle.ones(shape=(batch, in_dim))).numpy()[0] and
self.trainer_id == 1)
if is_use:
tmp = paddle.matmul(x, self.w1)
else:
tmp = paddle.matmul(x, self.w2)
return self.share_net(tmp)
class TestDistTraning(unittest.TestCase):
def test_multiple_gpus(self):
self.trainer_id = dist.get_rank()
dist.init_parallel_env()
model_a = SimpleNet(self.trainer_id)
model_b = SimpleNet(self.trainer_id)
state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict)
model_a = paddle.DataParallel(model_a, find_unused_parameters=True)
model_b = paddle.DataParallel(model_b, find_unused_parameters=True)
ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True
for step_id in range(1, 31):
random_input = paddle.rand(shape=(batch, in_dim))
random_input.stop_gradient = True
if step_id % 5 != 0:
with model_a.no_sync():
self.dp_layer(step_id, model_a, model_b, random_input,
ones_input)
else:
self.dp_layer(step_id, model_a, model_b, random_input,
ones_input)
self.check_gradient(model_a.parameters())
self.check_gradient(model_b.parameters())
self.check_acc(model_a._layers.w1.grad, model_b._layers.w1.grad)
self.check_acc(model_a._layers.w2.grad, model_b._layers.w2.grad)
model_a.clear_gradients()
model_b.clear_gradients()
def dp_layer(self, step_id, model_a, model_b, random_input, ones_input):
if step_id % 2 == 0:
out_a = model_a(random_input)
out_b = model_b(random_input)
else:
out_a = model_a(ones_input)
out_b = model_b(ones_input)
out_a.sum().backward()
out_b.sum().backward()
def check_acc(self, grad, acc_grad):
grad = grad.numpy() if grad is not None else None
acc_grad = acc_grad.numpy() if acc_grad is not None else None
return np.testing.assert_allclose(grad, acc_grad, rtol=1e-6)
def print_trainer_0(self, *args):
if self.trainer_id == 0:
print(*args)
def broadcast_param(self, param, root):
paddle.distributed.broadcast(param, root)
return param
def check_gradient(self, params):
other_param = []
for param in params:
if param.trainable and (param._grad_ivar() is not None):
grad = param._grad_ivar()
other_grad = self.broadcast_param(grad.clone(), root=1)
if self.trainer_id == 0:
np.testing.assert_allclose(other_grad.numpy(), grad.numpy())
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import contextlib
import unittest
import numpy as np
import six
import pickle
import random
import paddle
import paddle.fluid as fluid
import paddle.distributed as dist
import paddle.fluid.dygraph as dygraph
from paddle.fluid import core
from paddle.fluid.dygraph.nn import Linear
from test_dist_base import print_to_err, print_to_out, runtime_main, TestParallelDyGraphRunnerBase
seed = 90
RUN_STEP = 20
batch_size = 4
batch_num = 1000
class SimpleNetUnusedParam(fluid.Layer):
def __init__(self):
super(SimpleNetUnusedParam, self).__init__()
self.net_a = Linear(input_dim=10, output_dim=20)
self.net_b = Linear(input_dim=20, output_dim=5)
self.net_c = Linear(input_dim=5, output_dim=10)
self.net_d = Linear(input_dim=20, output_dim=10)
def forward(self, x):
x = self.net_a(x)
x.stop_gradient = True
x = self.net_b(x)
x = self.net_c(x)
return x
class TestNoSyncUnusedParam(TestParallelDyGraphRunnerBase):
def get_model(self):
model = SimpleNetUnusedParam()
train_reader = paddle.batch(
fake_sample_reader(), batch_size=batch_size, drop_last=True)
optimizer = paddle.optimizer.SGD(learning_rate=0.001,
parameters=model.parameters())
return model, train_reader, optimizer
def run_one_loop(self, model, optimizer, batch):
x_data = np.array([x for x in batch])
x_data = x_data.reshape((-1, 10))
x = paddle.to_tensor(x_data)
out = model(x)
loss = out.sum() / len(batch)
return loss
def run_trainer(self, args):
if fluid.core.is_compiled_with_cuda():
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(device_id)
else:
assert ("Only support CUDAPlace for now.")
with fluid.dygraph.guard(place):
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed(seed)
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
dist.init_parallel_env()
print_to_err(
type(self).__name__,
"begin to prepare context in dygraph with nccl2")
if not args.find_unused_parameters:
model = paddle.DataParallel(
model, find_unused_parameters=False)
else:
model = paddle.DataParallel(
model, find_unused_parameters=True)
print_to_err(type(self).__name__, "model built in dygraph")
out_losses = []
print_to_err(type(self).__name__, "begin to run dygraph training")
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
if step_id % 3 != 0:
if args.update_method == "nccl2":
with model.no_sync():
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
opt.minimize(loss)
print_to_err(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy())
if not args.accumulate_gradient:
model.clear_gradients()
print_to_out(out_losses)
def run_trainer_with_spawn(self, args):
paddle.disable_static()
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed(seed)
args.trainer_id = dist.get_rank()
if args.update_method == "nccl2":
dist.init_parallel_env()
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
if args.find_unused_parameters:
model = paddle.DataParallel(model, find_unused_parameters=True)
else:
model = paddle.DataParallel(model, find_unused_parameters=False)
out_losses = []
for step_id, data in enumerate(train_reader()):
data = self._get_data(data, args)
if step_id == RUN_STEP:
break
if step_id % 3 != 0:
if args.update_method == "nccl2":
with model.no_sync():
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
else:
loss = self.run_one_loop(model, opt, data)
loss.backward()
opt.minimize(loss)
print_to_err(
type(self).__name__,
"loss at step %d: %f" % (step_id, loss.numpy()))
out_losses.append(loss.numpy())
model.clear_gradients()
print_to_out(out_losses)
return out_losses
def fake_sample_reader():
def __reader__():
for i in range(batch_num):
x_data = np.random.random_sample((10, )).astype('float32')
yield x_data
return __reader__
if __name__ == "__main__":
runtime_main(TestNoSyncUnusedParam)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import sys
import unittest
import paddle.fluid as fluid
from test_dist_base import TestDistBase
from spawn_runner_base import TestDistSpawnRunner
from parallel_dygraph_no_sync import TestNoSync
from parallel_dygraph_no_sync_unused_params import TestNoSyncUnusedParam
from parallel_dygraph_no_sync_control_flow import TestNoSyncControlFlow
flag_name = os.path.splitext(__file__)[0]
class TestParallelDygraphNoSync(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = False
def test_no_sync(self):
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"parallel_dygraph_no_sync.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
class TestParallelDygraphNoSyncUnusedParam(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True
def test_no_sync_ununsed_param(self):
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"parallel_dygraph_no_sync_unused_params.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
class TestParallelDygraphNoSyncControlFlow(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True
def test_no_sync_control_flow(self):
if fluid.core.is_compiled_with_cuda():
self.check_with_place(
"parallel_dygraph_no_sync_control_flow.py",
delta=1e-5,
check_error_log=True,
log_name=flag_name)
class TestParallelDygraphNoSyncSpawn(TestDistSpawnRunner):
def test_no_sync_with_spawn(self):
if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4):
self.check_dist_result_with_spawn(test_class=TestNoSync, delta=1e-5)
class TestParallelDygraphNoSyncUnusedParamSpawn(TestDistSpawnRunner):
def test_no_sync_with_spawn(self):
if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4):
self.check_dist_result_with_spawn(
test_class=TestNoSyncUnusedParam, delta=1e-5)
class TestParallelDygraphNoSyncControlFlowSpawn(TestDistSpawnRunner):
def test_no_sync_with_spawn(self):
if fluid.core.is_compiled_with_cuda() and sys.version_info >= (3, 4):
self.check_dist_result_with_spawn(
test_class=TestNoSyncControlFlow, delta=1e-5)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import paddle.fluid as fluid
from test_parallel_dygraph_dataparallel import TestMultipleGpus
class TestModelParallelLayer(TestMultipleGpus):
def test_parallel_dygraph_dataparallel_no_sync(self):
self.run_mnist_2gpu('parallel_dygraph_no_sync_gradient_check.py')
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册