提交 83ba96bc 编写于 作者: S sandyhouse

update

上级 a5d1592f
......@@ -349,6 +349,7 @@ class SectionWorker : public DeviceWorker {
std::vector<std::unique_ptr<OperatorBase>> ops_;
static std::mutex thread_mutex;
static std::mutex cout_mutex;
static std::condition_variable thread_condition;
static bool threads_completed;
std::shared_ptr<framework::ProgramDesc> program_;
......
......@@ -126,6 +126,7 @@ void PipelineTrainer::CopyParameters(int section_id, int microbatch_id,
for (auto& var : global_block.AllVars()) {
int is_feed_var =
std::count(feed_var_names_.begin(), feed_var_names_.end(), var->Name());
VLOG(3) << "Var name: " << var->Name();
if ((var->Persistable() || is_feed_var) && microbatch_id == 0) {
if (is_feed_var) {
auto* new_ptr = minibatch_scopes_[section_id]->Var(var->Name());
......
......@@ -32,6 +32,7 @@ namespace framework {
std::atomic<int> SectionWorker::cpu_id_(0);
std::mutex SectionWorker::thread_mutex;
std::mutex SectionWorker::cout_mutex;
std::condition_variable SectionWorker::thread_condition;
bool SectionWorker::threads_completed = false;
uint64_t SectionWorker::batch_id_(0);
......@@ -103,9 +104,12 @@ void SectionWorker::TrainFiles() {
}
#endif
platform::Timer batch_timer;
if (thread_id_ == 0) {
while (true) {
// Start a minibatch.
batch_timer.Start();
for (int i = 0; i < num_microbatches_; ++i) {
try {
for (auto& op : ops_) {
......@@ -146,6 +150,7 @@ void SectionWorker::TrainFiles() {
thread_condition.notify_all();
}
}
dev_ctx_->Wait();
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
for (auto& op : ops_) {
......@@ -163,6 +168,7 @@ void SectionWorker::TrainFiles() {
}
}
}
dev_ctx_->Wait();
// update pass
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
......@@ -177,6 +183,8 @@ void SectionWorker::TrainFiles() {
}
}
dev_ctx_->Wait();
batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
}
} else {
while (true) {
......@@ -227,6 +235,7 @@ void SectionWorker::TrainFiles() {
}
}
}
dev_ctx_->Wait();
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
for (auto& op : ops_) {
......@@ -244,6 +253,7 @@ void SectionWorker::TrainFiles() {
}
}
}
dev_ctx_->Wait();
// update pass
for (auto& op : ops_) {
int op_role = op->Attr<int>(std::string("op_role"));
......@@ -307,14 +317,23 @@ void SectionWorker::TrainFilesWithProfiler() {
#endif
if (thread_id_ == 0) {
struct timeval start;
struct timeval end;
struct timeval micro_start;
struct timeval micro_end;
while (true) {
// Start a minibatch.
// int batch_size = 0;
//cudaEvent_t cu_start, cu_stop;
//cudaEventCreate(&cu_start);
//cudaEventCreate(&cu_stop);
batch_timer.Start();
for (int i = 0; i < num_microbatches_; ++i) {
try {
int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times.
......@@ -330,12 +349,20 @@ void SectionWorker::TrainFilesWithProfiler() {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
//cudaEventRecord(cu_start);
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
cudaDeviceSynchronize();
//cudaEventRecord(cu_stop);
//cudaEventSynchronize(cu_stop);
//float cuda_time;
//cudaEventElapsedTime(&cuda_time, cu_start, cu_stop);
//VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time;
timeline.Pause();
gettimeofday(&end, NULL);
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
if (time > op_max_time[op_idx]) {
......@@ -346,9 +373,26 @@ void SectionWorker::TrainFilesWithProfiler() {
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::FWD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i
<< "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
}
op_idx++;
}
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl;
}
} catch (platform::EOFException&) {
std::unique_lock<std::mutex> lk(thread_mutex);
threads_completed = true;
......@@ -363,6 +407,8 @@ void SectionWorker::TrainFilesWithProfiler() {
<< ", mean_time: " << op_total_time[i] / op_count[i];
}
VLOG(0) << "================================";
//cudaEventDestroy(cu_start);
//cudaEventDestroy(cu_stop);
return;
}
if (i == 0) {
......@@ -372,10 +418,13 @@ void SectionWorker::TrainFilesWithProfiler() {
thread_condition.notify_all();
}
}
dev_ctx_->Wait();
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) |
......@@ -383,11 +432,19 @@ void SectionWorker::TrainFilesWithProfiler() {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
//cudaEventRecord(cu_start);
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
cudaDeviceSynchronize();
//cudaEventRecord(cu_stop);
//cudaEventSynchronize(cu_stop);
//float cuda_time;
//cudaEventElapsedTime(&cuda_time, cu_start, cu_stop);
//VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time;
gettimeofday(&end, NULL);
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
......@@ -399,23 +456,51 @@ void SectionWorker::TrainFilesWithProfiler() {
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::BWD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i
<< "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
}
op_idx++;
}
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl;
}
}
dev_ctx_->Wait();
// update pass
int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for minibatch scope";
timeline.Start();
//cudaEventRecord(cu_start);
op->Run(*microbatch_scopes_[0], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get());
}
//cudaEventRecord(cu_stop);
//cudaEventSynchronize(cu_stop);
cudaDeviceSynchronize();
//float cuda_time;
//cudaEventElapsedTime(&cuda_time, cu_start, cu_stop);
//VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time;
gettimeofday(&end, NULL);
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
......@@ -427,14 +512,44 @@ void SectionWorker::TrainFilesWithProfiler() {
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
//std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << num_microbatches_
<< "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
}
op_idx++;
}
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl;
}
struct timeval wait_start;
struct timeval wait_end;
gettimeofday(&wait_start, NULL);
dev_ctx_->Wait();
gettimeofday(&wait_end, NULL);
VLOG(0) << "device wait: " << wait_end.tv_sec * 1e6 + wait_end.tv_usec - wait_start.tv_sec * 1e6 - wait_start.tv_usec;
batch_timer.Pause();
VLOG(0) << "batch time: " << batch_timer.ElapsedUS();
}
} else {
struct timeval start;
struct timeval end;
struct timeval micro_start;
struct timeval micro_end;
cudaEvent_t cu_start, cu_stop;
cudaEventCreate(&cu_start);
cudaEventCreate(&cu_stop);
while (true) {
{
PADDLE_ENFORCE_LE(
......@@ -459,7 +574,9 @@ void SectionWorker::TrainFilesWithProfiler() {
<< ", mean_time: " << op_total_time[i] / op_count[i];
}
VLOG(0) << "================================";
threads_completed = false;
//threads_completed = false;
//cudaEventDestroy(cu_start);
//cudaEventDestroy(cu_stop);
return;
}
lk.unlock();
......@@ -468,7 +585,9 @@ void SectionWorker::TrainFilesWithProfiler() {
// forward pass:
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role"));
// We run op with op_role = kLRSched only for the first microbatch
// to avoid increasing the @LR_DECAY_STEP@ multiple times.
......@@ -484,11 +603,19 @@ void SectionWorker::TrainFilesWithProfiler() {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
//cudaEventRecord(cu_start);
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
//cudaEventRecord(cu_stop);
//cudaEventSynchronize(cu_stop);
cudaDeviceSynchronize();
//float cuda_time;
//cudaEventElapsedTime(&cuda_time, cu_start, cu_stop);
//VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time;
gettimeofday(&end, NULL);
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
......@@ -500,14 +627,34 @@ void SectionWorker::TrainFilesWithProfiler() {
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::FWD:B[" << local_batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i
<< "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
}
op_idx++;
}
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!FWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl;
}
}
dev_ctx_->Wait();
// backward pass
for (int i = 0; i < num_microbatches_; ++i) {
int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kBackward) ||
op_role == (static_cast<int>(OpRole::kBackward) |
......@@ -515,11 +662,19 @@ void SectionWorker::TrainFilesWithProfiler() {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for scope " << i;
timeline.Start();
//cudaEventRecord(cu_start);
op->Run(*microbatch_scopes_[i], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[i], op.get(),
unused_vars_, gc.get());
}
//cudaEventRecord(cu_stop);
//cudaEventSynchronize(cu_stop);
cudaDeviceSynchronize();
//float cuda_time;
//cudaEventElapsedTime(&cuda_time, cu_start, cu_stop);
//VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time;
gettimeofday(&end, NULL);
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
......@@ -531,23 +686,51 @@ void SectionWorker::TrainFilesWithProfiler() {
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "::BWD:B[" << local_batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << i
<< "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
}
op_idx++;
}
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!BWD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl;
}
}
dev_ctx_->Wait();
// update pass
int op_idx = 0;
gettimeofday(&micro_start, NULL);
for (auto& op : ops_) {
gettimeofday(&start, NULL);
int op_role = op->Attr<int>(std::string("op_role"));
if (op_role == static_cast<int>(OpRole::kOptimize)) {
VLOG(3) << "running an op " << op->Type() << " for " << thread_id_
<< " for minibatch scope";
timeline.Start();
//cudaEventRecord(cu_start);
op->Run(*microbatch_scopes_[0], place_);
if (gc) {
DeleteUnusedTensors(*microbatch_scopes_[num_microbatches_ - 1],
op.get(), unused_vars_, gc.get());
}
//cudaEventRecord(cu_stop);
//cudaEventSynchronize(cu_stop);
cudaDeviceSynchronize();
//float cuda_time;
//cudaEventElapsedTime(&cuda_time, cu_start, cu_stop);
//VLOG(0) << "op: " << op->Type() << "; time: " << cuda_time;
gettimeofday(&end, NULL);
timeline.Pause();
auto time = timeline.ElapsedUS();
op_total_time[op_idx] += time;
......@@ -559,9 +742,27 @@ void SectionWorker::TrainFilesWithProfiler() {
}
op_count[op_idx] += 1;
op_total_time[op_idx] += time;
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
//std::cout << "::UPD:B[" << local_batch_id_ << "]:SEC[" << thread_id_
std::cout << "::UPD:B[" << batch_id_ << "]:SEC[" << thread_id_ << "]:SCOPE[" << num_microbatches_
<< "]:OP[" << op->Type() << "]:START[" << start.tv_sec * 1e6 + start.tv_usec
<< "]:END[" << end.tv_sec * 1e6 + end.tv_usec << "]" << std::endl;
}
}
op_idx++;
}
gettimeofday(&micro_end, NULL);
{
std::unique_lock<std::mutex> lk(cout_mutex);
std::cout << std::fixed;
std::cout.precision(0);
std::cout << "!!UPD:B[" << batch_id_ << "]:SEC[" << thread_id_
<< "]:START[" << micro_start.tv_sec * 1e6 + micro_start.tv_usec
<< "]:END[" << micro_end.tv_sec * 1e6 + micro_end.tv_usec << "]" << std::endl;
}
dev_ctx_->Wait();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册