提交 c8911895 编写于 作者: C chengduoZH

update sparse gradient parameter with reduce and broadcast

上级 5ff1ef36
......@@ -37,25 +37,20 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale,
bool use_nccl_allreduce)
platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale)
: loss_var_name_(loss_var_name),
places_(places),
local_scopes_(local_scopes),
nccl_ctxs_(nccl_ctxs),
use_nccl_allreduce_(use_nccl_allreduce) {
nccl_ctxs_(nccl_ctxs) {
#else
MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes, bool use_default_grad_scale,
bool use_nccl_allreduce)
const std::vector<Scope *> &local_scopes, bool use_default_grad_scale)
: loss_var_name_(loss_var_name),
places_(places),
local_scopes_(local_scopes),
use_nccl_allreduce_(use_nccl_allreduce) {
local_scopes_(local_scopes) {
#endif
for (auto &p : params) {
grad_names_.insert(GradVarName(p));
......@@ -121,8 +116,8 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
std::unordered_map<std::string, std::vector<std::unique_ptr<VarHandle>>>>(
places_.size());
size_t cur_device_id = 0;
// size_t cur_device_id = 0;
size_t update_sparse_gp_device_id = 0;
std::vector<std::unordered_set<std::string>> var_name_on_devices;
std::vector<std::unordered_set<std::string>> bcast_var_name_set;
......@@ -162,14 +157,13 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
// broadcast, and each gradient is only broadcast once.
for (auto &og : op->OutputArgumentNames()) {
if (IsParameterGradientOnce(og, &og_has_been_broadcast)) {
if (use_nccl_allreduce_) {
InsertNCCLAllReduceOp(&result, og);
} else {
CreateReduceOp(&result, cur_device_id, og);
var_name_on_devices[cur_device_id].emplace(og);
bcast_var_name_set[cur_device_id].emplace(
if (IsSparseGradient(og)) {
CreateReduceOp(&result, update_sparse_gp_device_id, og);
var_name_on_devices[update_sparse_gp_device_id].emplace(og);
bcast_var_name_set[update_sparse_gp_device_id].emplace(
og.substr(0, og.size() - strlen(kGradVarSuffix)));
cur_device_id = (cur_device_id + 1) % places_.size();
} else {
InsertNCCLAllReduceOp(&result, og);
}
}
}
......@@ -205,13 +199,15 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
return std::unique_ptr<SSAGraph>(graph);
}
bool MultiDevSSAGraphBuilder::IsSparseGradient(const std::string &og) const {
auto og_var = local_scopes_[0]->FindVar(og);
PADDLE_ENFORCE_NOT_NULL(og_var);
return og_var->IsType<SelectedRows>();
}
int MultiDevSSAGraphBuilder::GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const {
if (use_nccl_allreduce_) {
return -1;
}
int var_dev_id = -1;
for (auto &var_name : op.InputArgumentNames()) {
if (var_dev_id != -1) break;
......
......@@ -36,13 +36,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
platform::NCCLContextMap *nccl_ctxs,
bool use_default_grad_scale, bool use_nccl_allreduce);
bool use_default_grad_scale);
#else
MultiDevSSAGraphBuilder(const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::unordered_set<std::string> &params,
const std::vector<Scope *> &local_scopes,
bool use_default_grad_scale, bool use_nccl_allreduce);
bool use_default_grad_scale);
#endif
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
......@@ -60,7 +60,6 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#ifdef PADDLE_WITH_CUDA
platform::NCCLContextMap *nccl_ctxs_;
#endif
bool use_nccl_allreduce_;
bool use_default_grad_scale_;
bool IsScaleLossOp(const OpDesc &op) const;
......@@ -99,6 +98,8 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
* nullptr if not found.
*/
OpDesc *GetSendOpDesc(const ProgramDesc &program) const;
bool IsSparseGradient(const std::string &og) const;
};
} // namespace details
} // namespace framework
......
......@@ -58,7 +58,7 @@ ParallelExecutor::ParallelExecutor(
const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay,
bool use_default_grad_scale, bool use_nccl_allreduce)
bool use_default_grad_scale)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
......@@ -93,11 +93,11 @@ ParallelExecutor::ParallelExecutor(
#ifdef PADDLE_WITH_CUDA
details::MultiDevSSAGraphBuilder builder(
member_->places_, loss_var_name, params, member_->local_scopes_,
member_->nccl_ctxs_.get(), use_default_grad_scale, use_nccl_allreduce);
member_->nccl_ctxs_.get(), use_default_grad_scale);
#else
details::MultiDevSSAGraphBuilder builder(
member_->places_, loss_var_name, params, member_->local_scopes_,
use_default_grad_scale, use_nccl_allreduce);
details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name,
params, member_->local_scopes_,
use_default_grad_scale);
#endif
auto graph = builder.Build(main_program);
......
......@@ -40,8 +40,7 @@ class ParallelExecutor {
const ProgramDesc& main_program,
const std::string& loss_var_name, Scope* scope,
const std::vector<Scope*>& local_scopes,
bool allow_op_delay, bool use_default_grad_scale,
bool use_nccl_allreduce);
bool allow_op_delay, bool use_default_grad_scale);
~ParallelExecutor();
......
......@@ -502,12 +502,11 @@ All parameter, weight, gradient are variables in Paddle.
const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, std::vector<Scope *> &local_scopes,
bool allow_op_delay, bool use_default_grad_scale,
bool use_nccl_allreduce) {
bool allow_op_delay, bool use_default_grad_scale) {
new (&self) ParallelExecutor(
num_threads, use_event, places, params, bcast_vars,
main_program, loss_var_name, scope, local_scopes,
allow_op_delay, use_default_grad_scale, use_nccl_allreduce);
allow_op_delay, use_default_grad_scale);
})
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
......
......@@ -30,8 +30,7 @@ class ParallelExecutor(object):
num_threads=None,
allow_op_delay=False,
share_vars_from=None,
use_default_grad_scale=True,
use_nccl_allreduce=True):
use_default_grad_scale=True):
"""
ParallelExecutor can run program in parallel.
......@@ -47,14 +46,6 @@ class ParallelExecutor(object):
improve performance in some cases, default False.
share_vars_from(ParallelExecutor, default None): If provied,
it will share variables from the specified ParallelExecutor.
use_nccl_allreduce(bool, default True): Whether to use nccl_allreduce
or not, if set True, the communication between different
devices by nccl allReduce, which doesn't support updating sparse
parameter, if set False, the communication between different
devices by reduce_op and broadcast_op, which will distribute all
the parameter gradients evenly to different device and updates
the parameters, and finally broadcast to other device, this method
support updating sparse parameter. Default True.
use_default_grad_scale(bool, default True): If set True, a default
scale value equal to `1./device_count` would be multiplied to
gradients of each device and scaled gradients would be
......@@ -138,8 +129,7 @@ class ParallelExecutor(object):
scope,
local_scopes,
allow_op_delay,
use_default_grad_scale,
use_nccl_allreduce)
use_default_grad_scale)
self.scope = scope
......
......@@ -205,8 +205,7 @@ class TestParallelExecutorBase(unittest.TestCase):
allow_op_delay=False,
feed_dict=None,
seed=None,
use_parallel_executor=True,
use_nccl_allreduce=True):
use_parallel_executor=True):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
......@@ -235,10 +234,7 @@ class TestParallelExecutorBase(unittest.TestCase):
if use_parallel_executor:
exe = fluid.ParallelExecutor(
True,
loss_name=loss.name,
allow_op_delay=allow_op_delay,
use_nccl_allreduce=use_nccl_allreduce)
True, loss_name=loss.name, allow_op_delay=allow_op_delay)
else:
exe = fluid.Executor(place=place)
......@@ -284,25 +280,20 @@ class TestMNIST(TestParallelExecutorBase):
fluid.recordio_writer.convert_reader_to_recordio_file(
'./mnist.recordio', reader, feeder)
def check_simple_fc_convergence(self, use_nccl_allreduce=True):
def check_simple_fc_convergence(self):
self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
simple_fc_net,
feed_dict={"image": img,
"label": label},
use_nccl_allreduce=use_nccl_allreduce)
simple_fc_net, feed_dict={"image": img,
"label": label})
def test_simple_fc_with_nccl_allreduce(self):
self.check_simple_fc_convergence(True)
def test_simple_fc(self):
self.check_simple_fc_convergence()
def test_simple_fc_with_reduce_op(self):
self.check_simple_fc_convergence(False)
def check_simple_fc_parallel_accuracy(self, use_nccl_allreduce=True):
def check_simple_fc_parallel_accuracy(self):
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence(
......@@ -316,35 +307,26 @@ class TestMNIST(TestParallelExecutorBase):
seed=1000,
feed_dict={"image": img,
"label": label},
use_parallel_executor=True,
use_nccl_allreduce=use_nccl_allreduce)
use_parallel_executor=True)
for p_f in parallel_first_loss:
self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
for p_l in parallel_last_loss:
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_simple_fc_parallel_accuracy_with_nccl_allreduce(self):
self.check_simple_fc_parallel_accuracy(True)
def test_simple_fc_parallel_accuracy_with_reduce_op(self):
self.check_simple_fc_parallel_accuracy(False)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy()
def check_batchnorm_fc_convergence(self, use_nccl_allreduce):
def check_batchnorm_fc_convergence(self):
self.check_network_convergence(fc_with_batchnorm)
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
fc_with_batchnorm,
feed_dict={"image": img,
"label": label},
use_nccl_allreduce=use_nccl_allreduce)
def test_batchnorm_fc_with_nccl_allreduce(self):
self.check_batchnorm_fc_convergence(True)
fc_with_batchnorm, feed_dict={"image": img,
"label": label})
def test_batchnorm_fc_with_reduce_op(self):
self.check_batchnorm_fc_convergence(False)
def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence()
class TestResnet(TestParallelExecutorBase):
......@@ -366,21 +348,17 @@ class TestResnet(TestParallelExecutorBase):
# fluid.recordio_writer.convert_reader_to_recordio_file(
# "./flowers.recordio", reader, feeder, compressor=fluid.core.RecordIOWriter.Compressor.NoCompress)
def check_resnet_convergence(self, use_nccl_allreduce):
def check_resnet_convergence(self):
import functools
batch_size = 2
self.check_network_convergence(
functools.partial(
SE_ResNeXt50Small, batch_size=batch_size),
iter=20,
batch_size=batch_size,
use_nccl_allreduce=use_nccl_allreduce)
batch_size=batch_size)
def test_resnet_with_nccl_allreduce(self):
self.check_resnet_convergence(True)
def test_resnet_with_reduce_op(self):
self.check_resnet_convergence(False)
def test_resnet(self):
self.check_resnet_convergence()
class ModelHyperParams(object):
......@@ -544,7 +522,7 @@ class TestTransformer(TestParallelExecutorBase):
class ParallelExecutorTestingDuringTraining(unittest.TestCase):
def check_network_convergence(self, use_nccl_allreduce):
def check_network_convergence(self):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -565,16 +543,12 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase):
feed_dict = {'image': image, 'label': label}
train_exe = fluid.ParallelExecutor(
use_cuda=True,
loss_name=loss.name,
main_program=main,
use_nccl_allreduce=use_nccl_allreduce)
use_cuda=True, loss_name=loss.name, main_program=main)
test_exe = fluid.ParallelExecutor(
use_cuda=True,
main_program=test_program,
share_vars_from=train_exe,
use_nccl_allreduce=use_nccl_allreduce)
share_vars_from=train_exe)
for i in xrange(5):
test_loss, = test_exe.run([loss.name], feed=feed_dict)
......@@ -588,11 +562,8 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase):
"Train loss: " + str(train_loss) + "\n Test loss:" +
str(test_loss))
def test_parallel_testing_with_nccl_allreduce(self):
self.check_network_convergence(use_nccl_allreduce=True)
def test_parallel_testing_with_reduce_op(self):
self.check_network_convergence(use_nccl_allreduce=False)
def test_parallel(self):
self.check_network_convergence()
import paddle.dataset.conll05 as conll05
......@@ -612,7 +583,7 @@ embedding_name = 'emb'
def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
is_sparse, use_nccl_allreduce, **ignored):
is_sparse, **ignored):
# 8 features
predicate_embedding = fluid.layers.embedding(
input=predicate,
......@@ -681,7 +652,7 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
class TestCRFModel(unittest.TestCase):
def check_network_convergence(self, is_sparse, use_nccl_allreduce):
def check_network_convergence(self, is_sparse):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -729,10 +700,7 @@ class TestCRFModel(unittest.TestCase):
exe = fluid.Executor(place)
exe.run(startup)
pe = fluid.ParallelExecutor(
use_cuda=True,
loss_name=avg_cost.name,
use_nccl_allreduce=use_nccl_allreduce)
pe = fluid.ParallelExecutor(use_cuda=True, loss_name=avg_cost.name)
feeder = fluid.DataFeeder(
feed_list=[
......@@ -749,11 +717,7 @@ class TestCRFModel(unittest.TestCase):
fetch_list=[avg_cost.name]))[0]
def test_update_sparse_parameter(self):
self.check_network_convergence(is_sparse=True, use_nccl_allreduce=False)
def test_update_dense_parameter_with_nccl_allreduce(self):
self.check_network_convergence(is_sparse=False, use_nccl_allreduce=True)
self.check_network_convergence(is_sparse=True)
def test_update_dense_parameter_with_reduce_op(self):
self.check_network_convergence(
is_sparse=False, use_nccl_allreduce=False)
def test_update_dense_parameter(self):
self.check_network_convergence(is_sparse=False)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册