未验证 提交 681226e9 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #13864 from jacquesqiao/py-reader-add-test-mode

reader block queue add test mode
...@@ -31,8 +31,8 @@ class BlockingQueue { ...@@ -31,8 +31,8 @@ class BlockingQueue {
// is a workaround and a simplified version of framework::Channel as it // is a workaround and a simplified version of framework::Channel as it
// doesn't support GPU and it implements on buffered blocking queue. // doesn't support GPU and it implements on buffered blocking queue.
public: public:
explicit BlockingQueue(size_t capacity) explicit BlockingQueue(size_t capacity, bool speed_test_mode = false)
: capacity_(capacity), closed_(false) { : capacity_(capacity), speed_test_mode_(speed_test_mode), closed_(false) {
PADDLE_ENFORCE_GT( PADDLE_ENFORCE_GT(
capacity_, 0, capacity_, 0,
"The capacity of a reader::BlockingQueue must be greater than 0."); "The capacity of a reader::BlockingQueue must be greater than 0.");
...@@ -72,7 +72,9 @@ class BlockingQueue { ...@@ -72,7 +72,9 @@ class BlockingQueue {
if (!queue_.empty()) { if (!queue_.empty()) {
PADDLE_ENFORCE_NOT_NULL(elem); PADDLE_ENFORCE_NOT_NULL(elem);
*elem = queue_.front(); *elem = queue_.front();
if (LIKELY(!speed_test_mode_)) {
queue_.pop_front(); queue_.pop_front();
}
send_cv_.notify_one(); send_cv_.notify_one();
return true; return true;
} else { } else {
...@@ -114,6 +116,7 @@ class BlockingQueue { ...@@ -114,6 +116,7 @@ class BlockingQueue {
private: private:
size_t capacity_; size_t capacity_;
bool speed_test_mode_;
bool closed_; bool closed_;
std::deque<T> queue_; std::deque<T> queue_;
......
...@@ -33,8 +33,9 @@ class LoDTensorBlockingQueue { ...@@ -33,8 +33,9 @@ class LoDTensorBlockingQueue {
private: private:
LoDTensorBlockingQueue(size_t capacity, LoDTensorBlockingQueue(size_t capacity,
const std::vector<framework::DDim>& dims) const std::vector<framework::DDim>& dims,
: queue_(capacity), dims_(dims) {} bool speed_test_mode = false)
: queue_(capacity, speed_test_mode), dims_(dims) {}
public: public:
bool Push(const std::vector<framework::LoDTensor>& lod_tensor_vec) { bool Push(const std::vector<framework::LoDTensor>& lod_tensor_vec) {
...@@ -69,11 +70,12 @@ class LoDTensorBlockingQueue { ...@@ -69,11 +70,12 @@ class LoDTensorBlockingQueue {
class LoDTensorBlockingQueueHolder { class LoDTensorBlockingQueueHolder {
public: public:
void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims) { void InitOnce(size_t capacity, const std::vector<framework::DDim>& dims,
bool speed_test_mode = false) {
PADDLE_ENFORCE( PADDLE_ENFORCE(
queue_ == nullptr, queue_ == nullptr,
"LoDTensorBlockingQueueHolder::InitOnce() can only be called once"); "LoDTensorBlockingQueueHolder::InitOnce() can only be called once");
queue_.reset(new LoDTensorBlockingQueue(capacity, dims)); queue_.reset(new LoDTensorBlockingQueue(capacity, dims, speed_test_mode));
} }
inline const std::shared_ptr<LoDTensorBlockingQueue>& GetQueue() const { inline const std::shared_ptr<LoDTensorBlockingQueue>& GetQueue() const {
......
...@@ -217,3 +217,27 @@ TEST(BlockingQueue, MyClassTest) { ...@@ -217,3 +217,27 @@ TEST(BlockingQueue, MyClassTest) {
q.Receive(&b); q.Receive(&b);
EXPECT_EQ(a.val_, b.val_); EXPECT_EQ(a.val_, b.val_);
} }
TEST(BlockingQueue, speed_test_mode) {
size_t queue_size = 10;
BlockingQueue<size_t> q1(queue_size, false);
for (size_t i = 0; i < queue_size; ++i) {
q1.Send(i);
}
size_t b;
for (size_t i = 0; i < queue_size; ++i) {
q1.Receive(&b);
EXPECT_EQ(b, i);
}
EXPECT_EQ(q1.Size(), 0);
BlockingQueue<size_t> q2(queue_size, true);
for (size_t i = 0; i < queue_size; ++i) {
q2.Send(i);
}
for (size_t i = 0; i < queue_size; ++i) {
q2.Receive(&b);
EXPECT_EQ(b, 0);
}
EXPECT_EQ(q2.Size(), queue_size);
}
...@@ -130,6 +130,13 @@ struct EOFException : public std::exception { ...@@ -130,6 +130,13 @@ struct EOFException : public std::exception {
#define UNLIKELY(condition) (condition == 0) #define UNLIKELY(condition) (condition == 0)
#endif #endif
#if !defined(_WIN32)
#define LIKELY(condition) __builtin_expect(static_cast<bool>(condition), 1)
#else
// there is no equivalent intrinsics in msvc.
#define LIKELY(condition) (condition != 0)
#endif
template <typename... Args> template <typename... Args>
inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error( inline typename std::enable_if<sizeof...(Args) != 0, void>::type throw_on_error(
bool stat, const Args&... args) { bool stat, const Args&... args) {
......
...@@ -57,6 +57,10 @@ limitations under the License. */ ...@@ -57,6 +57,10 @@ limitations under the License. */
#include "pybind11/stl.h" #include "pybind11/stl.h"
DEFINE_bool(reader_queue_speed_test_mode, false,
"If set true, the queue.pop will only get data from queue but not "
"remove the data from queue for speed testing");
// disable auto conversion to list in Python // disable auto conversion to list in Python
PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensorArray); PYBIND11_MAKE_OPAQUE(paddle::framework::LoDTensorArray);
...@@ -380,7 +384,8 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -380,7 +384,8 @@ All parameter, weight, gradient are variables in Paddle.
return make_ddim(shape); return make_ddim(shape);
}); });
auto *holder = var.GetMutable<LoDTensorBlockingQueueHolder>(); auto *holder = var.GetMutable<LoDTensorBlockingQueueHolder>();
holder->InitOnce(capacity, dims); holder->InitOnce(capacity, dims,
FLAGS_reader_queue_speed_test_mode);
return holder->GetQueue(); return holder->GetQueue();
}, },
py::return_value_policy::copy); py::return_value_policy::copy);
......
...@@ -113,7 +113,8 @@ def __bootstrap__(): ...@@ -113,7 +113,8 @@ def __bootstrap__():
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir', 'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir',
'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb', 'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb',
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads', 'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads',
"dist_threadpool_size", 'cpu_deterministic', 'eager_delete_tensor_gb' 'dist_threadpool_size', 'cpu_deterministic', 'eager_delete_tensor_gb',
'reader_queue_speed_test_mode'
] ]
if core.is_compiled_with_dist(): if core.is_compiled_with_dist():
read_env_flags.append('rpc_deadline') read_env_flags.append('rpc_deadline')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册