提交 d434fcba 编写于 作者: D dongdaxiang

add TrainFilesWithTimer in async_executor

上级 cf6188a8
...@@ -304,9 +304,14 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program, ...@@ -304,9 +304,14 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
// start executing ops in multiple threads // start executing ops in multiple threads
for (int thidx = 0; thidx < actual_thread_num; ++thidx) { for (int thidx = 0; thidx < actual_thread_num; ++thidx) {
if (debug) {
threads.push_back(std::thread(&ExecutorThreadWorker::TrainFilesWithTimer,
workers[thidx].get()));
} else {
threads.push_back( threads.push_back(
std::thread(&ExecutorThreadWorker::TrainFiles, workers[thidx].get())); std::thread(&ExecutorThreadWorker::TrainFiles, workers[thidx].get()));
} }
}
for (auto& th : threads) { for (auto& th : threads) {
th.join(); th.join();
......
...@@ -180,6 +180,7 @@ void ExecutorThreadWorker::SetDevice() { ...@@ -180,6 +180,7 @@ void ExecutorThreadWorker::SetDevice() {
return; return;
#else #else
static unsigned concurrency_cap = std::thread::hardware_concurrency(); static unsigned concurrency_cap = std::thread::hardware_concurrency();
LOG(WARNING) << "concurrency capacity " << concurrency_cap;
int thread_id = this->thread_id_; int thread_id = this->thread_id_;
if (static_cast<unsigned>(thread_id) < concurrency_cap) { if (static_cast<unsigned>(thread_id) < concurrency_cap) {
...@@ -238,6 +239,49 @@ static void print_fetch_var(Scope* scope, const std::string& var_name) { ...@@ -238,6 +239,49 @@ static void print_fetch_var(Scope* scope, const std::string& var_name) {
VLOG(1) << "print_fetch_var: unrecognized data type:" << tensor.type(); VLOG(1) << "print_fetch_var: unrecognized data type:" << tensor.type();
} }
void ExecutorThreadWorker::TrainFilesWithTimer() {
platform::SetNumThreads(1);
SetDevice();
thread_reader_->Start();
std::vector<double> op_total_time;
std::vector<std::string> op_name;
for (auto& op : ops_) {
op_name.push_back(op->Type());
}
op_total_time.resize(ops_.size());
for (int i = 0; i < op_total_time.size(); ++i) {
op_total_time[i] = 0.0;
}
platform::Timer timeline;
double total_time = 0.0;
double read_time = 0.0;
int cur_batch;
int batch_cnt = 0;
timeline.Start();
while ((cur_batch = thread_reader_->Next()) > 0) {
timeline.Pause();
read_time += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
for (int i = 0; i < ops_.size(); ++i) {
timeline.Start();
ops_[i]->Run(*thread_scope_, place_);
timeline.Pause();
op_total_time[i] += timeline.ElapsedSec();
total_time += timeline.ElapsedSec();
}
++batch_cnt;
thread_scope_->DropKids();
if (batch_cnt > 0 && batch_cnt % 1000 == 0) {
for (int i = 0; i < ops_.size(); ++i) {
fprintf(stderr, "op_name:[%d][%s], op_mean_time:[%fs]\n", i,
op_name[i].c_str(), op_total_time[i] / batch_cnt);
}
fprintf(stderr, "mean read time: %fs\n", read_time / batch_cnt);
}
timeline.Start();
}
}
void ExecutorThreadWorker::TrainFiles() { void ExecutorThreadWorker::TrainFiles() {
platform::SetNumThreads(1); platform::SetNumThreads(1);
......
...@@ -155,6 +155,8 @@ class ExecutorThreadWorker { ...@@ -155,6 +155,8 @@ class ExecutorThreadWorker {
void SetDataFeed(const std::shared_ptr<DataFeed>& datafeed); void SetDataFeed(const std::shared_ptr<DataFeed>& datafeed);
// A multi-thread training function // A multi-thread training function
virtual void TrainFiles(); virtual void TrainFiles();
// with timer log
virtual void TrainFilesWithTimer();
// set fetch variable names from python interface assigned by users // set fetch variable names from python interface assigned by users
void SetFetchVarNames(const std::vector<std::string>& fetch_var_names); void SetFetchVarNames(const std::vector<std::string>& fetch_var_names);
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册