From 8acad27e8d74ee17f7ded7c0d1f0dc0df1633637 Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Tue, 13 Feb 2018 14:11:07 +0800 Subject: [PATCH] refine code --- paddle/fluid/operators/listen_and_serv_op.cc | 15 +- paddle/fluid/operators/recv_op.cc | 4 +- paddle/fluid/operators/send_op.cc | 4 +- .../paddle/v2/fluid/distribute_transpiler.py | 392 +++++++++--------- 4 files changed, 211 insertions(+), 204 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 9c336678471..4409df4995b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -75,8 +75,8 @@ class ListenAndServOp : public framework::OperatorBase { server_thread_->join(); } - void Run(const framework::Scope &scope, - const platform::Place &dev_place) const override { + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(dev_place); framework::Scope &recv_scope = scope.NewScope(); @@ -101,7 +101,6 @@ class ListenAndServOp : public framework::OperatorBase { // the gradients arrives, just add suffix 0~n and merge the gradient. rpc_service_->SetCond(0); size_t recv_var_cnt = 0; - size_t update_param_cnt = 0; int batch_barrier = 0; while (batch_barrier != fan_in) { const detail::MessageWithName &v = rpc_service_->Get(); @@ -128,29 +127,26 @@ class ListenAndServOp : public framework::OperatorBase { } } } - VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; if (exit_flag) { rpc_service_->ShutDown(); } - VLOG(3) << "run optimize graph..."; try { executor.Run(*program, &recv_scope, block->ID(), /*global_block*/ false /*create_local_scope*/, false /*create_vars*/); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } - // Reset the received sparse variables, the sum operator would not // sum the input sparse variables which rows is empty at the next // mini-batch. - // TOOD(Yancey1989): move the reset action into an operator, we couldn't + // TODO(Yancey1989): move the reset action into an operator, we couldn't // have any hide logic in the operator. for (auto &var : sparse_vars) { var->GetMutable()->mutable_rows()->clear(); } rpc_service_->SetCond(1); - rpc_service_->WaitClientGet(update_param_cnt); - grads_counter_.clear(); + // FIXME(typhoonzero): use another condition to sync wait clients get. + rpc_service_->WaitClientGet(ins.size()); sparse_vars.clear(); } // while(true) } @@ -158,7 +154,6 @@ class ListenAndServOp : public framework::OperatorBase { protected: std::shared_ptr rpc_service_; std::shared_ptr server_thread_; - mutable std::unordered_map grads_counter_; }; class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { diff --git a/paddle/fluid/operators/recv_op.cc b/paddle/fluid/operators/recv_op.cc index c093f60ceed..17b57b5d45a 100644 --- a/paddle/fluid/operators/recv_op.cc +++ b/paddle/fluid/operators/recv_op.cc @@ -32,8 +32,8 @@ class RecvOp : public framework::OperatorBase { const framework::AttributeMap& attrs) : OperatorBase(type, inputs, outputs, attrs) {} - void Run(const framework::Scope& scope, - const platform::Place& place) const override { + void RunImpl(const framework::Scope& scope, + const platform::Place& place) const override { auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index b241f738cbf..39b6c0e8c51 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -48,8 +48,8 @@ class SendOp : public framework::OperatorBase { const framework::AttributeMap& attrs) : OperatorBase(type, inputs, outputs, attrs) {} - void Run(const framework::Scope& scope, - const platform::Place& place) const override { + void RunImpl(const framework::Scope& scope, + const platform::Place& place) const override { auto ins = Inputs("X"); auto outs = Outputs("Out"); std::vector epmap = Attr>("epmap"); diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 41630998cf5..81317df9834 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -147,6 +147,21 @@ class DistributeTranspiler: Use different methods to split trainable variables to different parameter servers. + Steps to transpile trainer: + 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). + 2. rename splited grad variables to add trainer_id suffix ".trainer_%d". + 3. modify trainer program add split_op to each grad variable. + 4. append send_op to send splited variables to server and fetch + params(splited blocks or origin param) from server. + 5. append concat_op to merge splited blocks to update local weights. + + Steps to transpile pserver: + 1. create new program for parameter server. + 2. create params and grad variables that assigned to current server instance. + 3. create a sub-block in the server side program + 4. append ops that should run on current server instance. + 5. add listen_and_serv op + :param optimize_ops: op list of optimization, should be the return value of Optimizer.minimize :type optimize_ops: list @@ -154,7 +169,7 @@ class DistributeTranspiler: :type params_grads: list :param trainer_id: one unique id for each trainer in a job. :type trainer_id: int - :param program: program to optimize, default is default_main_program + :param program: program to transpile, default is default_main_program :type program: Program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string @@ -174,27 +189,15 @@ class DistributeTranspiler: # like Kubernetes, we should port this to use etcd later when developing # fluid distributed training with fault-tolerance. self.trainer_id = trainer_id - - # steps to transpile: - # 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). - # 2. modify trainer program add split_op to each Grad. - # 3. append send_op to trainer. - # 4. append concat_op to trainer to update local weights. - # 5. create new program for parameter server. - # 6. create parameter server program by split_method generated endpoint->VarBlock - # 7. update startup_program, rename variables to variables with trainer_id - pserver_endpoints = pservers.split(",") # step1 param_list = [pg[0] for pg in params_grads] grad_list = [pg[1] for pg in params_grads] - # TODO: add split selected rows support grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) # step2 grad_var_mapping = self._append_split_op(program, grad_blocks) - # step3 send_inputs = [] send_outputs = [] @@ -222,12 +225,12 @@ class DistributeTranspiler: rpc_client_var = program.global_block().create_var( name="RPC_CLIENT_VAR", - psersistable=True, + persistable=True, dtype='float32', # dtype and shape is not used in fact shape=[0]) # create send_op - send_op = program.global_block().append_op( + program.global_block().append_op( type="send", inputs={"X": send_inputs}, outputs={"Out": send_outputs, @@ -239,23 +242,158 @@ class DistributeTranspiler: if len(splited_var) <= 1: continue orig_param = program.global_block().vars[varname] - concat = program.global_block().append_op( + program.global_block().append_op( type="concat", inputs={"X": splited_var}, outputs={"Out": [orig_param]}, attrs={"axis": 0}) - # step 7 - startup_prog = default_startup_program() - for varname in startup_prog.global_block().vars.keys(): - if varname in param_var_mapping and \ - len(param_var_mapping[varname]) == 1: - new_var_name = "%s.trainer_%d" % \ - (varname, self.trainer_id) - startup_prog.global_block().rename_var(varname, new_var_name) - - def _create_vars_from_blocklist(self, program, block_list): - # Create respective variables using the block_list + def get_trainer_program(self): + # remove optimize ops and add a send op to main_program + self.program.global_block().delete_ops(self.optimize_ops) + return self.program + + def get_pserver_program(self, endpoint): + """ + Get pserver side program using the endpoint. + NOTE: assume blocks of the same variable is not distributed + on the same pserver, only change param/grad varnames for + trainers to fetch. + """ + # step1 + pserver_program = Program() + # step2 + recv_inputs = [] + for v in self.param_grad_ep_mapping[endpoint]["params"]: + self._clone_var(pserver_program.global_block(), v) + for v in self.param_grad_ep_mapping[endpoint]["grads"]: + # create vars for each trainer in global scope, so + # we don't need to create them when grad arrives. + # change client side var name to origin name by + # removing ".trainer_%d" suffix + suff_idx = v.name.find(".trainer_") + if suff_idx >= 0: + orig_var_name = v.name[:suff_idx] + pserver_program.global_block().create_var( + name=orig_var_name, + persistable=True, + dtype=v.dtype, + shape=v.shape) + print("create origin var: ", orig_var_name) + for trainer_id in xrange(self.trainers): + var = pserver_program.global_block().create_var( + name="%s.trainer_%d" % (orig_var_name, trainer_id), + persistable=False, + dtype=v.dtype, + shape=v.shape) + recv_inputs.append(var) + print("create per trainer var: ", var.name) + # step3 + optimize_block = pserver_program.create_block(0) + # step 4 + # Create a union-find data struct from optimize ops, + # If two ops are connected, we could add these two ops + # into one set. + ufind = self._create_ufind(self.optimize_ops) + # step 4.2 + # Iterate through the ops and append optimize op which + # located on current pserver + opt_op_on_pserver = [] + for _, op in enumerate(self.optimize_ops): + if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): + opt_op_on_pserver.append(op) + # step 4.3 + # Iterate through the ops, and if an op and the optimize ops + # which located on current pserver are in one set, then + # append it into the sub program. + for _, op in enumerate(self.optimize_ops): + for _, opt_op in enumerate(opt_op_on_pserver): + if ufind.is_connected(op, opt_op): + if self._is_opt_op(op): + self._append_pserver_ops(optimize_block, op, endpoint) + else: + self._append_pserver_non_opt_ops(optimize_block, op) + break + # step5 append the listen_and_serv op + pserver_program.global_block().append_op( + type="listen_and_serv", + inputs={'X': recv_inputs}, + outputs={}, + attrs={ + "OptimizeBlock": optimize_block, + "endpoint": endpoint, + "Fanin": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program + + def get_startup_program(self, endpoint, pserver_program): + """ + Get startup program for current parameter server. + Modify operator input variables if there are variables that + were split to several blocks. + """ + s_prog = Program() + orig_s_prog = framework.default_startup_program() + params = self.param_grad_ep_mapping[endpoint]["params"] + + def _get_splited_name_and_shape(varname): + for idx, splited_param in enumerate(params): + pname = splited_param.name + if same_or_split_var(pname, varname) and varname != pname: + return pname, splited_param.shape + return "", [] + + # 1. create vars in pserver program to startup program + pserver_vars = pserver_program.global_block().vars + created_var_map = dict() + for _, var in pserver_vars.iteritems(): + tmpvar = s_prog.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + created_var_map[var.name] = tmpvar + + # 2. rename op outputs + for op in orig_s_prog.global_block().ops: + new_inputs = dict() + new_outputs = dict() + # do not append startup op if var is not on this pserver + op_on_pserver = False + for key in op.output_names: + newname, _ = _get_splited_name_and_shape(op.output(key)[0]) + if newname: + op_on_pserver = True + new_outputs[key] = created_var_map[newname] + elif op.output(key)[0] in pserver_vars: + op_on_pserver = True + new_outputs[key] = pserver_vars[op.output(key)[0]] + + # most startup program ops have no inputs + new_inputs = self._get_input_map_from_op(pserver_vars, op) + + if op_on_pserver: + if op.type in [ + "gaussian_random", "fill_constant", "uniform_random" + ]: + op.attrs["shape"] = new_outputs["Out"].shape + s_prog.global_block().append_op( + type=op.type, + inputs=new_inputs, + outputs=new_outputs, + attrs=op.attrs) + return s_prog + + # ====================== private transpiler functions ===================== + def _create_vars_from_blocklist(self, + program, + block_list, + add_trainer_suffix=False): + """ + NOTE: only grads need to be named for different trainers, use + add_trainer_suffix to rename the grad vars. + """ block_map = dict() var_mapping = dict() for block_str in block_list: @@ -266,12 +404,15 @@ class DistributeTranspiler: for varname, splited in block_map.iteritems(): orig_var = program.global_block().var(varname) if len(splited) == 1: - # rename var to the trainer_id var - new_var_name = "%s.trainer_%d" % \ - (orig_var.name, self.trainer_id) - program.global_block().rename_var(varname, new_var_name) - var_mapping[varname] = \ - [program.global_block().var(new_var_name)] + if add_trainer_suffix: + new_var_name = "%s.trainer_%d" % \ + (orig_var.name, self.trainer_id) + program.global_block().rename_var(varname, new_var_name) + var_mapping[varname] = \ + [program.global_block().var(new_var_name)] + else: + var_mapping[varname] = \ + [program.global_block().var(orig_var.name)] continue var_mapping[varname] = [] @@ -286,10 +427,16 @@ class DistributeTranspiler: splited_shape = [rows] if len(orig_shape) >= 2: splited_shape.extend(orig_shape[1:]) + new_var_name = "" + if add_trainer_suffix: + new_var_name = "%s.block%d.trainer_%d" % \ + (varname, i, self.trainer_id) + else: + new_var_name = "%s.block%d" % \ + (varname, i) var = program.global_block().create_var( - name="%s.block%d.trainer_%d" % - (varname, i, self.trainer_id), - psersistable=False, + name=new_var_name, + persistable=False, dtype=orig_var.dtype, type=orig_var.type, shape=splited_shape) # flattend splited var @@ -305,13 +452,12 @@ class DistributeTranspiler: dtype=var.dtype, type=var.type, lod_level=var.lod_level, - # HACK: let all param in pserver be persistable so the child - # program in recv can get them persistable=True) def _append_split_op(self, program, gradblocks): # Split variables that need to be split and append respective ops - var_mapping = self._create_vars_from_blocklist(program, gradblocks) + var_mapping = self._create_vars_from_blocklist( + program, gradblocks, add_trainer_suffix=True) for varname, splited_vars in var_mapping.iteritems(): # variable that don't need to split have empty splited_vars if len(splited_vars) <= 1: @@ -341,24 +487,6 @@ class DistributeTranspiler: "[LOD_TENSOR, SELECTED_ROWS]") return var_mapping - def get_trainer_program(self): - # remove optimize ops and add a send op to main_program - self.program.global_block().delete_ops(self.optimize_ops) - return self.program - - def _create_var_for_trainers(self, block, var, trainers): - # For each trainer, create the necessary variables - var_list = [] - for i in xrange(trainers): - var_each = block.create_var( - name="%s.trainer_%d" % (var.name, i), - psersistable=var.persistable, - dtype=var.dtype, - type=var.type, - shape=var.shape) - var_list.append(var_each) - return var_list - def _get_optimizer_input_shape(self, op_type, varkey, orig_shape, param_shape): """ @@ -386,6 +514,13 @@ class DistributeTranspiler: pass return orig_shape + def _orig_varname(self, varname): + suff_idx = varname.find(".trainer_") + orig_var_name = "" + if suff_idx >= 0: + orig_var_name = varname[:suff_idx] + return orig_var_name + def _append_pserver_ops(self, optimize_block, opt_op, endpoint): program = optimize_block.program pserver_block = program.global_block() @@ -396,18 +531,23 @@ class DistributeTranspiler: if key == "Grad": grad_block = None for g in self.param_grad_ep_mapping[endpoint]["grads"]: - if same_or_split_var(g.name, opt_op.input(key)[0]): + if same_or_split_var( + self._orig_varname(g.name), opt_op.input(key)[0]): grad_block = g break if not grad_block: # do not append this op if current endpoint # is not dealing with this grad block return - merged_var = pserver_block.vars[grad_block.name] - # append merging ops if trainers > 1 + merged_var = \ + pserver_block.vars[self._orig_varname(grad_block.name)] if self.trainers > 1: - vars2merge = self._create_var_for_trainers( - pserver_block, grad_block, self.trainers) + vars2merge = [] + for i in xrange(self.trainers): + per_trainer_name = "%s.trainer_%d" % \ + (self._orig_varname(grad_block.name), i) + vars2merge.append(pserver_block.vars[per_trainer_name]) + optimize_block.append_op( type="sum", inputs={"X": vars2merge}, @@ -550,76 +690,6 @@ class DistributeTranspiler: return False return False - def get_pserver_program(self, endpoint): - """ - Get pserver side program using the endpoint - - NOTE: assume blocks of the same variable is not distributed - on the same pserver, only change param/grad varnames for - trainers to fetch. For each pserver endpoint, server side - program must be a sub-set of the original optimization program. - """ - # step5 - pserver_program = Program() - recv_inputs = [] - for v in self.param_grad_ep_mapping[endpoint]["params"]: - self._clone_var(pserver_program.global_block(), v) - for v in self.param_grad_ep_mapping[endpoint]["grads"]: - # create vars for each trainer in global scope, so - # we don't need to create them when grad arrives. - pserver_program.global_block().create_var( - name=v.name, persistable=True, dtype=v.dtype, shape=v.shape) - for trainer_id in xrange(self.trainers): - # change client side var name to origin name by - # removing ".trainer_%d" suffix - suff_idx = v.name.find(".trainer_") - if suff_idx >= 0: - orig_var_name = v.name[:suff_idx] - var = pserver_program.global_block().create_var( - name="%s.trainer_%d" % (orig_var_name, trainer_id), - persistable=True, - dtype=v.dtype, - shape=v.shape) - recv_inputs.append(var) - # step6 - optimize_block = pserver_program.create_block(0) - # step 6.1 - # Create a union-find data struct by optimize ops, - # If two ops are connected, we could add these two ops - # into one set. - ufind = self._create_ufind(self.optimize_ops) - # step 6.2 - # Iterate through the ops and append optimize op which - # located on current pserver - opt_op_on_pserver = [] - for _, op in enumerate(self.optimize_ops): - if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): - opt_op_on_pserver.append(op) - # step 6.3 - # Iterate through the ops, and if an op and the optimize ops - # which located on current pserver are in one set, then - # append it into the sub program. - for _, op in enumerate(self.optimize_ops): - for _, opt_op in enumerate(opt_op_on_pserver): - if ufind.is_connected(op, opt_op): - if self._is_opt_op(op): - self._append_pserver_ops(optimize_block, op, endpoint) - else: - self._append_pserver_non_opt_ops(optimize_block, op) - break - # Append the listen_and_serv op - pserver_program.global_block().append_op( - type="listen_and_serv", - inputs={'X': recv_inputs}, - outputs={}, - attrs={ - "OptimizeBlock": optimize_block, - "endpoint": endpoint, - "Fanin": self.trainers - }) - pserver_program.sync_with_cpp() - return pserver_program - def _get_input_map_from_op(self, varmap, op): iomap = dict() for key in op.input_names: @@ -643,61 +713,3 @@ class DistributeTranspiler: else: iomap[key] = vars return iomap - - def get_startup_program(self, endpoint, pserver_program): - """ - Get startup program for current parameter server. - Modify operator input variables if there are variables that - were split to several blocks. - """ - s_prog = Program() - orig_s_prog = framework.default_startup_program() - params = self.param_grad_ep_mapping[endpoint]["params"] - - def _get_splited_name_and_shape(varname): - for idx, splited_param in enumerate(params): - pname = splited_param.name - if same_or_split_var(pname, varname) and varname != pname: - return pname, splited_param.shape - return "", [] - - # 1. create vars in pserver program to startup program - pserver_vars = pserver_program.global_block().vars - created_var_map = dict() - for _, var in pserver_vars.iteritems(): - tmpvar = s_prog.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - created_var_map[var.name] = tmpvar - - # 2. rename op outputs - for op in orig_s_prog.global_block().ops: - new_inputs = dict() - new_outputs = dict() - # do not append startup op if var is not on this pserver - op_on_pserver = False - for key in op.output_names: - newname, _ = _get_splited_name_and_shape(op.output(key)[0]) - if newname: - op_on_pserver = True - new_outputs[key] = created_var_map[newname] - elif op.output(key)[0] in pserver_vars: - op_on_pserver = True - new_outputs[key] = pserver_vars[op.output(key)[0]] - - # most startup program ops have no inputs - new_inputs = self._get_input_map_from_op(pserver_vars, op) - - if op_on_pserver: - if op.type in [ - "gaussian_random", "fill_constant", "uniform_random" - ]: - op.attrs["shape"] = new_outputs["Out"].shape - s_prog.global_block().append_op( - type=op.type, - inputs=new_inputs, - outputs=new_outputs, - attrs=op.attrs) - return s_prog -- GitLab