diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index d6d53a8858030734812587f6bbd03a108c5cf8ce..0b75e22986eba6f998d3c9e17d9851de79b71a66 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -349,6 +349,7 @@ class SectionWorker : public DeviceWorker { std::vector> ops_; static std::mutex thread_mutex; + static std::mutex cout_mutex; static std::condition_variable thread_condition; static bool threads_completed; std::shared_ptr program_; diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 379892ecfd1161fd5e5003552bc48b1153b2c412..ee3780f1565099394852703d741602db6e39c1d0 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -126,6 +126,7 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id, for (auto& var : global_block.AllVars()) { int is_feed_var = std::count(feed_var_names_.begin(), feed_var_names_.end(), var->Name()); + VLOG(3) << "Var name: " << var->Name(); if ((var->Persistable() || is_feed_var) && microbatch_id == 0) { if (is_feed_var) { auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name()); diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc index 03b7afbb8771fadbe07a352497fa69a299928cf7..81001d3240127b7aadd595614d3fe8f2e17e0b02 100644 --- a/paddle/fluid/framework/section_worker.cc +++ b/paddle/fluid/framework/section_worker.cc @@ -32,6 +32,7 @@ namespace framework { std::atomic SectionWorker::cpu_id_(0); std::mutex SectionWorker::thread_mutex; +std::mutex SectionWorker::cout_mutex; std::condition_variable SectionWorker::thread_condition; bool SectionWorker::threads_completed = false; uint64_t SectionWorker::batch_id_(0); @@ -103,9 +104,12 @@ void SectionWorker::TrainFiles() { } #endif + platform::Timer batch_timer; + if (thread_id_ == 0) { while (true) { // Start a minibatch. + batch_timer.Start(); for (int i = 0; i < num_microbatches_; ++i) { try { for (auto& op : ops_) { @@ -146,6 +150,7 @@ void SectionWorker::TrainFiles() { thread_condition.notify_all(); } } + dev_ctx_->Wait(); // backward pass for (int i = 0; i < num_microbatches_; ++i) { for (auto& op : ops_) { @@ -163,6 +168,7 @@ void SectionWorker::TrainFiles() { } } } + dev_ctx_->Wait(); // update pass for (auto& op : ops_) { int op_role = op->Attr(std::string("op_role")); @@ -177,6 +183,8 @@ void SectionWorker::TrainFiles() { } } dev_ctx_->Wait(); + batch_timer.Pause(); + VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); } } else { while (true) { @@ -227,6 +235,7 @@ void SectionWorker::TrainFiles() { } } } + dev_ctx_->Wait(); // backward pass for (int i = 0; i < num_microbatches_; ++i) { for (auto& op : ops_) { @@ -244,6 +253,7 @@ void SectionWorker::TrainFiles() { } } } + dev_ctx_->Wait(); // update pass for (auto& op : ops_) { int op_role = op->Attr(std::string("op_role")); @@ -307,14 +317,23 @@ void SectionWorker::TrainFilesWithProfiler() { #endif if (thread_id_ == 0) { + struct timeval start; + struct timeval end; + struct timeval micro_start; + struct timeval micro_end; while (true) { // Start a minibatch. // int batch_size = 0; + //cudaEvent_t cu_start, cu_stop; + //cudaEventCreate(&cu_start); + //cudaEventCreate(&cu_stop); batch_timer.Start(); for (int i = 0; i < num_microbatches_; ++i) { try { int op_idx = 0; + gettimeofday(µ_start, NULL); for (auto& op : ops_) { + gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); // We run op with op_role = kLRSched only for the first microbatch // to avoid increasing the @LR_DECAY_STEP@ multiple times. @@ -330,12 +349,20 @@ void SectionWorker::TrainFilesWithProfiler() { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ << " for scope " << i; timeline.Start(); + //cudaEventRecord(cu_start); op->Run(*microbatch_scopes_[i], place_); if (gc) { DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, gc.get()); } + cudaDeviceSynchronize(); + //cudaEventRecord(cu_stop); + //cudaEventSynchronize(cu_stop); + //float cuda_time; + //cudaEventElapsedTime(&cuda_time, cu_start, cu_stop); + //VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time; timeline.Pause(); + gettimeofday(&end, NULL); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; if (time > op_max_time[op_idx]) { @@ -346,9 +373,26 @@ void SectionWorker::TrainFilesWithProfiler() { } op_count[op_idx] += 1; op_total_time[op_idx] += time; + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "::FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i + << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; + } } op_idx++; } + gettimeofday(µ_end, NULL); + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; + } } catch (platform::EOFException&) { std::unique_lock lk(thread_mutex); threads_completed = true; @@ -363,6 +407,8 @@ void SectionWorker::TrainFilesWithProfiler() { << ", mean_time: " << op_total_time[i] / op_count[i]; } VLOG(0) << "================================"; + //cudaEventDestroy(cu_start); + //cudaEventDestroy(cu_stop); return; } if (i == 0) { @@ -372,10 +418,13 @@ void SectionWorker::TrainFilesWithProfiler() { thread_condition.notify_all(); } } + dev_ctx_->Wait(); // backward pass for (int i = 0; i < num_microbatches_; ++i) { int op_idx = 0; + gettimeofday(µ_start, NULL); for (auto& op : ops_) { + gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); if (op_role == static_cast(OpRole::kBackward) || op_role == (static_cast(OpRole::kBackward) | @@ -383,11 +432,19 @@ void SectionWorker::TrainFilesWithProfiler() { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ << " for scope " << i; timeline.Start(); + //cudaEventRecord(cu_start); op->Run(*microbatch_scopes_[i], place_); if (gc) { DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, gc.get()); } + cudaDeviceSynchronize(); + //cudaEventRecord(cu_stop); + //cudaEventSynchronize(cu_stop); + //float cuda_time; + //cudaEventElapsedTime(&cuda_time, cu_start, cu_stop); + //VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time; + gettimeofday(&end, NULL); timeline.Pause(); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; @@ -399,23 +456,51 @@ void SectionWorker::TrainFilesWithProfiler() { } op_count[op_idx] += 1; op_total_time[op_idx] += time; + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "::BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i + << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; + } } op_idx++; } + gettimeofday(µ_end, NULL); + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; + } } + dev_ctx_->Wait(); // update pass int op_idx = 0; + gettimeofday(µ_start, NULL); for (auto& op : ops_) { + gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); if (op_role == static_cast(OpRole::kOptimize)) { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ << " for minibatch scope"; timeline.Start(); + //cudaEventRecord(cu_start); op->Run(*microbatch_scopes_[0], place_); if (gc) { DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], op.get(), unused_vars_, gc.get()); } + //cudaEventRecord(cu_stop); + //cudaEventSynchronize(cu_stop); + cudaDeviceSynchronize(); + //float cuda_time; + //cudaEventElapsedTime(&cuda_time, cu_start, cu_stop); + //VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time; + gettimeofday(&end, NULL); timeline.Pause(); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; @@ -427,14 +512,44 @@ void SectionWorker::TrainFilesWithProfiler() { } op_count[op_idx] += 1; op_total_time[op_idx] += time; + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + //std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ + std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << num_microbatches_ + << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; + } } op_idx++; } + gettimeofday(µ_end, NULL); + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; + } + struct timeval wait_start; + struct timeval wait_end; + gettimeofday(&wait_start, NULL); dev_ctx_->Wait(); + gettimeofday(&wait_end, NULL); + VLOG(0) << "device wait: " << wait_end.tv_sec * 1e6 + wait_end.tv_usec - wait_start.tv_sec * 1e6 - wait_start.tv_usec; batch_timer.Pause(); VLOG(0) << "batch time: " << batch_timer.ElapsedUS(); } } else { + struct timeval start; + struct timeval end; + struct timeval micro_start; + struct timeval micro_end; + cudaEvent_t cu_start, cu_stop; + cudaEventCreate(&cu_start); + cudaEventCreate(&cu_stop); while (true) { { PADDLE_ENFORCE_LE( @@ -459,7 +574,9 @@ void SectionWorker::TrainFilesWithProfiler() { << ", mean_time: " << op_total_time[i] / op_count[i]; } VLOG(0) << "================================"; - threads_completed = false; + //threads_completed = false; + //cudaEventDestroy(cu_start); + //cudaEventDestroy(cu_stop); return; } lk.unlock(); @@ -468,7 +585,9 @@ void SectionWorker::TrainFilesWithProfiler() { // forward pass: for (int i = 0; i < num_microbatches_; ++i) { int op_idx = 0; + gettimeofday(µ_start, NULL); for (auto& op : ops_) { + gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); // We run op with op_role = kLRSched only for the first microbatch // to avoid increasing the @LR_DECAY_STEP@ multiple times. @@ -484,11 +603,19 @@ void SectionWorker::TrainFilesWithProfiler() { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ << " for scope " << i; timeline.Start(); + //cudaEventRecord(cu_start); op->Run(*microbatch_scopes_[i], place_); if (gc) { DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, gc.get()); } + //cudaEventRecord(cu_stop); + //cudaEventSynchronize(cu_stop); + cudaDeviceSynchronize(); + //float cuda_time; + //cudaEventElapsedTime(&cuda_time, cu_start, cu_stop); + //VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time; + gettimeofday(&end, NULL); timeline.Pause(); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; @@ -500,14 +627,34 @@ void SectionWorker::TrainFilesWithProfiler() { } op_count[op_idx] += 1; op_total_time[op_idx] += time; + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "::FWD:B[" << local_batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i + << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; + } } op_idx++; } + gettimeofday(µ_end, NULL); + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; + } } + dev_ctx_->Wait(); // backward pass for (int i = 0; i < num_microbatches_; ++i) { int op_idx = 0; + gettimeofday(µ_start, NULL); for (auto& op : ops_) { + gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); if (op_role == static_cast(OpRole::kBackward) || op_role == (static_cast(OpRole::kBackward) | @@ -515,11 +662,19 @@ void SectionWorker::TrainFilesWithProfiler() { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ << " for scope " << i; timeline.Start(); + //cudaEventRecord(cu_start); op->Run(*microbatch_scopes_[i], place_); if (gc) { DeleteUnusedTensors(*microbatch_scopes_[i], op.get(), unused_vars_, gc.get()); } + //cudaEventRecord(cu_stop); + //cudaEventSynchronize(cu_stop); + cudaDeviceSynchronize(); + //float cuda_time; + //cudaEventElapsedTime(&cuda_time, cu_start, cu_stop); + //VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time; + gettimeofday(&end, NULL); timeline.Pause(); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; @@ -531,23 +686,51 @@ void SectionWorker::TrainFilesWithProfiler() { } op_count[op_idx] += 1; op_total_time[op_idx] += time; + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "::BWD:B[" << local_batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i + << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; + } } op_idx++; } + gettimeofday(µ_end, NULL); + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; + } } + dev_ctx_->Wait(); // update pass int op_idx = 0; + gettimeofday(µ_start, NULL); for (auto& op : ops_) { + gettimeofday(&start, NULL); int op_role = op->Attr(std::string("op_role")); if (op_role == static_cast(OpRole::kOptimize)) { VLOG(3) << "running an op " << op->Type() << " for " << thread_id_ << " for minibatch scope"; timeline.Start(); + //cudaEventRecord(cu_start); op->Run(*microbatch_scopes_[0], place_); if (gc) { DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1], op.get(), unused_vars_, gc.get()); } + //cudaEventRecord(cu_stop); + //cudaEventSynchronize(cu_stop); + cudaDeviceSynchronize(); + //float cuda_time; + //cudaEventElapsedTime(&cuda_time, cu_start, cu_stop); + //VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time; + gettimeofday(&end, NULL); timeline.Pause(); auto time = timeline.ElapsedUS(); op_total_time[op_idx] += time; @@ -559,9 +742,27 @@ void SectionWorker::TrainFilesWithProfiler() { } op_count[op_idx] += 1; op_total_time[op_idx] += time; + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + //std::cout << "::UPD:B[" << local_batch_id_ << "]:SEC[" << thread_id_ + std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << num_microbatches_ + << "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec + << "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl; + } } op_idx++; } + gettimeofday(µ_end, NULL); + { + std::unique_lock lk(cout_mutex); + std::cout << std::fixed; + std::cout.precision(0); + std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ + << "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec + << "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl; + } dev_ctx_->Wait(); } }