未验证 提交 10c78caf 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

重构buffer pool (#33)

重构disk buffer pool:
新增BufferPoolManager,管理所有的DiskBufferPool;
原有DiskBufferPool只处理一个文件,取消其中file_id的概念;
删除BPPageHandle,直接使用Frame;
BPManager更名BPFrameManager;
BPFileSubHeader改为BPFileHeader并直接放在DiskBufferPool中
上级 7ad72565
......@@ -36,6 +36,8 @@ See the Mulan PSL v2 for more details. */
#include "sql/query_cache/query_cache_stage.h"
#include "storage/default/default_storage_stage.h"
#include "storage/mem/mem_storage_stage.h"
#include "storage/default/disk_buffer_pool.h"
#include "storage/default/default_handler.h"
using namespace common;
......@@ -156,6 +158,33 @@ int prepare_init_seda()
return 0;
}
int init_global_objects()
{
BufferPoolManager *bpm = new BufferPoolManager();
BufferPoolManager::set_instance(bpm);
DefaultHandler *handler = new DefaultHandler();
DefaultHandler::set_default(handler);
return 0;
}
int uninit_global_objects()
{
DefaultHandler *default_handler = &DefaultHandler::get_default();
if (default_handler != nullptr) {
DefaultHandler::set_default(nullptr);
delete default_handler;
}
BufferPoolManager *bpm = &BufferPoolManager::instance();
if (bpm != nullptr) {
BufferPoolManager::set_instance(nullptr);
delete bpm;
}
return 0;
}
int init(ProcessParam *process_param)
{
......@@ -200,6 +229,12 @@ int init(ProcessParam *process_param)
get_properties()->to_string(conf_data);
LOG_INFO("Output configuration \n%s", conf_data.c_str());
rc = init_global_objects();
if (rc != 0) {
LOG_ERROR("failed to init global objects");
return rc;
}
// seda is used for backend async event handler
// the latency of seda is slow, it isn't used for critical latency
// environment.
......@@ -229,6 +264,7 @@ int init(ProcessParam *process_param)
void cleanup_util()
{
uninit_global_objects();
if (nullptr != get_properties()) {
delete get_properties();
......@@ -245,4 +281,6 @@ void cleanup_util()
}
void cleanup()
{}
{
cleanup_util();
}
......@@ -16,10 +16,10 @@ See the Mulan PSL v2 for more details. */
#include <string>
static const char *TABLE_META_SUFFIX = ".table";
static const char *TABLE_META_FILE_PATTERN = ".*\\.table$";
static const char *TABLE_DATA_SUFFIX = ".data";
static const char *TABLE_INDEX_SUFFIX = ".index";
static constexpr const char *TABLE_META_SUFFIX = ".table";
static constexpr const char *TABLE_META_FILE_PATTERN = ".*\\.table$";
static constexpr const char *TABLE_DATA_SUFFIX = ".data";
static constexpr const char *TABLE_INDEX_SUFFIX = ".index";
std::string table_meta_file(const char *base_dir, const char *table_name);
std::string table_data_file(const char *base_dir, const char *table_name);
......
......@@ -47,66 +47,54 @@ int page_header_size(int record_capacity)
return align8(page_fix_size() + bitmap_size);
}
////////////////////////////////////////////////////////////////////////////////
RecordPageHandler::RecordPageHandler()
: disk_buffer_pool_(nullptr), file_id_(-1), page_header_(nullptr), bitmap_(nullptr)
{
page_handle_.open = false;
page_handle_.frame = nullptr;
}
RecordPageHandler::~RecordPageHandler()
{
cleanup();
}
RC RecordPageHandler::init(DiskBufferPool &buffer_pool, int file_id, PageNum page_num)
RC RecordPageHandler::init(DiskBufferPool &buffer_pool, PageNum page_num)
{
if (disk_buffer_pool_ != nullptr) {
LOG_WARN("Disk buffer pool has been opened for file_id:page_num %d:%d.", file_id, page_num);
LOG_WARN("Disk buffer pool has been opened for page_num %d.", page_num);
return RC::RECORD_OPENNED;
}
RC ret = RC::SUCCESS;
if ((ret = buffer_pool.get_this_page(file_id, page_num, &page_handle_)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page handle from disk buffer pool. file_id:%d, ret=%d:%s", file_id, ret, strrc(ret));
if ((ret = buffer_pool.get_this_page(page_num, &frame_)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page handle from disk buffer pool. ret=%d:%s", ret, strrc(ret));
return ret;
}
char *data;
ret = buffer_pool.get_data(&page_handle_, &data);
if (ret != RC::SUCCESS) {
LOG_ERROR("Failed to get page data. ret=%d:%s", ret, strrc(ret));
return ret;
}
char *data = frame_->data();
disk_buffer_pool_ = &buffer_pool;
file_id_ = file_id;
page_header_ = (PageHeader *)(data);
bitmap_ = data + page_fix_size();
LOG_TRACE("Successfully init file_id:page_num %d:%d.", file_id, page_num);
LOG_TRACE("Successfully init page_num %d.", page_num);
return ret;
}
RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, int file_id, PageNum page_num, int record_size)
RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, PageNum page_num, int record_size)
{
RC ret = init(buffer_pool, file_id, page_num);
RC ret = init(buffer_pool, page_num);
if (ret != RC::SUCCESS) {
LOG_ERROR("Failed to init empty page file_id:page_num:record_size %d:%d:%d.", file_id, page_num, record_size);
LOG_ERROR("Failed to init empty page page_num:record_size %d:%d.", page_num, record_size);
return ret;
}
int page_size = sizeof(page_handle_.frame->page.data);
int page_size = BP_PAGE_DATA_SIZE;
int record_phy_size = align8(record_size);
page_header_->record_num = 0;
page_header_->record_capacity = page_record_capacity(page_size, record_phy_size);
page_header_->record_real_size = record_size;
page_header_->record_size = record_phy_size;
page_header_->first_record_offset = page_header_size(page_header_->record_capacity);
bitmap_ = page_handle_.frame->page.data + page_fix_size();
bitmap_ = frame_->data() + page_fix_size();
memset(bitmap_, 0, page_bitmap_size(page_header_->record_capacity));
disk_buffer_pool_->mark_dirty(&page_handle_);
frame_->mark_dirty();
return RC::SUCCESS;
}
......@@ -114,10 +102,8 @@ RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, int file_id,
RC RecordPageHandler::cleanup()
{
if (disk_buffer_pool_ != nullptr) {
disk_buffer_pool_->unpin_page(&page_handle_);
disk_buffer_pool_->unpin_page(frame_);
disk_buffer_pool_ = nullptr;
// skip purge page,
// skip reset page_header & bitmap
}
return RC::SUCCESS;
......@@ -125,9 +111,8 @@ RC RecordPageHandler::cleanup()
RC RecordPageHandler::insert_record(const char *data, RID *rid)
{
if (page_header_->record_num == page_header_->record_capacity) {
LOG_WARN("Page is full, file_id:page_num %d:%d.", file_id_, page_handle_.frame->page.page_num);
LOG_WARN("Page is full, page_num %d:%d.", frame_->page_num());
return RC::RECORD_NOMEM;
}
......@@ -141,7 +126,7 @@ RC RecordPageHandler::insert_record(const char *data, RID *rid)
char *record_data = get_record_data(index);
memcpy(record_data, data, page_header_->record_real_size);
disk_buffer_pool_->mark_dirty(&page_handle_);
frame_->mark_dirty();
if (rid) {
rid->page_num = get_page_num();
......@@ -155,25 +140,21 @@ RC RecordPageHandler::insert_record(const char *data, RID *rid)
RC RecordPageHandler::update_record(const Record *rec)
{
if (rec->rid.slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, file_id:page_num %d:%d.",
rec->rid.slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, page_num %d.",
rec->rid.slot_num, frame_->page_num());
return RC::INVALID_ARGUMENT;
}
Bitmap bitmap(bitmap_, page_header_->record_capacity);
if (!bitmap.get_bit(rec->rid.slot_num)) {
LOG_ERROR("Invalid slot_num %d, slot is empty, file_id:page_num %d:%d.",
rec->rid.slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num %d, slot is empty, page_num %d.",
rec->rid.slot_num, frame_->page_num());
return RC::RECORD_RECORD_NOT_EXIST;
} else {
char *record_data = get_record_data(rec->rid.slot_num);
memcpy(record_data, rec->data, page_header_->record_real_size);
bitmap.set_bit(rec->rid.slot_num);
disk_buffer_pool_->mark_dirty(&page_handle_);
frame_->mark_dirty();
// LOG_TRACE("Update record. file_id=%d, page num=%d,slot=%d", file_id_, rec->rid.page_num, rec->rid.slot_num);
return RC::SUCCESS;
}
......@@ -182,10 +163,8 @@ RC RecordPageHandler::update_record(const Record *rec)
RC RecordPageHandler::delete_record(const RID *rid)
{
if (rid->slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, file_id:page_num %d:%d.",
rid->slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, page_num %d.",
rid->slot_num, frame_->page_num());
return RC::INVALID_ARGUMENT;
}
......@@ -193,21 +172,18 @@ RC RecordPageHandler::delete_record(const RID *rid)
if (bitmap.get_bit(rid->slot_num)) {
bitmap.clear_bit(rid->slot_num);
page_header_->record_num--;
disk_buffer_pool_->mark_dirty(&page_handle_);
frame_->mark_dirty();
if (page_header_->record_num == 0) {
DiskBufferPool *disk_buffer_pool = disk_buffer_pool_;
int file_id = file_id_;
PageNum page_num = get_page_num();
cleanup();
disk_buffer_pool->dispose_page(file_id, page_num);
disk_buffer_pool->dispose_page(page_num);
}
return RC::SUCCESS;
} else {
LOG_ERROR("Invalid slot_num %d, slot is empty, file_id:page_num %d:%d.",
rid->slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num %d, slot is empty, page_num %d.",
rid->slot_num, frame_->page_num());
return RC::RECORD_RECORD_NOT_EXIST;
}
}
......@@ -215,19 +191,15 @@ RC RecordPageHandler::delete_record(const RID *rid)
RC RecordPageHandler::get_record(const RID *rid, Record *rec)
{
if (rid->slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num:%d, exceed page's record capacity, file_id:page_num %d:%d.",
rid->slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num:%d, exceed page's record capacity, page_num %d.",
rid->slot_num, frame_->page_num());
return RC::RECORD_INVALIDRID;
}
Bitmap bitmap(bitmap_, page_header_->record_capacity);
if (!bitmap.get_bit(rid->slot_num)) {
LOG_ERROR("Invalid slot_num:%d, slot is empty, file_id:page_num %d:%d.",
rid->slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num:%d, slot is empty, page_num %d.",
rid->slot_num, frame_->page_num());
return RC::RECORD_RECORD_NOT_EXIST;
}
......@@ -245,10 +217,8 @@ RC RecordPageHandler::get_first_record(Record *rec)
RC RecordPageHandler::get_next_record(Record *rec)
{
if (rec->rid.slot_num >= page_header_->record_capacity - 1) {
LOG_ERROR("Invalid slot_num:%d, exceed page's record capacity, file_id:page_num %d:%d.",
rec->rid.slot_num,
file_id_,
page_handle_.frame->page.page_num);
LOG_ERROR("Invalid slot_num:%d, exceed page's record capacity, page_num %d.",
rec->rid.slot_num, frame_->page_num());
return RC::RECORD_EOF;
}
......@@ -256,7 +226,7 @@ RC RecordPageHandler::get_next_record(Record *rec)
int index = bitmap.next_setted_bit(rec->rid.slot_num + 1);
if (index < 0) {
LOG_WARN("There is no empty slot on page -- file_id:%d, page_num:%d.", file_id_, page_handle_.frame->page.page_num);
LOG_WARN("There is no empty slot on page -- page_num:%d.", frame_->page_num());
return RC::RECORD_EOF;
}
......@@ -271,7 +241,7 @@ PageNum RecordPageHandler::get_page_num() const
if (nullptr == page_header_) {
return (PageNum)(-1);
}
return page_handle_.frame->page.page_num;
return frame_->page_num();
}
bool RecordPageHandler::is_full() const
......@@ -281,21 +251,16 @@ bool RecordPageHandler::is_full() const
////////////////////////////////////////////////////////////////////////////////
RecordFileHandler::RecordFileHandler() : disk_buffer_pool_(nullptr), file_id_(-1)
{}
RC RecordFileHandler::init(DiskBufferPool *buffer_pool, int file_id)
RC RecordFileHandler::init(DiskBufferPool *buffer_pool)
{
if (disk_buffer_pool_ != nullptr) {
LOG_ERROR("%d has been openned.", file_id);
LOG_ERROR("record file handler has been openned.");
return RC::RECORD_OPENNED;
}
disk_buffer_pool_ = buffer_pool;
file_id_ = file_id;
LOG_INFO("Successfully open %d.", file_id);
LOG_INFO("Successfully open record file handle");
return RC::SUCCESS;
}
......@@ -311,8 +276,8 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid)
RC ret = RC::SUCCESS;
// 找到没有填满的页面
int page_count = 0;
if ((ret = disk_buffer_pool_->get_page_count(file_id_, &page_count)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page count while inserting record, file_id:%d", this->file_id_);
if ((ret = disk_buffer_pool_->get_page_count(&page_count)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page count while inserting record");
return ret;
}
......@@ -321,8 +286,8 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid)
if (page_count >= 2) {
// 当前buffer pool 有页面时才尝试加载第一页
// 参考diskBufferPool,有效page的pageNum从1开始
if ((ret = record_page_handler_.init(*disk_buffer_pool_, file_id_, 1)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler, file_id:%d, ret=%d", file_id_, ret);
if ((ret = record_page_handler_.init(*disk_buffer_pool_, 1)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler, ret=%d", ret);
return ret;
}
current_page_num = record_page_handler_.get_page_num();
......@@ -339,10 +304,10 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid)
}
if (current_page_num != record_page_handler_.get_page_num()) {
record_page_handler_.cleanup();
ret = record_page_handler_.init(*disk_buffer_pool_, file_id_, current_page_num);
ret = record_page_handler_.init(*disk_buffer_pool_, current_page_num);
if (ret != RC::SUCCESS && ret != RC::BUFFERPOOL_INVALID_PAGE_NUM) {
LOG_ERROR(
"Failed to init record page handler. page number is %d. ret=%d:%s", current_page_num, ret, strrc(ret));
LOG_ERROR("Failed to init record page handler. page number is %d. ret=%d:%s",
current_page_num, ret, strrc(ret));
return ret;
}
}
......@@ -355,27 +320,24 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid)
// 找不到就分配一个新的页面
if (!page_found) {
BPPageHandle page_handle;
if ((ret = disk_buffer_pool_->allocate_page(file_id_, &page_handle)) != RC::SUCCESS) {
LOG_ERROR("Failed to allocate page while inserting record. file_it:%d, ret:%d", file_id_, ret);
Frame *frame = nullptr;
if ((ret = disk_buffer_pool_->allocate_page(&frame)) != RC::SUCCESS) {
LOG_ERROR("Failed to allocate page while inserting record. ret:%d", ret);
return ret;
}
current_page_num = page_handle.frame->page.page_num;
current_page_num = frame->page_num();
record_page_handler_.cleanup();
ret = record_page_handler_.init_empty_page(*disk_buffer_pool_, file_id_, current_page_num, record_size);
ret = record_page_handler_.init_empty_page(*disk_buffer_pool_, current_page_num, record_size);
if (ret != RC::SUCCESS) {
LOG_ERROR("Failed to init empty page. file_id:%d, ret:%d", file_id_, ret);
if (RC::SUCCESS != disk_buffer_pool_->unpin_page(&page_handle)) {
LOG_ERROR("Failed to unpin page. file_id:%d", file_id_);
LOG_ERROR("Failed to init empty page. ret:%d", ret);
if (RC::SUCCESS != disk_buffer_pool_->unpin_page(frame)) {
LOG_ERROR("Failed to unpin page. ");
}
return ret;
}
//@@@ TODO, remove unpin page here
// if (RC::SUCCESS != disk_buffer_pool_->unpin_page(&page_handle)) {
// LOG_ERROR("Failed to unpin page. file_id:%d", file_id_);
// }
disk_buffer_pool_->unpin_page(frame);
}
// 找到空闲位置
......@@ -390,8 +352,8 @@ RC RecordFileHandler::update_record(const Record *rec)
}
RC ret;
RecordPageHandler page_handler;
if ((ret = page_handler.init(*disk_buffer_pool_, file_id_, rec->rid.page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d, file_id=%d", rec->rid.page_num, file_id_);
if ((ret = page_handler.init(*disk_buffer_pool_, rec->rid.page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d", rec->rid.page_num);
return ret;
}
......@@ -406,8 +368,8 @@ RC RecordFileHandler::delete_record(const RID *rid)
RC ret = RC::SUCCESS;
RecordPageHandler page_handler;
if ((ret != page_handler.init(*disk_buffer_pool_, file_id_, rid->page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d, file_id:%d", rid->page_num, file_id_);
if ((ret != page_handler.init(*disk_buffer_pool_, rid->page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d", rid->page_num);
return ret;
}
return page_handler.delete_record(rid);
......@@ -427,8 +389,8 @@ RC RecordFileHandler::get_record(const RID *rid, Record *rec)
}
RecordPageHandler page_handler;
if ((ret != page_handler.init(*disk_buffer_pool_, file_id_, rid->page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d, file_id:%d", rid->page_num, file_id_);
if ((ret != page_handler.init(*disk_buffer_pool_, rid->page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d", rid->page_num);
return ret;
}
......@@ -437,15 +399,11 @@ RC RecordFileHandler::get_record(const RID *rid, Record *rec)
////////////////////////////////////////////////////////////////////////////////
RecordFileScanner::RecordFileScanner() : disk_buffer_pool_(nullptr), file_id_(-1), condition_filter_(nullptr)
{}
RC RecordFileScanner::open_scan(DiskBufferPool &buffer_pool, int file_id, ConditionFilter *condition_filter)
RC RecordFileScanner::open_scan(DiskBufferPool &buffer_pool, ConditionFilter *condition_filter)
{
close_scan();
disk_buffer_pool_ = &buffer_pool;
file_id_ = file_id;
condition_filter_ = condition_filter;
return RC::SUCCESS;
......@@ -483,8 +441,8 @@ RC RecordFileScanner::get_next_record(Record *rec)
Record current_record = *rec;
int page_count = 0;
if ((ret = disk_buffer_pool_->get_page_count(file_id_, &page_count)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page count while getting next record. file id=%d", file_id_);
if ((ret = disk_buffer_pool_->get_page_count(&page_count)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page count while getting next record.");
return RC::RECORD_EOF;
}
......@@ -496,7 +454,7 @@ RC RecordFileScanner::get_next_record(Record *rec)
if (current_record.rid.page_num != record_page_handler_.get_page_num()) {
record_page_handler_.cleanup();
ret = record_page_handler_.init(*disk_buffer_pool_, file_id_, current_record.rid.page_num);
ret = record_page_handler_.init(*disk_buffer_pool_, current_record.rid.page_num);
if (ret != RC::SUCCESS && ret != RC::BUFFERPOOL_INVALID_PAGE_NUM) {
LOG_ERROR("Failed to init record page handler. page num=%d", current_record.rid.page_num);
return ret;
......
......@@ -15,6 +15,7 @@ See the Mulan PSL v2 for more details. */
#define __OBSERVER_STORAGE_COMMON_RECORD_MANAGER_H_
#include <sstream>
#include <limits>
#include "storage/default/disk_buffer_pool.h"
typedef int32_t SlotNum;
......@@ -96,10 +97,10 @@ struct Record {
class RecordPageHandler {
public:
RecordPageHandler();
RecordPageHandler() = default;
~RecordPageHandler();
RC init(DiskBufferPool &buffer_pool, int file_id, PageNum page_num);
RC init_empty_page(DiskBufferPool &buffer_pool, int file_id, PageNum page_num, int record_size);
RC init(DiskBufferPool &buffer_pool, PageNum page_num);
RC init_empty_page(DiskBufferPool &buffer_pool, PageNum page_num, int record_size);
RC cleanup();
RC insert_record(const char *data, RID *rid);
......@@ -114,7 +115,7 @@ public:
return rc;
}
rc = updater(record);
disk_buffer_pool_->mark_dirty(&page_handle_);
frame_->mark_dirty();
return rc;
}
......@@ -131,51 +132,40 @@ public:
protected:
char *get_record_data(SlotNum slot_num)
{
return page_handle_.frame->page.data + page_header_->first_record_offset + (page_header_->record_size * slot_num);
return frame_->data() + page_header_->first_record_offset + (page_header_->record_size * slot_num);
}
protected:
DiskBufferPool *disk_buffer_pool_;
int file_id_;
BPPageHandle page_handle_;
PageHeader *page_header_;
char *bitmap_;
DiskBufferPool *disk_buffer_pool_ = nullptr;
Frame *frame_ = nullptr;
PageHeader *page_header_ = nullptr;
char *bitmap_ = nullptr;
};
class RecordFileHandler {
public:
RecordFileHandler();
RC init(DiskBufferPool *buffer_pool, int file_id);
RecordFileHandler() = default;
RC init(DiskBufferPool *buffer_pool);
void close();
/**
* 更新指定文件中的记录,rec指向的记录结构中的rid字段为要更新的记录的标识符,
* pData字段指向新的记录内容
* @param rec
* @return
*/
RC update_record(const Record *rec);
/**
* 从指定文件中删除标识符为rid的记录
* @param rid
* @return
*/
RC delete_record(const RID *rid);
/**
* 插入一个新的记录到指定文件中,pData为指向新纪录内容的指针,返回该记录的标识符rid
* @param data
* @param rid
* @return
*/
RC insert_record(const char *data, int record_size, RID *rid);
/**
* 获取指定文件中标识符为rid的记录内容到rec指向的记录结构中
* @param rid
* @param rec
* @return
*/
RC get_record(const RID *rid, Record *rec);
......@@ -185,7 +175,7 @@ public:
RC rc = RC::SUCCESS;
RecordPageHandler page_handler;
if ((rc != page_handler.init(*disk_buffer_pool_, file_id_, rid->page_num)) != RC::SUCCESS) {
if ((rc != page_handler.init(*disk_buffer_pool_, rid->page_num)) != RC::SUCCESS) {
return rc;
}
......@@ -193,15 +183,13 @@ public:
}
private:
DiskBufferPool *disk_buffer_pool_;
int file_id_; // 参考DiskBufferPool中的fileId
DiskBufferPool *disk_buffer_pool_ = nullptr;
RecordPageHandler record_page_handler_; // 目前只有insert record使用
};
class RecordFileScanner {
public:
RecordFileScanner();
RecordFileScanner() = default;
/**
* 打开一个文件扫描。
......@@ -210,13 +198,8 @@ public:
* 然后再调用GetNextRec函数来逐个返回文件中满足条件的记录。
* 如果条件数量conNum为0,则意味着检索文件中的所有记录。
* 如果条件不为空,则要对每条记录进行条件比较,只有满足所有条件的记录才被返回
* @param buffer_pool
* @param file_id
* @param condition_num
* @param conditions
* @return
*/
RC open_scan(DiskBufferPool &buffer_pool, int file_id, ConditionFilter *condition_filter);
RC open_scan(DiskBufferPool &buffer_pool, ConditionFilter *condition_filter);
/**
* 关闭一个文件扫描,释放相应的资源
......@@ -236,10 +219,9 @@ public:
RC get_next_record(Record *rec);
private:
DiskBufferPool *disk_buffer_pool_;
int file_id_; // 参考DiskBufferPool中的fileId
DiskBufferPool *disk_buffer_pool_ = nullptr;
ConditionFilter *condition_filter_;
ConditionFilter *condition_filter_ = nullptr;
RecordPageHandler record_page_handler_;
};
......
......@@ -29,9 +29,6 @@ See the Mulan PSL v2 for more details. */
#include "storage/index/bplus_tree_index.h"
#include "storage/trx/trx.h"
Table::Table() : data_buffer_pool_(nullptr), file_id_(-1), record_handler_(nullptr)
{}
Table::~Table()
{
if (record_handler_ != nullptr) {
......@@ -39,8 +36,8 @@ Table::~Table()
record_handler_ = nullptr;
}
if (data_buffer_pool_ != nullptr && file_id_ >= 0) {
data_buffer_pool_->close_file(file_id_);
if (data_buffer_pool_ != nullptr) {
data_buffer_pool_->close_file();
data_buffer_pool_ = nullptr;
}
......@@ -102,8 +99,8 @@ RC Table::create(
fs.close();
std::string data_file = table_data_file(base_dir, name);
data_buffer_pool_ = theGlobalDiskBufferPool();
rc = data_buffer_pool_->create_file(data_file.c_str());
BufferPoolManager &bpm = BufferPoolManager::instance();
rc = bpm.create_file(data_file.c_str());
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to create disk buffer pool of data file. file name=%s", data_file.c_str());
return rc;
......@@ -339,28 +336,24 @@ RC Table::make_record(int value_num, const Value *values, char *&record_out)
RC Table::init_record_handler(const char *base_dir)
{
std::string data_file = table_data_file(base_dir, table_meta_.name());
if (nullptr == data_buffer_pool_) {
data_buffer_pool_ = theGlobalDiskBufferPool();
}
int data_buffer_pool_file_id;
RC rc = data_buffer_pool_->open_file(data_file.c_str(), &data_buffer_pool_file_id);
RC rc = BufferPoolManager::instance().open_file(data_file.c_str(), data_buffer_pool_);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to open disk buffer pool for file:%s. rc=%d:%s", data_file.c_str(), rc, strrc(rc));
return rc;
}
record_handler_ = new RecordFileHandler();
rc = record_handler_->init(data_buffer_pool_, data_buffer_pool_file_id);
rc = record_handler_->init(data_buffer_pool_);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to init record handler. rc=%d:%s", rc, strrc(rc));
data_buffer_pool_->close_file(data_buffer_pool_file_id);
data_buffer_pool_->close_file();
data_buffer_pool_ = nullptr;
delete record_handler_;
record_handler_ = nullptr;
return rc;
}
file_id_ = data_buffer_pool_file_id;
return rc;
}
......@@ -419,9 +412,9 @@ RC Table::scan_record(
RC rc = RC::SUCCESS;
RecordFileScanner scanner;
rc = scanner.open_scan(*data_buffer_pool_, file_id_, filter);
rc = scanner.open_scan(*data_buffer_pool_, filter);
if (rc != RC::SUCCESS) {
LOG_ERROR("failed to open scanner. file id=%d. rc=%d:%s", file_id_, rc, strrc(rc));
LOG_ERROR("failed to open scanner. rc=%d:%s", rc, strrc(rc));
return rc;
}
......@@ -441,7 +434,7 @@ RC Table::scan_record(
if (RC::RECORD_EOF == rc) {
rc = RC::SUCCESS;
} else {
LOG_ERROR("failed to scan record. file id=%d, rc=%d:%s", file_id_, rc, strrc(rc));
LOG_ERROR("failed to scan record. rc=%d:%s", rc, strrc(rc));
}
scanner.close_scan();
return rc;
......@@ -834,7 +827,7 @@ IndexScanner *Table::find_index_for_scan(const ConditionFilter *filter)
RC Table::sync()
{
RC rc = data_buffer_pool_->purge_all_pages(file_id_);
RC rc = data_buffer_pool_->purge_all_pages();
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to flush table's data pages. table=%s, rc=%d:%s", name(), rc, strrc(rc));
return rc;
......
......@@ -30,7 +30,7 @@ class Trx;
class Table {
public:
Table();
Table() = default;
~Table();
/**
......@@ -101,9 +101,8 @@ private:
private:
std::string base_dir_;
TableMeta table_meta_;
DiskBufferPool *data_buffer_pool_; /// 数据文件关联的buffer pool
int file_id_;
RecordFileHandler *record_handler_; /// 记录操作
DiskBufferPool *data_buffer_pool_ = nullptr; /// 数据文件关联的buffer pool
RecordFileHandler *record_handler_ = nullptr; /// 记录操作
std::vector<Index *> indexes_;
};
......
......@@ -24,10 +24,20 @@ See the Mulan PSL v2 for more details. */
#include "storage/common/table.h"
#include "storage/common/condition_filter.h"
static DefaultHandler *default_handler = nullptr;
void DefaultHandler::set_default(DefaultHandler *handler)
{
if (default_handler != nullptr && handler != nullptr) {
LOG_ERROR("default handler is setted");
abort();
}
default_handler = handler;
}
DefaultHandler &DefaultHandler::get_default()
{
static DefaultHandler handler;
return handler;
return *default_handler;
}
DefaultHandler::DefaultHandler()
......
......@@ -159,6 +159,7 @@ public:
RC sync();
public:
static void set_default(DefaultHandler *handler);
static DefaultHandler &get_default();
private:
......
......@@ -14,19 +14,23 @@ See the Mulan PSL v2 for more details. */
#ifndef __OBSERVER_STORAGE_COMMON_PAGE_MANAGER_H_
#define __OBSERVER_STORAGE_COMMON_PAGE_MANAGER_H_
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <time.h>
#include <string>
#include <unordered_map>
#include "rc.h"
#include "common/mm/mem_pool.h"
typedef int PageNum;
typedef int32_t PageNum;
class BufferPoolManager;
class DiskBufferPool;
//
#define BP_INVALID_PAGE_NUM (-1)
......@@ -34,73 +38,82 @@ typedef int PageNum;
#define BP_PAGE_DATA_SIZE (BP_PAGE_SIZE - sizeof(PageNum))
#define BP_FILE_SUB_HDR_SIZE (sizeof(BPFileSubHeader))
#define BP_BUFFER_SIZE 256
#define MAX_OPEN_FILE 1024
typedef struct {
struct Page {
PageNum page_num;
char data[BP_PAGE_DATA_SIZE];
} Page;
};
// sizeof(Page) should be equal to BP_PAGE_SIZE
typedef struct {
PageNum page_count;
int allocated_pages;
} BPFileSubHeader;
typedef struct {
int file_id;
std::set<PageNum> pages;
} BPDisposedPages;
typedef struct Frame_ {
bool dirty;
unsigned int pin_count;
unsigned long acc_time;
int file_desc;
Page page;
/**
* BufferPool的文件第一个页面,存放一些元数据信息,包括了后面每页的分配信息。
* TODO 1. 当前的做法,只能分配比较少的页面,你可以扩展一下,支持更多的页面或无限多的页面吗?
* 可以参考Linux ext(n)和Windows NTFS等文件系统
* 2. 当前使用bitmap存放页面分配情况,但是这种方法在页面非常多的时候,查找空闲页面的
* 效率非常低,你有办法优化吗?
*/
struct BPFileHeader {
int32_t page_count; //! 当前文件一共有多少个页面
int32_t allocated_pages; //! 已经分配了多少个页面
char bitmap[0]; //! 页面分配位图, 第0个页面(就是当前页面),总是1
};
bool can_purge()
class Frame
{
public:
void clear_page()
{
return pin_count <= 0;
memset(&page_, 0, sizeof(page_));
}
} Frame;
typedef struct BPPageHandle {
BPPageHandle() : open(false), frame(nullptr)
{}
PageNum page_num() const
{
return page_.page_num;
}
PageNum page_num() const {
return frame->page.page_num;
void set_page_num(PageNum page_num)
{
page_.page_num = page_num;
}
/**
* 标记指定页面为“脏”页。如果修改了页面的内容,则应调用此函数,
* 以便该页面被淘汰出缓冲区时系统将新的页面数据写入磁盘文件
*/
void mark_dirty() {
this->frame->dirty = true;
dirty_ = true;
}
char *data() {
return this->frame->page.data;
return page_.data;
}
bool open;
Frame *frame;
} BPPageHandle;
class BPFileHandle {
public:
BPFileHandle();
~BPFileHandle();
int file_desc() const
{
return file_desc_;
}
public:
bool bopen;
const char *file_name;
int file_desc;
Frame *hdr_frame;
Page *hdr_page;
char *bitmap;
BPFileSubHeader *file_sub_header;
void set_file_desc(int fd)
{
file_desc_ = fd;
}
bool can_purge()
{
return pin_count_ <= 0;
}
private:
friend class DiskBufferPool;
bool dirty_ = false;
unsigned int pin_count_ = 0;
unsigned long acc_time_ = 0;
int file_desc_ = -1;
Page page_;
};
class BPManager : public common::MemPoolSimple<Frame> {
class BPFrameManager : public common::MemPoolSimple<Frame> {
public:
BPManager(const char *tag);
BPFrameManager(const char *tag);
Frame *get(int file_desc, PageNum page_num);
......@@ -111,26 +124,7 @@ public:
class DiskBufferPool {
public:
static DiskBufferPool *mk_instance()
{
return new DiskBufferPool();
}
static void set_pool_num(int pool_num)
{
if (pool_num > 0) {
POOL_NUM = pool_num;
LOG_INFO("Successfully set POOL_NUM as %d", pool_num);
} else {
LOG_INFO("Invalid input argument pool_num:%d", pool_num);
}
}
static const int get_pool_num()
{
return POOL_NUM;
}
DiskBufferPool(BufferPoolManager &bp_manager, BPFrameManager &frame_manager);
~DiskBufferPool();
/**
......@@ -142,53 +136,36 @@ public:
* 根据文件名打开一个分页文件,返回文件ID
* @return
*/
RC open_file(const char *file_name, int *file_id);
RC open_file(const char *file_name);
/**
* 关闭fileID对应的分页文件
* 关闭分页文件
*/
RC close_file(int file_id);
RC close_file();
/**
* 根据文件ID和页号获取指定页面到缓冲区,返回页面句柄指针。
* @return
*/
RC get_this_page(int file_id, PageNum page_num, BPPageHandle *page_handle);
RC get_this_page(PageNum page_num, Frame **frame);
/**
* 在指定文件中分配一个新的页面,并将其放入缓冲区,返回页面句柄指针。
* 分配页面时,如果文件中有空闲页,就直接分配一个空闲页;
* 如果文件中没有空闲页,则扩展文件规模来增加新的空闲页。
*/
RC allocate_page(int file_id, BPPageHandle *page_handle);
/**
* 根据页面句柄指针返回对应的页面号
*/
RC get_page_num(BPPageHandle *page_handle, PageNum *page_num);
/**
* 根据页面句柄指针返回对应的数据区指针
*/
RC get_data(BPPageHandle *page_handle, char **data);
RC allocate_page(Frame **frame);
/**
* 比purge_page多一个动作, 在磁盘上将对应的页数据删掉。
*/
RC dispose_page(int file_id, PageNum page_num);
RC dispose_page(PageNum page_num);
/**
* 释放指定文件关联的页的内存, 如果已经脏, 则刷到磁盘,除了pinned page
* @param file_handle
* @param page_num 如果不指定page_num 将刷新所有页
*/
RC purge_page(int file_id, PageNum page_num);
/**
* 标记指定页面为“脏”页。如果修改了页面的内容,则应调用此函数,
* 以便该页面被淘汰出缓冲区时系统将新的页面数据写入磁盘文件
*/
RC mark_dirty(BPPageHandle *page_handle);
RC purge_page(PageNum page_num);
RC purge_all_pages();
/**
* 此函数用于解除pageHandle对应页面的驻留缓冲区限制。
......@@ -196,44 +173,66 @@ public:
* 该页面被设置为驻留缓冲区状态,以防止其在处理过程中被置换出去,
* 因此在该页面使用完之后应调用此函数解除该限制,使得该页面此后可以正常地被淘汰出缓冲区
*/
RC unpin_page(BPPageHandle *page_handle);
RC unpin_page(Frame *frame);
/**
* 获取文件的总页数
*/
RC get_page_count(int file_id, int *page_count);
RC get_page_count(int *page_count);
RC purge_all_pages(int file_id);
RC check_all_pages_unpinned();
RC check_all_pages_unpinned(int file_id);
int file_desc() const;
/**
* 如果页面是脏的,就将数据刷新到磁盘
*/
RC flush_page(Frame &frame);
protected:
RC allocate_page(Frame **buf);
RC allocate_frame(Frame **buf);
/**
* 刷新指定文件关联的所有脏页到磁盘,除了pinned page
* @param file_handle
* @param page_num 如果不指定page_num 将刷新所有页
* 刷新指定页面到磁盘(flush),并且释放关联的Frame
*/
RC purge_page(BPFileHandle *file_handle, PageNum page_num);
RC purge_page(Frame *used_frame);
RC purge_all_pages(BPFileHandle *file_handle);
RC check_file_id(int file_id);
RC check_page_num(PageNum page_num, BPFileHandle *file_handle);
RC load_page(PageNum page_num, BPFileHandle *file_handle, Frame *frame);
RC flush_page(Frame *frame);
RC purge_frame(Frame *used_frame);
RC check_page_num(PageNum page_num);
private:
DiskBufferPool();
/**
* 加载指定页面的数据到内存中
*/
RC load_page(PageNum page_num, Frame *frame);
private:
BPManager bp_manager_;
BPFileHandle *open_list_[MAX_OPEN_FILE] = {nullptr};
std::map<int, BPDisposedPages> disposed_pages;
static int POOL_NUM;
BufferPoolManager &bp_manager_;
BPFrameManager & frame_manager_;
std::string file_name_;
int file_desc_ = -1;
Frame * hdr_frame_ = nullptr;
BPFileHeader * file_header_ = nullptr;
std::set<PageNum> disposed_pages;
};
DiskBufferPool *theGlobalDiskBufferPool();
class BufferPoolManager
{
public:
BufferPoolManager();
~BufferPoolManager();
RC create_file(const char *file_name);
RC open_file(const char *file_name, DiskBufferPool *&bp);
RC close_file(const char *file_name);
RC flush_page(Frame &frame);
public:
static void set_instance(BufferPoolManager *bpm);
static BufferPoolManager &instance();
private:
BPFrameManager frame_manager_{"BufPool"};
std::unordered_map<std::string, DiskBufferPool *> buffer_pools_;
std::unordered_map<int, DiskBufferPool *> fd_buffer_pools_;
};
#endif //__OBSERVER_STORAGE_COMMON_PAGE_MANAGER_H_
......@@ -19,6 +19,7 @@ See the Mulan PSL v2 for more details. */
#include <string.h>
#include <sstream>
#include <functional>
#include "storage/common/record_manager.h"
#include "storage/default/disk_buffer_pool.h"
......@@ -244,7 +245,7 @@ struct InternalIndexNode : public IndexNode {
class IndexNodeHandler {
public:
IndexNodeHandler(const IndexFileHeader &header, BPPageHandle &page_handle);
IndexNodeHandler(const IndexFileHeader &header, Frame *frame);
void init_empty(bool leaf);
......@@ -272,7 +273,7 @@ protected:
class LeafIndexNodeHandler : public IndexNodeHandler {
public:
LeafIndexNodeHandler(const IndexFileHeader &header, BPPageHandle &page_handle);
LeafIndexNodeHandler(const IndexFileHeader &header, Frame *frame);
void init_empty();
void set_next_page(PageNum page_num);
......@@ -293,18 +294,18 @@ public:
void insert(int index, const char *key, const char *value);
void remove(int index);
int remove(const char *key, const KeyComparator &comparator);
RC move_half_to(LeafIndexNodeHandler &other, DiskBufferPool *bp, int file_id);
RC move_first_to_end(LeafIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool, int file_id);
RC move_last_to_front(LeafIndexNodeHandler &other, DiskBufferPool *bp, int file_id);
RC move_half_to(LeafIndexNodeHandler &other, DiskBufferPool *bp);
RC move_first_to_end(LeafIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool);
RC move_last_to_front(LeafIndexNodeHandler &other, DiskBufferPool *bp);
/**
* move all items to left page
*/
RC move_to(LeafIndexNodeHandler &other, DiskBufferPool *bp, int file_id);
RC move_to(LeafIndexNodeHandler &other, DiskBufferPool *bp);
int max_size() const;
int min_size() const;
bool validate(const KeyComparator &comparator, DiskBufferPool *bp, int file_id) const;
bool validate(const KeyComparator &comparator, DiskBufferPool *bp) const;
friend std::string to_string(const LeafIndexNodeHandler &handler, const KeyPrinter &printer);
private:
......@@ -321,13 +322,13 @@ private:
class InternalIndexNodeHandler : public IndexNodeHandler {
public:
InternalIndexNodeHandler(const IndexFileHeader &header, BPPageHandle &page_handle);
InternalIndexNodeHandler(const IndexFileHeader &header, Frame *frame);
void init_empty();
void create_new_root(PageNum first_page_num, const char *key, PageNum page_num);
void insert(const char *key, PageNum page_num, const KeyComparator &comparator);
RC move_half_to(LeafIndexNodeHandler &other, DiskBufferPool *bp, int file_id);
RC move_half_to(LeafIndexNodeHandler &other, DiskBufferPool *bp);
char *key_at(int index);
PageNum value_at(int index);
......@@ -349,18 +350,18 @@ public:
int max_size() const;
int min_size() const;
RC move_to(InternalIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool, int file_id);
RC move_first_to_end(InternalIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool, int file_id);
RC move_last_to_front(InternalIndexNodeHandler &other, DiskBufferPool *bp, int file_id);
RC move_half_to(InternalIndexNodeHandler &other, DiskBufferPool *bp, int file_id);
RC move_to(InternalIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool);
RC move_first_to_end(InternalIndexNodeHandler &other, DiskBufferPool *disk_buffer_pool);
RC move_last_to_front(InternalIndexNodeHandler &other, DiskBufferPool *bp);
RC move_half_to(InternalIndexNodeHandler &other, DiskBufferPool *bp);
bool validate(const KeyComparator &comparator, DiskBufferPool *bp, int file_id) const;
bool validate(const KeyComparator &comparator, DiskBufferPool *bp) const;
friend std::string to_string(const InternalIndexNodeHandler &handler, const KeyPrinter &printer);
private:
RC copy_from(const char *items, int num, DiskBufferPool *disk_buffer_pool, int file_id);
RC append(const char *item, DiskBufferPool *bp, int file_id);
RC preappend(const char *item, DiskBufferPool *bp, int file_id);
RC copy_from(const char *items, int num, DiskBufferPool *disk_buffer_pool);
RC append(const char *item, DiskBufferPool *bp);
RC preappend(const char *item, DiskBufferPool *bp);
private:
char *__item_at(int index) const;
......@@ -418,11 +419,6 @@ public:
RC sync();
const int get_file_id()
{
return file_id_;
}
/**
* Check whether current B+ tree is invalid or not.
* return true means current tree is valid, return false means current tree is invalid.
......@@ -435,52 +431,48 @@ public:
RC print_leafs();
private:
RC print_leaf(BPPageHandle &page_handle);
RC print_internal_node_recursive(BPPageHandle &page_handle);
RC print_leaf(Frame *frame);
RC print_internal_node_recursive(Frame *frame);
bool validate_node(IndexNode *node);
bool validate_leaf_link();
bool validate_node_recursive(BPPageHandle &page_handle);
bool validate_node_recursive(Frame *frame);
protected:
RC find_leaf(const char *key, BPPageHandle &page_handle);
RC left_most_page(BPPageHandle &page_handle);
RC right_most_page(BPPageHandle &page_handle);
RC find_leaf(const char *key, Frame *&frame);
RC left_most_page(Frame *&frame);
RC right_most_page(Frame *&frame);
RC find_leaf_internal(const std::function<PageNum(InternalIndexNodeHandler &)> &child_page_getter,
BPPageHandle &page_handle);
Frame *&frame);
RC insert_into_parent(
PageNum parent_page, BPPageHandle &left_page_handle, const char *pkey, BPPageHandle &right_page_handle);
RC split_leaf(BPPageHandle &leaf_page_handle);
PageNum parent_page, Frame *left_frame, const char *pkey, Frame &right_frame);
RC delete_entry_internal(BPPageHandle &leaf_page_handle, const char *key);
RC delete_entry_internal(Frame *leaf_frame, const char *key);
RC insert_into_new_root(BPPageHandle &left_page_handle, const char *pkey, BPPageHandle &right_page_handle);
RC insert_into_new_root(Frame *left_frame, const char *pkey, Frame &right_frame);
template <typename IndexNodeHandlerType>
RC split(BPPageHandle &page_handle, BPPageHandle &new_page_handle);
RC split(Frame *frame, Frame *&new_frame);
template <typename IndexNodeHandlerType>
RC coalesce_or_redistribute(BPPageHandle &page_handle);
RC coalesce_or_redistribute(Frame *frame);
template <typename IndexNodeHandlerType>
RC coalesce(BPPageHandle &neighbor_page_handle, BPPageHandle &page_handle,
BPPageHandle &parent_page_handle, int index);
RC coalesce(Frame *neighbor_frame, Frame *frame, Frame *parent_frame, int index);
template <typename IndexNodeHandlerType>
RC redistribute(BPPageHandle &neighbor_page_handle, BPPageHandle &page_handle,
BPPageHandle &parent_page_handle, int index);
RC redistribute(Frame *neighbor_frame, Frame *frame, Frame *parent_frame, int index);
RC insert_entry_into_parent(BPPageHandle &page_handle, BPPageHandle &new_page_handle, const char *key);
RC insert_entry_into_leaf_node(BPPageHandle &page_handle, const char *pkey, const RID *rid);
RC insert_entry_into_parent(Frame *frame, Frame *new_frame, const char *key);
RC insert_entry_into_leaf_node(Frame *frame, const char *pkey, const RID *rid);
RC update_root_page_num();
RC create_new_tree(const char *key, const RID *rid);
RC adjust_root(BPPageHandle &root_page_handle);
RC adjust_root(Frame *root_frame);
private:
char *make_key(const char *user_key, const RID &rid);
void free_key(char *key);
protected:
DiskBufferPool *disk_buffer_pool_ = nullptr;
int file_id_ = -1;
bool header_dirty_ = false;
IndexFileHeader file_header_;
......@@ -519,8 +511,8 @@ private:
/// 使用左右叶子节点和位置来表示扫描的起始位置和终止位置
/// 起始位置和终止位置都是有效的数据
BPPageHandle left_page_handle_;
BPPageHandle right_page_handle_;
Frame * left_frame_ = nullptr;
Frame * right_frame_ = nullptr;
int iter_index_ = -1;
int end_index_ = -1; // use -1 for end of scan
};
......
......@@ -79,10 +79,8 @@ RC BplusTreeIndex::open(const char *file_name, const IndexMeta &index_meta, cons
RC BplusTreeIndex::close()
{
if (inited_) {
LOG_INFO("Begin to close index, file_id:%d, index:%s, field:%s",
index_handler_.get_file_id(),
index_meta_.name(),
index_meta_.field());
LOG_INFO("Begin to close index, index:%s, field:%s",
index_meta_.name(), index_meta_.field());
index_handler_.close();
inited_ = false;
}
......
......@@ -15,68 +15,68 @@ See the Mulan PSL v2 for more details. */
#include "storage/default/disk_buffer_pool.h"
#include "gtest/gtest.h"
void test_get(BPManager &bp_manager)
void test_get(BPFrameManager &frame_manager)
{
Frame *frame1 = bp_manager.alloc();
Frame *frame1 = frame_manager.alloc();
ASSERT_NE(frame1, nullptr);
frame1->file_desc = 0;
frame1->page.page_num = 1;
frame1->set_file_desc(0);
frame1->set_page_num(1);
ASSERT_EQ(frame1, bp_manager.get(0, 1));
ASSERT_EQ(frame1, frame_manager.get(0, 1));
Frame *frame2 = bp_manager.alloc();
Frame *frame2 = frame_manager.alloc();
ASSERT_NE(frame2, nullptr);
frame2->file_desc = 0;
frame2->page.page_num = 2;
frame2->set_file_desc(0);
frame2->set_page_num(2);
ASSERT_EQ(frame1, bp_manager.get(0, 1));
ASSERT_EQ(frame1, frame_manager.get(0, 1));
Frame *frame3 = bp_manager.alloc();
Frame *frame3 = frame_manager.alloc();
ASSERT_NE(frame3, nullptr);
frame3->file_desc = 0;
frame3->page.page_num = 3;
frame3->set_file_desc(0);
frame3->set_page_num(3);
frame2 = bp_manager.get(0, 2);
frame2 = frame_manager.get(0, 2);
ASSERT_NE(frame2, nullptr);
Frame *frame4 = bp_manager.alloc();
frame4->file_desc = 0;
frame4->page.page_num = 4;
Frame *frame4 = frame_manager.alloc();
frame4->set_file_desc(0);
frame4->set_page_num(4);
bp_manager.free(frame1);
frame1 = bp_manager.get(0, 1);
frame_manager.free(frame1);
frame1 = frame_manager.get(0, 1);
ASSERT_EQ(frame1, nullptr);
ASSERT_EQ(frame3, bp_manager.get(0, 3));
ASSERT_EQ(frame3, frame_manager.get(0, 3));
ASSERT_EQ(frame4, bp_manager.get(0, 4));
ASSERT_EQ(frame4, frame_manager.get(0, 4));
bp_manager.free(frame2);
bp_manager.free(frame3);
bp_manager.free(frame4);
frame_manager.free(frame2);
frame_manager.free(frame3);
frame_manager.free(frame4);
ASSERT_EQ(nullptr, bp_manager.get(0, 2));
ASSERT_EQ(nullptr, bp_manager.get(0, 3));
ASSERT_EQ(nullptr, bp_manager.get(0, 4));
ASSERT_EQ(nullptr, frame_manager.get(0, 2));
ASSERT_EQ(nullptr, frame_manager.get(0, 3));
ASSERT_EQ(nullptr, frame_manager.get(0, 4));
}
void test_alloc(BPManager &bp_manager)
void test_alloc(BPFrameManager &frame_manager)
{
int size = bp_manager.get_size();
int size = frame_manager.get_size();
std::list<Frame *> used_list;
for (int i = 0; i < size; i++) {
Frame *item = bp_manager.alloc();
Frame *item = frame_manager.alloc();
ASSERT_NE(item, nullptr);
used_list.push_back(item);
}
ASSERT_EQ(used_list.size(), bp_manager.get_used_num());
ASSERT_EQ(used_list.size(), frame_manager.get_used_num());
for (int i = 0; i < size; i++) {
Frame *item = bp_manager.alloc();
Frame *item = frame_manager.alloc();
ASSERT_EQ(item, nullptr);
}
......@@ -86,26 +86,26 @@ void test_alloc(BPManager &bp_manager)
Frame *item = used_list.front();
used_list.pop_front();
bp_manager.free(item);
frame_manager.free(item);
} else {
Frame *item = bp_manager.alloc();
Frame *item = frame_manager.alloc();
used_list.push_back(item);
}
ASSERT_EQ(used_list.size(), bp_manager.get_used_num());
ASSERT_EQ(used_list.size(), frame_manager.get_used_num());
}
}
TEST(test_bp_manager, test_bp_manager_simple_lru)
TEST(test_frame_manager, test_frame_manager_simple_lru)
{
BPManager bp_manager("Test");
bp_manager.init(false, 2);
BPFrameManager frame_manager("Test");
frame_manager.init(false, 2);
test_get(bp_manager);
test_get(frame_manager);
test_alloc(bp_manager);
test_alloc(frame_manager);
bp_manager.cleanup();
frame_manager.cleanup();
}
int main(int argc, char **argv)
......
......@@ -321,20 +321,11 @@ TEST(test_bplus_tree, test_leaf_index_node_handle)
index_file_header.attr_type = INTS;
Frame frame;
frame.dirty = false;
frame.pin_count = 0;
frame.acc_time = 0;
frame.file_desc = 0;
frame.page.page_num = 100;
BPPageHandle page_handle;
page_handle.open = true;
page_handle.frame = &frame;
KeyComparator key_comparator;
key_comparator.init(INTS, 4);
LeafIndexNodeHandler leaf_node(index_file_header, page_handle);
LeafIndexNodeHandler leaf_node(index_file_header, &frame);
leaf_node.init_empty();
ASSERT_EQ(0, leaf_node.size());
......@@ -389,20 +380,11 @@ TEST(test_bplus_tree, test_internal_index_node_handle)
index_file_header.attr_type = INTS;
Frame frame;
frame.dirty = false;
frame.pin_count = 0;
frame.acc_time = 0;
frame.file_desc = 0;
frame.page.page_num = 100;
BPPageHandle page_handle;
page_handle.open = true;
page_handle.frame = &frame;
KeyComparator key_comparator;
key_comparator.init(INTS, 4);
InternalIndexNodeHandler internal_node(index_file_header, page_handle);
InternalIndexNodeHandler internal_node(index_file_header, &frame);
internal_node.init_empty();
ASSERT_EQ(0, internal_node.size());
......@@ -480,8 +462,6 @@ TEST(test_bplus_tree, test_scanner)
{
LoggerFactory::init_default("test.log");
DiskBufferPool::set_pool_num(POOL_NUM);
const char *index_name = "scanner.btree";
::remove(index_name);
handler = new BplusTreeHandler();
......@@ -689,10 +669,7 @@ TEST(test_bplus_tree, test_scanner)
TEST(test_bplus_tree, test_bplus_tree_insert)
{
LoggerFactory::init_default("test.log");
// set the disk buffer pool's number to make it is easy to test
DiskBufferPool::set_pool_num(POOL_NUM);
::remove(index_name);
handler = new BplusTreeHandler();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册