未验证 提交 7bff6c8b 编写于 作者: J JugendTraum 提交者: GitHub

Fix problem that record may overflow the page (#232)

### What problem were solved in this pull request?

Issue Number: close #226

Problem:
记录管理部分中page_header_中record_capacity初始化有误,需要进行修正
由于record_capacity的计算并没有考虑8字节对齐,计算的是不对齐时页面下容纳的最大记录量。
但第一条记录偏移量first_record_offset需要8字节对齐,使得最后一条记录的数据可能会溢出页面。

### What is changed and how it works?
如果溢出页面,则减少record_capacity数值
同时去掉了部分冗余或含义不清的函数,修改了上取整的计算表达式
完善了记录管理模块成员函数的注释
上级 f7b7c932
......@@ -19,12 +19,15 @@ See the Mulan PSL v2 for more details. */
using namespace common;
int align8(int size) { return size / 8 * 8 + ((size % 8 == 0) ? 0 : 8); }
static constexpr int PAGE_HEADER_SIZE = (sizeof(PageHeader));
/**
* @brief 一个页面有一个页头和bitmap加上N个记录组成。这个函数返回一个页面一定会占用的固定大小
* @brief 8字节对齐
* 注: ceiling(a / b) = floor((a + b - 1) / b)
*
* @param size 待对齐的字节数
*/
int page_fix_size() { return sizeof(PageHeader); }
int align8(int size) { return (size + 7) / 8 * 8; }
/**
* @brief 计算指定大小的页面,可以容纳多少个记录
......@@ -36,26 +39,17 @@ int page_record_capacity(int page_size, int record_size)
{
// (record_capacity * record_size) + record_capacity/8 + 1 <= (page_size - fix_size)
// ==> record_capacity = ((page_size - fix_size) - 1) / (record_size + 0.125)
return (int)((page_size - page_fix_size() - 1) / (record_size + 0.125));
return (int)((page_size - PAGE_HEADER_SIZE - 1) / (record_size + 0.125));
}
/**
* @brief bitmap 记录了某个位置是否有有效的记录数据,这里给定记录个数时需要多少字节来存放bitmap数据
* 注: ceiling(a / b) = floor((a + b - 1) / b)
*
* @param record_capacity 想要存放多少记录
*/
int page_bitmap_size(int record_capacity) { return record_capacity / 8 + ((record_capacity % 8 == 0) ? 0 : 1); }
int page_bitmap_size(int record_capacity) { return (record_capacity + 7) / 8; }
/**
* @brief 页面头固定信息加上bitmap需要的字节
*
* @param record_capacity 想要存放多少记录
*/
int page_header_size(int record_capacity)
{
const int bitmap_size = page_bitmap_size(record_capacity);
return align8(page_fix_size() + bitmap_size);
}
////////////////////////////////////////////////////////////////////////////////
RecordPageIterator::RecordPageIterator() {}
RecordPageIterator::~RecordPageIterator() {}
......@@ -98,18 +92,18 @@ RC RecordPageHandler::init(DiskBufferPool &buffer_pool, PageNum page_num, bool r
return ret;
}
char *data = frame_->data();
if (readonly) {
frame_->read_latch();
} else {
frame_->write_latch();
}
readonly_ = readonly;
char *data = frame_->data();
disk_buffer_pool_ = &buffer_pool;
page_header_ = (PageHeader *)(data);
bitmap_ = data + page_fix_size();
readonly_ = readonly;
page_header_ = (PageHeader *)(data);
bitmap_ = data + PAGE_HEADER_SIZE;
LOG_TRACE("Successfully init page_num %d.", page_num);
return ret;
}
......@@ -127,15 +121,13 @@ RC RecordPageHandler::recover_init(DiskBufferPool &buffer_pool, PageNum page_num
return ret;
}
frame_->write_latch();
readonly_ = false;
char *data = frame_->data();
frame_->write_latch();
disk_buffer_pool_ = &buffer_pool;
page_header_ = (PageHeader *)(data);
bitmap_ = data + page_fix_size();
readonly_ = false;
page_header_ = (PageHeader *)(data);
bitmap_ = data + PAGE_HEADER_SIZE;
buffer_pool.recover_page(page_num);
......@@ -151,15 +143,16 @@ RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, PageNum page_
return ret;
}
int page_size = BP_PAGE_DATA_SIZE;
int record_phy_size = align8(record_size);
page_header_->record_num = 0;
page_header_->record_capacity = page_record_capacity(page_size, record_phy_size);
page_header_->record_real_size = record_size;
page_header_->record_size = record_phy_size;
page_header_->first_record_offset = page_header_size(page_header_->record_capacity);
bitmap_ = frame_->data() + page_fix_size();
page_header_->record_size = align8(record_size);
page_header_->record_capacity = page_record_capacity(BP_PAGE_DATA_SIZE, page_header_->record_size);
page_header_->first_record_offset = align8(PAGE_HEADER_SIZE + page_bitmap_size(page_header_->record_capacity));
this->fix_record_capacity();
ASSERT(page_header_->first_record_offset +
page_header_->record_capacity * page_header_->record_size <= BP_PAGE_DATA_SIZE, "Record overflow the page size");
bitmap_ = frame_->data() + PAGE_HEADER_SIZE;
memset(bitmap_, 0, page_bitmap_size(page_header_->record_capacity));
if ((ret = buffer_pool.flush_page(*frame_)) != RC::SUCCESS) {
......
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
......@@ -62,9 +63,9 @@ class Table;
struct PageHeader
{
int32_t record_num; ///< 当前页面记录的个数
int32_t record_capacity; ///< 最大记录个数
int32_t record_real_size; ///< 每条记录的实际大小
int32_t record_size; ///< 每条记录占用实际空间大小(可能对齐)
int32_t record_capacity; ///< 最大记录个数
int32_t first_record_offset; ///< 第一条记录的偏移量
};
......@@ -86,9 +87,21 @@ public:
*/
void init(RecordPageHandler &record_page_handler, SlotNum start_slot_num = 0);
/**
* @brief 判断是否有下一个记录
*/
bool has_next();
/**
* @brief 读取下一个记录到record中包括RID和数据,并更新下一个记录位置next_slot_num_
*
* @param record 返回的下一个记录
*/
RC next(Record &record);
/**
* 该迭代器是否有效
*/
bool is_valid() const { return record_page_handler_ != nullptr; }
private:
......@@ -132,7 +145,7 @@ public:
RC recover_init(DiskBufferPool &buffer_pool, PageNum page_num);
/**
* @brief 对一个新的页面做初始化
* @brief 对一个新的页面做初始化,初始化关于该页面记录信息的页头PageHeader
*
* @param buffer_pool 关联某个文件时,都通过buffer pool来做读写文件
* @param page_num 当前处理哪个页面
......@@ -176,6 +189,9 @@ public:
*/
RC get_record(const RID *rid, Record *rec);
/**
* @brief 返回该记录页的页号
*/
PageNum get_page_num() const;
/**
......@@ -184,6 +200,26 @@ public:
bool is_full() const;
protected:
/**
* @details
* 前面在计算record_capacity时并没有考虑对齐,但第一个record需要8字节对齐
* 因此按此前计算的record_capacity,最后一个记录的部分数据可能会被挤出页面
* 所以需要对record_capacity进行修正,保证记录不会溢出
*/
void fix_record_capacity() {
int32_t last_record_offset = page_header_->first_record_offset +
page_header_->record_capacity * page_header_->record_size;
while(last_record_offset > BP_PAGE_DATA_SIZE) {
page_header_->record_capacity -= 1;
last_record_offset -= page_header_->record_size;
}
}
/**
* @brief 获取指定槽位的记录数据
*
* @param 指定的记录槽位
*/
char *get_record_data(SlotNum slot_num)
{
return frame_->data() + page_header_->first_record_offset + (page_header_->record_size * slot_num);
......@@ -191,10 +227,10 @@ protected:
protected:
DiskBufferPool *disk_buffer_pool_ = nullptr; ///< 当前操作的buffer pool(文件)
Frame *frame_ = nullptr; ///< 当前操作页面关联的frame(frame的更多概念可以参考buffer pool和frame)
bool readonly_ = false; ///< 当前的操作是否都是只读的
Frame *frame_ = nullptr; ///< 当前操作页面关联的frame(frame的更多概念可以参考buffer pool和frame)
PageHeader *page_header_ = nullptr; ///< 当前页面上页面头
char *bitmap_ = nullptr; ///< 当前页面上record分配状态信息bitmap内存起始位置
PageHeader *page_header_ = nullptr; ///< 当前页面上页面头
char *bitmap_ = nullptr; ///< 当前页面上record分配状态信息bitmap内存起始位置
private:
friend class RecordPageIterator;
......@@ -224,14 +260,28 @@ public:
void close();
/**
* @brief 从指定文件中删除标识符为rid的记录
* @brief 从指定文件中删除指定槽位的记录
*
* @param rid 待删除记录的标识符
*/
RC delete_record(const RID *rid);
/**
* @brief 插入一个新的记录到指定文件中,data为指向新纪录内容的指针,返回该记录的标识符rid
* @brief 插入一个新的记录到指定文件中,并返回该记录的标识符
*
* @param data 纪录内容
* @param record_size 记录大小
* @param rid 返回该记录的标识符
*/
RC insert_record(const char *data, int record_size, RID *rid);
/**
* @brief 数据库恢复时,在指定文件指定位置插入数据
*
* @param data 记录内容
* @param record_size 记录大小
* @param rid 要插入记录的指定标识符
*/
RC recover_insert_record(const char *data, int record_size, const RID &rid);
/**
......@@ -257,14 +307,14 @@ public:
private:
/**
* @brief 初始化记录当前没有填满记录的页面
* @brief 初始化当前没有填满记录的页面,初始化free_pages_成员
*/
RC init_free_pages();
private:
DiskBufferPool *disk_buffer_pool_ = nullptr;
std::unordered_set<PageNum> free_pages_; ///< 没有填充满的页面集合
common::Mutex lock_; ///< 当编译时增加-DCONCURRENCY=ON 选项时,才会真正的支持并发
common::Mutex lock_; ///< 当编译时增加-DCONCURRENCY=ON 选项时,才会真正的支持并发
};
/**
......@@ -281,10 +331,10 @@ public:
/**
* @brief 打开一个文件扫描。
* @details 如果条件不为空,则要对每条记录进行条件比较,只有满足所有条件的记录才被返回
* @param table 遍历的哪张表
* @param buffer_pool 访问的文件
* @param readonly 当前是否只读操作。访问数据时,需要对页面加锁。比如
* 删除时也需要遍历找到数据,然后删除,这时就需要加写锁
* @param table 遍历的哪张表
* @param buffer_pool 访问的文件
* @param readonly 当前是否只读操作。访问数据时,需要对页面加锁。比如
* 删除时也需要遍历找到数据,然后删除,这时就需要加写锁
* @param condition_filter 做一些初步过滤操作
*/
RC open_scan(Table *table, DiskBufferPool &buffer_pool, Trx *trx, bool readonly, ConditionFilter *condition_filter);
......@@ -294,7 +344,7 @@ public:
*/
RC close_scan();
/**
/**
* @brief 判断是否还有数据
* @details 判断完成后调用next获取下一条数据
*/
......@@ -302,13 +352,16 @@ public:
/**
* @brief 获取下一条记录
*
* @param record 返回的下一条记录
*
* @details 获取下一条记录之前先调用has_next()判断是否还有数据
*/
RC next(Record &record);
private:
/**
* @brief 获取下一条记录
* @brief 获取该文件中的下一条记录
*/
RC fetch_next_record();
......@@ -319,14 +372,14 @@ private:
private:
// TODO 对于一个纯粹的record遍历器来说,不应该关心表和事务
Table *table_ = nullptr; ///< 当前遍历的是哪张表。这个字段仅供事务函数使用,如果设计合适,可以去掉
DiskBufferPool *disk_buffer_pool_ = nullptr; ///< 当前访问的文件
Trx *trx_ = nullptr; ///< 当前是哪个事务在遍历
bool readonly_ = false; ///< 遍历出来的数据,是否可能对它做修改
Table *table_ = nullptr; ///< 当前遍历的是哪张表。这个字段仅供事务函数使用,如果设计合适,可以去掉
DiskBufferPool *disk_buffer_pool_ = nullptr; ///< 当前访问的文件
Trx *trx_ = nullptr; ///< 当前是哪个事务在遍历
bool readonly_ = false; ///< 遍历出来的数据,是否可能对它做修改
BufferPoolIterator bp_iterator_; ///< 遍历buffer pool的所有页面
ConditionFilter *condition_filter_ = nullptr; ///< 过滤record
RecordPageHandler record_page_handler_;
RecordPageHandler record_page_handler_; ///< 处理文件某页面的记录
RecordPageIterator record_page_iterator_; ///< 遍历某个页面上的所有record
Record next_record_; ///< 获取的记录放在这里缓存起来
};
};
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册