diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc index 0dd37e7df067454ef828e0569ddf0cc99e7b73d6..fbedd6c825ba65386e3e31c3482375a7e6361278 100644 --- a/paddle/fluid/framework/block_desc.cc +++ b/paddle/fluid/framework/block_desc.cc @@ -42,6 +42,25 @@ bool BlockDesc::HasVar(const std::string &name) const { return vars_.find(name) != vars_.end(); } +VarDesc *BlockDesc::RenameVar(const std::string &old_name, + const std::string &new_name) { + if (!this->HasVar(old_name)) { + return nullptr; + } + need_update_ = true; + auto *var = this->Var(old_name); + VarDesc *new_var = new VarDesc(*(var->Proto())); + new_var->SetName(new_name); + vars_[new_name].reset(new_var); + // rename inputs and outputs + for (const auto &op : ops_) { + auto *it = op.get(); + it->Rename(old_name, new_name); + } + vars_.erase(old_name); + return new_var; +} + VarDesc *BlockDesc::FindVarRecursive(const std::string &name) const { if (name == kEmptyVarName) return nullptr; diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h index 4e2b03e245f34eb26ad71d865a3fe99ba5f85dc9..b2375b53e3ac6bd8d82897f9a8a640178e6b7a39 100644 --- a/paddle/fluid/framework/block_desc.h +++ b/paddle/fluid/framework/block_desc.h @@ -55,6 +55,8 @@ class BlockDesc { bool HasVar(const std::string &var_name) const; + VarDesc *RenameVar(const std::string &old_name, const std::string &new_name); + VarDesc *FindVarRecursive(const std::string &name_bytes) const; VarDesc &FindRecursiveOrCreateVar(const std::string &name_bytes); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 6c0292ecb247a582370cf11da271847e0bd8bd3f..ee0e3533ce028992af3d4558e3fd198a09c4816b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -75,13 +75,6 @@ class ListenAndServOp : public framework::OperatorBase { server_thread_->join(); } - std::string GetGradVarNameForTrainer(const std::string &varname) const { - if (grads_counter_.find(varname) == grads_counter_.end()) { - grads_counter_[varname] = 0; - } - return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++); - } - void RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const override { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); @@ -91,8 +84,7 @@ class ListenAndServOp : public framework::OperatorBase { // FIXME(Yancey1989): initialize rpc server with lazy mode. rpc_service_->SetScope(&recv_scope); rpc_service_->SetDevCtx(&dev_ctx); - auto param_list = Attr>("ParamList"); - auto grad_list = Attr>("GradList"); + auto ins = Inputs("X"); auto fan_in = Attr("Fanin"); auto *block = Attr(kOptimizeBlock); @@ -109,40 +101,24 @@ class ListenAndServOp : public framework::OperatorBase { // the gradients arrives, just add suffix 0~n and merge the gradient. rpc_service_->SetCond(0); size_t recv_var_cnt = 0; - size_t update_param_cnt = 0; int batch_barrier = 0; while (batch_barrier != fan_in) { const detail::MessageWithName &v = rpc_service_->Get(); - auto grad_var_name = v.first; - if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { + auto recv_var_name = v.first; + if (recv_var_name == LISTEN_TERMINATE_MESSAGE) { LOG(INFO) << "received terminate message and exit"; exit_flag = true; break; - } else if (grad_var_name == BATCH_BARRIER_MESSAGE) { + } else if (recv_var_name == BATCH_BARRIER_MESSAGE) { VLOG(3) << "recv batch barrier message"; batch_barrier++; continue; } else { - // receive a variable + VLOG(3) << "received grad: " << recv_var_name; recv_var_cnt++; - auto it = - std::find(grad_list.begin(), grad_list.end(), grad_var_name); - std::string param_var_name; - if (it != grad_list.end()) { - param_var_name = param_list[it - grad_list.begin()]; - update_param_cnt++; - VLOG(3) << "received grad: " << grad_var_name - << " updating param: " << param_var_name; - } else { - VLOG(3) << "received variable: " << grad_var_name - << " no need to update param"; - } - if (fan_in > 1 && !param_var_name.empty()) { - grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); - } - auto *var = recv_scope.FindVar(grad_var_name); + auto *var = recv_scope.FindVar(recv_var_name); if (var == nullptr) { - LOG(ERROR) << "Can not find server side var: " << grad_var_name; + LOG(ERROR) << "Can not find server side var: " << recv_var_name; PADDLE_THROW("Can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); @@ -151,29 +127,26 @@ class ListenAndServOp : public framework::OperatorBase { } } } - VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; if (exit_flag) { rpc_service_->ShutDown(); } - VLOG(3) << "run optimize graph..."; try { executor.Run(*program, &recv_scope, block->ID(), /*global_block*/ false /*create_local_scope*/, false /*create_vars*/); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } - // Reset the received sparse variables, the sum operator would not // sum the input sparse variables which rows is empty at the next // mini-batch. - // TOOD(Yancey1989): move the reset action into an operator, we couldn't + // TODO(Yancey1989): move the reset action into an operator, we couldn't // have any hide logic in the operator. for (auto &var : sparse_vars) { var->GetMutable()->mutable_rows()->clear(); } rpc_service_->SetCond(1); - rpc_service_->WaitClientGet(update_param_cnt); - grads_counter_.clear(); + // FIXME(typhoonzero): use another condition to sync wait clients get. + rpc_service_->WaitClientGet(ins.size()); sparse_vars.clear(); } // while(true) } @@ -181,13 +154,13 @@ class ListenAndServOp : public framework::OperatorBase { protected: std::shared_ptr rpc_service_; std::shared_ptr server_thread_; - mutable std::unordered_map grads_counter_; }; class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { public: ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", "(Tensor) Variables that server recv.").AsDuplicable(); AddComment(R"DOC( ListenAndServ operator @@ -201,16 +174,7 @@ from send_op and send back variables to recv_op. .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); AddAttr(kOptimizeBlock, "BlockID to run on server side."); - AddAttr>( - "ParamList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") - .SetDefault({}); - AddAttr>( - "GradList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") - .SetDefault({}); - AddAttr("Fanin", "type int", - "Number of trainers in the current cluster job") + AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); } }; diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 131971099ef3febc3cfaff30e918fa74cfc6cfe4..4e04151c6ad26f37bde6bd0058505c767ef2d7f1 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -170,6 +170,14 @@ void BindBlockDesc(py::module &m) { [](BlockDesc &self, py::bytes byte_name) { std::string name = byte_name; return self.HasVar(name); + }, + py::return_value_policy::reference) + .def("rename_var", + [](BlockDesc &self, const py::bytes &byte_name, + const py::bytes &byte_name_new) { + std::string name = byte_name; + std::string new_name = byte_name_new; + self.RenameVar(name, new_name); }) .def("has_var_recursive", [](BlockDesc &self, py::bytes byte_name) { @@ -198,7 +206,7 @@ void BindVarDsec(py::module &m) { py::class_ var_desc(m, "VarDesc", ""); var_desc .def("name", - [](const VarDesc &self) { + [](VarDesc &self) { py::bytes name = self.Name(); return name; }, diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index 9aba35a6481e3ad3ab37c8d4de0f998c9f0a1f07..c6ff09a1d1e3ca56877e986c3ed3ae9ecd0a7316 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -74,6 +74,8 @@ def download(url, module_name, md5sum, save_name=None): retry = 0 retry_limit = 3 while not (os.path.exists(filename) and md5file(filename) == md5sum): + if os.path.exists(filename): + print "file md5", md5file(filename), md5sum if retry < retry_limit: retry += 1 else: diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 03a7478ae8f4c8b73fc35537bf4599349b3116f5..2fcf3753c5f1211d3b27f38fbdc8d097c437c79a 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -14,7 +14,7 @@ from __future__ import print_function import framework -from framework import Program, default_main_program, Parameter, Variable +from framework import Program, default_main_program, default_startup_program, Parameter, Variable import optimizer from layer_helper import LayerHelper from distributed_spliter import * @@ -133,6 +133,7 @@ class DistributeTranspiler: def transpile(self, optimize_ops, params_grads, + trainer_id, program=None, pservers="127.0.0.1:6174", trainers=1, @@ -146,13 +147,37 @@ class DistributeTranspiler: Use different methods to split trainable variables to different parameter servers. + Steps to transpile trainer: + 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). + 2. rename splited grad variables to add trainer_id suffix ".trainer_%d". + 3. modify trainer program add split_op to each grad variable. + 4. append send_op to send splited variables to server and fetch + params(splited blocks or origin param) from server. + 5. append concat_op to merge splited blocks to update local weights. + + Steps to transpile pserver: + 1. create new program for parameter server. + 2. create params and grad variables that assigned to current server instance. + 3. create a sub-block in the server side program + 4. append ops that should run on current server instance. + 5. add listen_and_serv op + :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize + return value of Optimizer.minimize :type optimize_ops: list - :param program: program to optimize, default is default_main_program + :param params_grads: list of tuple(weight, gradient) + :type params_grads: list + :param trainer_id: one unique id for each trainer in a job. + :type trainer_id: int + :param program: program to transpile, default is default_main_program + :type program: Program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :return: return a list of programs + :param trainers: total number of workers/trainers in the job + :type trainers: int + :param split_method: A function to determin how to split variables + to different servers equally. + :type split_method: function """ assert (callable(split_method)) if program is None: @@ -160,25 +185,19 @@ class DistributeTranspiler: self.program = program self.trainers = trainers self.optimize_ops = optimize_ops - # steps to transpile: - # 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). - # 2. modify trainer program add split_op to each Grad. - # 3. append send_op to trainer. - # 4. append concat_op to trainer to update local weights. - # 5. create new program for parameter server. - # 6. create parameter server program by split_method generated endpoint->VarBlock - + # TODO(typhoonzero): currently trainer_id is fetched from cluster system + # like Kubernetes, we should port this to use etcd later when developing + # fluid distributed training with fault-tolerance. + self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") # step1 param_list = [pg[0] for pg in params_grads] grad_list = [pg[1] for pg in params_grads] - # TODO: add split selected rows support grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) # step2 grad_var_mapping = self._append_split_op(program, grad_blocks) - # step3 send_inputs = [] send_outputs = [] @@ -211,7 +230,7 @@ class DistributeTranspiler: shape=[0]) # create send_op - send_op = program.global_block().append_op( + program.global_block().append_op( type="send", inputs={"X": send_inputs}, outputs={"Out": send_outputs, @@ -223,14 +242,158 @@ class DistributeTranspiler: if len(splited_var) <= 1: continue orig_param = program.global_block().vars[varname] - concat = program.global_block().append_op( + program.global_block().append_op( type="concat", inputs={"X": splited_var}, outputs={"Out": [orig_param]}, attrs={"axis": 0}) - def _create_vars_from_blocklist(self, program, block_list): - # Create respective variables using the block_list + def get_trainer_program(self): + # remove optimize ops and add a send op to main_program + self.program.global_block().delete_ops(self.optimize_ops) + return self.program + + def get_pserver_program(self, endpoint): + """ + Get pserver side program using the endpoint. + NOTE: assume blocks of the same variable is not distributed + on the same pserver, only change param/grad varnames for + trainers to fetch. + """ + # step1 + pserver_program = Program() + # step2 + recv_inputs = [] + for v in self.param_grad_ep_mapping[endpoint]["params"]: + self._clone_var(pserver_program.global_block(), v) + for v in self.param_grad_ep_mapping[endpoint]["grads"]: + # create vars for each trainer in global scope, so + # we don't need to create them when grad arrives. + # change client side var name to origin name by + # removing ".trainer_%d" suffix + suff_idx = v.name.find(".trainer_") + if suff_idx >= 0: + orig_var_name = v.name[:suff_idx] + pserver_program.global_block().create_var( + name=orig_var_name, + persistable=True, + dtype=v.dtype, + shape=v.shape) + print("create origin var: ", orig_var_name) + for trainer_id in xrange(self.trainers): + var = pserver_program.global_block().create_var( + name="%s.trainer_%d" % (orig_var_name, trainer_id), + persistable=False, + dtype=v.dtype, + shape=v.shape) + recv_inputs.append(var) + print("create per trainer var: ", var.name) + # step3 + optimize_block = pserver_program.create_block(0) + # step 4 + # Create a union-find data struct from optimize ops, + # If two ops are connected, we could add these two ops + # into one set. + ufind = self._create_ufind(self.optimize_ops) + # step 4.2 + # Iterate through the ops and append optimize op which + # located on current pserver + opt_op_on_pserver = [] + for _, op in enumerate(self.optimize_ops): + if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): + opt_op_on_pserver.append(op) + # step 4.3 + # Iterate through the ops, and if an op and the optimize ops + # which located on current pserver are in one set, then + # append it into the sub program. + for _, op in enumerate(self.optimize_ops): + for _, opt_op in enumerate(opt_op_on_pserver): + if ufind.is_connected(op, opt_op): + if self._is_opt_op(op): + self._append_pserver_ops(optimize_block, op, endpoint) + else: + self._append_pserver_non_opt_ops(optimize_block, op) + break + # step5 append the listen_and_serv op + pserver_program.global_block().append_op( + type="listen_and_serv", + inputs={'X': recv_inputs}, + outputs={}, + attrs={ + "OptimizeBlock": optimize_block, + "endpoint": endpoint, + "Fanin": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program + + def get_startup_program(self, endpoint, pserver_program): + """ + Get startup program for current parameter server. + Modify operator input variables if there are variables that + were split to several blocks. + """ + s_prog = Program() + orig_s_prog = framework.default_startup_program() + params = self.param_grad_ep_mapping[endpoint]["params"] + + def _get_splited_name_and_shape(varname): + for idx, splited_param in enumerate(params): + pname = splited_param.name + if same_or_split_var(pname, varname) and varname != pname: + return pname, splited_param.shape + return "", [] + + # 1. create vars in pserver program to startup program + pserver_vars = pserver_program.global_block().vars + created_var_map = dict() + for _, var in pserver_vars.iteritems(): + tmpvar = s_prog.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + created_var_map[var.name] = tmpvar + + # 2. rename op outputs + for op in orig_s_prog.global_block().ops: + new_inputs = dict() + new_outputs = dict() + # do not append startup op if var is not on this pserver + op_on_pserver = False + for key in op.output_names: + newname, _ = _get_splited_name_and_shape(op.output(key)[0]) + if newname: + op_on_pserver = True + new_outputs[key] = created_var_map[newname] + elif op.output(key)[0] in pserver_vars: + op_on_pserver = True + new_outputs[key] = pserver_vars[op.output(key)[0]] + + # most startup program ops have no inputs + new_inputs = self._get_input_map_from_op(pserver_vars, op) + + if op_on_pserver: + if op.type in [ + "gaussian_random", "fill_constant", "uniform_random" + ]: + op.attrs["shape"] = new_outputs["Out"].shape + s_prog.global_block().append_op( + type=op.type, + inputs=new_inputs, + outputs=new_outputs, + attrs=op.attrs) + return s_prog + + # ====================== private transpiler functions ===================== + def _create_vars_from_blocklist(self, + program, + block_list, + add_trainer_suffix=False): + """ + NOTE: only grads need to be named for different trainers, use + add_trainer_suffix to rename the grad vars. + """ block_map = dict() var_mapping = dict() for block_str in block_list: @@ -239,11 +402,20 @@ class DistributeTranspiler: block_map[varname] = [] block_map[varname].append((long(offset), long(size))) for varname, splited in block_map.iteritems(): - orig_var = program.global_block().vars[varname] - var_mapping[varname] = [] + orig_var = program.global_block().var(varname) if len(splited) == 1: - var_mapping[varname] = [orig_var] + if add_trainer_suffix: + new_var_name = "%s.trainer_%d" % \ + (orig_var.name, self.trainer_id) + program.global_block().rename_var(varname, new_var_name) + var_mapping[varname] = \ + [program.global_block().var(new_var_name)] + else: + var_mapping[varname] = \ + [program.global_block().var(orig_var.name)] continue + + var_mapping[varname] = [] orig_shape = orig_var.shape orig_dim1_flatten = 1 if len(orig_shape) >= 2: @@ -255,13 +427,21 @@ class DistributeTranspiler: splited_shape = [rows] if len(orig_shape) >= 2: splited_shape.extend(orig_shape[1:]) + new_var_name = "" + if add_trainer_suffix: + new_var_name = "%s.block%d.trainer_%d" % \ + (varname, i, self.trainer_id) + else: + new_var_name = "%s.block%d" % \ + (varname, i) var = program.global_block().create_var( - name="%s.block%d" % (varname, i), + name=new_var_name, persistable=False, dtype=orig_var.dtype, type=orig_var.type, shape=splited_shape) # flattend splited var var_mapping[varname].append(var) + program.global_block().sync_with_cpp() return var_mapping def _clone_var(self, block, var): @@ -272,13 +452,12 @@ class DistributeTranspiler: dtype=var.dtype, type=var.type, lod_level=var.lod_level, - # HACK: let all param in pserver be persistable so the child - # program in recv can get them persistable=True) def _append_split_op(self, program, gradblocks): # Split variables that need to be split and append respective ops - var_mapping = self._create_vars_from_blocklist(program, gradblocks) + var_mapping = self._create_vars_from_blocklist( + program, gradblocks, add_trainer_suffix=True) for varname, splited_vars in var_mapping.iteritems(): # variable that don't need to split have empty splited_vars if len(splited_vars) <= 1: @@ -308,24 +487,6 @@ class DistributeTranspiler: "[LOD_TENSOR, SELECTED_ROWS]") return var_mapping - def get_trainer_program(self): - # remove optimize ops and add a send op to main_program - self.program.global_block().delete_ops(self.optimize_ops) - return self.program - - def _create_var_for_trainers(self, block, var, trainers): - # For each trainer, create the necessary variables - var_list = [] - for i in xrange(trainers): - var_each = block.create_var( - name="%s.trainer_%d" % (var.name, i), - persistable=var.persistable, - dtype=var.dtype, - type=var.type, - shape=var.shape) - var_list.append(var_each) - return var_list - def _get_optimizer_input_shape(self, op_type, varkey, orig_shape, param_shape): """ @@ -353,6 +514,13 @@ class DistributeTranspiler: pass return orig_shape + def _orig_varname(self, varname): + suff_idx = varname.find(".trainer_") + orig_var_name = "" + if suff_idx >= 0: + orig_var_name = varname[:suff_idx] + return orig_var_name + def _append_pserver_ops(self, optimize_block, opt_op, endpoint): program = optimize_block.program pserver_block = program.global_block() @@ -363,18 +531,23 @@ class DistributeTranspiler: if key == "Grad": grad_block = None for g in self.param_grad_ep_mapping[endpoint]["grads"]: - if same_or_split_var(g.name, opt_op.input(key)[0]): + if same_or_split_var( + self._orig_varname(g.name), opt_op.input(key)[0]): grad_block = g break if not grad_block: # do not append this op if current endpoint # is not dealing with this grad block return - merged_var = pserver_block.vars[grad_block.name] - # append merging ops if trainers > 1 + merged_var = \ + pserver_block.vars[self._orig_varname(grad_block.name)] if self.trainers > 1: - vars2merge = self._create_var_for_trainers( - pserver_block, grad_block, self.trainers) + vars2merge = [] + for i in xrange(self.trainers): + per_trainer_name = "%s.trainer_%d" % \ + (self._orig_varname(grad_block.name), i) + vars2merge.append(pserver_block.vars[per_trainer_name]) + optimize_block.append_op( type="sum", inputs={"X": vars2merge}, @@ -517,77 +690,6 @@ class DistributeTranspiler: return False return False - def get_pserver_program(self, endpoint): - """ - Get pserver side program using the endpoint - - NOTE: assume blocks of the same variable is not distributed - on the same pserver, only change param/grad varnames for - trainers to fetch. For each pserver endpoint, server side - program must be a sub-set of the original optimization program. - """ - # step5 - pserver_program = Program() - for v in self.param_grad_ep_mapping[endpoint]["params"]: - self._clone_var(pserver_program.global_block(), v) - for v in self.param_grad_ep_mapping[endpoint]["grads"]: - # create vars for each trainer in global scope, so - # we don't need to create them when grad arrives. - pserver_program.global_block().create_var( - name=v.name, persistable=True, dtype=v.dtype, shape=v.shape) - for trainer_id in xrange(self.trainers): - pserver_program.global_block().create_var( - name="%s.trainer_%d" % (v.name, trainer_id), - persistable=True, - dtype=v.dtype, - shape=v.shape) - # step6 - optimize_block = pserver_program.create_block(0) - # step 6.1 - # Create a union-find data struct by optimize ops, - # If two ops are connected, we could add these two ops - # into one set. - ufind = self._create_ufind(self.optimize_ops) - # step 6.2 - # Iterate through the ops and append optimize op which - # located on current pserver - opt_op_on_pserver = [] - for _, op in enumerate(self.optimize_ops): - if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): - opt_op_on_pserver.append(op) - # step 6.3 - # Iterate through the ops, and if an op and the optimize ops - # which located on current pserver are in one set, then - # append it into the sub program. - for _, op in enumerate(self.optimize_ops): - for _, opt_op in enumerate(opt_op_on_pserver): - if ufind.is_connected(op, opt_op): - if self._is_opt_op(op): - self._append_pserver_ops(optimize_block, op, endpoint) - else: - self._append_pserver_non_opt_ops(optimize_block, op) - break - # Append the listen_and_serv op - pserver_program.global_block().append_op( - type="listen_and_serv", - inputs={}, - outputs={}, - attrs={ - "OptimizeBlock": optimize_block, - "endpoint": endpoint, - "ParamList": [ - p.name - for p in self.param_grad_ep_mapping[endpoint]["params"] - ], - "GradList": [ - p.name - for p in self.param_grad_ep_mapping[endpoint]["grads"] - ], - "Fanin": self.trainers - }) - pserver_program.sync_with_cpp() - return pserver_program - def _get_input_map_from_op(self, varmap, op): iomap = dict() for key in op.input_names: @@ -611,61 +713,3 @@ class DistributeTranspiler: else: iomap[key] = vars return iomap - - def get_startup_program(self, endpoint, pserver_program): - """ - Get startup program for current parameter server. - Modify operator input variables if there are variables that - were split to several blocks. - """ - s_prog = Program() - orig_s_prog = framework.default_startup_program() - params = self.param_grad_ep_mapping[endpoint]["params"] - - def _get_splited_name_and_shape(varname): - for idx, splited_param in enumerate(params): - pname = splited_param.name - if same_or_split_var(pname, varname) and varname != pname: - return pname, splited_param.shape - return "", [] - - # 1. create vars in pserver program to startup program - pserver_vars = pserver_program.global_block().vars - created_var_map = dict() - for _, var in pserver_vars.iteritems(): - tmpvar = s_prog.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - created_var_map[var.name] = tmpvar - - # 2. rename op outputs - for op in orig_s_prog.global_block().ops: - new_inputs = dict() - new_outputs = dict() - # do not append startup op if var is not on this pserver - op_on_pserver = False - for key in op.output_names: - newname, _ = _get_splited_name_and_shape(op.output(key)[0]) - if newname: - op_on_pserver = True - new_outputs[key] = created_var_map[newname] - elif op.output(key)[0] in pserver_vars: - op_on_pserver = True - new_outputs[key] = pserver_vars[op.output(key)[0]] - - # most startup program ops have no inputs - new_inputs = self._get_input_map_from_op(pserver_vars, op) - - if op_on_pserver: - if op.type in [ - "gaussian_random", "fill_constant", "uniform_random" - ]: - op.attrs["shape"] = new_outputs["Out"].shape - s_prog.global_block().append_op( - type=op.type, - inputs=new_inputs, - outputs=new_outputs, - attrs=op.attrs) - return s_prog diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index 0e11709296a4fc7121611c1f9928314810f35783..1cb06c52a43b3585a49d4b8bef031afef07e9b0d 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -286,6 +286,10 @@ class Variable(object): def name(self): return self.desc.name() + @name.setter + def name(self, new_name): + self.desc.set_name(new_name) + @property def shape(self): # convert to tuple, make it as same as numpy API. @@ -531,6 +535,12 @@ class Operator(object): """ return self.desc.input(name) + def rename_input(self, old_name, new_name): + self.desc.rename_input(old_name, new_name) + + def rename_output(self, old_name, new_name): + self.desc.rename_output(old_name, new_name) + @property def input_names(self): """ @@ -540,6 +550,14 @@ class Operator(object): """ return self.desc.input_names() + @property + def input_arg_names(self): + return self.desc.input_arg_names() + + @property + def output_arg_names(self): + return self.desc.output_arg_names() + def output(self, name): """ Get output arguments by the output parameter name @@ -717,6 +735,60 @@ class Block(object): def has_var(self, name): return name in self.vars + def rename_var(self, name, new_name): + """ + Rename variable in vars and ops' inputs and outputs + """ + if not self.has_var(name): + raise ValueError("var %s is not in current" % name) + v = self.var(name) + stop_gradient = None + trainable = None + optimize_attr = None + regularizer = None + gradient_clip_attr = None + error_clip = None + if type(v) == Parameter: + stop_gradient = v.stop_gradient + trainable = v.trainable + optimize_attr = v.optimize_attr + regularizer = v.regularizer + gradient_clip_attr = v.gradient_clip_attr + error_clip = v.error_clip + elif type(v) == Variable: + error_clip = v.error_clip + stop_gradient = v.stop_gradient + else: + raise ValueError("unsupported var type: %s", type(v)) + + self.desc.rename_var(name, new_name) + d = self.desc.find_var(new_name) + var = None + if type(v) == Parameter: + var = Parameter( + self, + d.shape(), + d.dtype(), + name=new_name, + stop_gradient=stop_gradient, + trainable=trainable, + optimize_attr=optimize_attr, + regularizer=regularizer, + gradient_clip_attr=gradient_clip_attr, + error_clip=error_clip) + elif type(v) == Variable: + var = Variable( + self, + name=new_name, + error_clip=error_clip, + stop_gradient=stop_gradient) + + # rename the python side, sync_with_cpp will only add + # new vars/ops to python side. + self.vars[new_name] = var + del self.vars[name] + self.sync_with_cpp() + def create_parameter(self, *args, **kwargs): global_block = self.program.global_block() param = Parameter(global_block, *args, **kwargs) diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py index 07815059c4f8ed0e660f3b67382aba6d7a60d524..1c1fffc5892aa6ade05a341efb7043cea538b03f 100644 --- a/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py @@ -58,14 +58,19 @@ trainers = int(os.getenv("TRAINERS")) # total trainer count current_endpoint = os.getenv("SERVER_ENDPOINT") # current pserver endpoint training_role = os.getenv("TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver +if not current_endpoint: + print("need env SERVER_ENDPOINT") + exit(1) + t = fluid.DistributeTranspiler() t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=trainers) + optimize_ops, + params_grads, + 0, + pservers=pserver_endpoints, + trainers=trainers) if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) exe.run(pserver_startup)