diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index c278980adc93290287ccb89fb874e486aba821cb..34702df5291434d3e7a22e742bd84738bfec64bc 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -122,9 +122,11 @@ class DeviceWorker { virtual void SetReaderPlace(const paddle::platform::Place& place) { device_reader_->SetPlace(place); } + virtual Scope* GetThreadScope() { return thread_scope_; } protected: Scope* root_scope_ = nullptr; + Scope* thread_scope_; paddle::platform::Place place_; DataFeed* device_reader_ = nullptr; int64_t batch_num_; @@ -156,15 +158,18 @@ class HogwildWorker : public CPUWorkerBase { virtual void PrintFetchVars(); virtual void CreateDeviceResource(const ProgramDesc& main_prog); virtual void BindingDataFeedMemory(); + template + void SetZero(LoDTensor* tensor, LoDTensor* root_tensor, int tensor_dim); protected: void CreateThreadOperators(const ProgramDesc& program); void CreateThreadScope(const ProgramDesc& program); std::vector op_names_; std::vector ops_; - Scope* thread_scope_; + // Scope* thread_scope_; HogwildWorkerParameter param_; std::vector skip_ops_; + std::map stat_var_name_map_; }; class DownpourWorker : public HogwildWorker { diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 20be90e622346d66ebf30b323e35b8984fdcd31d..c4f13975b7e42f0d89f440381603a70164352aae 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -23,8 +23,8 @@ limitations under the License. */ namespace paddle { namespace framework { -void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, - Dataset* dataset) { +void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc, + Dataset *dataset) { thread_num_ = trainer_desc.thread_num(); SetDataset(dataset); @@ -35,17 +35,22 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc, need_dump_field_ = true; } if (need_dump_field_) { - auto& file_list = dataset->GetFileList(); + auto &file_list = dataset->GetFileList(); if (file_list.size() == 0) { need_dump_field_ = false; } } mpi_rank_ = trainer_desc.mpi_rank() / 2; - const std::vector readers = + const std::vector readers = dataset->GetReaders(); thread_num_ = readers.size(); workers_.resize(thread_num_); + for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size(); + i++) { + need_merge_var_names_.push_back( + trainer_desc.downpour_param().stat_var_names(i)); + } for (int i = 0; i < thread_num_; ++i) { workers_[i] = DeviceWorkerFactory::CreateDeviceWorker( @@ -104,7 +109,7 @@ void DistMultiTrainer::FinalizeDumpEnv() { queue_.reset(); } -void DistMultiTrainer::InitOtherEnv(const ProgramDesc& main_program) { +void DistMultiTrainer::InitOtherEnv(const ProgramDesc &main_program) { if (need_dump_field_) { InitDumpEnv(); } @@ -126,9 +131,33 @@ void DistMultiTrainer::Run() { } void DistMultiTrainer::Finalize() { - for (auto& th : threads_) { + for (auto &th : threads_) { th.join(); } + for (int i = 0; i < need_merge_var_names_.size(); i++) { + Variable *root_var = root_scope_->FindVar(need_merge_var_names_[i]); + if (root_var == nullptr) { + continue; + } + LoDTensor *root_tensor = root_var->GetMutable(); + for (int j = 1; j < thread_num_; j++) { + Scope *cur_thread_scope = workers_[j]->GetThreadScope(); + Variable *thread_var = + cur_thread_scope->FindVar(need_merge_var_names_[i]); + LoDTensor *thread_tensor = thread_var->GetMutable(); + if (root_tensor->numel() != thread_tensor->numel()) { + continue; + } +#define MergeCallback(cpp_type, proto_type) \ + do { \ + if (root_tensor->type() == proto_type) { \ + MergeToRootScope(root_tensor, thread_tensor); \ + } \ + } while (0) + _ForEachDataType_(MergeCallback); + } + } + if (need_dump_field_) { FinalizeDumpEnv(); } @@ -136,5 +165,14 @@ void DistMultiTrainer::Finalize() { root_scope_->DropKids(); } +template +void DistMultiTrainer::MergeToRootScope(LoDTensor *root_tensor, + LoDTensor *tensor) { + T *root_data = root_tensor->data(); + T *data = tensor->data(); + for (int i = 0; i < tensor->numel(); i++) { + root_data[i] += data[i]; + } +} } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/downpour_worker.cc b/paddle/fluid/framework/downpour_worker.cc index 113de8020379bd8e44a736a42a39267d4f0614c3..0a54ef4be51447bc3184eea737050a04226ee805 100644 --- a/paddle/fluid/framework/downpour_worker.cc +++ b/paddle/fluid/framework/downpour_worker.cc @@ -64,6 +64,10 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) { skip_ops_[i] = param_.skip_ops(i); } + for (int i = 0; i < param_.stat_var_names_size(); ++i) { + stat_var_name_map_[param_.stat_var_names(i)] = 1; + } + need_to_push_sparse_ = param_.push_sparse(); need_to_push_dense_ = param_.push_dense(); diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index a006a0fa174f7c0d611e95e3c36d11a8658f8582..4aaf2569eb4eed72fc521d3861077d0b3653e625 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/platform/cpu_helper.h" @@ -20,7 +21,7 @@ limitations under the License. */ namespace paddle { namespace framework { -void HogwildWorker::Initialize(const TrainerDesc& desc) { +void HogwildWorker::Initialize(const TrainerDesc &desc) { fetch_config_ = desc.fetch_config(); param_ = desc.hogwild_param(); skip_ops_.resize(param_.skip_ops_size()); @@ -30,45 +31,70 @@ void HogwildWorker::Initialize(const TrainerDesc& desc) { use_cvm_ = desc.use_cvm(); } -void HogwildWorker::CreateThreadOperators(const ProgramDesc& program) { - auto& block = program.Block(0); +void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) { + auto &block = program.Block(0); op_names_.clear(); - for (auto& op_desc : block.AllOps()) { + for (auto &op_desc : block.AllOps()) { std::unique_ptr local_op = OpRegistry::CreateOp(*op_desc); op_names_.push_back(op_desc->Type()); - OperatorBase* local_op_ptr = local_op.release(); + OperatorBase *local_op_ptr = local_op.release(); ops_.push_back(local_op_ptr); continue; } } -void HogwildWorker::CreateThreadScope(const ProgramDesc& program) { - auto& block = program.Block(0); +void HogwildWorker::CreateThreadScope(const ProgramDesc &program) { + auto &block = program.Block(0); PADDLE_ENFORCE_NOT_NULL( root_scope_, "root_scope should be set before creating thread scope"); thread_scope_ = &root_scope_->NewScope(); - for (auto& var : block.AllVars()) { + + for (auto &var : block.AllVars()) { if (var->Persistable()) { - auto* ptr = root_scope_->Var(var->Name()); + auto *ptr = root_scope_->Var(var->Name()); InitializeVariable(ptr, var->GetType()); + if (stat_var_name_map_.find(var->Name()) != stat_var_name_map_.end() && + thread_id_ != 0) { + int tensor_dim = + root_scope_->FindVar(var->Name())->GetMutable()->numel(); + auto *ptr1 = thread_scope_->Var(var->Name()); + InitializeVariable(ptr1, var->GetType()); + LoDTensor *thread_tensor = ptr1->GetMutable(); + LoDTensor *root_tensor = + root_scope_->FindVar(var->Name())->GetMutable(); +#define MemsetCallback(cpp_type, proto_type) \ + do { \ + if (root_tensor->type() == proto_type) { \ + SetZero(thread_tensor, root_tensor, tensor_dim); \ + } \ + } while (0) + _ForEachDataType_(MemsetCallback); + } } else { - auto* ptr = thread_scope_->Var(var->Name()); + auto *ptr = thread_scope_->Var(var->Name()); InitializeVariable(ptr, var->GetType()); } } } +template +void HogwildWorker::SetZero(LoDTensor *tensor, LoDTensor *root_tensor, + int tensor_dim) { + T *ptr = tensor->mutable_data(root_tensor->dims(), platform::CPUPlace()); + memset(ptr, 0, sizeof(T) * tensor_dim); +} + void HogwildWorker::BindingDataFeedMemory() { - const std::vector& input_feed = + const std::vector &input_feed = device_reader_->GetUseSlotAlias(); for (auto name : input_feed) { device_reader_->AddFeedVar(thread_scope_->FindVar(name), name); } } -void HogwildWorker::CreateDeviceResource(const ProgramDesc& main_prog) { +void HogwildWorker::CreateDeviceResource(const ProgramDesc &main_prog) { CreateThreadScope(main_prog); CreateThreadOperators(main_prog); } @@ -78,7 +104,7 @@ void HogwildWorker::TrainFilesWithProfiler() { device_reader_->Start(); std::vector op_total_time; std::vector op_name; - for (auto& op : ops_) { + for (auto &op : ops_) { op_name.push_back(op->Type()); } op_total_time.resize(ops_.size()); @@ -141,7 +167,7 @@ void HogwildWorker::TrainFiles() { device_reader_->Start(); int cur_batch; while ((cur_batch = device_reader_->Next()) > 0) { - for (auto& op : ops_) { + for (auto &op : ops_) { bool need_skip = false; for (auto t = 0u; t < skip_ops_.size(); ++t) { if (op->Type().find(skip_ops_[t]) != std::string::npos) { diff --git a/paddle/fluid/framework/multi_trainer.cc b/paddle/fluid/framework/multi_trainer.cc index f81948c4daec181406cf3fcf0ce6789277ce10e3..be25672b4c7d29bc3bb7eca039a3c735994f0777 100644 --- a/paddle/fluid/framework/multi_trainer.cc +++ b/paddle/fluid/framework/multi_trainer.cc @@ -24,6 +24,11 @@ namespace framework { void MultiTrainer::Initialize(const TrainerDesc& trainer_desc, Dataset* dataset) { thread_num_ = trainer_desc.thread_num(); + for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size(); + i++) { + need_merge_var_names_.push_back( + trainer_desc.downpour_param().stat_var_names(i)); + } SetDataset(dataset); // get filelist from trainer_desc here const std::vector readers = diff --git a/paddle/fluid/framework/trainer.h b/paddle/fluid/framework/trainer.h index a4913cb26a9b913c55e14185c11e6383ea9b4b63..170ceb50fda20501fe03de170568043de71af3cc 100644 --- a/paddle/fluid/framework/trainer.h +++ b/paddle/fluid/framework/trainer.h @@ -76,6 +76,7 @@ class MultiTrainer : public TrainerBase { std::vector threads_; std::vector readers_; std::vector> workers_; + std::vector need_merge_var_names_; }; class DistMultiTrainer : public MultiTrainer { @@ -86,6 +87,8 @@ class DistMultiTrainer : public MultiTrainer { virtual void InitOtherEnv(const ProgramDesc& main_program); virtual void Run(); virtual void Finalize(); + template + void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor); virtual void FinalizeDumpEnv(); virtual void InitDumpEnv(); virtual void DumpWork(); diff --git a/paddle/fluid/framework/trainer_desc.proto b/paddle/fluid/framework/trainer_desc.proto index e859ced328b0a7843b155af3668c2a49503a49c7..2724be65e2dedb57c3d96ea1863afb2e8cc3fbfa 100644 --- a/paddle/fluid/framework/trainer_desc.proto +++ b/paddle/fluid/framework/trainer_desc.proto @@ -60,6 +60,7 @@ message DownpourWorkerParameter { repeated ProgramConfig program_config = 4; optional bool push_sparse = 5 [ default = true ]; optional bool push_dense = 6 [ default = true ]; + repeated string stat_var_names = 7; } message SectionWorkerParameter { diff --git a/python/paddle/fluid/device_worker.py b/python/paddle/fluid/device_worker.py index 5eb5f3ec878064050d52020de1614e6ab7bf2d7d..c6ca201d56745c09d464227312b5b2f4d3c3ebc0 100644 --- a/python/paddle/fluid/device_worker.py +++ b/python/paddle/fluid/device_worker.py @@ -169,6 +169,9 @@ class DownpourSGD(DeviceWorker): sparse_table.fea_dim = sparse_table.emb_dim + 2 # TODO(guru4elephant): hard code here, need to improve sparse_table.label_var_name = "click" + if opt_info["stat_var_names"]: + for i in opt_info["stat_var_names"]: + downpour.stat_var_names.extend([i]) for i in self._fleet_desc.trainer_param.dense_table: if i.table_id in dense_table_set: diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py index 2d25f466d51046ad4ef6be7b80d1708a97662e4e..57e8c31d5464c95ade159815007dd67bb01d0752 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py @@ -246,6 +246,7 @@ class DistributedAdam(DistributedOptimizerImplBase): opt_info["fleet_desc"] = ps_param opt_info["worker_skipped_ops"] = worker_skipped_ops opt_info["use_cvm"] = strategy.get("use_cvm", False) + opt_info["stat_var_names"] = strategy.get("stat_var_names", []) opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1) opt_info["dump_slot"] = False opt_info["dump_converter"] = "" diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py index 88d874676e34daecfd0f5f303b3be57a7a3039d6..4f651ffcbee10427147d59cfcef340a9ce9ce599 100644 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_util.py +++ b/python/paddle/fluid/incubate/fleet/utils/fleet_util.py @@ -1175,7 +1175,7 @@ class FleetUtil(object): local_pos_ins.name, local_total_ins.name) - # below is part of model + # below is part of example model label = fluid.layers.data(name="click", shape=[-1, 1],\ dtype="int64", lod_level=0, append_batch_size=False) emb = my_slot_net(slots, label) # emb can be fc layer of size 1 @@ -1264,12 +1264,12 @@ class FleetUtil(object): mae = global_abserr / total_ins_num rmse = math.sqrt(global_sqrerr / total_ins_num) - actual_ctr = pos_ins_num / total_ins_num + return_actual_ctr = pos_ins_num / total_ins_num predicted_ctr = global_prob / total_ins_num mean_predict_qvalue = global_q_value / total_ins_num copc = 0.0 if abs(predicted_ctr > 1e-6): - copc = actual_ctr / predicted_ctr + copc = return_actual_ctr / predicted_ctr # calculate bucket error last_ctr = -1.0 @@ -1316,8 +1316,8 @@ class FleetUtil(object): bucket_error = error_sum / error_count if error_count > 0 else 0.0 return [ - auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc, - mean_predict_qvalue, int(total_ins_num) + auc, bucket_error, mae, rmse, return_actual_ctr, predicted_ctr, + copc, mean_predict_qvalue, int(total_ins_num) ] def print_global_metrics(self, diff --git a/python/paddle/fluid/tests/unittests/test_downpoursgd.py b/python/paddle/fluid/tests/unittests/test_downpoursgd.py index 3ab6219c78d60e374a200c53216b230c338f9120..d1b54d5f22a3c322f874f8907bc0cf3aac13691a 100644 --- a/python/paddle/fluid/tests/unittests/test_downpoursgd.py +++ b/python/paddle/fluid/tests/unittests/test_downpoursgd.py @@ -76,6 +76,7 @@ class TestListenAndServOp(OpTest): opt_info["use_cvm"] = True opt_info["scale_datanorm"] = -1 opt_info["dump_slot"] = False + opt_info["stat_var_names"] = [] main_program._fleet_opt = opt_info trainer = DistMultiTrainer() @@ -131,6 +132,7 @@ class TestListenAndServOp(OpTest): opt_info["use_cvm"] = False opt_info["scale_datanorm"] = -1 opt_info["dump_slot"] = False + opt_info["stat_var_names"] = [] main_program._fleet_opt = opt_info trainer = DistMultiTrainer()