提交 e3937271 编写于 作者: O obdev 提交者: wangzelin.wzl

Fix callbakc memory leak and limit memory by tenant id.

上级 ccf6ae5c
......@@ -122,7 +122,8 @@ int ObTmpPageCacheValue::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheV
int ObTmpPageCache::prefetch(
const ObTmpPageCacheKey &key,
const ObTmpBlockIOInfo &info,
ObMacroBlockHandle &mb_handle)
ObMacroBlockHandle &mb_handle,
common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!key.is_valid() )) {
......@@ -134,7 +135,7 @@ int ObTmpPageCache::prefetch(
callback.cache_ = this;
callback.offset_ = info.offset_;
callback.buf_size_ = info.size_;
callback.allocator_ = &allocator_;
callback.allocator_ = &allocator;
callback.key_ = key;
if (OB_FAIL(read_io(info, callback, mb_handle))) {
if (mb_handle.get_io_handle().is_empty()) {
......@@ -156,7 +157,8 @@ int ObTmpPageCache::prefetch(
int ObTmpPageCache::prefetch(
const ObTmpBlockIOInfo &info,
const common::ObIArray<ObTmpPageIOInfo> &page_io_infos,
ObMacroBlockHandle &mb_handle)
ObMacroBlockHandle &mb_handle,
common::ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(page_io_infos.count() <= 0)) {
......@@ -167,31 +169,22 @@ int ObTmpPageCache::prefetch(
callback.cache_ = this;
callback.offset_ = info.offset_;
callback.buf_size_ = info.size_;
callback.allocator_ = &allocator_;
void *buf = allocator_.alloc(sizeof(common::ObSEArray<ObTmpPageIOInfo, ObTmpFilePageBuddy::MAX_PAGE_NUMS>));
if (NULL == buf) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc a buf", K(ret), K(info));
} else {
callback.page_io_infos_ = new (buf) common::ObSEArray<ObTmpPageIOInfo, ObTmpFilePageBuddy::MAX_PAGE_NUMS>();
callback.page_io_infos_->assign(page_io_infos);
if (OB_FAIL(read_io(info, callback, mb_handle))) {
if (mb_handle.get_io_handle().is_empty()) {
// TODO: After the continuous IO has been optimized, this should
// not happen.
if (OB_FAIL(mb_handle.wait(DEFAULT_IO_WAIT_TIME_MS))) {
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
}
} else {
callback.allocator_ = &allocator;
if (OB_FAIL(callback.page_io_infos_.assign(page_io_infos))) {
STORAGE_LOG(WARN, "fail to assign page io infos", K(ret), K(page_io_infos.count()), K(info));
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
if (mb_handle.get_io_handle().is_empty()) {
// TODO: After the continuous IO has been optimized, this should
// not happen.
if (OB_FAIL(mb_handle.wait(DEFAULT_IO_WAIT_TIME_MS))) {
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
}
} else {
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(buf)) {
allocator_.free(buf);
}
}
return ret;
}
......@@ -340,6 +333,8 @@ ObTmpPageCache::ObTmpMultiPageIOCallback::ObTmpMultiPageIOCallback()
ObTmpPageCache::ObTmpMultiPageIOCallback::~ObTmpMultiPageIOCallback()
{
page_io_infos_.reset();
page_io_infos_.~ObIArray<ObTmpPageIOInfo>();
}
int ObTmpPageCache::ObTmpMultiPageIOCallback::inner_process(const bool is_success)
......@@ -350,17 +345,16 @@ int ObTmpPageCache::ObTmpMultiPageIOCallback::inner_process(const bool is_succes
STORAGE_LOG(WARN, "Invalid tmp page cache callback, ", KP_(cache), K(ret));
} else if (is_success) {
char *buf = const_cast<char *>(get_data());
for (int32_t i = 0; OB_SUCC(ret) && i < page_io_infos_->count(); i++) {
int64_t offset = page_io_infos_->at(i).key_.get_page_id()
for (int32_t i = 0; OB_SUCC(ret) && i < page_io_infos_.count(); i++) {
int64_t offset = page_io_infos_.at(i).key_.get_page_id()
* ObTmpMacroBlock::get_default_page_size() - offset_;
offset += ObTmpMacroBlock::get_header_padding();
ObTmpPageCacheValue value(buf + offset);
if (OB_FAIL(process_page(page_io_infos_->at(i).key_, value))) {
if (OB_FAIL(process_page(page_io_infos_.at(i).key_, value))) {
STORAGE_LOG(WARN, "fail to process tmp page cache in callback", K(ret));
}
}
page_io_infos_->reset();
allocator_->free(page_io_infos_);
page_io_infos_.reset();
}
if (OB_FAIL(ret) && NULL != allocator_ && NULL != io_buf_) {
allocator_->free(io_buf_);
......@@ -388,7 +382,11 @@ int ObTmpPageCache::ObTmpMultiPageIOCallback::inner_deep_copy(char *buf,
} else {
ObTmpMultiPageIOCallback *pcallback = new (buf) ObTmpMultiPageIOCallback();
*pcallback = *this;
callback = pcallback;
if (OB_FAIL(pcallback->page_io_infos_.assign(page_io_infos_))) {
STORAGE_LOG(WARN, "The tmp page io assign failed", K(ret));
} else {
callback = pcallback;
}
}
return ret;
}
......@@ -432,16 +430,9 @@ ObTmpPageCache::~ObTmpPageCache()
int ObTmpPageCache::init(const char *cache_name, const int64_t priority)
{
int ret = OB_SUCCESS;
const int64_t mem_limit = 4 * 1024 * 1024 * 1024LL;
if (OB_FAIL((common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue>::init(
cache_name, priority)))) {
STORAGE_LOG(WARN, "Fail to init kv cache, ", K(ret));
} else if (OB_FAIL(allocator_.init(mem_limit,
OB_MALLOC_BIG_BLOCK_SIZE,
OB_MALLOC_BIG_BLOCK_SIZE))) {
STORAGE_LOG(WARN, "Fail to init io allocator, ", K(ret));
} else {
allocator_.set_label(ObModIds::OB_TMP_PAGE_CACHE);
}
return ret;
}
......@@ -449,7 +440,6 @@ int ObTmpPageCache::init(const char *cache_name, const int64_t priority)
void ObTmpPageCache::destroy()
{
common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue>::destroy();
allocator_.destroy();
}
int ObTmpPageCache::put_page(const ObTmpPageCacheKey &key, const ObTmpPageCacheValue &value)
......
......@@ -106,12 +106,14 @@ public:
int prefetch(
const ObTmpPageCacheKey &key,
const ObTmpBlockIOInfo &info,
ObMacroBlockHandle &mb_handle);
ObMacroBlockHandle &mb_handle,
common::ObIAllocator &allocator);
// multi page prefetch
int prefetch(
const ObTmpBlockIOInfo &info,
const common::ObIArray<ObTmpPageIOInfo> &page_io_infos,
ObMacroBlockHandle &mb_handle);
ObMacroBlockHandle &mb_handle,
common::ObIAllocator &allocator);
int get_cache_page(const ObTmpPageCacheKey &key, ObTmpPageValueHandle &handle);
int get_page(const ObTmpPageCacheKey &key, ObTmpPageValueHandle &handle);
int put_page(const ObTmpPageCacheKey &key, const ObTmpPageCacheValue &value);
......@@ -163,7 +165,7 @@ public:
TO_STRING_KV(KP_(data_buf));
private:
friend class ObTmpPageCache;
common::ObIArray<ObTmpPageIOInfo> *page_io_infos_;
common::ObArray<ObTmpPageIOInfo> page_io_infos_;
};
private:
ObTmpPageCache();
......@@ -172,7 +174,6 @@ private:
ObMacroBlockHandle &handle);
private:
common::ObConcurrentFIFOAllocator allocator_;
DISALLOW_COPY_AND_ASSIGN(ObTmpPageCache);
};
......
......@@ -814,6 +814,7 @@ ObTmpTenantFileStore::ObTmpTenantFileStore()
: page_cache_(NULL),
tmp_block_manager_(),
allocator_(),
io_allocator_(),
tmp_mem_block_manager_(),
is_inited_(false),
page_cache_num_(0),
......@@ -850,8 +851,10 @@ int ObTmpTenantFileStore::init(const uint64_t tenant_id)
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObTmpTenantFileStore has not been inited", K(ret));
} else if (OB_FAIL(allocator_.init(TOTAL_LIMIT, HOLD_LIMIT, BLOCK_SIZE))) {
} else if (OB_FAIL(allocator_.init(BLOCK_SIZE, ObModIds::OB_TMP_BLOCK_MANAGER, tenant_id, TOTAL_LIMIT))) {
STORAGE_LOG(WARN, "fail to init allocator", K(ret));
} else if (OB_FAIL(io_allocator_.init(OB_MALLOC_BIG_BLOCK_SIZE, ObModIds::OB_TMP_PAGE_CACHE, tenant_id, IO_LIMIT))) {
STORAGE_LOG(WARN, "Fail to init io allocator, ", K(ret));
} else if (OB_ISNULL(page_cache_ = &ObTmpPageCache::get_instance())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "fail to get the page cache", K(ret));
......@@ -860,7 +863,6 @@ int ObTmpTenantFileStore::init(const uint64_t tenant_id)
} else if (OB_FAIL(tmp_mem_block_manager_.init(tenant_id, allocator_))) {
STORAGE_LOG(WARN, "fail to init memory block manager", K(ret));
} else {
allocator_.set_label(ObModIds::OB_TMP_BLOCK_MANAGER);
is_inited_ = true;
}
if (!is_inited_) {
......@@ -877,6 +879,7 @@ void ObTmpTenantFileStore::destroy()
page_cache_ = NULL;
}
allocator_.destroy();
io_allocator_.destroy();
is_inited_ = false;
STORAGE_LOG(INFO, "cache num when destroy",
K(ATOMIC_LOAD(&page_cache_num_)), K(ATOMIC_LOAD(&block_cache_num_)));
......@@ -1250,7 +1253,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io
info.offset_ = p_offset + ObTmpMacroBlock::get_header_padding();
info.size_ = page_nums * ObTmpMacroBlock::get_default_page_size();
info.macro_block_id_ = block->get_macro_block_id();
if (OB_FAIL(page_cache_->prefetch(info, *page_io_infos, mb_handle))) {
if (OB_FAIL(page_cache_->prefetch(info, *page_io_infos, mb_handle, io_allocator_))) {
STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret));
} else {
ObTmpFileIOHandle::ObIOReadHandle read_handle(mb_handle, io_info.buf_,
......@@ -1269,7 +1272,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io
info.offset_ += ObTmpMacroBlock::get_header_padding();
info.size_ = ObTmpMacroBlock::get_default_page_size();
info.macro_block_id_ = block->get_macro_block_id();
if (OB_FAIL(page_cache_->prefetch(page_io_infos->at(i).key_, info, mb_handle))) {
if (OB_FAIL(page_cache_->prefetch(page_io_infos->at(i).key_, info, mb_handle, io_allocator_))) {
STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret));
} else {
char *buf = io_info.buf_ + ObTmpMacroBlock::calculate_offset(
......
......@@ -262,6 +262,7 @@ private:
int wait_write_io_finish_if_need();
private:
static const uint64_t IO_LIMIT = 4 * 1024L * 1024L * 1024L;
static const uint64_t TOTAL_LIMIT = 15 * 1024L * 1024L * 1024L;
static const uint64_t HOLD_LIMIT = 8 * 1024L * 1024L;
static const uint64_t BLOCK_SIZE = common::OB_MALLOC_MIDDLE_BLOCK_SIZE;
......@@ -270,6 +271,7 @@ private:
ObTmpPageCache *page_cache_;
ObTmpTenantMacroBlockManager tmp_block_manager_;
common::ObConcurrentFIFOAllocator allocator_;
common::ObConcurrentFIFOAllocator io_allocator_;
ObTmpTenantMemBlockManager tmp_mem_block_manager_;
common::SpinRWLock lock_;
bool is_inited_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册