diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index d31943289d7a153ccf4320cab877cfc5f541e7cc..b40cbdcc1b1bf73b70b24c59d2ca1825a74fef68 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -247,7 +247,8 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS graph build_strategy collective_helper fast_threaded_ssa_graph_executor variable_helper) -cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS executor) +cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS + conditional_block_op executor) cc_library(prune SRCS prune.cc DEPS framework_proto boost) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index f6f3098613ba194bea90a36efc3153cf63d2db5b..4951ada9bd55a7e19d343b6385d57becb90d79ae 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -19,6 +19,7 @@ limitations under the License. */ #include #include #include // NOLINT +#include #include #include // NOLINT #include // NOLINT @@ -313,6 +314,10 @@ class DownpourWorker : public HogwildWorker { std::map> dense_value_names_; std::map table_dependency_; std::vector> copy_dense_tables_; + // multitask + std::map cond2table_map_; + std::set condvalue_set_; + bool flag_partial_push_; private: // std::vector dump_param_; diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 00f721701a4a55980c8df3079d88cbeeba116f49..e2c85ab3905ffebdfacaf821951afd7e9f924f92 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include +#include #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/platform/cpu_helper.h" @@ -65,6 +67,13 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { } } + flag_partial_push_ = false; + for (auto& m : param_.program_config(0).partial_pushdense_condtable_map()) { + cond2table_map_[m.key()] = m.value(); + condvalue_set_.insert(m.value()); + flag_partial_push_ = true; + } + skip_ops_.resize(param_.skip_ops_size()); for (int i = 0; i < param_.skip_ops_size(); ++i) { skip_ops_[i] = param_.skip_ops(i); @@ -876,14 +885,42 @@ void DownpourWorker::TrainFiles() { #endif if (need_to_push_dense_) { - for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); - ++i) { - uint64_t tid = static_cast( - param_.program_config(0).push_dense_table_id(i)); - fleet_ptr_->PushDenseVarsAsync( - *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_, - scale_datanorm_, cur_batch); + if (flag_partial_push_) { + Variable* var = (*thread_scope_).FindVar("cond_tag"); + LoDTensor* tensor = var->GetMutable(); + // check type in python code + int64_t* cond_value_batch = tensor->data(); + + for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { + uint64_t tid = static_cast( + param_.program_config(0).push_dense_table_id(i)); + if (condvalue_set_.find(tid) != condvalue_set_.end()) { + // common dense table must push dense + if (cond2table_map_[cond_value_batch[0]] != tid) { + // can't push dense + continue; + } + } + + VLOG(3) << "push multitask dense gradient " << tid; + fleet_ptr_->PushDenseVarsAsync( + *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_, + scale_datanorm_, cur_batch); + } + + } else { + for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); + ++i) { + uint64_t tid = static_cast( + param_.program_config(0).push_dense_table_id(i)); + + fleet_ptr_->PushDenseVarsAsync( + *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_, + scale_datanorm_, cur_batch); + } } + VLOG(3) << "push dense gradient done."; // the following code should be more precise and clean diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 1117d676a5ece5b97a50b6290781f3bbc853cf7a..9aea9d4a83284d1f598c6536ddf1011c3c54e00a 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker_factory.h" +#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/platform/cpu_helper.h" #include "paddle/fluid/platform/lodtensor_printer.h" @@ -47,6 +48,8 @@ void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) { ops_.push_back(local_op_ptr); continue; } + operators::PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + program, 0, ops_); } void HogwildWorker::CreateThreadScope(const ProgramDesc &program) { diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index 1985742fc4aa6a0fc67f552f2b69902840a00d0f..87de436617e11f468272760235c536276dd5c8c1 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -148,12 +148,17 @@ message CopyTableConfig { repeated TableDependencyMap table_denpendency_map = 12; } +message CondTableMap { + required int32 key = 1; + required int32 value = 2; +} message ProgramConfig { required string program_id = 1; repeated int32 push_sparse_table_id = 2; repeated int32 push_dense_table_id = 3; repeated int32 pull_sparse_table_id = 4; repeated int32 pull_dense_table_id = 5; + repeated CondTableMap partial_pushdense_condtable_map = 10; } message PullDenseWorkerParameter { diff --git a/paddle/fluid/operators/controlflow/CMakeLists.txt b/paddle/fluid/operators/controlflow/CMakeLists.txt index 680abc5ddffc3ab386769a1cfe21fcc21a2aff4b..4d409ed00a0b3ce8bdb547cdfabe5713d5ce2343 100644 --- a/paddle/fluid/operators/controlflow/CMakeLists.txt +++ b/paddle/fluid/operators/controlflow/CMakeLists.txt @@ -1,5 +1,7 @@ include(operators) -register_operators(DEPS naive_executor) +register_operators(EXCLUDES conditional_block_op DEPS naive_executor) + +cc_library(conditional_block_op SRCS conditional_block_op.cc DEPS executor) cc_library(op_variant SRCS op_variant.cc DEPS operator proto_desc) cc_library(conditional_block_op_helper SRCS conditional_block_op_helper.cc DEPS operator op_variant conditional_block_op) cc_library(recurrent_op_helper SRCS recurrent_op_helper.cc DEPS operator op_variant recurrent_op) diff --git a/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc b/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc index 00b86121c0ddab5d53bf4c417ce27885ae0deb88..500e1ccea92c7c6bfc53f8c160a2af576d344cdc 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc +++ b/paddle/fluid/operators/controlflow/conditional_block_op_helper.cc @@ -162,6 +162,32 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl( program, &fwd_ops, &bwd_ops); } +void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + const framework::ProgramDesc &program, int block_id, + const std::vector &all_ops) { + // If block_id is not 0, returns + // This is because all conditional_block_ops and conditional_block_grad_ops + // in the whole program would be processed when block_id is 0 (i.e. + // when Executor::Run() or ParallelExecutor constructs). + + // What's more, all conditional_block_ops and conditional_block_grad_ops + // must be processed when block_id is zero. If not, conditional_block_op + // may run first and erase variables used in conditional_block_grad_op, + // and in this moment, conditional_block_grad_ops may be not constructed yet. + if (block_id != 0) return; + + std::vector fwd_ops, bwd_ops; + for (auto *op : all_ops) { + if (op->Type() == "conditional_block") { + fwd_ops.emplace_back(op); + } else if (op->Type() == "conditional_block_grad") { + bwd_ops.emplace_back(op); + } + } + + PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl( + program, &fwd_ops, &bwd_ops); +} void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( const framework::ProgramDesc &program, diff --git a/paddle/fluid/operators/controlflow/conditional_block_op_helper.h b/paddle/fluid/operators/controlflow/conditional_block_op_helper.h index abaaa8976065ca5636369036a0701d3d5bdfab00..22eb2ece4b05b8ad7fad3acdc545e3c98d211f31 100644 --- a/paddle/fluid/operators/controlflow/conditional_block_op_helper.h +++ b/paddle/fluid/operators/controlflow/conditional_block_op_helper.h @@ -33,6 +33,10 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( const framework::ProgramDesc &program, int block_id, const std::vector> &all_ops); +void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( + const framework::ProgramDesc &program, int block_id, + const std::vector &all_ops); + void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp( const framework::ProgramDesc &program, const std::vector &ifelse_ops, diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index 4796cd5ada420567fa126154cc1ac28badc0f2c0..ec91417a0f2eede673641c017ffee7ca63bba278 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -221,6 +221,13 @@ class DownpourSGD(DeviceWorker): for i in program_configs[program_id]["pull_dense"]: pc.pull_dense_table_id.extend([i]) dense_table_set.add(i) + # code for partial push dense table such as multitask + if "cond2denseid" in program_configs[program_id]: + cond2denseid = program_configs[program_id]["cond2denseid"] + for key, value in cond2denseid.items(): + mc_map = pc.partial_pushdense_condtable_map.add() + mc_map.key = key + mc_map.value = value break trainer_desc.device_worker_name = opt_info.get("worker_class", diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 5cd1aa884a928db4980933091d951010ce347444..0189bc2bd7407dc8aed0976d08e8d8ad052332ba 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -167,6 +167,113 @@ class DistributedAdam(DistributedOptimizerImplBase): ret_list.append(x[0]) return ret_list + def _if_last_block(self, op, _equal_dict): + # for conditional_block op + cond_str = op.input('Cond')[0] + bool_test = False + if cond_str.startswith('equal'): + bool_test = True + vars_ = op.input('Input') + equal_keys = _equal_dict.keys() + for var_cond in vars_: + if var_cond in equal_keys: + if bool_test: + print("the conditional block is error") + return False + return True + + def _generte_cond_para_map(self, op, _fill_value_dict, _equal_fill_dict, + _now_program, _all_params): + # generate cond value to parameter map recursively + cond_str = op.input('Cond')[0] + vars_ = op.input('Input') + + if self._if_last_block(op, _equal_fill_dict): + vars_ = op.input('Input') + cond_key = "" + if cond_str.startswith('equal'): + cond_key = int(_fill_value_dict[_equal_fill_dict[cond_str]]) + else: + cond_key = -1 + p_list = [] + for var_cond in vars_: + if var_cond in _all_params: + p_list.append(var_cond) + + self._cond_params[cond_key] = p_list + self._other_params.extend(p_list) + else: + ops_cond = _now_program.block(int(op.attr('sub_block').id)).ops + for op in ops_cond: + if op.type == 'conditional_block': + self._generte_cond_para_map(op, _fill_value_dict, + _equal_fill_dict, _now_program, + _all_params) + + def _has_conditional_block(self, loss): + now_program = loss.block.program + root_block = now_program.block(0) + ops_ = root_block.ops + for op in ops_: + if op.type == 'conditional_block': + return True + return False + + def _check_params_grads(self, params, grads): + if len(params) != len(grads): + raise ValueError("params size != grads size, %s vs %s" % + (len(params), len(grads))) + + pname2grad = dict() + for i in range(len(params)): + pname = params[i].name + gname = grads[i].name + if pname != gname[:-5]: + raise ValueError(" params != grads , %s vs %s" % (pname, gname)) + pname2grad[pname] = grads[i] + + return pname2grad + + def _generate_multi_dense_table(self, + params, + grads, + cond_params, + other_params, + sparse_table_names, + dense_table_id=0): + # generate multi dense table by cond value + pname2grad = self._check_params_grads(params, grads) + root_params_list = [] + root_grads_list = [] + dense_tables = [] + for i, p in enumerate(params): + if p.name not in other_params and p.name not in sparse_table_names: + root_params_list.append(p) + root_grads_list.append(grads[i]) + if len(root_params_list) > 0: + dense_tables.append(dense_table_id) + dense_table_id += 1 + lists_params = [[] for i in range(len(cond_params.keys()))] + lists_grads = [[] for i in range(len(cond_params.keys()))] + + key_id = 0 + name2key = dict() + cond2denseid = dict() + for key, value in cond_params.items(): + cond2denseid[key] = dense_table_id + dense_tables.append(dense_table_id) + dense_table_id += 1 + for v in value: + name2key[v] = key_id + key_id += 1 + + for p in params: + if p.name in other_params: + lists_params[name2key[p.name]].append(p) + lists_grads[name2key[p.name]].append(pname2grad[p.name]) + + return dense_tables, cond2denseid, lists_params, lists_grads, root_params_list, root_grads_list + def _minimize(self, losses, startup_program=None, @@ -215,6 +322,31 @@ class DistributedAdam(DistributedOptimizerImplBase): no_grad_set), key=lambda x: x[0].name) + # has condition_block op means multi-task + flag_multi_task = self._has_conditional_block(loss) + if flag_multi_task: + self._cond_params = dict() + self._other_params = [] + now_program = loss.block.program + root_block = now_program.block(0) + all_params = [] + for par in root_block.all_parameters(): + all_params.append(par.name) + + ops_ = root_block.ops + fill_value_dict = dict() + equal_fill_dict = dict() + for op in ops_: + # conditional_block op must has fill_constant and equal op + if op.type == 'fill_constant': + fill_value_dict[op.output('Out')[0]] = op.attr('value') + if op.type == 'equal': + equal_fill_dict[op.output('Out')[0]] = op.input('Y')[0] + if op.type == 'conditional_block': + self._generte_cond_para_map(op, fill_value_dict, + equal_fill_dict, + now_program, all_params) + if prog_id not in program_id_set: program_id_set.add(prog_id) sparse_table = self._find_multi_distributed_lookup_table([loss]) @@ -402,17 +534,65 @@ class DistributedAdam(DistributedOptimizerImplBase): data_norm_grads.append(i[1]) if not is_data_norm_data: grads.append(i[1]) + # for new dense table + multi_task_dense_tables_push = [] + multi_task_dense_tables_pull = [] + if flag_multi_task: + dense_tables, cond2denseid, lists_params, lists_grads, root_params_list, root_grads_list = self._generate_multi_dense_table( + params, grads, self._cond_params, + self._other_params, sparse_table_names, + dense_table_index) + program_configs[program_id][ + 'cond2denseid'] = cond2denseid + multi_task_dense_tables_push = dense_tables + multi_task_dense_tables_pull = dense_tables[:] if strategy.get('dense_table') is not None: - server.add_dense_table(dense_table_index, params, grads, - strategy['dense_table'], - sparse_table_names) + if flag_multi_task: + server_dense_table_index = dense_table_index + if len(root_params_list) > 0: + server.add_dense_table( + server_dense_table_index, root_params_list, + root_grads_list, strategy['dense_table'], + sparse_table_names) + server_dense_table_index += 1 + + for i in range(len(lists_params)): + server.add_dense_table( + server_dense_table_index, lists_params[i], + lists_grads[i], strategy['dense_table'], + sparse_table_names) + server_dense_table_index += 1 + else: + server.add_dense_table( + dense_table_index, params, grads, + strategy['dense_table'], sparse_table_names) + else: server.add_dense_table(dense_table_index, params, grads, None, sparse_table_names) - worker.add_dense_table( - dense_table_index, self._learning_rate, params, grads, - dense_start_table_id, sparse_table_names) + + if flag_multi_task: + + if len(root_params_list) > 0: + worker.add_dense_table( + dense_table_index, self._learning_rate, + root_params_list, root_grads_list, + dense_start_table_id, sparse_table_names) + dense_table_index += 1 + + for i in range(len(lists_params)): + worker.add_dense_table( + dense_table_index, self._learning_rate, + lists_params[i], lists_grads[i], + dense_start_table_id, sparse_table_names) + dense_table_index += 1 + + dense_table_index -= 1 + else: + worker.add_dense_table( + dense_table_index, self._learning_rate, params, + grads, dense_start_table_id, sparse_table_names) if FLEET_GLOBAL_DICT["enable"]: cur_prog = losses[loss_index].block.program @@ -430,15 +610,28 @@ class DistributedAdam(DistributedOptimizerImplBase): program_id] and "push_dense" in program_configs[ program_id] and len(program_configs[program_id][ "pull_dense"]) > 0: - program_configs[program_id]["pull_dense"].extend( - [dense_table_index]) - program_configs[program_id]["push_dense"].extend( - [dense_table_index]) + if flag_multi_task: + program_configs[program_id]["pull_dense"].extend( + multi_task_dense_tables_pull) + program_configs[program_id]["push_dense"].extend( + multi_task_dense_tables_push) + else: + program_configs[program_id]["pull_dense"].extend( + [dense_table_index]) + program_configs[program_id]["push_dense"].extend( + [dense_table_index]) else: - program_configs[program_id][ - "pull_dense"] = [dense_table_index] - program_configs[program_id][ - "push_dense"] = [dense_table_index] + if flag_multi_task: + program_configs[program_id][ + "pull_dense"] = multi_task_dense_tables_pull + program_configs[program_id][ + "push_dense"] = multi_task_dense_tables_push + else: + program_configs[program_id][ + "pull_dense"] = [dense_table_index] + program_configs[program_id][ + "push_dense"] = [dense_table_index] + if len(data_norm_params) != 0 and len(data_norm_grads) != 0: dense_table_index += 1 if strategy.get('datanorm_table') is not None: