未验证 提交 ce16b40b 编写于 作者: F fengjiayi 提交者: GitHub

Merge pull request #11891 from JiayiFeng/dev_eof_exp

Add EOFException to represent EOF in C++ reader
...@@ -62,7 +62,7 @@ std::vector<std::array<int, 3>> DataBalanceOpHandle::GetBalancePlan( ...@@ -62,7 +62,7 @@ std::vector<std::array<int, 3>> DataBalanceOpHandle::GetBalancePlan(
} }
if (total_size < device_num) { if (total_size < device_num) {
// No enough data. // No enough data.
PADDLE_THROW("There is no next data."); PADDLE_THROW_EOF();
} }
std::sort(size_device_vec.begin(), size_device_vec.end(), std::sort(size_device_vec.begin(), size_device_vec.end(),
[](const std::array<int, 2> &a, const std::array<int, 2> &b) { [](const std::array<int, 2> &a, const std::array<int, 2> &b) {
......
...@@ -98,9 +98,18 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -98,9 +98,18 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
if (timeout) { if (timeout) {
std::lock_guard<std::mutex> l(exception_mu_); std::lock_guard<std::mutex> l(exception_mu_);
if (exception_) { if (exception_) {
auto exp = *exception_; std::exception *exp = exception_.get();
exception_.reset(); if (dynamic_cast<platform::EOFException *>(exp)) {
throw exp; auto e = *static_cast<platform::EOFException *>(exp);
exception_.reset();
throw e;
} else if (dynamic_cast<platform::EnforceNotMet *>(exp)) {
auto e = *static_cast<platform::EnforceNotMet *>(exp);
exception_.reset();
throw e;
} else {
LOG(FATAL) << "Unknown exception.";
}
} else { } else {
continue; continue;
} }
...@@ -199,6 +208,12 @@ void ThreadedSSAGraphExecutor::RunOp( ...@@ -199,6 +208,12 @@ void ThreadedSSAGraphExecutor::RunOp(
running_ops_--; running_ops_--;
ready_var_q->Extend(op->Outputs()); ready_var_q->Extend(op->Outputs());
VLOG(10) << op << " " << op->Name() << "Signal posted"; VLOG(10) << op << " " << op->Name() << "Signal posted";
} catch (platform::EOFException ex) {
std::lock_guard<std::mutex> l(exception_mu_);
// EOFException will not cover up existing EnforceNotMet.
if (exception_.get() == nullptr) {
exception_.reset(new platform::EOFException(ex));
}
} catch (platform::EnforceNotMet ex) { } catch (platform::EnforceNotMet ex) {
std::lock_guard<std::mutex> l(exception_mu_); std::lock_guard<std::mutex> l(exception_mu_);
exception_.reset(new platform::EnforceNotMet(ex)); exception_.reset(new platform::EnforceNotMet(ex));
......
...@@ -57,7 +57,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -57,7 +57,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
platform::DeviceContextPool fetch_ctxs_; platform::DeviceContextPool fetch_ctxs_;
std::mutex exception_mu_; std::mutex exception_mu_;
std::unique_ptr<platform::EnforceNotMet> exception_; std::unique_ptr<std::exception> exception_;
std::atomic<int> running_ops_; std::atomic<int> running_ops_;
void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops, void InsertPendingOp(std::unordered_map<OpHandleBase *, size_t> *pending_ops,
......
...@@ -68,7 +68,7 @@ class ReadOp : public framework::OperatorBase { ...@@ -68,7 +68,7 @@ class ReadOp : public framework::OperatorBase {
reader->ReadNext(&ins); reader->ReadNext(&ins);
if (ins.empty()) { if (ins.empty()) {
if (Attr<bool>("throw_eof_exp")) { if (Attr<bool>("throw_eof_exp")) {
PADDLE_THROW("There is no next data."); PADDLE_THROW_EOF();
} else { } else {
ins.resize(out_arg_names.size()); ins.resize(out_arg_names.size());
for (auto& tensor : ins) { for (auto& tensor : ins) {
......
...@@ -102,6 +102,15 @@ struct EnforceNotMet : public std::exception { ...@@ -102,6 +102,15 @@ struct EnforceNotMet : public std::exception {
const char* what() const noexcept { return err_str_.c_str(); } const char* what() const noexcept { return err_str_.c_str(); }
}; };
struct EOFException : public std::exception {
std::string err_str_;
EOFException(const char* err_msg, const char* f, int l) {
err_str_ = string::Sprintf("%s at [%s:%d]", err_msg, f, l);
}
const char* what() const noexcept { return err_str_.c_str(); }
};
// Because most enforce conditions would evaluate to true, we can use // Because most enforce conditions would evaluate to true, we can use
// __builtin_expect to instruct the C++ compiler to generate code that // __builtin_expect to instruct the C++ compiler to generate code that
// always forces branch prediction of true. // always forces branch prediction of true.
...@@ -242,6 +251,11 @@ inline void throw_on_error(T e) { ...@@ -242,6 +251,11 @@ inline void throw_on_error(T e) {
#define PADDLE_ENFORCE(...) ::paddle::platform::throw_on_error(__VA_ARGS__); #define PADDLE_ENFORCE(...) ::paddle::platform::throw_on_error(__VA_ARGS__);
#endif #endif
#define PADDLE_THROW_EOF() \
do { \
throw ::paddle::platform::EOFException("There is no next data.", __FILE__, \
__LINE__); \
} while (false)
/* /*
* Some enforce helpers here, usage: * Some enforce helpers here, usage:
* int a = 1; * int a = 1;
......
...@@ -210,3 +210,14 @@ TEST(ENFORCE_USER_DEFINED_CLASS, NE) { ...@@ -210,3 +210,14 @@ TEST(ENFORCE_USER_DEFINED_CLASS, NE) {
Dims a{{1, 2, 3, 4}}, b{{5, 6, 7, 8}}; Dims a{{1, 2, 3, 4}}, b{{5, 6, 7, 8}};
ASSERT_THROW(PADDLE_ENFORCE_EQ(a, b), paddle::platform::EnforceNotMet); ASSERT_THROW(PADDLE_ENFORCE_EQ(a, b), paddle::platform::EnforceNotMet);
} }
TEST(EOF_EXCEPTION, THROW_EOF) {
bool caught_eof = false;
try {
PADDLE_THROW_EOF();
} catch (paddle::platform::EOFException error) {
caught_eof = true;
EXPECT_TRUE(HasPrefix(StringPiece(error.what()), "There is no next data."));
}
EXPECT_TRUE(caught_eof);
}
...@@ -18,10 +18,13 @@ namespace paddle { ...@@ -18,10 +18,13 @@ namespace paddle {
namespace pybind { namespace pybind {
void BindException(pybind11::module* m) { void BindException(pybind11::module* m) {
static pybind11::exception<platform::EOFException> eof(*m, "EOFException");
static pybind11::exception<platform::EnforceNotMet> exc(*m, "EnforceNotMet"); static pybind11::exception<platform::EnforceNotMet> exc(*m, "EnforceNotMet");
pybind11::register_exception_translator([](std::exception_ptr p) { pybind11::register_exception_translator([](std::exception_ptr p) {
try { try {
if (p) std::rethrow_exception(p); if (p) std::rethrow_exception(p);
} catch (const platform::EOFException& e) {
eof(e.what());
} catch (const platform::EnforceNotMet& e) { } catch (const platform::EnforceNotMet& e) {
exc(e.what()); exc(e.what());
} }
......
...@@ -118,8 +118,7 @@ class TestDataBalance(unittest.TestCase): ...@@ -118,8 +118,7 @@ class TestDataBalance(unittest.TestCase):
try: try:
image_val, label_val = parallel_exe.run(fetch_list, image_val, label_val = parallel_exe.run(fetch_list,
return_numpy=True) return_numpy=True)
except fluid.core.EnforceNotMet as ex: except fluid.core.EOFException:
self.assertIn("There is no next data.", ex.message)
break break
ins_num = image_val.shape[0] ins_num = image_val.shape[0]
broadcasted_label = np.ones( broadcasted_label = np.ones(
...@@ -162,8 +161,7 @@ class TestDataBalance(unittest.TestCase): ...@@ -162,8 +161,7 @@ class TestDataBalance(unittest.TestCase):
try: try:
ins_tensor, label_tensor = parallel_exe.run( ins_tensor, label_tensor = parallel_exe.run(
fetch_list, return_numpy=False) fetch_list, return_numpy=False)
except fluid.core.EnforceNotMet as ex: except fluid.core.EOFException:
self.assertIn("There is no next data.", ex.message)
break break
ins_val = np.array(ins_tensor) ins_val = np.array(ins_tensor)
......
...@@ -64,8 +64,7 @@ class TestMultipleReader(unittest.TestCase): ...@@ -64,8 +64,7 @@ class TestMultipleReader(unittest.TestCase):
while True: while True:
try: try:
img_val, = exe.run(fetch_list=[img]) img_val, = exe.run(fetch_list=[img])
except fluid.core.EnforceNotMet as ex: except fluid.core.EOFException:
self.assertIn("There is no next data.", ex.message)
break break
batch_count += 1 batch_count += 1
self.assertLessEqual(img_val.shape[0], self.batch_size) self.assertLessEqual(img_val.shape[0], self.batch_size)
......
...@@ -59,8 +59,7 @@ class TestMultipleReader(unittest.TestCase): ...@@ -59,8 +59,7 @@ class TestMultipleReader(unittest.TestCase):
while True: while True:
try: try:
img_val, = exe.run(fetch_list=[img]) img_val, = exe.run(fetch_list=[img])
except fluid.core.EnforceNotMet as ex: except fluid.core.EOFException:
self.assertIn("There is no next data.", ex.message)
break break
batch_count += 1 batch_count += 1
self.assertLessEqual(img_val.shape[0], self.batch_size) self.assertLessEqual(img_val.shape[0], self.batch_size)
......
...@@ -68,8 +68,7 @@ class TestRecordIO(unittest.TestCase): ...@@ -68,8 +68,7 @@ class TestRecordIO(unittest.TestCase):
while True: while True:
try: try:
tmp, = exe.run(fetch_list=[avg_loss]) tmp, = exe.run(fetch_list=[avg_loss])
except fluid.core.EnforceNotMet as ex: except fluid.core.EOFException:
self.assertIn("There is no next data.", ex.message)
break break
avg_loss_np.append(tmp) avg_loss_np.append(tmp)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册