提交 3e4083ed 编写于 作者: F fengjiayi

Make exception handling of threaded_ssa_graph_executor an independent class

上级 ec4c6e1f
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace framework {
namespace details {
class ExceptionHolder {
public:
void Catch(const platform::EnforceNotMet& exp) {
std::lock_guard<std::mutex> lock(mu_);
exception_.reset(new platform::EnforceNotMet(exp));
type_ = kEnforceNotMet;
}
void Catch(const platform::EOFException& exp) {
std::lock_guard<std::mutex> lock(mu_);
// EOFException will not cover up existing EnforceNotMet.
if (exception_.get() == nullptr) {
exception_.reset(new platform::EOFException(exp));
type_ = kEOF;
}
}
bool ExceptionCatched() const {
std::lock_guard<std::mutex> lock(mu_);
return exception_.get() != nullptr;
}
void Throw() {
std::lock_guard<std::mutex> lock(mu_);
switch (type_) {
case kNone:
break;
case kEnforceNotMet: {
auto e = *static_cast<platform::EnforceNotMet*>(exception_.get());
throw e;
break;
}
case kEOF: {
auto e = *static_cast<platform::EOFException*>(exception_.get());
throw e;
break;
}
default:
LOG(FATAL) << "Unknown exception.";
}
exception_.reset();
type_ = kNone;
}
void Clear() {
std::lock_guard<std::mutex> lock(mu_);
exception_.reset();
type_ = kNone;
}
private:
enum ExceptionType { kNone, kEnforceNotMet, kEOF };
ExceptionType type_{kNone};
std::unique_ptr<std::exception> exception_;
mutable std::mutex mu_;
};
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -83,7 +83,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -83,7 +83,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
// Clean run context // Clean run context
run_op_futures_.clear(); run_op_futures_.clear();
exception_.reset(); exception_holder_.Clear();
// Step 3. Execution // Step 3. Execution
while (!pending_vars.empty()) { while (!pending_vars.empty()) {
...@@ -103,23 +103,11 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -103,23 +103,11 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto cur_ready_vars = ready_vars.PopAll(1, &timeout); auto cur_ready_vars = ready_vars.PopAll(1, &timeout);
if (timeout) { if (timeout) {
std::unique_lock<std::mutex> l(exception_mu_); if (exception_holder_.ExceptionCatched()) {
if (exception_) {
l.unlock();
for (auto &run_op_future : run_op_futures_) { for (auto &run_op_future : run_op_futures_) {
run_op_future.wait(); run_op_future.wait();
} }
l.lock(); exception_holder_.Throw();
std::exception *exp = exception_.get();
if (dynamic_cast<platform::EOFException *>(exp)) {
auto e = *static_cast<platform::EOFException *>(exp);
throw e;
} else if (dynamic_cast<platform::EnforceNotMet *>(exp)) {
auto e = *static_cast<platform::EnforceNotMet *>(exp);
throw e;
} else {
LOG(FATAL) << "Unknown exception.";
}
} else { } else {
continue; continue;
} }
...@@ -229,14 +217,9 @@ void ThreadedSSAGraphExecutor::RunOp( ...@@ -229,14 +217,9 @@ void ThreadedSSAGraphExecutor::RunOp(
ready_var_q->Extend(op->Outputs()); ready_var_q->Extend(op->Outputs());
VLOG(10) << op << " " << op->Name() << "Signal posted"; VLOG(10) << op << " " << op->Name() << "Signal posted";
} catch (platform::EOFException ex) { } catch (platform::EOFException ex) {
std::lock_guard<std::mutex> l(exception_mu_); exception_holder_.Catch(ex);
// EOFException will not cover up existing EnforceNotMet.
if (exception_.get() == nullptr) {
exception_.reset(new platform::EOFException(ex));
}
} catch (platform::EnforceNotMet ex) { } catch (platform::EnforceNotMet ex) {
std::lock_guard<std::mutex> l(exception_mu_); exception_holder_.Catch(ex);
exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) { } catch (...) {
LOG(FATAL) << "Unknown exception catched"; LOG(FATAL) << "Unknown exception catched";
} }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <functional> #include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party #include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/details/exception_holder.h"
#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/fetch_op_handle.h" #include "paddle/fluid/framework/details/fetch_op_handle.h"
#include "paddle/fluid/framework/details/ssa_graph_executor.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h"
...@@ -58,8 +59,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -58,8 +59,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
platform::DeviceContextPool fetch_ctxs_; platform::DeviceContextPool fetch_ctxs_;
std::mutex exception_mu_; ExceptionHolder exception_holder_;
std::unique_ptr<std::exception> exception_;
std::atomic<int> running_ops_; std::atomic<int> running_ops_;
void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops, void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册