diff --git a/src/storage/blocksstable/ob_tmp_file.cpp b/src/storage/blocksstable/ob_tmp_file.cpp index fd783333388bcbac6328c8c4b9a29d46a1c10736..b7d8de59cf2143a16b4d56e868b2e8c9617bda86 100644 --- a/src/storage/blocksstable/ob_tmp_file.cpp +++ b/src/storage/blocksstable/ob_tmp_file.cpp @@ -40,14 +40,10 @@ bool ObTmpFileIOInfo::is_valid() const } ObTmpFileIOHandle::ObTmpFileIOHandle() - : tmp_file_(NULL), - io_handles_(), - page_cache_handles_(), - block_cache_handles_(), - buf_(NULL), - size_(0), - is_read_(false) -{} + : tmp_file_(NULL), io_handles_(), page_cache_handles_(), block_cache_handles_(), buf_(NULL), + size_(0), is_read_(false), has_wait_(false) +{ +} ObTmpFileIOHandle::~ObTmpFileIOHandle() { @@ -65,6 +61,7 @@ int ObTmpFileIOHandle::prepare_read(char* read_buf, ObTmpFile* file) size_ = 0; tmp_file_ = file; is_read_ = true; + has_wait_ = false; } return ret; } @@ -80,6 +77,7 @@ int ObTmpFileIOHandle::prepare_write(char* write_buf, const int64_t write_size, size_ = write_size; tmp_file_ = file; is_read_ = false; + has_wait_ = false; } return ret; } @@ -87,29 +85,35 @@ int ObTmpFileIOHandle::prepare_write(char* write_buf, const int64_t write_size, int ObTmpFileIOHandle::wait(const int64_t timeout_ms) { int ret = OB_SUCCESS; - for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) { - ObBlockCacheHandle& tmp = block_cache_handles_.at(i); - MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); - tmp.block_handle_.reset(); - } - for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) { - ObPageCacheHandle& tmp = page_cache_handles_.at(i); - MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); - tmp.page_handle_.reset(); - } - for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) { - ObIOReadHandle& tmp = io_handles_.at(i); - if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) { - STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret)); - } else { - MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_); - tmp.macro_handle_.reset(); + if (OB_UNLIKELY(has_wait_ && is_read_)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "read wait() isn't reentrant interface, shouldn't call again", K(ret)); + } else { + for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) { + ObBlockCacheHandle &tmp = block_cache_handles_.at(i); + MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); + tmp.block_handle_.reset(); } - } - if (OB_SUCC(ret)) { - io_handles_.reset(); - page_cache_handles_.reset(); block_cache_handles_.reset(); + + for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) { + ObPageCacheHandle &tmp = page_cache_handles_.at(i); + MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); + tmp.page_handle_.reset(); + } + page_cache_handles_.reset(); + + for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) { + ObIOReadHandle &tmp = io_handles_.at(i); + if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) { + STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret)); + } else { + MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_); + tmp.macro_handle_.reset(); + } + } + io_handles_.reset(); + has_wait_ = true; } return ret; } @@ -132,6 +136,7 @@ void ObTmpFileIOHandle::reset() size_ = 0; tmp_file_ = NULL; is_read_ = false; + has_wait_ = false; } bool ObTmpFileIOHandle::is_valid() @@ -1000,6 +1005,7 @@ int ObTmpFileManager::aio_read(const ObTmpFileIOInfo& io_info, ObTmpFileIOHandle { int ret = OB_SUCCESS; ObTmpFileHandle file_handle; + handle.reset(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret)); @@ -1020,6 +1026,7 @@ int ObTmpFileManager::aio_pread(const ObTmpFileIOInfo& io_info, const int64_t of { int ret = OB_SUCCESS; ObTmpFileHandle file_handle; + handle.reset(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret)); @@ -1040,6 +1047,7 @@ int ObTmpFileManager::read(const ObTmpFileIOInfo& io_info, const int64_t timeout { int ret = OB_SUCCESS; ObTmpFileHandle file_handle; + handle.reset(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret)); @@ -1061,6 +1069,7 @@ int ObTmpFileManager::pread( { int ret = OB_SUCCESS; ObTmpFileHandle file_handle; + handle.reset(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret)); @@ -1081,6 +1090,7 @@ int ObTmpFileManager::aio_write(const ObTmpFileIOInfo& io_info, ObTmpFileIOHandl { int ret = OB_SUCCESS; ObTmpFileHandle file_handle; + handle.reset(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObTmpFileManager has not been inited", K(ret)); diff --git a/src/storage/blocksstable/ob_tmp_file.h b/src/storage/blocksstable/ob_tmp_file.h index 78158d7b08ef9a5cf105c4323b9cef09cf2303bd..0e16f1e94e4709eaad2144a6c762ab55cf70197e 100644 --- a/src/storage/blocksstable/ob_tmp_file.h +++ b/src/storage/blocksstable/ob_tmp_file.h @@ -126,6 +126,7 @@ private: char* buf_; int64_t size_; // has read or to write size. bool is_read_; + bool has_wait_; DISALLOW_COPY_AND_ASSIGN(ObTmpFileIOHandle); }; diff --git a/unittest/storage/blocksstable/test_tmp_file.cpp b/unittest/storage/blocksstable/test_tmp_file.cpp index 2b77f2a4d3f82a7d3cbea488b0687b660dfd3f80..a8d699778718590cb7c3f8a70abc719d53fef5ce 100644 --- a/unittest/storage/blocksstable/test_tmp_file.cpp +++ b/unittest/storage/blocksstable/test_tmp_file.cpp @@ -1423,6 +1423,59 @@ TEST_F(TestTmpFile, test_drop_tenant_file) ASSERT_EQ(0, ObTmpFileStore::get_instance().tenant_file_stores_.size()); } +TEST_F(TestTmpFile, test_handle_double_wait) +{ + int ret = OB_SUCCESS; + int64_t dir = -1; + int64_t fd = -1; + ObTmpFileIOInfo io_info; + ObTmpFileIOHandle handle; + ret = ObTmpFileManager::get_instance().alloc_dir(dir); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObTmpFileManager::get_instance().open(fd, dir); + ASSERT_EQ(OB_SUCCESS, ret); + char *write_buf = new char [256]; + for (int i = 0; i < 256; ++i) { + write_buf[i] = static_cast(i); + } + char *read_buf = new char [256]; + io_info.fd_ = fd; + io_info.tenant_id_ = 1; + io_info.io_desc_.category_ = USER_IO; + io_info.io_desc_.wait_event_no_ = 2; + io_info.buf_ = write_buf; + io_info.size_ = 256; + const int64_t timeout_ms = 5000; + int64_t write_time = ObTimeUtility::current_time(); + ret = ObTmpFileManager::get_instance().write(io_info, timeout_ms); + write_time = ObTimeUtility::current_time() - write_time; + ASSERT_EQ(OB_SUCCESS, ret); + io_info.buf_ = read_buf; + + + int64_t read_time = ObTimeUtility::current_time(); + ret = ObTmpFileManager::get_instance().pread(io_info, 0, timeout_ms, handle); + read_time = ObTimeUtility::current_time() - read_time; + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(256, handle.get_data_size()); + int cmp = memcmp(handle.get_buffer(), write_buf, 256); + ASSERT_EQ(0, cmp); + + ASSERT_EQ(OB_ERR_UNEXPECTED, handle.wait(timeout_ms)); + + STORAGE_LOG(INFO, "test_handle_double_wait"); + STORAGE_LOG(INFO, "io time", K(write_time), K(read_time)); + ObTmpTenantFileStore *store = NULL; + OB_TMP_FILE_STORE.get_store(1, store); + store->print_block_usage(); + ObMallocAllocator::get_instance()->print_tenant_memory_usage(1); + ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(1); + ObMallocAllocator::get_instance()->print_tenant_memory_usage(500); + ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500); + + ObTmpFileManager::get_instance().remove(fd); +} + } // end namespace unittest } // end namespace oceanbase