From 1b20096a529bb6ce80d066fc0805c9dd8a8b9364 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Thu, 14 Dec 2017 20:25:20 +0800 Subject: [PATCH] done --- paddle/operators/recv_op.cc | 28 ++- paddle/operators/send_op.cc | 2 - python/paddle/v2/fluid/__init__.py | 3 +- python/paddle/v2/fluid/distribute_planner.py | 49 ----- .../paddle/v2/fluid/distribute_transpiler.py | 206 ++++++++++++++++++ python/paddle/v2/fluid/executor.py | 105 --------- ...y => notest_recognize_digits_conv_dist.py} | 13 +- 7 files changed, 238 insertions(+), 168 deletions(-) delete mode 100644 python/paddle/v2/fluid/distribute_planner.py create mode 100644 python/paddle/v2/fluid/distribute_transpiler.py rename python/paddle/v2/fluid/tests/book/{test_recognize_digits_conv_dist.py => notest_recognize_digits_conv_dist.py} (82%) diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index a0c25a25eb1..2ff6f42c94c 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -62,17 +62,29 @@ class RecvOp : public framework::OperatorBase { server_thread_->join(); } + std::string GetGradVarNameForTrainer(const std::string &varname) const { + if (grads_counter_.find(varname) != grads_counter_.end()) { + grads_counter_[varname] = 0; + } + char ret[256]; + snprintf(ret, sizeof(ret), "%s.trainer_%d", varname.c_str(), + grads_counter_[varname]++); + return std::string(ret); + } + void Run(const framework::Scope &scope, const platform::DeviceContext &dev_ctx) const override { // FIXME(typhoonzero): no new scopes for every run. framework::Scope &recv_scope = scope.NewScope(); auto param_list = Attr>("ParamList"); auto grad_list = Attr>("GradList"); + auto trainer_count = Attr("Trainers"); size_t param_count = param_list.size(); // TODO(typhoonzero): change this to a while_op for every cluster-batch. while (true) { - // TODO(typhoonzero): get from multiple trainers. - for (size_t i = 0; i < param_count; ++i) { + // Get from multiple trainers, we don't care about order in which + // the gradient arrives, just add suffix 0~n then average the gradient. + for (size_t i = 0; i < param_count * trainer_count; ++i) { // blocking get one var from client. const detail::TensorWithName &v = rpc_service_->Get(); auto grad_var_name = v.first; @@ -83,6 +95,14 @@ class RecvOp : public framework::OperatorBase { } VLOG(10) << "recved grad: " << grad_var_name << " updating param: " << param_var_name; + if (trainer_count > 1) { + auto *var = recv_scope.FindVar(grad_var_name); + if (var != nullptr) { + // must rename the var to different names to merge gradient. + grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); + } + } + auto *var = recv_scope.Var(grad_var_name); auto *tensor = var->GetMutable(); // FIXME(typhoonzero): do not copy @@ -119,6 +139,7 @@ class RecvOp : public framework::OperatorBase { // grpc send/recv service implement to register. std::shared_ptr rpc_service_; std::shared_ptr server_thread_; + mutable std::unordered_map grads_counter_; }; class RecvOpMaker : public framework::OpProtoAndCheckerMaker { @@ -144,6 +165,9 @@ This operator will recv tensor from send_op AddAttr>( "GradList", "type list of string", "grad->param name mapping to find which param to optimize."); + AddAttr("Trainers", "type int", + "Number of trainers in the current cluster job") + .SetDefault(1); } }; diff --git a/paddle/operators/send_op.cc b/paddle/operators/send_op.cc index ab1ae5b31dd..3fcd2144f96 100644 --- a/paddle/operators/send_op.cc +++ b/paddle/operators/send_op.cc @@ -47,14 +47,12 @@ class SendOp : public framework::OperatorBase { // TODO(typhoonzero): currently it's non-blocking, // should block until server responds. for (auto in : ins) { - LOG(ERROR) << "sending grad: " << in; bool ret = client_->SendVariable(scope, in); if (!ret) { LOG(ERROR) << "send variable error"; } } for (auto in : ins) { - LOG(ERROR) << "updating from server..."; bool ret = client_->GetVariable(scope); if (!ret) { LOG(ERROR) << "GetVariable error"; diff --git a/python/paddle/v2/fluid/__init__.py b/python/paddle/v2/fluid/__init__.py index 59986c9f0ca..a93f9363613 100644 --- a/python/paddle/v2/fluid/__init__.py +++ b/python/paddle/v2/fluid/__init__.py @@ -16,12 +16,13 @@ import regularizer from param_attr import ParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, GPUPlace +from distribute_transpiler import DistributeTranspiler Tensor = LoDTensor __all__ = framework.__all__ + executor.__all__ + [ 'io', 'initializer', 'layers', 'nets', 'optimizer', 'backward', 'regularizer', 'LoDTensor', 'CPUPlace', 'GPUPlace', 'Tensor', 'ParamAttr' - 'DataFeeder' + 'DataFeeder', 'DistributeTranspiler' ] diff --git a/python/paddle/v2/fluid/distribute_planner.py b/python/paddle/v2/fluid/distribute_planner.py deleted file mode 100644 index c3430b3b68a..00000000000 --- a/python/paddle/v2/fluid/distribute_planner.py +++ /dev/null @@ -1,49 +0,0 @@ -import framework -from backward import append_backward_ops -from regularizer import append_regularization_ops -import optimizer -from layer_helper import LayerHelper - - -def hash_name_to_server(params_grads, pserver_endpoints): - """ - :param param_grads: - :return: a map of pserver endpoint -> - params -> [param list] - grads -> [grad list] - """ - - def _hash_param(param_name, total): - return hash(param_name) % total - - param_grad_map = dict() - for param, grad in params_grads: - if param.trainable is True and grad is not None: - server_id = _hash_param(param.name, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - return param_grad_map - - -def round_robin(params_grads, pserver_endpoints): - assert (len(params_grads) > len(pserver_endpoints)) - - param_grad_map = dict() - pserver_idx = 0 - for param, grad in params_grads: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return param_grad_map diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py new file mode 100644 index 00000000000..739b47cd281 --- /dev/null +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -0,0 +1,206 @@ +import framework +from framework import Program, default_main_program, Parameter, Variable +import optimizer +from layer_helper import LayerHelper + + +def hash_name_to_server(params_grads, pserver_endpoints): + """ + :param param_grads: + :return: a map of pserver endpoint -> + params -> [param list] + grads -> [grad list] + """ + + def _hash_param(param_name, total): + return hash(param_name) % total + + param_grad_map = dict() + for param, grad in params_grads: + if param.trainable is True and grad is not None: + server_id = _hash_param(param.name, len(pserver_endpoints)) + server_for_param = pserver_endpoints[server_id] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + return param_grad_map + + +def round_robin(params_grads, pserver_endpoints): + assert (len(params_grads) > len(pserver_endpoints)) + + param_grad_map = dict() + pserver_idx = 0 + for param, grad in params_grads: + if param.trainable is True: + server_for_param = pserver_endpoints[pserver_idx] + if not param_grad_map.has_key(server_for_param): + param_grad_map[server_for_param] = {"params": [], "grads": []} + + param_grad_map[server_for_param]["params"].append(param) + param_grad_map[server_for_param]["grads"].append(grad) + + pserver_idx += 1 + if pserver_idx >= len(pserver_endpoints): + pserver_idx = 0 + return param_grad_map + + +class DistributeTranspiler: + def transpile(self, + optimize_ops, + params_grads, + program=None, + pservers="127.0.0.1:6174", + trainers=1, + split_method=round_robin): + """ + Transpile the program to a distributed data-parallelism programs. + + The main_program will be transform to use a remote parameter server + to do parameter optimization. And the optimization graph will be put + in to a parameter server program. + + Use different methods to split trainable varialbles to different + parameter servers. + + :param optimize_ops: op list of optimization, should be the + return value of Optimizer.minimize + :type optimize_ops: list + :param program: program to optimize, default default_main_program + :param pservers: parameter server endpoints like "m1:6174,m2:6174" + :type pservers: string + + :return: return a list of programs + """ + if program is None: + program = default_main_program() + self.trainers = trainers + self._optimize_distributed( + optimize_ops, + program, + params_grads, + pservers=pservers, + trainers=trainers, + split_method=split_method) + + def _clone_param(self, block, v): + assert isinstance(v, Parameter) + new_p = Parameter( + block=block, + shape=v.shape, + dtype=v.dtype, + type=v.type, + lod_level=v.lod_level, + stop_gradient=v.stop_gradient, + trainable=v.trainable, + optimize_attr=v.optimize_attr, + regularizer=v.regularizer, + name=v.name) + block.vars[new_p.name] = new_p + + def _clone_var(self, block, var): + assert isinstance(var, Variable) + return block.create_var( + name=var.name, + shape=var.shape, + dtype=var.dtype, + type=var.type, + lod_level=var.lod_level, + persistable=var.persistable) + + def _optimize_distributed(self, optimize_ops, program, params_and_grads, + **kwargs): + # remove optimize ops and add a send op to main_program + # FIXME(typhoonzero): delete_op only remove the first accurance, + # need to consider about multiple same optimize op? + for op in optimize_ops: + program.global_block().delete_op(op) + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = round_robin + + assert (callable(split_method)) + pserver_endpoints = kwargs["pservers"].split(",") + self.param_grad_map = split_method(params_and_grads, pserver_endpoints) + + for ep in pserver_endpoints: + # FIXME(typhoonzero): send to different servers can run in parrallel. + send_op = program.global_block().append_op( + type="send", + inputs={"X": self.param_grad_map[ep]["grads"] + }, # inputs is a list of tensors to be send + outputs={}, + attrs={"endpoint": ep}) + + def _create_var_for_trainers(self, block, var, trainers): + var_list = [] + for i in xrange(trainers): + var_each = block.create_var( + name="%s.trainer_%d" % (var.name, i), + psersistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + var_list.append(var_each) + return var_list + + def get_pserver_program(self, endpoint, optimize_ops): + pserver_program = Program() + for v in self.param_grad_map[endpoint]["params"]: + self._clone_param(pserver_program.global_block(), v) + + optimize_sub_program = Program() + grad_var_names = [ + var.name for var in self.param_grad_map[endpoint]["grads"] + ] + for opt_op in optimize_ops: + for _, var in opt_op.inputs.iteritems(): + # NOTE: append operators to merge gradients from multiple + # trainers. If trainers == 1, this is not needed. + if self.trainers > 1 and var.name in grad_var_names: + vars2merge = self._create_var_for_trainers( + optimize_sub_program.global_block(), var, self.trainers) + merged_var = optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type="sum", + inputs={"X": vars2merge}, + outputs={"Out": merged_var}) + optimize_sub_program.global_block().append_op( + type="scale", + inputs={"X": merged_var}, + outputs={"Out": merged_var}, + attrs={"scale": 1.0 / float(self.trainers)}) + else: + optimize_sub_program.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + optimize_sub_program.global_block().append_op( + type=opt_op.type, + inputs=opt_op.inputs, + outputs=opt_op.outputs, + attrs=opt_op.attrs) + pserver_program.global_block().append_op( + type="recv", + inputs={"RX": + self.param_grad_map[endpoint]["grads"]}, # grads to recv + outputs={}, + attrs={ + "OptimizeProgram": optimize_sub_program.desc, + "endpoint": endpoint, + "ParamList": + [p.name for p in self.param_grad_map[endpoint]["params"]], + "GradList": + [p.name for p in self.param_grad_map[endpoint]["grads"]], + "Trainers": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program diff --git a/python/paddle/v2/fluid/executor.py b/python/paddle/v2/fluid/executor.py index 4d245250e89..0d02422afd2 100644 --- a/python/paddle/v2/fluid/executor.py +++ b/python/paddle/v2/fluid/executor.py @@ -50,111 +50,6 @@ class Executor(object): self.executor = core.Executor(act_places) self.places = places - def optimize(self, optimize_ops, params_grads, program=None, **kwargs): - """ - optimize the program for different runtime environment - - :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize - :type optimize_ops: list - :param program: program to optimize, default default_main_program - :param pservers: parameter server endpoints like "m1:6174,m2:6174" - :type pservers: string - - :return: return a list of programs - """ - if program is None: - program = default_main_program() - - if kwargs.has_key("pservers"): - return self._optimize_distributed(optimize_ops, program, - params_grads, **kwargs) - - def _clone_param(self, block, v): - assert isinstance(v, Parameter) - new_p = Parameter( - block=block, - shape=v.shape, - dtype=v.dtype, - type=v.type, - lod_level=v.lod_level, - stop_gradient=v.stop_gradient, - trainable=v.trainable, - optimize_attr=v.optimize_attr, - regularizer=v.regularizer, - name=v.name) - block.vars[new_p.name] = new_p - - def _clone_var(self, block, var): - assert isinstance(var, Variable) - return block.create_var( - name=var.name, - shape=var.shape, - dtype=var.dtype, - type=var.type, - lod_level=var.lod_level, - persistable=var.persistable) - - def _optimize_distributed(self, optimize_ops, program, params_and_grads, - **kwargs): - # remove optimize ops and add a send op to main_program - # FIXME(typhoonzero): delete_op only remove the first accurence, - # need to consider about multiple same optimize op? - for op in optimize_ops: - program.global_block().delete_op(op) - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = distribute_planner.round_robin - - assert (callable(split_method)) - pserver_endpoints = kwargs["pservers"].split(",") - self.param_grad_map = split_method(params_and_grads, pserver_endpoints) - - for ep in pserver_endpoints: - # FIXME(typhoonzero): send to different servers can run in parrallel. - send_op = program.global_block().append_op( - type="send", - inputs={"X": self.param_grad_map[ep]["grads"] - }, # inputs is a list of tensors to be send - outputs={}, - attrs={"endpoint": ep}) - - def get_pserver_program(self, endpoint, optimize_ops): - pserver_program = Program() - for v in self.param_grad_map[endpoint]["params"]: - self._clone_param(pserver_program.global_block(), v) - - optimize_sub_program = Program() - for opt_op in optimize_ops: - for varname, var in opt_op.inputs.iteritems(): - optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) - - pserver_program.global_block().append_op( - type="recv", - inputs={"RX": - self.param_grad_map[endpoint]["grads"]}, # grads to recv - outputs={}, - attrs={ - "OptimizeProgram": optimize_sub_program.desc, - "endpoint": endpoint, - "ParamList": - [p.name for p in self.param_grad_map[endpoint]["params"]], - "GradList": - [p.name for p in self.param_grad_map[endpoint]["grads"]] - }) - pserver_program.sync_with_cpp() - return pserver_program - def aslodtensor(self, data): def accumulate(data): if not isinstance(data, list): diff --git a/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py b/python/paddle/v2/fluid/tests/book/notest_recognize_digits_conv_dist.py similarity index 82% rename from python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py rename to python/paddle/v2/fluid/tests/book/notest_recognize_digits_conv_dist.py index 5178131ea77..c7f4f2212f3 100644 --- a/python/paddle/v2/fluid/tests/book/test_recognize_digits_conv_dist.py +++ b/python/paddle/v2/fluid/tests/book/notest_recognize_digits_conv_dist.py @@ -38,17 +38,14 @@ train_reader = paddle.batch( place = fluid.CPUPlace() exe = fluid.Executor(place) - -exe.optimize(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) +t = fluid.DistributeTranspiler() +t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) pserver_endpoint = os.getenv("PSERVER") if pserver_endpoint: - pserver_prog = exe.get_pserver_program(pserver_endpoint, optimize_ops) - print("pserver startup: ", fluid.default_startup_program()) + pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) exe.run(fluid.default_startup_program()) - while True: - exe.run(pserver_prog) - print("Run pserver once end...") + exe.run(pserver_prog) else: feeder = fluid.DataFeeder(feed_list=[images, label], place=place) exe.run(fluid.default_startup_program()) @@ -60,8 +57,6 @@ else: feed=feeder.feed(data), fetch_list=[avg_cost] + accuracy.metrics) pass_acc = accuracy.eval(exe) - print("pass_id=" + str(pass_id) + " acc=" + str(acc) + " pass_acc=" - + str(pass_acc)) # print loss, acc if loss < 10.0 and pass_acc > 0.9: # if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good. -- GitLab