未验证 提交 5a83496c 编写于 作者: Z zhang wenhui 提交者: GitHub

Multi task (#26002)

* add multitask

* add multitask, test=develop

* fix code style, test=develop

* add partail push dense, test=develop

* fix has_kay in py3, test=develop

* fix, test=develop

* fix, test=develop

* fix, test=develop
上级 7a58431c
......@@ -247,7 +247,8 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS
graph build_strategy collective_helper
fast_threaded_ssa_graph_executor variable_helper)
cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS executor)
cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS
conditional_block_op executor)
cc_library(prune SRCS prune.cc DEPS framework_proto boost)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry
......
......@@ -19,6 +19,7 @@ limitations under the License. */
#include <map>
#include <memory>
#include <mutex> // NOLINT
#include <set>
#include <string>
#include <thread> // NOLINT
#include <unordered_map> // NOLINT
......@@ -313,6 +314,10 @@ class DownpourWorker : public HogwildWorker {
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, uint64_t> table_dependency_;
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
// multitask
std::map<int32_t, uint64_t> cond2table_map_;
std::set<uint64_t> condvalue_set_;
bool flag_partial_push_;
private:
// std::vector<std::string> dump_param_;
......
......@@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <cstdlib>
#include <ctime>
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/platform/cpu_helper.h"
......@@ -65,6 +67,13 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
}
}
flag_partial_push_ = false;
for (auto& m : param_.program_config(0).partial_pushdense_condtable_map()) {
cond2table_map_[m.key()] = m.value();
condvalue_set_.insert(m.value());
flag_partial_push_ = true;
}
skip_ops_.resize(param_.skip_ops_size());
for (int i = 0; i < param_.skip_ops_size(); ++i) {
skip_ops_[i] = param_.skip_ops(i);
......@@ -876,14 +885,42 @@ void DownpourWorker::TrainFiles() {
#endif
if (need_to_push_dense_) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
if (flag_partial_push_) {
Variable* var = (*thread_scope_).FindVar("cond_tag");
LoDTensor* tensor = var->GetMutable<LoDTensor>();
// check type in python code
int64_t* cond_value_batch = tensor->data<int64_t>();
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
if (condvalue_set_.find(tid) != condvalue_set_.end()) {
// common dense table must push dense
if (cond2table_map_[cond_value_batch[0]] != tid) {
// can't push dense
continue;
}
}
VLOG(3) << "push multitask dense gradient " << tid;
fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
}
} else {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
}
}
VLOG(3) << "push dense gradient done.";
// the following code should be more precise and clean
......
......@@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/platform/lodtensor_printer.h"
......@@ -47,6 +48,8 @@ void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) {
ops_.push_back(local_op_ptr);
continue;
}
operators::PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
program, 0, ops_);
}
void HogwildWorker::CreateThreadScope(const ProgramDesc &program) {
......
......@@ -148,12 +148,17 @@ message CopyTableConfig {
repeated TableDependencyMap table_denpendency_map = 12;
}
message CondTableMap {
required int32 key = 1;
required int32 value = 2;
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
repeated int32 push_dense_table_id = 3;
repeated int32 pull_sparse_table_id = 4;
repeated int32 pull_dense_table_id = 5;
repeated CondTableMap partial_pushdense_condtable_map = 10;
}
message PullDenseWorkerParameter {
......
include(operators)
register_operators(DEPS naive_executor)
register_operators(EXCLUDES conditional_block_op DEPS naive_executor)
cc_library(conditional_block_op SRCS conditional_block_op.cc DEPS executor)
cc_library(op_variant SRCS op_variant.cc DEPS operator proto_desc)
cc_library(conditional_block_op_helper SRCS conditional_block_op_helper.cc DEPS operator op_variant conditional_block_op)
cc_library(recurrent_op_helper SRCS recurrent_op_helper.cc DEPS operator op_variant recurrent_op)
......
......@@ -162,6 +162,32 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl(
program, &fwd_ops, &bwd_ops);
}
void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program, int block_id,
const std::vector<framework::OperatorBase *> &all_ops) {
// If block_id is not 0, returns
// This is because all conditional_block_ops and conditional_block_grad_ops
// in the whole program would be processed when block_id is 0 (i.e.
// when Executor::Run() or ParallelExecutor constructs).
// What's more, all conditional_block_ops and conditional_block_grad_ops
// must be processed when block_id is zero. If not, conditional_block_op
// may run first and erase variables used in conditional_block_grad_op,
// and in this moment, conditional_block_grad_ops may be not constructed yet.
if (block_id != 0) return;
std::vector<OpVariant> fwd_ops, bwd_ops;
for (auto *op : all_ops) {
if (op->Type() == "conditional_block") {
fwd_ops.emplace_back(op);
} else if (op->Type() == "conditional_block_grad") {
bwd_ops.emplace_back(op);
}
}
PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl(
program, &fwd_ops, &bwd_ops);
}
void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program,
......
......@@ -33,6 +33,10 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program, int block_id,
const std::vector<std::unique_ptr<framework::OperatorBase>> &all_ops);
void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program, int block_id,
const std::vector<framework::OperatorBase *> &all_ops);
void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program,
const std::vector<framework::OperatorBase *> &ifelse_ops,
......
......@@ -221,6 +221,13 @@ class DownpourSGD(DeviceWorker):
for i in program_configs[program_id]["pull_dense"]:
pc.pull_dense_table_id.extend([i])
dense_table_set.add(i)
# code for partial push dense table such as multitask
if "cond2denseid" in program_configs[program_id]:
cond2denseid = program_configs[program_id]["cond2denseid"]
for key, value in cond2denseid.items():
mc_map = pc.partial_pushdense_condtable_map.add()
mc_map.key = key
mc_map.value = value
break
trainer_desc.device_worker_name = opt_info.get("worker_class",
......
......@@ -167,6 +167,113 @@ class DistributedAdam(DistributedOptimizerImplBase):
ret_list.append(x[0])
return ret_list
def _if_last_block(self, op, _equal_dict):
# for conditional_block op
cond_str = op.input('Cond')[0]
bool_test = False
if cond_str.startswith('equal'):
bool_test = True
vars_ = op.input('Input')
equal_keys = _equal_dict.keys()
for var_cond in vars_:
if var_cond in equal_keys:
if bool_test:
print("the conditional block is error")
return False
return True
def _generte_cond_para_map(self, op, _fill_value_dict, _equal_fill_dict,
_now_program, _all_params):
# generate cond value to parameter map recursively
cond_str = op.input('Cond')[0]
vars_ = op.input('Input')
if self._if_last_block(op, _equal_fill_dict):
vars_ = op.input('Input')
cond_key = ""
if cond_str.startswith('equal'):
cond_key = int(_fill_value_dict[_equal_fill_dict[cond_str]])
else:
cond_key = -1
p_list = []
for var_cond in vars_:
if var_cond in _all_params:
p_list.append(var_cond)
self._cond_params[cond_key] = p_list
self._other_params.extend(p_list)
else:
ops_cond = _now_program.block(int(op.attr('sub_block').id)).ops
for op in ops_cond:
if op.type == 'conditional_block':
self._generte_cond_para_map(op, _fill_value_dict,
_equal_fill_dict, _now_program,
_all_params)
def _has_conditional_block(self, loss):
now_program = loss.block.program
root_block = now_program.block(0)
ops_ = root_block.ops
for op in ops_:
if op.type == 'conditional_block':
return True
return False
def _check_params_grads(self, params, grads):
if len(params) != len(grads):
raise ValueError("params size != grads size, %s vs %s" %
(len(params), len(grads)))
pname2grad = dict()
for i in range(len(params)):
pname = params[i].name
gname = grads[i].name
if pname != gname[:-5]:
raise ValueError(" params != grads , %s vs %s" % (pname, gname))
pname2grad[pname] = grads[i]
return pname2grad
def _generate_multi_dense_table(self,
params,
grads,
cond_params,
other_params,
sparse_table_names,
dense_table_id=0):
# generate multi dense table by cond value
pname2grad = self._check_params_grads(params, grads)
root_params_list = []
root_grads_list = []
dense_tables = []
for i, p in enumerate(params):
if p.name not in other_params and p.name not in sparse_table_names:
root_params_list.append(p)
root_grads_list.append(grads[i])
if len(root_params_list) > 0:
dense_tables.append(dense_table_id)
dense_table_id += 1
lists_params = [[] for i in range(len(cond_params.keys()))]
lists_grads = [[] for i in range(len(cond_params.keys()))]
key_id = 0
name2key = dict()
cond2denseid = dict()
for key, value in cond_params.items():
cond2denseid[key] = dense_table_id
dense_tables.append(dense_table_id)
dense_table_id += 1
for v in value:
name2key[v] = key_id
key_id += 1
for p in params:
if p.name in other_params:
lists_params[name2key[p.name]].append(p)
lists_grads[name2key[p.name]].append(pname2grad[p.name])
return dense_tables, cond2denseid, lists_params, lists_grads, root_params_list, root_grads_list
def _minimize(self,
losses,
startup_program=None,
......@@ -215,6 +322,31 @@ class DistributedAdam(DistributedOptimizerImplBase):
no_grad_set),
key=lambda x: x[0].name)
# has condition_block op means multi-task
flag_multi_task = self._has_conditional_block(loss)
if flag_multi_task:
self._cond_params = dict()
self._other_params = []
now_program = loss.block.program
root_block = now_program.block(0)
all_params = []
for par in root_block.all_parameters():
all_params.append(par.name)
ops_ = root_block.ops
fill_value_dict = dict()
equal_fill_dict = dict()
for op in ops_:
# conditional_block op must has fill_constant and equal op
if op.type == 'fill_constant':
fill_value_dict[op.output('Out')[0]] = op.attr('value')
if op.type == 'equal':
equal_fill_dict[op.output('Out')[0]] = op.input('Y')[0]
if op.type == 'conditional_block':
self._generte_cond_para_map(op, fill_value_dict,
equal_fill_dict,
now_program, all_params)
if prog_id not in program_id_set:
program_id_set.add(prog_id)
sparse_table = self._find_multi_distributed_lookup_table([loss])
......@@ -402,17 +534,65 @@ class DistributedAdam(DistributedOptimizerImplBase):
data_norm_grads.append(i[1])
if not is_data_norm_data:
grads.append(i[1])
# for new dense table
multi_task_dense_tables_push = []
multi_task_dense_tables_pull = []
if flag_multi_task:
dense_tables, cond2denseid, lists_params, lists_grads, root_params_list, root_grads_list = self._generate_multi_dense_table(
params, grads, self._cond_params,
self._other_params, sparse_table_names,
dense_table_index)
program_configs[program_id][
'cond2denseid'] = cond2denseid
multi_task_dense_tables_push = dense_tables
multi_task_dense_tables_pull = dense_tables[:]
if strategy.get('dense_table') is not None:
server.add_dense_table(dense_table_index, params, grads,
strategy['dense_table'],
sparse_table_names)
if flag_multi_task:
server_dense_table_index = dense_table_index
if len(root_params_list) > 0:
server.add_dense_table(
server_dense_table_index, root_params_list,
root_grads_list, strategy['dense_table'],
sparse_table_names)
server_dense_table_index += 1
for i in range(len(lists_params)):
server.add_dense_table(
server_dense_table_index, lists_params[i],
lists_grads[i], strategy['dense_table'],
sparse_table_names)
server_dense_table_index += 1
else:
server.add_dense_table(
dense_table_index, params, grads,
strategy['dense_table'], sparse_table_names)
else:
server.add_dense_table(dense_table_index, params, grads,
None, sparse_table_names)
worker.add_dense_table(
dense_table_index, self._learning_rate, params, grads,
dense_start_table_id, sparse_table_names)
if flag_multi_task:
if len(root_params_list) > 0:
worker.add_dense_table(
dense_table_index, self._learning_rate,
root_params_list, root_grads_list,
dense_start_table_id, sparse_table_names)
dense_table_index += 1
for i in range(len(lists_params)):
worker.add_dense_table(
dense_table_index, self._learning_rate,
lists_params[i], lists_grads[i],
dense_start_table_id, sparse_table_names)
dense_table_index += 1
dense_table_index -= 1
else:
worker.add_dense_table(
dense_table_index, self._learning_rate, params,
grads, dense_start_table_id, sparse_table_names)
if FLEET_GLOBAL_DICT["enable"]:
cur_prog = losses[loss_index].block.program
......@@ -430,15 +610,28 @@ class DistributedAdam(DistributedOptimizerImplBase):
program_id] and "push_dense" in program_configs[
program_id] and len(program_configs[program_id][
"pull_dense"]) > 0:
program_configs[program_id]["pull_dense"].extend(
[dense_table_index])
program_configs[program_id]["push_dense"].extend(
[dense_table_index])
if flag_multi_task:
program_configs[program_id]["pull_dense"].extend(
multi_task_dense_tables_pull)
program_configs[program_id]["push_dense"].extend(
multi_task_dense_tables_push)
else:
program_configs[program_id]["pull_dense"].extend(
[dense_table_index])
program_configs[program_id]["push_dense"].extend(
[dense_table_index])
else:
program_configs[program_id][
"pull_dense"] = [dense_table_index]
program_configs[program_id][
"push_dense"] = [dense_table_index]
if flag_multi_task:
program_configs[program_id][
"pull_dense"] = multi_task_dense_tables_pull
program_configs[program_id][
"push_dense"] = multi_task_dense_tables_push
else:
program_configs[program_id][
"pull_dense"] = [dense_table_index]
program_configs[program_id][
"push_dense"] = [dense_table_index]
if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
dense_table_index += 1
if strategy.get('datanorm_table') is not None:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册