未验证 提交 10ca3f96 编写于 作者: Y yaoxuefeng 提交者: GitHub

add thread scope stat accurate metrics test=develop (#19480)

* add thread scope stat accurate metrics test=develop

* fix style

* fix style

* fix style

* fix style test=develop

* fix style test=develop

* fix style test=develop

* fix style test=develop

* fix style test=develop

* fix style test=develop

* fix style test=develop

* fix conflict

* fix style

* fix style test=develop

* fix error test=develop

* fix error test=develop
上级 d6cb1a41
......@@ -122,9 +122,11 @@ class DeviceWorker {
virtual void SetReaderPlace(const paddle::platform::Place& place) {
device_reader_->SetPlace(place);
}
virtual Scope* GetThreadScope() { return thread_scope_; }
protected:
Scope* root_scope_ = nullptr;
Scope* thread_scope_;
paddle::platform::Place place_;
DataFeed* device_reader_ = nullptr;
int64_t batch_num_;
......@@ -156,15 +158,18 @@ class HogwildWorker : public CPUWorkerBase {
virtual void PrintFetchVars();
virtual void CreateDeviceResource(const ProgramDesc& main_prog);
virtual void BindingDataFeedMemory();
template <typename T>
void SetZero(LoDTensor* tensor, LoDTensor* root_tensor, int tensor_dim);
protected:
void CreateThreadOperators(const ProgramDesc& program);
void CreateThreadScope(const ProgramDesc& program);
std::vector<std::string> op_names_;
std::vector<OperatorBase*> ops_;
Scope* thread_scope_;
// Scope* thread_scope_;
HogwildWorkerParameter param_;
std::vector<std::string> skip_ops_;
std::map<std::string, int> stat_var_name_map_;
};
class DownpourWorker : public HogwildWorker {
......
......@@ -23,8 +23,8 @@ limitations under the License. */
namespace paddle {
namespace framework {
void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
Dataset *dataset) {
thread_num_ = trainer_desc.thread_num();
SetDataset(dataset);
......@@ -35,17 +35,22 @@ void DistMultiTrainer::Initialize(const TrainerDesc& trainer_desc,
need_dump_field_ = true;
}
if (need_dump_field_) {
auto& file_list = dataset->GetFileList();
auto &file_list = dataset->GetFileList();
if (file_list.size() == 0) {
need_dump_field_ = false;
}
}
mpi_rank_ = trainer_desc.mpi_rank() / 2;
const std::vector<paddle::framework::DataFeed*> readers =
const std::vector<paddle::framework::DataFeed *> readers =
dataset->GetReaders();
thread_num_ = readers.size();
workers_.resize(thread_num_);
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
i++) {
need_merge_var_names_.push_back(
trainer_desc.downpour_param().stat_var_names(i));
}
for (int i = 0; i < thread_num_; ++i) {
workers_[i] = DeviceWorkerFactory::CreateDeviceWorker(
......@@ -104,7 +109,7 @@ void DistMultiTrainer::FinalizeDumpEnv() {
queue_.reset();
}
void DistMultiTrainer::InitOtherEnv(const ProgramDesc& main_program) {
void DistMultiTrainer::InitOtherEnv(const ProgramDesc &main_program) {
if (need_dump_field_) {
InitDumpEnv();
}
......@@ -126,9 +131,33 @@ void DistMultiTrainer::Run() {
}
void DistMultiTrainer::Finalize() {
for (auto& th : threads_) {
for (auto &th : threads_) {
th.join();
}
for (int i = 0; i < need_merge_var_names_.size(); i++) {
Variable *root_var = root_scope_->FindVar(need_merge_var_names_[i]);
if (root_var == nullptr) {
continue;
}
LoDTensor *root_tensor = root_var->GetMutable<LoDTensor>();
for (int j = 1; j < thread_num_; j++) {
Scope *cur_thread_scope = workers_[j]->GetThreadScope();
Variable *thread_var =
cur_thread_scope->FindVar(need_merge_var_names_[i]);
LoDTensor *thread_tensor = thread_var->GetMutable<LoDTensor>();
if (root_tensor->numel() != thread_tensor->numel()) {
continue;
}
#define MergeCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
MergeToRootScope<cpp_type>(root_tensor, thread_tensor); \
} \
} while (0)
_ForEachDataType_(MergeCallback);
}
}
if (need_dump_field_) {
FinalizeDumpEnv();
}
......@@ -136,5 +165,14 @@ void DistMultiTrainer::Finalize() {
root_scope_->DropKids();
}
template <typename T>
void DistMultiTrainer::MergeToRootScope(LoDTensor *root_tensor,
LoDTensor *tensor) {
T *root_data = root_tensor->data<T>();
T *data = tensor->data<T>();
for (int i = 0; i < tensor->numel(); i++) {
root_data[i] += data[i];
}
}
} // end namespace framework
} // end namespace paddle
......@@ -64,6 +64,10 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
skip_ops_[i] = param_.skip_ops(i);
}
for (int i = 0; i < param_.stat_var_names_size(); ++i) {
stat_var_name_map_[param_.stat_var_names(i)] = 1;
}
need_to_push_sparse_ = param_.push_sparse();
need_to_push_dense_ = param_.push_dense();
......
......@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/platform/cpu_helper.h"
......@@ -20,7 +21,7 @@ limitations under the License. */
namespace paddle {
namespace framework {
void HogwildWorker::Initialize(const TrainerDesc& desc) {
void HogwildWorker::Initialize(const TrainerDesc &desc) {
fetch_config_ = desc.fetch_config();
param_ = desc.hogwild_param();
skip_ops_.resize(param_.skip_ops_size());
......@@ -30,45 +31,70 @@ void HogwildWorker::Initialize(const TrainerDesc& desc) {
use_cvm_ = desc.use_cvm();
}
void HogwildWorker::CreateThreadOperators(const ProgramDesc& program) {
auto& block = program.Block(0);
void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) {
auto &block = program.Block(0);
op_names_.clear();
for (auto& op_desc : block.AllOps()) {
for (auto &op_desc : block.AllOps()) {
std::unique_ptr<OperatorBase> local_op = OpRegistry::CreateOp(*op_desc);
op_names_.push_back(op_desc->Type());
OperatorBase* local_op_ptr = local_op.release();
OperatorBase *local_op_ptr = local_op.release();
ops_.push_back(local_op_ptr);
continue;
}
}
void HogwildWorker::CreateThreadScope(const ProgramDesc& program) {
auto& block = program.Block(0);
void HogwildWorker::CreateThreadScope(const ProgramDesc &program) {
auto &block = program.Block(0);
PADDLE_ENFORCE_NOT_NULL(
root_scope_, "root_scope should be set before creating thread scope");
thread_scope_ = &root_scope_->NewScope();
for (auto& var : block.AllVars()) {
for (auto &var : block.AllVars()) {
if (var->Persistable()) {
auto* ptr = root_scope_->Var(var->Name());
auto *ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
if (stat_var_name_map_.find(var->Name()) != stat_var_name_map_.end() &&
thread_id_ != 0) {
int tensor_dim =
root_scope_->FindVar(var->Name())->GetMutable<LoDTensor>()->numel();
auto *ptr1 = thread_scope_->Var(var->Name());
InitializeVariable(ptr1, var->GetType());
LoDTensor *thread_tensor = ptr1->GetMutable<LoDTensor>();
LoDTensor *root_tensor =
root_scope_->FindVar(var->Name())->GetMutable<LoDTensor>();
#define MemsetCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
SetZero<cpp_type>(thread_tensor, root_tensor, tensor_dim); \
} \
} while (0)
_ForEachDataType_(MemsetCallback);
}
} else {
auto* ptr = thread_scope_->Var(var->Name());
auto *ptr = thread_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
}
}
}
template <typename T>
void HogwildWorker::SetZero(LoDTensor *tensor, LoDTensor *root_tensor,
int tensor_dim) {
T *ptr = tensor->mutable_data<T>(root_tensor->dims(), platform::CPUPlace());
memset(ptr, 0, sizeof(T) * tensor_dim);
}
void HogwildWorker::BindingDataFeedMemory() {
const std::vector<std::string>& input_feed =
const std::vector<std::string> &input_feed =
device_reader_->GetUseSlotAlias();
for (auto name : input_feed) {
device_reader_->AddFeedVar(thread_scope_->FindVar(name), name);
}
}
void HogwildWorker::CreateDeviceResource(const ProgramDesc& main_prog) {
void HogwildWorker::CreateDeviceResource(const ProgramDesc &main_prog) {
CreateThreadScope(main_prog);
CreateThreadOperators(main_prog);
}
......@@ -78,7 +104,7 @@ void HogwildWorker::TrainFilesWithProfiler() {
device_reader_->Start();
std::vector<double> op_total_time;
std::vector<std::string> op_name;
for (auto& op : ops_) {
for (auto &op : ops_) {
op_name.push_back(op->Type());
}
op_total_time.resize(ops_.size());
......@@ -141,7 +167,7 @@ void HogwildWorker::TrainFiles() {
device_reader_->Start();
int cur_batch;
while ((cur_batch = device_reader_->Next()) > 0) {
for (auto& op : ops_) {
for (auto &op : ops_) {
bool need_skip = false;
for (auto t = 0u; t < skip_ops_.size(); ++t) {
if (op->Type().find(skip_ops_[t]) != std::string::npos) {
......
......@@ -24,6 +24,11 @@ namespace framework {
void MultiTrainer::Initialize(const TrainerDesc& trainer_desc,
Dataset* dataset) {
thread_num_ = trainer_desc.thread_num();
for (int i = 0; i < trainer_desc.downpour_param().stat_var_names_size();
i++) {
need_merge_var_names_.push_back(
trainer_desc.downpour_param().stat_var_names(i));
}
SetDataset(dataset);
// get filelist from trainer_desc here
const std::vector<paddle::framework::DataFeed*> readers =
......
......@@ -76,6 +76,7 @@ class MultiTrainer : public TrainerBase {
std::vector<std::thread> threads_;
std::vector<DataFeed*> readers_;
std::vector<std::shared_ptr<DeviceWorker>> workers_;
std::vector<std::string> need_merge_var_names_;
};
class DistMultiTrainer : public MultiTrainer {
......@@ -86,6 +87,8 @@ class DistMultiTrainer : public MultiTrainer {
virtual void InitOtherEnv(const ProgramDesc& main_program);
virtual void Run();
virtual void Finalize();
template <typename T>
void MergeToRootScope(LoDTensor* root_tensor, LoDTensor* thread_tensor);
virtual void FinalizeDumpEnv();
virtual void InitDumpEnv();
virtual void DumpWork();
......
......@@ -60,6 +60,7 @@ message DownpourWorkerParameter {
repeated ProgramConfig program_config = 4;
optional bool push_sparse = 5 [ default = true ];
optional bool push_dense = 6 [ default = true ];
repeated string stat_var_names = 7;
}
message SectionWorkerParameter {
......
......@@ -169,6 +169,9 @@ class DownpourSGD(DeviceWorker):
sparse_table.fea_dim = sparse_table.emb_dim + 2
# TODO(guru4elephant): hard code here, need to improve
sparse_table.label_var_name = "click"
if opt_info["stat_var_names"]:
for i in opt_info["stat_var_names"]:
downpour.stat_var_names.extend([i])
for i in self._fleet_desc.trainer_param.dense_table:
if i.table_id in dense_table_set:
......
......@@ -246,6 +246,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["stat_var_names"] = strategy.get("stat_var_names", [])
opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1)
opt_info["dump_slot"] = False
opt_info["dump_converter"] = ""
......
......@@ -1175,7 +1175,7 @@ class FleetUtil(object):
local_pos_ins.name,
local_total_ins.name)
# below is part of model
# below is part of example model
label = fluid.layers.data(name="click", shape=[-1, 1],\
dtype="int64", lod_level=0, append_batch_size=False)
emb = my_slot_net(slots, label) # emb can be fc layer of size 1
......@@ -1264,12 +1264,12 @@ class FleetUtil(object):
mae = global_abserr / total_ins_num
rmse = math.sqrt(global_sqrerr / total_ins_num)
actual_ctr = pos_ins_num / total_ins_num
return_actual_ctr = pos_ins_num / total_ins_num
predicted_ctr = global_prob / total_ins_num
mean_predict_qvalue = global_q_value / total_ins_num
copc = 0.0
if abs(predicted_ctr > 1e-6):
copc = actual_ctr / predicted_ctr
copc = return_actual_ctr / predicted_ctr
# calculate bucket error
last_ctr = -1.0
......@@ -1316,8 +1316,8 @@ class FleetUtil(object):
bucket_error = error_sum / error_count if error_count > 0 else 0.0
return [
auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc,
mean_predict_qvalue, int(total_ins_num)
auc, bucket_error, mae, rmse, return_actual_ctr, predicted_ctr,
copc, mean_predict_qvalue, int(total_ins_num)
]
def print_global_metrics(self,
......
......@@ -76,6 +76,7 @@ class TestListenAndServOp(OpTest):
opt_info["use_cvm"] = True
opt_info["scale_datanorm"] = -1
opt_info["dump_slot"] = False
opt_info["stat_var_names"] = []
main_program._fleet_opt = opt_info
trainer = DistMultiTrainer()
......@@ -131,6 +132,7 @@ class TestListenAndServOp(OpTest):
opt_info["use_cvm"] = False
opt_info["scale_datanorm"] = -1
opt_info["dump_slot"] = False
opt_info["stat_var_names"] = []
main_program._fleet_opt = opt_info
trainer = DistMultiTrainer()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册