提交 fb6201e9 编写于 作者: S sneaxiy

test=develop

上级 5f2e8378
...@@ -64,7 +64,8 @@ class OpHandleBase { ...@@ -64,7 +64,8 @@ class OpHandleBase {
virtual bool IsMultiDeviceTransfer() { return false; } virtual bool IsMultiDeviceTransfer() { return false; }
const platform::DeviceContext *DeviceContext(platform::Place place) { const platform::DeviceContext *DeviceContext(platform::Place place) {
return dev_ctxes_[place]; auto it = dev_ctxes_.find(place);
return it != dev_ctxes_.end() ? it->second : nullptr;
} }
void SetDeviceContext(platform::Place place, platform::DeviceContext *ctx_) { void SetDeviceContext(platform::Place place, platform::DeviceContext *ctx_) {
......
...@@ -46,6 +46,41 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { ...@@ -46,6 +46,41 @@ ExecutorPrepareContext::~ExecutorPrepareContext() {
VLOG(5) << "destroy ExecutorPrepareContext"; VLOG(5) << "destroy ExecutorPrepareContext";
} }
template <typename RefCntMap>
static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op,
GarbageCollector<Tensor>* gc,
RefCntMap* ref_cnts) {
std::unordered_set<Tensor*> erase_tensors;
auto handler = [&](const VariableNameMap& name_map) {
for (auto& name_pair : name_map) {
for (auto& name : name_pair.second) {
auto it = ref_cnts->find(name);
if (it == ref_cnts->end()) continue;
if ((it->second)-- == 1) {
auto* var = scope.FindVar(name);
if (var != nullptr) {
VLOG(10) << "Erase tensor \'" << name << "\'";
if (var->IsType<LoDTensor>()) {
erase_tensors.insert(var->GetMutable<LoDTensor>());
} else if (var->IsType<SelectedRows>()) {
erase_tensors.insert(
var->GetMutable<SelectedRows>()->mutable_value());
}
}
}
}
}
};
handler(op->Inputs());
handler(op->Outputs());
if (!erase_tensors.empty()) {
gc->Add(erase_tensors);
}
}
Executor::Executor(const platform::Place& place) : place_(place) {} Executor::Executor(const platform::Place& place) : place_(place) {}
void Executor::Close() { void Executor::Close() {
...@@ -331,9 +366,13 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, ...@@ -331,9 +366,13 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
} }
int64_t max_memory_size = GetEagerDeletionThreshold(); int64_t max_memory_size = GetEagerDeletionThreshold();
std::unique_ptr<GarbageCollector<Tensor>> gc; std::unique_ptr<GarbageCollector<Tensor>> gc;
if (max_memory_size >= 0) { // WhileOp would set keep_kids to false
// WhileGradOp would need the scopes created in WhileOp
// Perhaps, we should not perform eager deletion in WhileOp
// The scopes and variables created by WhileOp would be deleted
// in WhileGradOp.
if (max_memory_size >= 0 && !keep_kids) {
ctx->ResetReferenceCount(); ctx->ResetReferenceCount();
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) { if (platform::is_gpu_place(place_)) {
...@@ -352,45 +391,8 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, ...@@ -352,45 +391,8 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
op->Run(*local_scope, place_); op->Run(*local_scope, place_);
if (gc != nullptr) { if (gc != nullptr) {
std::vector<std::string> erase_vars; DeleteUnusedTensors(*local_scope, op.get(), gc.get(),
for (auto& input : op->Inputs()) { &(ctx->cur_ref_cnts_));
for (auto& input_name : input.second) {
auto it = ctx->cur_ref_cnts_.find(input_name);
if (it == ctx->cur_ref_cnts_.end()) continue;
if (it->second == 1) { // should delete it
erase_vars.emplace_back(input_name);
ctx->cur_ref_cnts_.erase(input_name);
} else {
--(it->second);
}
}
}
for (auto& output : op->Outputs()) {
for (auto& output_name : output.second) {
auto it = ctx->cur_ref_cnts_.find(output_name);
if (it == ctx->cur_ref_cnts_.end()) continue;
if (it->second == 1) {
erase_vars.emplace_back(output_name);
ctx->cur_ref_cnts_.erase(output_name);
} else {
--(it->second);
}
}
}
if (!erase_vars.empty()) {
std::vector<framework::LoDTensor*> erase_tensors;
for (auto& name : erase_vars) {
auto* var = local_scope->FindVar(name);
if (var == nullptr) continue;
if (var->IsType<framework::LoDTensor>()) {
auto* tensor = var->GetMutable<framework::LoDTensor>();
erase_tensors.push_back(tensor);
}
}
if (!erase_tensors.empty()) gc->Add(erase_tensors);
}
} }
if (FLAGS_benchmark) { if (FLAGS_benchmark) {
......
...@@ -32,38 +32,32 @@ template <typename T> ...@@ -32,38 +32,32 @@ template <typename T>
std::unordered_map<std::string, T> GetNonPersistableReferenceCount( std::unordered_map<std::string, T> GetNonPersistableReferenceCount(
const ProgramDesc& prog, size_t block_id) { const ProgramDesc& prog, size_t block_id) {
auto& block = prog.Block(block_id); auto& block = prog.Block(block_id);
std::unordered_set<std::string> ignored_vars;
std::unordered_map<std::string, T> ref_cnts; std::unordered_map<std::string, T> ref_cnts;
for (auto var_desc : block.AllVars()) { auto update_ref_cnts = [&](OpDesc* op_desc, const VariableNameMap& name_map) {
for (auto& name_pair : name_map) {
for (auto& name : name_pair.second) {
auto* var_desc = block.FindVar(name);
if (var_desc == nullptr || var_desc->Persistable()) continue;
auto type = var_desc->Proto()->type().type(); auto type = var_desc->Proto()->type().type();
if (type != proto::VarType::LOD_TENSOR || var_desc->Persistable()) { if (type != proto::VarType::LOD_TENSOR &&
ignored_vars.insert(var_desc->Name()); // ignore persistable vars type != proto::VarType::SELECTED_ROWS) {
} continue;
} }
for (auto op_desc : block.AllOps()) { auto it = ref_cnts.find(name);
for (auto& input : op_desc->Inputs()) { if (it != ref_cnts.end()) {
for (auto& input_name : input.second) { ++it->second;
if (!ignored_vars.count(input_name)) { } else {
if (ref_cnts.count(input_name)) ref_cnts[name] = 1;
++ref_cnts[input_name];
else
ref_cnts[input_name] = 1;
} }
} }
} }
};
for (auto& output : op_desc->Outputs()) { for (auto op_desc : block.AllOps()) {
for (auto output_name : output.second) { update_ref_cnts(op_desc, op_desc->Inputs());
if (!ignored_vars.count(output_name)) { update_ref_cnts(op_desc, op_desc->Outputs());
if (ref_cnts.count(output_name))
++ref_cnts[output_name];
else
ref_cnts[output_name] = 1;
}
}
}
} }
return ref_cnts; return ref_cnts;
} }
......
...@@ -64,6 +64,8 @@ ParallelExecutor::ParallelExecutor( ...@@ -64,6 +64,8 @@ ParallelExecutor::ParallelExecutor(
const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
size_t num_trainers, size_t trainer_id) size_t num_trainers, size_t trainer_id)
: member_(new ParallelExecutorPrivate(places)) { : member_(new ParallelExecutorPrivate(places)) {
is_alive_.test_and_set();
member_->global_scope_ = scope; member_->global_scope_ = scope;
member_->use_cuda_ = exec_strategy.use_cuda_; member_->use_cuda_ = exec_strategy.use_cuda_;
member_->use_all_reduce_ = member_->use_all_reduce_ =
...@@ -246,6 +248,15 @@ void ParallelExecutor::BCastParamsToDevices( ...@@ -246,6 +248,15 @@ void ParallelExecutor::BCastParamsToDevices(
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) { const std::string &fetched_var_name) {
// If ParallelExecutor has been destructed
// just return
if (!is_alive_.test_and_set()) return;
// If ParallelExecutor is running
if (is_running_.test_and_set()) {
PADDLE_THROW("The previous ParallelExecutor::Run() has not stopped");
}
platform::RecordBlock b(0); platform::RecordBlock b(0);
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (!gcs_.empty()) { if (!gcs_.empty()) {
...@@ -259,9 +270,17 @@ void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, ...@@ -259,9 +270,17 @@ void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
} }
} }
#endif #endif
try {
auto fetch_data = member_->executor_->Run(fetch_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() = *member_->global_scope_->Var(fetched_var_name)
fetch_data; ->GetMutable<FeedFetchList>() = fetch_data;
is_running_.clear();
} catch (...) {
is_running_.clear();
if (is_alive_.test_and_set()) {
std::rethrow_exception(std::current_exception());
}
}
} }
void ParallelExecutor::FeedTensorsIntoLocalScopes( void ParallelExecutor::FeedTensorsIntoLocalScopes(
...@@ -299,6 +318,7 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( ...@@ -299,6 +318,7 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
} }
ParallelExecutor::~ParallelExecutor() { ParallelExecutor::~ParallelExecutor() {
is_alive_.clear();
if (member_->own_local_scope_) { if (member_->own_local_scope_) {
for (size_t i = 1; i < member_->local_scopes_.size(); ++i) { for (size_t i = 1; i < member_->local_scopes_.size(); ++i) {
Scope *local_scope = member_->local_scopes_[i]; Scope *local_scope = member_->local_scopes_[i];
...@@ -307,6 +327,12 @@ ParallelExecutor::~ParallelExecutor() { ...@@ -307,6 +327,12 @@ ParallelExecutor::~ParallelExecutor() {
} }
} }
} }
while (is_running_.test_and_set()) {
// wait unitl all threads have been stopped
}
member_.reset();
} }
} // namespace framework } // namespace framework
......
...@@ -75,7 +75,20 @@ class ParallelExecutor { ...@@ -75,7 +75,20 @@ class ParallelExecutor {
private: private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const; void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
ParallelExecutorPrivate *member_; std::unique_ptr<ParallelExecutorPrivate> member_;
// FIXME(zjl): HOT-FIX
// A flag to indicate whether ParallelExecutor is destructed.
// In Python side, when users interrupt the process manually, such as
// keyboard interrupt, ParallelExecutor may be destructed before Run() ends.
// Thus, disturbing exception messages would occur when interrupted.
// If is_alive_ is false, we would discard the last exception thrown by Run().
// Since std::atomic_flag is always lock-free and faster than
// std::atomic<bool>, we choose std::atomic_flag to be the flag here.
std::atomic_flag is_alive_ = ATOMIC_FLAG_INIT;
// A flag to indicate whether ParallelExecutor is running.
std::atomic_flag is_running_ = ATOMIC_FLAG_INIT;
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
// ref_cnts_ is only initialized when ParallelExecutor constructs, and then // ref_cnts_ is only initialized when ParallelExecutor constructs, and then
......
...@@ -49,18 +49,18 @@ int64_t GetEagerDeletionThreshold() { ...@@ -49,18 +49,18 @@ int64_t GetEagerDeletionThreshold() {
Scope::~Scope() { DropKids(); } Scope::~Scope() { DropKids(); }
Scope& Scope::NewScope() const { Scope& Scope::NewScope() const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
kids_.push_back(new Scope(this)); kids_.push_back(new Scope(this));
return *kids_.back(); return *kids_.back();
} }
Variable* Scope::Var(const std::string& name) { Variable* Scope::Var(const std::string& name) {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return VarInternal(name); return VarInternal(name);
} }
Variable* Scope::Var(std::string* name) { Variable* Scope::Var(std::string* name) {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto new_name = string::Sprintf("%p.%d", this, vars_.size()); auto new_name = string::Sprintf("%p.%d", this, vars_.size());
if (name != nullptr) { if (name != nullptr) {
*name = new_name; *name = new_name;
...@@ -69,29 +69,34 @@ Variable* Scope::Var(std::string* name) { ...@@ -69,29 +69,34 @@ Variable* Scope::Var(std::string* name) {
} }
Variable* Scope::FindVar(const std::string& name) const { Variable* Scope::FindVar(const std::string& name) const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return FindVarInternal(name); return FindVarInternal(name);
} }
Variable* Scope::FindLocalVar(const std::string& name) const {
std::lock_guard<std::mutex> lock(mutex_);
return FindVarLocally(name);
}
const Scope* Scope::FindScope(const Variable* var) const { const Scope* Scope::FindScope(const Variable* var) const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return FindScopeInternal(var); return FindScopeInternal(var);
} }
void Scope::DropKids() { void Scope::DropKids() {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
for (Scope* s : kids_) delete s; for (Scope* s : kids_) delete s;
kids_.clear(); kids_.clear();
} }
bool Scope::HasKid(const Scope* scope) const { bool Scope::HasKid(const Scope* scope) const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto it = std::find(this->kids_.begin(), this->kids_.end(), scope); auto it = std::find(this->kids_.begin(), this->kids_.end(), scope);
return it != this->kids_.end(); return it != this->kids_.end();
} }
std::vector<std::string> Scope::LocalVarNames() const { std::vector<std::string> Scope::LocalVarNames() const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::vector<std::string> known_vars; std::vector<std::string> known_vars;
known_vars.reserve(this->vars_.size()); known_vars.reserve(this->vars_.size());
for (auto& p : vars_) { for (auto& p : vars_) {
...@@ -101,7 +106,7 @@ std::vector<std::string> Scope::LocalVarNames() const { ...@@ -101,7 +106,7 @@ std::vector<std::string> Scope::LocalVarNames() const {
} }
void Scope::DeleteScope(Scope* scope) const { void Scope::DeleteScope(Scope* scope) const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto it = std::find(this->kids_.begin(), this->kids_.end(), scope); auto it = std::find(this->kids_.begin(), this->kids_.end(), scope);
PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope); PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope);
this->kids_.erase(it); this->kids_.erase(it);
...@@ -114,7 +119,7 @@ void Scope::DeleteScope(Scope* scope) const { ...@@ -114,7 +119,7 @@ void Scope::DeleteScope(Scope* scope) const {
} }
void Scope::EraseVars(const std::vector<std::string>& var_names) { void Scope::EraseVars(const std::vector<std::string>& var_names) {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::set<std::string> var_set(var_names.begin(), var_names.end()); std::set<std::string> var_set(var_names.begin(), var_names.end());
for (auto it = vars_.begin(); it != vars_.end();) { for (auto it = vars_.begin(); it != vars_.end();) {
if (var_set.find(it->first) != var_set.end()) { if (var_set.find(it->first) != var_set.end()) {
...@@ -127,12 +132,12 @@ void Scope::EraseVars(const std::vector<std::string>& var_names) { ...@@ -127,12 +132,12 @@ void Scope::EraseVars(const std::vector<std::string>& var_names) {
void Scope::Rename(const std::string& origin_name, void Scope::Rename(const std::string& origin_name,
const std::string& new_name) const { const std::string& new_name) const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
RenameInternal(origin_name, new_name); RenameInternal(origin_name, new_name);
} }
std::string Scope::Rename(const std::string& origin_name) const { std::string Scope::Rename(const std::string& origin_name) const {
std::unique_lock<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
auto new_name = string::Sprintf("%p.%d", this, vars_.size()); auto new_name = string::Sprintf("%p.%d", this, vars_.size());
RenameInternal(origin_name, new_name); RenameInternal(origin_name, new_name);
return new_name; return new_name;
......
...@@ -63,6 +63,11 @@ class Scope { ...@@ -63,6 +63,11 @@ class Scope {
/// Caller doesn't own the returned Variable. /// Caller doesn't own the returned Variable.
Variable* FindVar(const std::string& name) const; Variable* FindVar(const std::string& name) const;
/// Find a variable in the current scope.
/// Return nullptr if cannot find.
/// Caller doesn't own the returned Variable.
Variable* FindLocalVar(const std::string& name) const;
const Scope* parent() const { return parent_; } const Scope* parent() const { return parent_; }
/// Find the scope or an ancestor scope that contains the given variable. /// Find the scope or an ancestor scope that contains the given variable.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册