diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 68fcc104d48b2b39929ed2198a2dd2eabae10e94..8f73b3d478e6da65dfa575cfed862dc4e542efa5 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -46,6 +46,7 @@ message OpDesc { repeated bool bools = 11; optional int32 block_idx = 12; optional int64 l = 13; + repeated int32 blocks_idx = 14; }; message Var { diff --git a/paddle/fluid/framework/op_desc.cc b/paddle/fluid/framework/op_desc.cc index f92769192c218eb7cdc2350ff6e4721b45005806..a190199f1cb1361f67f20c755b8e7ef52c284adc 100644 --- a/paddle/fluid/framework/op_desc.cc +++ b/paddle/fluid/framework/op_desc.cc @@ -211,6 +211,12 @@ void OpDesc::SetBlockAttr(const std::string &name, BlockDesc *block) { need_update_ = true; } +void OpDesc::SetBlocksAttr(const std::string &name, + std::vector blocks) { + this->attrs_[name] = blocks; + need_update_ = true; +} + void OpDesc::SetAttrMap( const std::unordered_map &attr_map) { attrs_ = attr_map; @@ -305,6 +311,13 @@ struct SetAttrDescVisitor : public boost::static_visitor { void operator()(const std::vector &v) const { VectorToRepeated(v, attr_->mutable_bools()); } + void operator()(const std::vector &v) const { + std::vector blocks_idx; + for (auto blk : v) { + blocks_idx.push_back(blk->ID()); + } + VectorToRepeated(blocks_idx, attr_->mutable_blocks_idx()); + } void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); } void operator()(int64_t v) const { attr_->set_l(v); } void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); } diff --git a/paddle/fluid/framework/op_desc.h b/paddle/fluid/framework/op_desc.h index a02d3e269129596f65a2fb346e76c1af7fbead95..74dd8ec002005dd080424b48b5db1a2574a6974f 100644 --- a/paddle/fluid/framework/op_desc.h +++ b/paddle/fluid/framework/op_desc.h @@ -77,6 +77,8 @@ class OpDesc { void SetBlockAttr(const std::string &name, BlockDesc *block); + void SetBlocksAttr(const std::string &name, std::vector blocks); + Attribute GetAttr(const std::string &name) const; Attribute GetNullableAttr(const std::string &name) const; diff --git a/paddle/fluid/framework/type_defs.h b/paddle/fluid/framework/type_defs.h index 4879209ece9fdfea91e484a4118c00a2a2a2b4f7..e099e40f121ff13657e563eb608feecbca0551be 100644 --- a/paddle/fluid/framework/type_defs.h +++ b/paddle/fluid/framework/type_defs.h @@ -35,7 +35,8 @@ using VariableNameMap = std::map>; using Attribute = boost::variant, std::vector, std::vector, bool, - std::vector, BlockDesc*, int64_t>; + std::vector, BlockDesc*, int64_t, + std::vector>; using AttributeMap = std::unordered_map; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 0f2863cc5991224136175813ef93b9bea4bd0712..3fc5ae6f2cf285d001a0c59e215d738c63b994f7 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,14 +101,11 @@ void ListenAndServOp::RunSyncLoop( framework::Scope *recv_scope, const std::vector &prefetch_block_id_list) const { size_t num_blocks = program->Size(); - auto skip_sub_blks = Attr>("skip_sub_blks"); + auto optimize_blocks = + Attr>(kOptimizeBlocks); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - std::vector optimize_block_id_list; - for (auto *block : optimize_blocks) { - optimize_block_id_list.push_back(block->ID()); - } auto optimize_prepared = executor->Prepare(*program, optimize_block_id_list); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( @@ -136,10 +133,10 @@ void ListenAndServOp::RunSyncLoop( std::vector parallel_blkids; parallel_blkids.push_back(optimize_blocks[0]->ID()); double ts = GetTimestamp(); - for (size_t i = 1; i < optimize_block_id_list.size(); ++i) { + for (size_t i = 1; i < optimize_blocks.size(); ++i) { // skip the first optimize block because it is already in the // parallel_blkids. - int blkid = optimize_block_id_list[i]; + int blkid = optimize_blocks[i]->ID(); if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, recv_scope); @@ -263,7 +260,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, Attr>(kOptimizeBlocks); PADDLE_ENFORCE(optimize_blocks.size() > 1, "optimize blocks should be 1 at least on the pserver side."); - auto *program = optimize_block[0]->Program(); + auto *program = optimize_blocks[0]->Program(); framework::Executor executor(dev_place); // prepare for prefetch @@ -340,8 +337,8 @@ class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { "a map from grad name to it's optimize block id") .SetDefault({}); AddAttr("sync_mode", "if works at sync_mode or not").SetDefault(true); - AddAttr(kOptimizeBlocks, - "Optimize blocks to run on server side."); + AddAttr>( + kOptimizeBlocks, "Optimize blocks to run on server side."); AddAttr>(kPrefetchVarNameToBlockId, "prefetch blocks to run on server side.") .SetDefault({}); diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index bcf6d4dd3087060c016e53722cde80704ef2e834..2d44e1f63cbd77dca32c53af5264ba9971f9c805 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -293,6 +293,7 @@ void BindOpDesc(pybind11::module *m) { .def("set_attr", &pd::OpDesc::SetAttr) .def("attr", &pd::OpDesc::GetAttr) .def("set_block_attr", &pd::OpDesc::SetBlockAttr) + .def("set_blocks_attr", &pd::OpDesc::SetBlocksAttr) .def("set_serialized_attr", [](pd::OpDesc &self, const std::string &name, const pybind11::bytes &seriralized) { diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index db21b1f3c03c40d79084b0dbb57d22f6d41fa270..1843072662541a86a53b33a229ea15e1e25c528c 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -561,6 +561,10 @@ class Operator(object): if isinstance(self.attrs[attr_name], Block): self.desc.set_block_attr(attr_name, self.attrs[attr_name].desc) + elif isinstance(self.attrs[attr_name], list) and \ + all(isinstance(v, Block) for v in self.attrs[attr_name]): + self.desc.set_blocks_attr( + attr_name, [v.desc for v in self.attrs[attr_name]]) elif isinstance(self.attrs[attr_name], core.BlockDesc) or \ isinstance(self.attrs[attr_name], core.ProgramDesc): self.desc.set_serialized_attr( @@ -715,6 +719,8 @@ class Operator(object): self.attrs[name] = val if isinstance(val, Block): self.desc.set_block_attr(name, val.desc) + elif isinstance(val, list) and all(isinstance(v, Block) for v in val): + self.desc.set_blocks_attr(name, [v.desc for v in val]) elif isinstance(val, core.BlockDesc) or \ isinstance(val, core.ProgramDesc): self.desc.set_serialized_attr(name, val.serialize_to_string()) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 391dddcf3e9c4f4e087cac703eacbf56a8dbf80f..676079144ebde419088752d1754d808a4e76f63b 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -396,7 +396,7 @@ class DistributeTranspiler(object): return varname return "" - def __clone_lr_op_sub_block__(op, program, new_block, skip_sub_blks): + def __clone_lr_op_sub_block__(op, program, new_block): if not op.has_attr('sub_block'): return @@ -406,7 +406,6 @@ class DistributeTranspiler(object): # we put the new sub block to new block to follow the block # hierarchy of the original blocks new_sub_block = program.create_block(new_block.idx) - skip_sub_blks.append(new_sub_block.idx) # clone vars for var in origin_block.vars: @@ -416,8 +415,7 @@ class DistributeTranspiler(object): for op in origin_block.ops: self._clone_lr_op(program, new_sub_block, op) # clone sub_block of op - __clone_lr_op_sub_block__(op, program, new_sub_block, - skip_sub_blks) + __clone_lr_op_sub_block__(op, program, new_sub_block) # reset the block of op op.set_attr('sub_block', new_sub_block) @@ -433,8 +431,7 @@ class DistributeTranspiler(object): for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append sub blocks to pserver_program in lr_decay_op - __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block, - skip_sub_blks) + __clone_lr_op_sub_block__(op, pserver_program, lr_decay_block) # append op to the current block grad_to_block_id = []