未验证 提交 092839d6 编写于 作者: D danleifeng 提交者: GitHub

[psgpu]add checknan print and fix trainer device (#38131)

* trainer_device fix and checknan tool for psgpu;test=develop

* disable show_one_table;test=develop
上级 25c1b623
......@@ -454,9 +454,9 @@ void PSGPUWrapper::BuildGPUTask(std::shared_ptr<HeterContext> gpu_task) {
this->HeterPs_->build_ps(i, gpu_task->device_keys_[i].data(),
gpu_task->device_values_[i].data(),
feature_keys_count[i], 500000, 2);
if (feature_keys_count[i] > 0) {
HeterPs_->show_one_table(i);
}
// if (feature_keys_count[i] > 0) {
// HeterPs_->show_one_table(i);
// }
};
for (size_t i = 0; i < threads.size(); i++) {
threads[i] = std::thread(build_func, i);
......
......@@ -75,6 +75,8 @@ void PSGPUTrainer::Initialize(const TrainerDesc& trainer_desc,
workers_[i]->SetDumpParamVector(dump_param_);
workers_[i]->InitRandomDumpConfig(trainer_desc);
workers_[i]->SetDataFeed(readers[i]);
workers_[i]->SetPlace(places_[i]);
workers_[i]->SetReaderPlace(places_[i]);
workers_[i]->Initialize(trainer_desc);
workers_[i]->SetWorkerNum(place_num);
}
......@@ -102,8 +104,6 @@ void PSGPUTrainer::RegisterHeterCallback() {
void PSGPUTrainer::InitTrainerEnv(const ProgramDesc& main_program,
const platform::Place& place) {
for (size_t i = 0; i < places_.size(); ++i) {
workers_[i]->SetPlace(places_[i]);
workers_[i]->SetReaderPlace(places_[i]);
workers_[i]->SetRootScope(root_scope_);
workers_[i]->CreateDeviceResource(main_program); // Program
workers_[i]->BindingDataFeedMemory();
......@@ -216,7 +216,9 @@ void PSGPUTrainer::Finalize() {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();
if (root_tensor == nullptr || !root_tensor->IsInitialized()) {
continue;
}
for (size_t j = 0; j < places_.size(); j++) {
Scope* cur_thread_scope = workers_[j]->GetThreadScope();
Variable* thread_var =
......@@ -225,6 +227,9 @@ void PSGPUTrainer::Finalize() {
continue;
}
LoDTensor* thread_tensor = thread_var->GetMutable<LoDTensor>();
if (thread_tensor == nullptr || !thread_tensor->IsInitialized()) {
continue;
}
#define MergeCallback(cpp_type, proto_type) \
do { \
if (root_tensor->type() == proto_type) { \
......
......@@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/platform/lodtensor_printer.h"
#include "paddle/fluid/string/string_helper.h"
#if (defined PADDLE_WITH_NCCL || defined PADDLE_WITH_RCCL) && \
......@@ -149,6 +150,38 @@ void PSGPUWorker::TrainFiles() {
DumpParam(*thread_scope_, batch_cnt);
}
for (std::string& var_name : check_nan_var_names_) {
Variable* var = thread_scope_->FindVar(var_name);
if (var == nullptr) {
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == nullptr || !tensor->IsInitialized()) {
continue;
}
if (framework::TensorContainsInf(*tensor) ||
framework::TensorContainsNAN(*tensor)) {
static std::mutex mutex;
{
std::lock_guard<std::mutex> lock(mutex);
VLOG(0) << "worker " << thread_id_ << ": " << var_name
<< " cantains inf or nan";
auto all_vars = thread_scope_->LocalVarNames();
std::stringstream ss;
ss << "====== worker " << thread_id_ << "======\n";
for (auto& local_var : all_vars) {
platform::PrintVar(thread_scope_, local_var, local_var, &ss);
ss << "\n";
}
std::cout << ss.str() << std::endl;
VLOG(0) << "worker " << thread_id_ << "print nan var done....";
}
sleep(600);
exit(-1);
}
}
dev_ctx_->Wait();
PrintFetchVars();
thread_scope_->DropKids();
++batch_cnt;
......
......@@ -35,6 +35,10 @@ class TensorFormatter {
const std::string& tensor_name = "",
const std::string& message = "");
template <typename T>
void FormatData(const framework::LoDTensor& print_tensor,
std::stringstream& log_stream);
void Print(const framework::LoDTensor& print_tensor,
const std::string& tensor_name = "",
const std::string& message = "");
......@@ -46,10 +50,6 @@ class TensorFormatter {
void SetSummarize(int64_t summarize);
private:
template <typename T>
void FormatData(const framework::LoDTensor& print_tensor,
std::stringstream& log_stream);
int64_t summarize_ = -1;
bool print_tensor_type_ = true;
bool print_tensor_shape_ = true;
......
......@@ -39,23 +39,37 @@ void PrintVar(framework::Scope* scope, const std::string& var_name,
<< " does not exist in your scope";
return;
}
if (!tensor->IsInitialized()) {
VLOG(0) << "tensor of variable " << var_name
<< " does not initialized in your scope";
return;
}
*sstream << print_info << ": ";
*sstream << print_info;
#define PrintTensorCallback(cpp_type, proto_type) \
do { \
if (tensor->type() == proto_type) { \
*sstream << "["; \
auto* data = tensor->data<cpp_type>(); \
auto element_num = tensor->numel(); \
if (element_num > 0) { \
*sstream << data[0]; \
for (int j = 1; j < element_num; ++j) { \
*sstream << " " << data[j]; \
} \
} \
*sstream << "]"; \
} \
#define PrintTensorCallback(cpp_type, proto_type) \
do { \
if (tensor->type() == proto_type) { \
*sstream << "["; \
const cpp_type* data = nullptr; \
framework::LoDTensor cpu_tensor; \
if (is_cpu_place(tensor->place())) { \
data = tensor->data<cpp_type>(); \
} else { \
platform::CPUPlace cpu_place; \
TensorCopy(*tensor, cpu_place, &cpu_tensor); \
data = cpu_tensor.data<cpp_type>(); \
} \
auto element_num = tensor->numel(); \
*sstream << element_num << "]:["; \
if (element_num > 0) { \
*sstream << data[0]; \
for (int j = 1; j < element_num; ++j) { \
*sstream << " " << data[j]; \
} \
} \
*sstream << "]"; \
} \
} while (0)
_ForEachDataType_(PrintTensorCallback);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册