未验证 提交 c7ecaace 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

2022 oceanbase miniob competition (#104)

2022 oceanbase competition
上级 d63a39f5
# Introduction
miniob 是 OceanBase与华中科技大学联合开发的、面向"零"基础数据库内核知识同学的一门数据库实现入门教程实践工具。
miniob设计的目标是让不熟悉数据库设计和实现的同学能够快速的了解与深入学习数据库内核,期望通过相关训练之后,能够对各个数据库内核模块的功能与它们之间的关联有所了解,并能够在
使用数据库时,设计出高效的SQL。面向的对象主要是在校学生,并且诸多模块做了简化,比如不考虑并发操作。
注意:此代码仅供学习使用,不考虑任何安全特性。
# 介绍
miniob 是 OceanBase 与华中科技大学联合开发的、面向"零"基础同学的数据库入门实践工具。
miniob 设计的目标是让同学们快速了解数据库并深入学习数据库内核,期望通过相关训练之后,能够对数据库内核各个模块的功能及其关联有所了解,并能够在
使用数据库时,设计出高效的 SQL 。miniob 面向的对象主要是在校学生,并且诸多模块都做了简化,比如不考虑并发操作。
[GitHub 首页](https://github.com/oceanbase/miniob)
# 如何开发
## 搭建开发环境
有多种方式搭建开发环境,可以直接在本地安装一些三方依赖,或者使用Docker。如果使用的是Windows,我们建议使用Docker来开发。
### 搭建本地开发环境
直接在本地搭建开发环境,可以参考 [how_to_build](docs/how_to_build.md)
### 使用Docker开发
请参考 [如何使用Docker开发MiniOB](docs/how-to-dev-using-docker.md)
(注意:此代码仅供学习使用,不考虑任何安全特性。)
### Windows上开发MiniOB
[如何在Windows上使用Docker开发miniob](docs/how_to_dev_miniob_by_docker_on_windows.md)
## 词法语法解析开发环境
如果已经在处理一些SQL词法语法解析相关的问题,请参考 [MiniOB 词法语法解析开发与测试](docs/miniob-sql-parser.md)
Docker 环境已经预安装了相关的组件。
# 数据库管理系统实现基础讲义
由华中科技大学谢美意和左琼老师联合编撰数据库管理系统实现教材。参考 [数据库管理系统实现基础讲义](docs/lectures/index.md)
[GitHub 首页](https://github.com/oceanbase/miniob)
# miniob 介绍
[miniob代码架构框架设计和说明](docs/miniob-introduction.md)
# 1. 题目说明
[miniob 题目描述](docs/miniob_topics.md)
# miniob 训练
我们为MiniOB设计了配套的训练题目,大家可以在 [MiniOB 训练营](https://open.oceanbase.com/train?questionId=200001) 上进行提交测试。
# 2. 开发指南
## 搭建开发环境
1. [本地配置gcc环境](docs/how_to_build.md)
2. [使用Docker开发](docs/how-to-dev-using-docker.md)
3. [在Windows上使用Docker](docs/how_to_dev_miniob_by_docker_on_windows.md)
[miniob 题目描述](docs/miniob_topics.md)
## 词法、语法解析
请参考 [miniob 词法语法解析开发与测试](docs/miniob-sql-parser.md)
为了满足训练营或比赛测试要求,代码的输出需要满足一定要求,请参考 [MiniOB 输出约定](docs/miniob-output-convention.md)。一般情况下,不需要专门来看这篇文档,但是如果你的测试总是不正确,建议对照一下输出约定。
# 3. 提交测试
题目完成并通过自测后,大家可以在 [miniob 训练营](https://open.oceanbase.com/train?questionId=200001) 上提交代码进行测试。
# miniob 实现解析
客户端输出需要满足一定要求,如果你的测试结果不符合预期,请参考 [miniob 输出约定](docs/miniob-output-convention.md)
[miniob-date 实现解析](https://oceanbase-partner.github.io/lectures-on-dbms-implementation/miniob-date-implementation.html)
# 4. 数据库管理系统实现基础讲义
由华中科技大学谢美意和左琼老师联合编撰的数据库管理系统实现教材:[《数据库管理系统实现基础讲义》](docs/lectures/index.md)
[miniob drop-table 实现解析](https://oceanbase-partner.github.io/lectures-on-dbms-implementation/miniob-drop-table-implementation.html)
# 5. miniob 介绍
[miniob 源码解析视频](https://open.oceanbase.com/activities/4921877)
[miniob select-tables 实现解析](https://oceanbase-partner.github.io/lectures-on-dbms-implementation/miniob-select-tables-implementation.html)
[miniob 源码解析文档](https://www.oceanbase.com/docs/community-developer-quickstart-10000000000627363)
[miniob 调试篇](https://oceanbase-partner.github.io/lectures-on-dbms-implementation/miniob-how-to-debug.html)
(资料持续整理中,请大家自行查阅标题为“MiniOB...”的视频或文档)
......@@ -37,5 +37,8 @@ private:
} // namespace common
#define DERFER_NAME(suffix) defer_helper_##suffix
#define DEFER(callback) common::DeferHelper DERFER_NAME(__LINE__)(callback)
#define AA(B, C) B##C
#define BB(B, C) AA(B,C)
#define DEFER(callback) common::DeferHelper BB(defer_helper_, __LINE__)(callback)
......@@ -53,6 +53,7 @@ const char *strrc(RC rc)
RC_CASE_STRING(FORMAT);
RC_CASE_STRING(RANGE);
RC_CASE_STRING(NOTADB);
RC_CASE_STRING(LOGBUF);
RC_CASE_STRING(NOTICE);
RC_CASE_STRING(BUFFERPOOL_EXIST);
......@@ -174,8 +175,23 @@ const char *strrc(RC rc)
RC_CASE_STRING(NOTICE_RECOVER_ROLLBACK);
RC_CASE_STRING(NOTICE_AUTOINDEX);
RC_CASE_STRING(FILE_EXIST);
RC_CASE_STRING(FILE_NOT_EXIST);
RC_CASE_STRING(FILE_NAME);
RC_CASE_STRING(FILE_BOUND);
RC_CASE_STRING(FILE_CREATE);
RC_CASE_STRING(FILE_OPEN);
RC_CASE_STRING(FILE_NOT_OPENED);
RC_CASE_STRING(FILE_CLOSE);
RC_CASE_STRING(FILE_REMOVE);
RC_CASE_STRING(FILE_SEEK);
RC_CASE_STRING(FILE_READ);
RC_CASE_STRING(FILE_WRITE);
RC_CASE_STRING(AUTH_USER);
RC_CASE_STRING(LOGBUF_FULL);
RC_CASE_STRING(LOGBUF_EMPTY);
default: {
return "UNKNOWN";
}
......
......@@ -166,6 +166,26 @@ enum RCAuth {
USER = 1,
};
enum RCFILE {
F_EXIST = 1,
F_NOT_EXIST,
F_NAME,
F_BOUND,
F_CREATE,
F_OPEN,
F_NOT_OPENED,
F_CLOSE,
F_REMOVE,
F_SEEK,
F_READ,
F_WRITE,
};
enum RCLOGBUF {
LB_FULL = 1,
LB_EMPTY,
};
enum RC {
SUCCESS = 0, /* Successful result */
......@@ -201,6 +221,8 @@ enum RC {
FORMAT, /* Not used */
RANGE, /* 2nd parameter to bind out of range */
NOTADB, /* File opened that is not a database file */
FILE_ERROR, /* File error */
LOGBUF, /* clog buffer error */
NOTICE = 100, /* Notifications from log() */
/* buffer pool part */
......@@ -337,8 +359,26 @@ enum RC {
NOTICE_RECOVER_ROLLBACK = (NOTICE | (RCNotice::RECOVER_ROLLBACK << 8)),
NOTICE_AUTOINDEX = (NOTICE | (RCNotice::AUTOINDEX << 8)),
/* file part */
FILE_EXIST = ( FILE_ERROR | (RCFILE::F_EXIST << 8)),
FILE_NOT_EXIST = ( FILE_ERROR | (RCFILE::F_NOT_EXIST << 8)),
FILE_NAME = ( FILE_ERROR | (RCFILE::F_NAME << 8)),
FILE_BOUND = ( FILE_ERROR | (RCFILE::F_BOUND << 8)),
FILE_CREATE = ( FILE_ERROR | (RCFILE::F_CREATE << 8)),
FILE_OPEN = ( FILE_ERROR | (RCFILE::F_OPEN << 8)),
FILE_NOT_OPENED = ( FILE_ERROR | (RCFILE::F_NOT_OPENED << 8)),
FILE_CLOSE = ( FILE_ERROR | (RCFILE::F_CLOSE << 8)),
FILE_REMOVE = ( FILE_ERROR | (RCFILE::F_REMOVE << 8)),
FILE_SEEK = ( FILE_ERROR | (RCFILE::F_SEEK << 8)),
FILE_READ = ( FILE_ERROR | (RCFILE::F_READ << 8)),
FILE_WRITE = ( FILE_ERROR | (RCFILE::F_WRITE << 8)),
/* auth part*/
AUTH_USER = (AUTH | (RCAuth::USER << 8)),
/* clog buffer part */
LOGBUF_FULL = (LOGBUF | (RCLOGBUF::LB_FULL << 8)),
LOGBUF_EMPTY = (LOGBUF | (RCLOGBUF::LB_EMPTY << 8)),
};
extern const char *strrc(RC rc);
......
......@@ -44,6 +44,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/default/default_handler.h"
#include "storage/common/condition_filter.h"
#include "storage/trx/trx.h"
#include "storage/clog/clog.h"
using namespace common;
......@@ -175,18 +176,29 @@ void ExecuteStage::handle_request(common::StageEvent *event)
default_storage_stage_->handle_event(event);
} break;
case SCF_SYNC: {
/*
RC rc = DefaultHandler::get_default().sync();
session_event->set_response(strrc(rc));
*/
} break;
case SCF_BEGIN: {
do_begin(sql_event);
/*
session_event->set_response("SUCCESS\n");
*/
} break;
case SCF_COMMIT: {
do_commit(sql_event);
/*
Trx *trx = session->current_trx();
RC rc = trx->commit();
session->set_trx_multi_operation_mode(false);
session_event->set_response(strrc(rc));
*/
} break;
case SCF_CLOG_SYNC: {
do_clog_sync(sql_event);
}
case SCF_ROLLBACK: {
Trx *trx = session_event->get_client()->session->current_trx();
RC rc = trx->rollback();
......@@ -519,6 +531,10 @@ RC ExecuteStage::do_insert(SQLStageEvent *sql_event)
{
Stmt *stmt = sql_event->stmt();
SessionEvent *session_event = sql_event->session_event();
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
CLogManager *clog_manager = db->get_clog_manager();
if (stmt == nullptr) {
LOG_WARN("cannot find statement");
......@@ -526,11 +542,29 @@ RC ExecuteStage::do_insert(SQLStageEvent *sql_event)
}
InsertStmt *insert_stmt = (InsertStmt *)stmt;
Table *table = insert_stmt->table();
RC rc = table->insert_record(nullptr, insert_stmt->value_amount(), insert_stmt->values());
RC rc = table->insert_record(trx, insert_stmt->value_amount(), insert_stmt->values());
if (rc == RC::SUCCESS) {
session_event->set_response("SUCCESS\n");
if (!session->is_trx_multi_operation_mode()) {
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_COMMIT, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
return rc;
}
trx->next_current_id();
session_event->set_response("SUCCESS\n");
} else {
session_event->set_response("SUCCESS\n");
}
} else {
session_event->set_response("FAILURE\n");
}
......@@ -541,6 +575,10 @@ RC ExecuteStage::do_delete(SQLStageEvent *sql_event)
{
Stmt *stmt = sql_event->stmt();
SessionEvent *session_event = sql_event->session_event();
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
CLogManager *clog_manager = db->get_clog_manager();
if (stmt == nullptr) {
LOG_WARN("cannot find statement");
......@@ -551,14 +589,106 @@ RC ExecuteStage::do_delete(SQLStageEvent *sql_event)
TableScanOperator scan_oper(delete_stmt->table());
PredicateOperator pred_oper(delete_stmt->filter_stmt());
pred_oper.add_child(&scan_oper);
DeleteOperator delete_oper(delete_stmt);
DeleteOperator delete_oper(delete_stmt, trx);
delete_oper.add_child(&pred_oper);
RC rc = delete_oper.open();
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
if (!session->is_trx_multi_operation_mode()) {
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_COMMIT, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
return rc;
}
trx->next_current_id();
session_event->set_response("SUCCESS\n");
}
}
return rc;
}
RC ExecuteStage::do_begin(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SessionEvent *session_event = sql_event->session_event();
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
CLogManager *clog_manager = db->get_clog_manager();
session->set_trx_multi_operation_mode(true);
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_BEGIN, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
}
return rc;
}
RC ExecuteStage::do_commit(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SessionEvent *session_event = sql_event->session_event();
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
CLogManager *clog_manager = db->get_clog_manager();
session->set_trx_multi_operation_mode(false);
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_COMMIT, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
}
trx->next_current_id();
return rc;
}
RC ExecuteStage::do_clog_sync(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SessionEvent *session_event = sql_event->session_event();
Db *db = session_event->session()->get_current_db();
CLogManager *clog_manager = db->get_clog_manager();
rc = clog_manager->clog_sync();
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
}
return rc;
}
......@@ -47,6 +47,9 @@ protected:
RC do_select(SQLStageEvent *sql_event);
RC do_insert(SQLStageEvent *sql_event);
RC do_delete(SQLStageEvent *sql_event);
RC do_begin(SQLStageEvent *sql_event);
RC do_commit(SQLStageEvent *sql_event);
RC do_clog_sync(SQLStageEvent *sql_event);
protected:
private:
......
......@@ -16,6 +16,7 @@ See the Mulan PSL v2 for more details. */
#include "sql/operator/delete_operator.h"
#include "storage/record/record.h"
#include "storage/common/table.h"
#include "storage/trx/trx.h"
#include "sql/stmt/delete_stmt.h"
RC DeleteOperator::open()
......@@ -42,7 +43,7 @@ RC DeleteOperator::open()
RowTuple *row_tuple = static_cast<RowTuple *>(tuple);
Record &record = row_tuple->record();
rc = table->delete_record(nullptr, &record);
rc = table->delete_record(trx_, &record);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to delete record: %s", strrc(rc));
return rc;
......
......@@ -17,13 +17,14 @@ See the Mulan PSL v2 for more details. */
#include "sql/operator/operator.h"
#include "rc.h"
class Trx;
class DeleteStmt;
class DeleteOperator : public Operator
{
public:
DeleteOperator(DeleteStmt *delete_stmt)
: delete_stmt_(delete_stmt)
DeleteOperator(DeleteStmt *delete_stmt, Trx *trx)
: delete_stmt_(delete_stmt), trx_(trx)
{}
virtual ~DeleteOperator() = default;
......@@ -39,4 +40,5 @@ public:
//RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override
private:
DeleteStmt *delete_stmt_ = nullptr;
Trx *trx_ = nullptr;
};
......@@ -372,6 +372,7 @@ void query_reset(Query *query)
case SCF_LOAD_DATA: {
load_data_destroy(&query->sstr.load_data);
} break;
case SCF_CLOG_SYNC:
case SCF_BEGIN:
case SCF_COMMIT:
case SCF_ROLLBACK:
......
......@@ -168,6 +168,7 @@ enum SqlCommandFlag {
SCF_DESC_TABLE,
SCF_BEGIN,
SCF_COMMIT,
SCF_CLOG_SYNC,
SCF_ROLLBACK,
SCF_LOAD_DATA,
SCF_HELP,
......
/* Copyright (c) 2021-2022 Xie Meiyi(xiemeiyi@hust.edu.cn),
Huazhong University of Science and Technology
and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by huhaosheng.hhs on 2022
//
#include "common/log/log.h"
#include "clog.h"
#define CLOG_INS_REC_NODATA_SIZE (sizeof(CLogInsertRecord) - sizeof(char *))
const char *CLOG_FILE_NAME = "clog";
int _align8(int size)
{
return size / 8 * 8 + ((size % 8 == 0) ? 0 : 8);
}
CLogRecord::CLogRecord(CLogType flag, int32_t trx_id, const char *table_name /* = nullptr */, int data_len /* = 0 */,
Record *rec /* = nullptr */)
{
flag_ = flag;
switch (flag) {
case REDO_MTR_BEGIN:
case REDO_MTR_COMMIT: {
log_record_.mtr.hdr_.trx_id_ = trx_id;
log_record_.mtr.hdr_.type_ = flag;
log_record_.mtr.hdr_.logrec_len_ = sizeof(CLogMTRRecord);
log_record_.mtr.hdr_.lsn_ = CLogManager::get_next_lsn(log_record_.mtr.hdr_.logrec_len_);
} break;
case REDO_INSERT: {
if (!rec || !rec->data()) {
LOG_ERROR("Record is null");
} else {
log_record_.ins.hdr_.trx_id_ = trx_id;
log_record_.ins.hdr_.type_ = flag;
strcpy(log_record_.ins.table_name_, table_name);
log_record_.ins.rid_ = rec->rid();
log_record_.ins.data_len_ = data_len;
log_record_.ins.hdr_.logrec_len_ = _align8(CLOG_INS_REC_NODATA_SIZE + data_len);
log_record_.ins.data_ = new char[log_record_.ins.hdr_.logrec_len_ - CLOG_INS_REC_NODATA_SIZE];
memcpy(log_record_.ins.data_, rec->data(), data_len);
log_record_.ins.hdr_.lsn_ = CLogManager::get_next_lsn(log_record_.ins.hdr_.logrec_len_);
}
} break;
case REDO_DELETE: {
if (!rec) {
LOG_ERROR("Record is null");
} else {
log_record_.del.hdr_.trx_id_ = trx_id;
log_record_.del.hdr_.type_ = flag;
log_record_.del.hdr_.logrec_len_ = sizeof(CLogDeleteRecord);
strcpy(log_record_.ins.table_name_, table_name);
log_record_.del.rid_ = rec->rid();
log_record_.del.hdr_.lsn_ = CLogManager::get_next_lsn(log_record_.del.hdr_.logrec_len_);
}
} break;
default:
LOG_ERROR("flag is error");
break;
}
}
CLogRecord::CLogRecord(char *data)
{
CLogRecordHeader *hdr = (CLogRecordHeader *)data;
flag_ = (CLogType)hdr->type_;
switch (flag_) {
case REDO_MTR_BEGIN:
case REDO_MTR_COMMIT: {
log_record_.mtr.hdr_ = *hdr;
} break;
case REDO_INSERT: {
log_record_.ins.hdr_ = *hdr;
data += sizeof(CLogRecordHeader);
strcpy(log_record_.ins.table_name_, data);
data += TABLE_NAME_MAX_LEN;
log_record_.ins.rid_ = *(RID *)data;
data += sizeof(RID);
log_record_.ins.data_len_ = *(int *)data;
data += sizeof(int);
log_record_.ins.data_ = new char[log_record_.ins.hdr_.logrec_len_ - CLOG_INS_REC_NODATA_SIZE];
memcpy(log_record_.ins.data_, data, log_record_.ins.data_len_);
} break;
case REDO_DELETE: {
log_record_.del.hdr_ = *hdr;
data += sizeof(CLogRecordHeader);
strcpy(log_record_.del.table_name_, data);
data += TABLE_NAME_MAX_LEN;
log_record_.del.rid_ = *(RID *)data;
} break;
default:
LOG_ERROR("flag is error");
break;
}
}
CLogRecord::~CLogRecord()
{
if (REDO_INSERT == flag_) {
delete[] log_record_.ins.data_;
}
}
RC CLogRecord::copy_record(void *dest, int start_off, int copy_len)
{
CLogRecords *log_rec = &log_record_;
if (start_off + copy_len > get_logrec_len()) {
return RC::GENERIC_ERROR;
} else if (flag_ != REDO_INSERT) {
memcpy(dest, (char *)log_rec + start_off, copy_len);
} else {
if (start_off > CLOG_INS_REC_NODATA_SIZE) {
memcpy(dest, log_rec->ins.data_ + start_off - CLOG_INS_REC_NODATA_SIZE, copy_len);
} else if (start_off + copy_len <= CLOG_INS_REC_NODATA_SIZE) {
memcpy(dest, (char *)log_rec + start_off, copy_len);
} else {
memcpy(dest, (char *)log_rec + start_off, CLOG_INS_REC_NODATA_SIZE - start_off);
memcpy((char *)dest + CLOG_INS_REC_NODATA_SIZE - start_off,
log_rec->ins.data_,
copy_len - (CLOG_INS_REC_NODATA_SIZE - start_off));
}
}
return RC::SUCCESS;
}
// for unitest // 1 = "="// 0 = "!="
int CLogRecord::cmp_eq(CLogRecord *other)
{
CLogRecords *other_logrec = other->get_record();
if (flag_ == other->flag_) {
switch (flag_) {
case REDO_MTR_BEGIN:
case REDO_MTR_COMMIT:
return log_record_.mtr == other_logrec->mtr;
case REDO_INSERT:
return log_record_.ins == other_logrec->ins;
case REDO_DELETE:
return log_record_.del == other_logrec->del;
default:
LOG_ERROR("log_record is error");
break;
}
}
return 0;
}
//
CLogBuffer::CLogBuffer()
{
current_block_no_ = 0 * CLOG_BLOCK_SIZE; // 第一个块是文件头块
write_block_offset_ = 0;
write_offset_ = 0;
memset(buffer_, 0, CLOG_BUFFER_SIZE);
}
CLogBuffer::~CLogBuffer()
{}
RC CLogBuffer::append_log_record(CLogRecord *log_rec, int &start_off)
{
if (!log_rec) {
return RC::GENERIC_ERROR;
}
if (write_offset_ == CLOG_BUFFER_SIZE) {
return RC::LOGBUF_FULL;
}
RC rc = RC::SUCCESS;
int32_t logrec_left_len = log_rec->get_logrec_len() - start_off;
CLogBlock *log_block = (CLogBlock *)&buffer_[write_block_offset_];
if (write_offset_ == 0 && write_block_offset_ == 0) {
memset(log_block, 0, CLOG_BLOCK_SIZE);
current_block_no_ += CLOG_BLOCK_SIZE;
log_block->log_block_hdr_.log_block_no = current_block_no_;
write_offset_ += CLOG_BLOCK_HDR_SIZE;
}
if (log_block->log_block_hdr_.log_data_len_ == CLOG_BLOCK_DATA_SIZE) { // 当前block已写满
// 新分配一个block
write_block_offset_ += CLOG_BLOCK_SIZE;
current_block_no_ += CLOG_BLOCK_SIZE;
log_block = (CLogBlock *)&buffer_[write_block_offset_];
memset(log_block, 0, CLOG_BLOCK_SIZE);
log_block->log_block_hdr_.log_block_no = current_block_no_;
write_offset_ += CLOG_BLOCK_HDR_SIZE;
return append_log_record(log_rec, start_off);
} else {
if (logrec_left_len <= (CLOG_BLOCK_DATA_SIZE - log_block->log_block_hdr_.log_data_len_)) { //不需要再跨block存放
if (log_block->log_block_hdr_.log_data_len_ == 0) { //当前为新block
if (start_off == 0) {
log_block->log_block_hdr_.first_rec_offset_ = CLOG_BLOCK_HDR_SIZE;
} else {
log_block->log_block_hdr_.first_rec_offset_ = CLOG_BLOCK_HDR_SIZE + logrec_left_len;
}
}
log_rec->copy_record(&(buffer_[write_offset_]), start_off, logrec_left_len);
write_offset_ += logrec_left_len;
log_block->log_block_hdr_.log_data_len_ += logrec_left_len;
start_off += logrec_left_len;
} else { //需要跨block
if (log_block->log_block_hdr_.log_data_len_ == 0) { //当前为新block
log_block->log_block_hdr_.first_rec_offset_ = CLOG_BLOCK_SIZE;
}
int32_t block_left_len = CLOG_BLOCK_DATA_SIZE - log_block->log_block_hdr_.log_data_len_;
log_rec->copy_record(&(buffer_[write_offset_]), start_off, block_left_len);
write_offset_ += block_left_len;
log_block->log_block_hdr_.log_data_len_ += block_left_len;
start_off += block_left_len;
return append_log_record(log_rec, start_off);
}
}
return rc;
}
RC CLogBuffer::flush_buffer(CLogFile *log_file)
{
if (write_offset_ == CLOG_BUFFER_SIZE) { //如果是buffer满触发的下刷
CLogBlock *log_block = (CLogBlock *)buffer_;
log_file->write(log_block->log_block_hdr_.log_block_no, CLOG_BUFFER_SIZE, buffer_);
write_block_offset_ = 0;
write_offset_ = 0;
memset(buffer_, 0, CLOG_BUFFER_SIZE);
} else {
CLogBlock *log_block = (CLogBlock *)buffer_;
log_file->write(log_block->log_block_hdr_.log_block_no, write_block_offset_ + CLOG_BLOCK_SIZE, buffer_);
log_block = (CLogBlock *)&buffer_[write_block_offset_];
if (log_block->log_block_hdr_.log_data_len_ == CLOG_BLOCK_DATA_SIZE) { // 最后一个block已写满
write_block_offset_ = 0;
write_offset_ = 0;
memset(buffer_, 0, CLOG_BUFFER_SIZE);
} else if (write_block_offset_ != 0) {
// 将最后一个未写满的block迁移到buffer起始位置
write_offset_ = log_block->log_block_hdr_.log_data_len_ + CLOG_BLOCK_HDR_SIZE;
memcpy(buffer_, &buffer_[write_block_offset_], CLOG_BLOCK_SIZE);
write_block_offset_ = 0;
memset(&buffer_[CLOG_BLOCK_SIZE], 0, CLOG_BUFFER_SIZE - CLOG_BLOCK_SIZE);
}
}
return RC::SUCCESS;
}
RC CLogBuffer::block_copy(int32_t offset, CLogBlock *log_block)
{
memcpy(&buffer_[offset], (char *)log_block, CLOG_BLOCK_SIZE);
return RC::SUCCESS;
}
//
CLogFile::CLogFile(const char *path)
{
log_file_ = new PersistHandler();
RC rc = RC::SUCCESS;
std::string clog_file_path = std::string(path) + common::FILE_PATH_SPLIT_STR + CLOG_FILE_NAME;
rc = log_file_->create_file(clog_file_path.c_str());
if (rc == RC::SUCCESS) {
log_file_->open_file();
update_log_fhd(0);
} else if (rc == RC::FILE_EXIST) {
log_file_->open_file(clog_file_path.c_str());
log_file_->read_at(0, CLOG_BLOCK_SIZE, (char *)&log_fhd_);
}
}
CLogFile::~CLogFile()
{
if (log_file_) {
log_file_->close_file();
delete log_file_;
}
}
RC CLogFile::update_log_fhd(int32_t current_file_lsn)
{
log_fhd_.hdr_.current_file_lsn_ = current_file_lsn;
log_fhd_.hdr_.current_file_real_offset_ = CLOG_FILE_HDR_SIZE;
RC rc = log_file_->write_at(0, CLOG_BLOCK_SIZE, (char *)&log_fhd_);
return rc;
}
RC CLogFile::append(int data_len, char *data)
{
RC rc = log_file_->append(data_len, data);
return rc;
}
RC CLogFile::write(uint64_t offset, int data_len, char *data)
{
RC rc = log_file_->write_at(offset, data_len, data);
return rc;
}
RC CLogFile::recover(CLogMTRManager *mtr_mgr, CLogBuffer *log_buffer)
{
char redo_buffer[CLOG_REDO_BUFFER_SIZE];
CLogRecordBuf logrec_buf;
memset(&logrec_buf, 0, sizeof(CLogRecordBuf));
CLogRecord *log_rec = nullptr;
uint64_t offset = CLOG_BLOCK_SIZE; // 第一个block为文件头
int64_t read_size = 0;
log_file_->read_at(offset, CLOG_REDO_BUFFER_SIZE, redo_buffer, &read_size);
while (read_size != 0) {
int32_t buffer_offset = 0;
while (buffer_offset < CLOG_REDO_BUFFER_SIZE) {
CLogBlock *log_block = (CLogBlock *)&redo_buffer[buffer_offset];
log_buffer->set_current_block_no(log_block->log_block_hdr_.log_block_no);
int16_t rec_offset = CLOG_BLOCK_HDR_SIZE;
while (rec_offset < CLOG_BLOCK_HDR_SIZE + log_block->log_block_hdr_.log_data_len_) {
block_recover(log_block, rec_offset, &logrec_buf, log_rec);
if (log_rec != nullptr) {
CLogManager::gloabl_lsn_ = log_rec->get_lsn() + log_rec->get_logrec_len();
mtr_mgr->log_record_manage(log_rec);
log_rec = nullptr;
}
}
if (log_block->log_block_hdr_.log_data_len_ < CLOG_BLOCK_DATA_SIZE) { //最后一个block
log_buffer->block_copy(0, log_block);
log_buffer->set_write_block_offset(0);
log_buffer->set_write_offset(log_block->log_block_hdr_.log_data_len_ + CLOG_BLOCK_HDR_SIZE);
goto done;
}
buffer_offset += CLOG_BLOCK_SIZE;
}
offset += read_size;
log_file_->read_at(offset, CLOG_REDO_BUFFER_SIZE, redo_buffer, &read_size);
}
done:
if (logrec_buf.write_offset_ != 0) {
log_rec = new CLogRecord((char *)logrec_buf.buffer_);
mtr_mgr->log_record_manage(log_rec);
}
return RC::SUCCESS;
}
RC CLogFile::block_recover(CLogBlock *block, int16_t &offset, CLogRecordBuf *logrec_buf, CLogRecord *&log_rec)
{
if (offset == CLOG_BLOCK_HDR_SIZE &&
block->log_block_hdr_.first_rec_offset_ != CLOG_BLOCK_HDR_SIZE) { //跨block中的某部分(非第一部分)
// 追加到logrec_buf
memcpy(&logrec_buf->buffer_[logrec_buf->write_offset_],
(char *)block + (int)offset,
block->log_block_hdr_.first_rec_offset_ - CLOG_BLOCK_HDR_SIZE);
logrec_buf->write_offset_ += block->log_block_hdr_.first_rec_offset_ - CLOG_BLOCK_HDR_SIZE;
offset += block->log_block_hdr_.first_rec_offset_ - CLOG_BLOCK_HDR_SIZE;
} else {
if (CLOG_BLOCK_SIZE - offset < sizeof(CLogRecordHeader)) { // 一定是跨block的第一部分
// 此时无法确定log record的长度
// 开始写入logrec_buf
memcpy(&logrec_buf->buffer_[logrec_buf->write_offset_], (char *)block + (int)offset, CLOG_BLOCK_SIZE - offset);
logrec_buf->write_offset_ += CLOG_BLOCK_SIZE - offset;
offset = CLOG_BLOCK_SIZE;
} else {
if (logrec_buf->write_offset_ != 0) {
log_rec = new CLogRecord((char *)logrec_buf->buffer_);
memset(logrec_buf, 0, sizeof(CLogRecordBuf));
} else {
CLogRecordHeader *logrec_hdr = (CLogRecordHeader *)((char *)block + (int)offset);
if (logrec_hdr->logrec_len_ <= CLOG_BLOCK_SIZE - offset) {
log_rec = new CLogRecord((char *)block + (int)offset);
offset += logrec_hdr->logrec_len_;
} else { //此时为跨block的第一部分
// 开始写入logrec_buf
memcpy(
&logrec_buf->buffer_[logrec_buf->write_offset_], (char *)block + (int)offset, CLOG_BLOCK_SIZE - offset);
logrec_buf->write_offset_ += CLOG_BLOCK_SIZE - offset;
offset = CLOG_BLOCK_SIZE;
}
}
}
}
return RC::SUCCESS;
}
//
void CLogMTRManager::log_record_manage(CLogRecord *log_rec)
{
if (log_rec->get_log_type() == REDO_MTR_COMMIT) {
trx_commited[log_rec->get_trx_id()] = true;
delete log_rec;
} else if (log_rec->get_log_type() == REDO_MTR_BEGIN) {
trx_commited.insert({log_rec->get_trx_id(), false});
delete log_rec;
} else {
log_redo_list.push_back(log_rec);
}
}
////////////////////
std::atomic<int32_t> CLogManager::gloabl_lsn_(0);
CLogManager::CLogManager(const char *path)
{
log_buffer_ = new CLogBuffer();
log_file_ = new CLogFile(path);
log_mtr_mgr_ = new CLogMTRManager();
}
CLogManager::~CLogManager()
{
if (log_buffer_) {
delete log_buffer_;
}
}
RC CLogManager::clog_gen_record(CLogType flag, int32_t trx_id, CLogRecord *&log_rec,
const char *table_name /* = nullptr */, int data_len /* = 0 */, Record *rec /* = nullptr*/)
{
CLogRecord *log_record = new CLogRecord(flag, trx_id, table_name, data_len, rec);
if (log_record) {
log_rec = log_record;
} else {
LOG_ERROR("new CLogRecord failed");
return RC::NOMEM;
}
return RC::SUCCESS;
}
RC CLogManager::clog_append_record(CLogRecord *log_rec)
{
RC rc = RC::SUCCESS;
int start_offset = 0;
rc = log_buffer_->append_log_record(log_rec, start_offset);
if (rc == RC::LOGBUF_FULL || log_rec->get_log_type() == REDO_MTR_COMMIT) {
clog_sync();
if (start_offset != log_rec->get_logrec_len()) { // 当前日志记录还没写完
log_buffer_->append_log_record(log_rec, start_offset);
}
}
delete log_rec; // NOTE: 单元测试需要注释该行
return rc;
}
RC CLogManager::clog_sync()
{
RC rc = RC::SUCCESS;
rc = log_buffer_->flush_buffer(log_file_);
return rc;
}
RC CLogManager::recover()
{
log_file_->recover(log_mtr_mgr_, log_buffer_);
return RC::SUCCESS;
}
CLogMTRManager *CLogManager::get_mtr_manager()
{
return log_mtr_mgr_;
}
int32_t CLogManager::get_next_lsn(int32_t rec_len)
{
int32_t res_lsn = CLogManager::gloabl_lsn_;
CLogManager::gloabl_lsn_ += rec_len; // 当前不考虑溢出
return res_lsn;
}
/* Copyright (c) 2021-2022 Xie Meiyi(xiemeiyi@hust.edu.cn),
Huazhong University of Science and Technology
and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by huhaosheng.hhs on 2022
//
#ifndef __OBSERVER_STORAGE_REDO_REDOLOG_H_
#define __OBSERVER_STORAGE_REDO_REDOLOG_H_
#include <stddef.h>
#include <stdint.h>
#include <list>
#include <atomic>
#include <unordered_map>
#include "storage/record/record.h"
#include "storage/persist/persist.h"
#include "rc.h"
//固定文件大小 TODO: 循环文件组
#define CLOG_FILE_SIZE 48 * 1024 * 1024
#define CLOG_BUFFER_SIZE 4 * 1024 * 1024
#define TABLE_NAME_MAX_LEN 20 // TODO: 表名不要超过20字节
class CLogManager;
class CLogBuffer;
class CLogFile;
class PersistHandler;
struct CLogRecordHeader;
struct CLogFileHeader;
struct CLogBlockHeader;
struct CLogBlock;
struct CLogMTRManager;
enum CLogType { REDO_ERROR = 0, REDO_MTR_BEGIN, REDO_MTR_COMMIT, REDO_INSERT, REDO_DELETE };
struct CLogRecordHeader {
int32_t lsn_;
int32_t trx_id_;
int type_;
int logrec_len_;
bool operator==(const CLogRecordHeader &other) const
{
return lsn_ == other.lsn_ && trx_id_ == other.trx_id_ && type_ == other.type_ && logrec_len_ == other.logrec_len_;
}
};
struct CLogInsertRecord {
CLogRecordHeader hdr_;
char table_name_[TABLE_NAME_MAX_LEN];
RID rid_;
int data_len_;
char *data_;
bool operator==(const CLogInsertRecord &other) const
{
return hdr_ == other.hdr_ && (strcmp(table_name_, other.table_name_) == 0) && (rid_ == other.rid_) &&
(data_len_ == other.data_len_) && (memcmp(data_, other.data_, data_len_) == 0);
}
};
struct CLogDeleteRecord {
CLogRecordHeader hdr_;
char table_name_[TABLE_NAME_MAX_LEN];
RID rid_;
bool operator==(const CLogDeleteRecord &other) const
{
return hdr_ == other.hdr_ && strcmp(table_name_, other.table_name_) == 0 && rid_ == other.rid_;
}
};
struct CLogMTRRecord {
CLogRecordHeader hdr_;
bool operator==(const CLogMTRRecord &other) const
{
return hdr_ == other.hdr_;
}
};
union CLogRecords {
CLogInsertRecord ins;
CLogDeleteRecord del;
CLogMTRRecord mtr;
char *errors;
};
class CLogRecord {
friend class Db;
public:
// TODO: lsn当前在内部分配
// 对齐在内部处理
CLogRecord(CLogType flag, int32_t trx_id, const char *table_name = nullptr, int data_len = 0, Record *rec = nullptr);
// 从外存恢复log record
CLogRecord(char *data);
~CLogRecord();
CLogType get_log_type()
{
return flag_;
}
int32_t get_trx_id()
{
return log_record_.mtr.hdr_.trx_id_;
}
int32_t get_logrec_len()
{
return log_record_.mtr.hdr_.logrec_len_;
}
int32_t get_lsn()
{
return log_record_.mtr.hdr_.lsn_;
}
RC copy_record(void *dest, int start_off, int copy_len);
/// for unitest
int cmp_eq(CLogRecord *other);
CLogRecords *get_record()
{
return &log_record_;
}
///
protected:
CLogType flag_;
CLogRecords log_record_;
};
// TODO: 当前为简单实现,无循环
class CLogBuffer {
public:
CLogBuffer();
~CLogBuffer();
RC append_log_record(CLogRecord *log_rec, int &start_off);
// 将buffer中的数据下刷到log_file
RC flush_buffer(CLogFile *log_file);
void set_current_block_no(const int32_t block_no)
{
current_block_no_ = block_no;
}
void set_write_block_offset(const int32_t write_block_offset)
{
write_block_offset_ = write_block_offset;
};
void set_write_offset(const int32_t write_offset)
{
write_offset_ = write_offset;
};
RC block_copy(int32_t offset, CLogBlock *log_block);
protected:
int32_t current_block_no_;
int32_t write_block_offset_;
int32_t write_offset_;
char buffer_[CLOG_BUFFER_SIZE];
};
//
#define CLOG_FILE_HDR_SIZE (sizeof(CLogFileHeader))
#define CLOG_BLOCK_SIZE (1 << 9)
#define CLOG_BLOCK_DATA_SIZE (CLOG_BLOCK_SIZE - sizeof(CLogBlockHeader))
#define CLOG_BLOCK_HDR_SIZE (sizeof(CLogBlockHeader))
#define CLOG_REDO_BUFFER_SIZE 8 * CLOG_BLOCK_SIZE
struct CLogRecordBuf {
int32_t write_offset_;
// TODO: 当前假定log record大小不会超过CLOG_REDO_BUFFER_SIZE
char buffer_[CLOG_REDO_BUFFER_SIZE];
};
struct CLogFileHeader {
int32_t current_file_real_offset_;
// TODO: 用于文件组,当前没用
int32_t current_file_lsn_;
};
struct CLogFHDBlock {
CLogFileHeader hdr_;
char pad[CLOG_BLOCK_SIZE - CLOG_FILE_HDR_SIZE];
};
struct CLogBlockHeader {
int32_t log_block_no; // 在文件中的offset no=n*CLOG_BLOCK_SIZE
int16_t log_data_len_;
int16_t first_rec_offset_;
};
struct CLogBlock {
CLogBlockHeader log_block_hdr_;
char data[CLOG_BLOCK_DATA_SIZE];
};
class CLogFile {
public:
CLogFile(const char *path);
~CLogFile();
RC update_log_fhd(int32_t current_file_lsn);
RC append(int data_len, char *data);
RC write(uint64_t offset, int data_len, char *data);
RC recover(CLogMTRManager *mtr_mgr, CLogBuffer *log_buffer);
RC block_recover(CLogBlock *block, int16_t &offset, CLogRecordBuf *logrec_buf, CLogRecord *&log_rec);
protected:
CLogFHDBlock log_fhd_;
PersistHandler *log_file_;
};
// TODO: 当前简单管理mtr
struct CLogMTRManager {
std::list<CLogRecord *> log_redo_list;
std::unordered_map<int32_t, bool> trx_commited; // <trx_id, commited>
void log_record_manage(CLogRecord *log_rec);
};
//
class CLogManager {
public:
CLogManager(const char *path);
~CLogManager();
RC init();
RC clog_gen_record(CLogType flag, int32_t trx_id, CLogRecord *&log_rec, const char *table_name = nullptr,
int data_len = 0, Record *rec = nullptr);
//追加写到log_buffer
RC clog_append_record(CLogRecord *log_rec);
// 通常不需要在外部调用
RC clog_sync();
// TODO: 优化回放过程,对同一位置的修改可以用哈希聚合
RC recover();
CLogMTRManager *get_mtr_manager();
static int32_t get_next_lsn(int32_t rec_len);
static std::atomic<int32_t> gloabl_lsn_;
protected:
CLogBuffer *log_buffer_;
CLogFile *log_file_;
CLogMTRManager *log_mtr_mgr_;
};
#endif // __OBSERVER_STORAGE_REDO_REDOLOG_H_
......@@ -13,6 +13,7 @@ See the Mulan PSL v2 for more details. */
//
#include <stddef.h>
#include <math.h>
#include "condition_filter.h"
#include "storage/record/record_manager.h"
#include "common/log/log.h"
......@@ -156,7 +157,8 @@ bool DefaultConditionFilter::filter(const Record &rec) const
case FLOATS: {
float left = *(float *)left_value;
float right = *(float *)right_value;
cmp_result = (int)(left - right);
float result = left - right;
cmp_result = result >= 0 ? ceil(result) : floor(result);
} break;
default: {
}
......
......@@ -24,6 +24,8 @@ See the Mulan PSL v2 for more details. */
#include "storage/common/table_meta.h"
#include "storage/common/table.h"
#include "storage/common/meta_util.h"
#include "storage/trx/trx.h"
#include "storage/clog/clog.h"
Db::~Db()
{
......@@ -46,6 +48,12 @@ RC Db::init(const char *name, const char *dbpath)
return RC::GENERIC_ERROR;
}
clog_manager_ = new CLogManager(dbpath);
if (clog_manager_ == nullptr) {
LOG_ERROR("Failed to init CLogManager.");
return RC::GENERIC_ERROR;
}
name_ = name;
path_ = dbpath;
......@@ -64,7 +72,7 @@ RC Db::create_table(const char *table_name, int attribute_count, const AttrInfo
// 文件路径可以移到Table模块
std::string table_file_path = table_meta_file(path_.c_str(), table_name);
Table *table = new Table();
rc = table->create(table_file_path.c_str(), table_name, path_.c_str(), attribute_count, attributes);
rc = table->create(table_file_path.c_str(), table_name, path_.c_str(), attribute_count, attributes, get_clog_manager());
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to create table %s.", table_name);
delete table;
......@@ -97,7 +105,7 @@ RC Db::open_all_tables()
RC rc = RC::SUCCESS;
for (const std::string &filename : table_meta_files) {
Table *table = new Table();
rc = table->open(filename.c_str(), path_.c_str());
rc = table->open(filename.c_str(), path_.c_str(), clog_manager_);
if (rc != RC::SUCCESS) {
delete table;
LOG_ERROR("Failed to open table. filename=%s", filename.c_str());
......@@ -146,4 +154,76 @@ RC Db::sync()
}
LOG_INFO("Successfully sync db. db=%s", name_.c_str());
return rc;
}
RC Db::recover()
{
RC rc = RC::SUCCESS;
if ((rc = clog_manager_->recover()) == RC::SUCCESS) {
uint32_t max_trx_id = 0;
CLogMTRManager *mtr_manager = clog_manager_->get_mtr_manager();
for (auto it = mtr_manager->log_redo_list.begin(); it != mtr_manager->log_redo_list.end(); it++) {
CLogRecord *clog_record = *it;
if (clog_record->get_log_type() != CLogType::REDO_INSERT && clog_record->get_log_type() != CLogType::REDO_DELETE) {
delete clog_record;
continue;
}
auto find_iter = mtr_manager->trx_commited.find(clog_record->get_trx_id());
if (find_iter == mtr_manager->trx_commited.end()) {
LOG_ERROR("CLog record without commit message! "); // unexpected error
delete clog_record;
return RC::GENERIC_ERROR;
} else if (find_iter->second == false ) {
delete clog_record;
continue;
}
Table *table = find_table(clog_record->log_record_.ins.table_name_);
if (table == nullptr) {
delete clog_record;
continue;
}
switch(clog_record->get_log_type()) {
case CLogType::REDO_INSERT: {
char *record_data = new char[clog_record->log_record_.ins.data_len_];
memcpy(record_data, clog_record->log_record_.ins.data_, clog_record->log_record_.ins.data_len_);
Record record;
record.set_data(record_data);
record.set_rid(clog_record->log_record_.ins.rid_);
rc = table->recover_insert_record(&record);
delete[] record_data;
} break;
case CLogType::REDO_DELETE: {
Record record;
record.set_rid(clog_record->log_record_.del.rid_);
rc = table->recover_delete_record(&record);
} break;
default: {
rc = RC::SUCCESS;
}
}
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to recover. rc=%d:%s", rc, strrc(rc));
break;
}
if (max_trx_id < clog_record->get_trx_id()) {
max_trx_id = clog_record->get_trx_id();
}
delete clog_record;
}
if (rc == RC::SUCCESS && max_trx_id > 0) {
Trx::set_trx_id(max_trx_id);
}
}
return rc;
}
CLogManager *Db::get_clog_manager() {
return clog_manager_;
}
\ No newline at end of file
......@@ -23,6 +23,7 @@ See the Mulan PSL v2 for more details. */
#include "sql/parser/parse_defs.h"
class Table;
class CLogManager;
class Db {
public:
......@@ -41,6 +42,10 @@ public:
RC sync();
RC recover();
CLogManager *get_clog_manager();
private:
RC open_all_tables();
......@@ -48,6 +53,7 @@ private:
std::string name_;
std::string path_;
std::unordered_map<std::string, Table *> opened_tables_;
CLogManager *clog_manager_ = nullptr;
};
#endif // __OBSERVER_STORAGE_COMMON_DB_H__
\ No newline at end of file
......@@ -28,6 +28,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/index/index.h"
#include "storage/index/bplus_tree_index.h"
#include "storage/trx/trx.h"
#include "storage/clog/clog.h"
Table::~Table()
{
......@@ -51,7 +52,7 @@ Table::~Table()
}
RC Table::create(
const char *path, const char *name, const char *base_dir, int attribute_count, const AttrInfo attributes[])
const char *path, const char *name, const char *base_dir, int attribute_count, const AttrInfo attributes[], CLogManager *clog_manager)
{
if (common::is_blank(name)) {
......@@ -114,11 +115,12 @@ RC Table::create(
}
base_dir_ = base_dir;
clog_manager_ = clog_manager;
LOG_INFO("Successfully create table %s:%s", base_dir, name);
return rc;
}
RC Table::open(const char *meta_file, const char *base_dir)
RC Table::open(const char *meta_file, const char *base_dir, CLogManager *clog_manager)
{
// 加载元数据文件
std::fstream fs;
......@@ -176,6 +178,10 @@ RC Table::open(const char *meta_file, const char *base_dir)
}
indexes_.push_back(index);
}
if (clog_manager_ == nullptr) {
clog_manager_ = clog_manager;
}
return rc;
}
......@@ -263,8 +269,36 @@ RC Table::insert_record(Trx *trx, Record *record)
}
return rc;
}
if (trx != nullptr) {
// append clog record
CLogRecord *clog_record = nullptr;
rc = clog_manager_->clog_gen_record(CLogType::REDO_INSERT, trx->get_current_id(), clog_record, name(), table_meta_.record_size(), record);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to create a clog record. rc=%d:%s", rc, strrc(rc));
return rc;
}
rc = clog_manager_->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
return rc;
}
}
return rc;
}
RC Table::recover_insert_record(Record *record)
{
RC rc = RC::SUCCESS;
rc = record_handler_->recover_insert_record(record->data(), table_meta_.record_size(), &record->rid());
if (rc != RC::SUCCESS) {
LOG_ERROR("Insert record failed. table name=%s, rc=%d:%s", table_meta_.name(), rc, strrc(rc));
return rc;
}
return rc;
}
RC Table::insert_record(Trx *trx, int value_num, const Value *values)
{
if (value_num <= 0 || nullptr == values) {
......@@ -281,7 +315,6 @@ RC Table::insert_record(Trx *trx, int value_num, const Value *values)
Record record;
record.set_data(record_data);
// record.valid = true;
rc = insert_record(trx, &record);
delete[] record_data;
return rc;
......@@ -656,17 +689,44 @@ RC Table::delete_record(Trx *trx, ConditionFilter *filter, int *deleted_count)
RC Table::delete_record(Trx *trx, Record *record)
{
RC rc = RC::SUCCESS;
rc = delete_entry_of_indexes(record->data(), record->rid(), false); // 重复代码 refer to commit_delete
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to delete indexes of record (rid=%d.%d). rc=%d:%s",
record->rid().page_num, record->rid().slot_num, rc, strrc(rc));
return rc;
}
rc = record_handler_->delete_record(&record->rid());
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to delete record (rid=%d.%d). rc=%d:%s",
record->rid().page_num, record->rid().slot_num, rc, strrc(rc));
return rc;
}
if (trx != nullptr) {
rc = trx->delete_record(this, record);
} else {
rc = delete_entry_of_indexes(record->data(), record->rid(), false); // 重复代码 refer to commit_delete
CLogRecord *clog_record = nullptr;
rc = clog_manager_->clog_gen_record(CLogType::REDO_DELETE, trx->get_current_id(), clog_record, name(), 0, record);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to delete indexes of record (rid=%d.%d). rc=%d:%s",
record->rid().page_num, record->rid().slot_num, rc, strrc(rc));
} else {
rc = record_handler_->delete_record(&record->rid());
LOG_ERROR("Failed to create a clog record. rc=%d:%s", rc, strrc(rc));
return rc;
}
rc = clog_manager_->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
return rc;
}
}
return rc;
}
RC Table::recover_delete_record(Record *record)
{
RC rc = RC::SUCCESS;
rc = record_handler_->delete_record(&record->rid());
return rc;
}
......@@ -853,12 +913,7 @@ IndexScanner *Table::find_index_for_scan(const ConditionFilter *filter)
RC Table::sync()
{
RC rc = data_buffer_pool_->flush_all_pages();
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to flush table's data pages. table=%s, rc=%d:%s", name(), rc, strrc(rc));
return rc;
}
RC rc = RC::SUCCESS;
for (Index *index : indexes_) {
rc = index->sync();
if (rc != RC::SUCCESS) {
......
......@@ -28,6 +28,7 @@ class Index;
class IndexScanner;
class RecordDeleter;
class Trx;
class CLogManager;
// TODO remove the routines with condition
class Table {
......@@ -42,21 +43,25 @@ public:
* @param base_dir 表数据存放的路径
* @param attribute_count 字段个数
* @param attributes 字段
* @param clog_manager clog管理器,用于维护redo log
*/
RC create(const char *path, const char *name, const char *base_dir, int attribute_count, const AttrInfo attributes[]);
RC create(const char *path, const char *name, const char *base_dir, int attribute_count, const AttrInfo attributes[],
CLogManager *clog_manager);
/**
* 打开一个表
* @param meta_file 保存表元数据的文件完整路径
* @param base_dir 表所在的文件夹,表记录数据文件、索引数据文件存放位置
* @param clog_manager clog管理器
*/
RC open(const char *meta_file, const char *base_dir);
RC open(const char *meta_file, const char *base_dir, CLogManager *clog_manager);
RC insert_record(Trx *trx, int value_num, const Value *values);
RC update_record(Trx *trx, const char *attribute_name, const Value *value, int condition_num,
const Condition conditions[], int *updated_count);
RC delete_record(Trx *trx, ConditionFilter *filter, int *deleted_count);
RC delete_record(Trx *trx, Record *record);
RC recover_delete_record(Record *record);
RC scan_record(Trx *trx, ConditionFilter *filter, int limit, void *context,
void (*record_reader)(const char *data, void *context));
......@@ -90,9 +95,11 @@ private:
RC (*record_reader)(Record *record, void *context));
IndexScanner *find_index_for_scan(const ConditionFilter *filter);
IndexScanner *find_index_for_scan(const DefaultConditionFilter &filter);
RC insert_record(Trx *trx, Record *record);
public:
RC recover_insert_record(Record *record);
private:
friend class RecordUpdater;
friend class RecordDeleter;
......@@ -110,8 +117,9 @@ public:
private:
std::string base_dir_;
CLogManager *clog_manager_;
TableMeta table_meta_;
DiskBufferPool *data_buffer_pool_ = nullptr; /// 数据文件关联的buffer pool
DiskBufferPool *data_buffer_pool_ = nullptr; /// 数据文件关联的buffer pool
RecordFileHandler *record_handler_ = nullptr; /// 记录操作
std::vector<Index *> indexes_;
};
......
......@@ -123,6 +123,10 @@ RC DefaultHandler::open_db(const char *dbname)
if ((ret = db->init(dbname, dbpath.c_str())) != RC::SUCCESS) {
LOG_ERROR("Failed to open db: %s. error=%d", dbname, ret);
}
if ((ret = db->recover()) != RC::SUCCESS) {
LOG_ERROR("Failed to recover db: %s. error=%d", dbname, ret);
}
opened_dbs_[dbname] = db;
return RC::SUCCESS;
}
......
......@@ -226,7 +226,8 @@ RC DiskBufferPool::close_file()
}
hdr_frame_->pin_count_--;
if ((rc = purge_all_pages()) != RC::SUCCESS) {
// TODO: 理论上是在回放时回滚未提交事务,但目前没有undo log,因此不下刷数据page,只通过redo log回放
if ((rc = purge_page(0)) != RC::SUCCESS) {
hdr_frame_->pin_count_++;
LOG_ERROR("Failed to close %s, due to failed to purge all pages.", file_name_.c_str());
return rc;
......@@ -254,6 +255,7 @@ RC DiskBufferPool::get_this_page(PageNum page_num, Frame **frame)
used_match_frame->pin_count_++;
used_match_frame->acc_time_ = current_time();
*frame = used_match_frame;
return RC::SUCCESS;
}
......@@ -342,7 +344,6 @@ RC DiskBufferPool::allocate_page(Frame **frame)
RC DiskBufferPool::unpin_page(Frame *frame)
{
assert(frame->pin_count_ >= 1);
if (--frame->pin_count_ == 0) {
PageNum page_num = frame->page_num();
auto pages_it = disposed_pages.find(page_num);
......@@ -488,6 +489,21 @@ RC DiskBufferPool::flush_all_pages()
return RC::SUCCESS;
}
RC DiskBufferPool::recover_page(PageNum page_num)
{
int byte = 0, bit = 0;
byte = page_num / 8;
bit = page_num % 8;
if (!(file_header_->bitmap[byte] & (1 << bit))) {
file_header_->bitmap[byte] |= (1 << bit);
file_header_->allocated_pages++;
file_header_->page_count++;
hdr_frame_->mark_dirty();
}
return RC::SUCCESS;
}
RC DiskBufferPool::allocate_frame(PageNum page_num, Frame **buffer)
{
while (true) {
......
......@@ -284,6 +284,11 @@ public:
*/
RC flush_all_pages();
/**
* 回放日志时处理page0中已被认定为不存在的page
*/
RC recover_page(PageNum page_num);
protected:
protected:
RC allocate_frame(PageNum page_num, Frame **buf);
......
......@@ -1736,12 +1736,12 @@ RC BplusTreeScanner::open(const char *left_user_key, int left_len, bool left_inc
bool should_inclusive_after_fix = false;
rc = fix_user_key(left_user_key, left_len, true/*greater*/, &fixed_left_key, &should_inclusive_after_fix);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to fix left user key. rc=%s", strrc(rc));
return rc;
LOG_WARN("failed to fix left user key. rc=%s", strrc(rc));
return rc;
}
if (should_inclusive_after_fix) {
left_inclusive = true;
left_inclusive = true;
}
}
......@@ -1769,14 +1769,14 @@ RC BplusTreeScanner::open(const char *left_user_key, int left_len, bool left_inc
if (left_index >= left_node.size()) { // 超出了当前页,就需要向后移动一个位置
const PageNum next_page_num = left_node.next_page();
if (next_page_num == BP_INVALID_PAGE_NUM) { // 这里已经是最后一页,说明当前扫描,没有数据
return RC::SUCCESS;
return RC::SUCCESS;
}
tree_handler_.disk_buffer_pool_->unpin_page(left_frame_);
rc = tree_handler_.disk_buffer_pool_->get_this_page(next_page_num, &left_frame_);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to fetch next page. page num=%d, rc=%d:%s", next_page_num, rc, strrc(rc));
return rc;
LOG_WARN("failed to fetch next page. page num=%d, rc=%d:%s", next_page_num, rc, strrc(rc));
return rc;
}
left_index = 0;
......@@ -1802,12 +1802,12 @@ RC BplusTreeScanner::open(const char *left_user_key, int left_len, bool left_inc
if (tree_handler_.file_header_.attr_type == CHARS) {
rc = fix_user_key(right_user_key, right_len, false/*want_greater*/, &fixed_right_key, &should_include_after_fix);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to fix right user key. rc=%s", strrc(rc));
return rc;
LOG_WARN("failed to fix right user key. rc=%s", strrc(rc));
return rc;
}
if (should_include_after_fix) {
right_inclusive = true;
right_inclusive = true;
}
}
if (right_inclusive) {
......@@ -1840,15 +1840,15 @@ RC BplusTreeScanner::open(const char *left_user_key, int left_len, bool left_inc
// 其它的叶子节点都不可能返回0,所以这段逻辑其实是可以简化的
const PageNum prev_page_num = right_node.prev_page();
if (prev_page_num == BP_INVALID_PAGE_NUM) {
end_index_ = -1;
return RC::SUCCESS;
end_index_ = -1;
return RC::SUCCESS;
}
tree_handler_.disk_buffer_pool_->unpin_page(right_frame_);
rc = tree_handler_.disk_buffer_pool_->get_this_page(prev_page_num, &right_frame_);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to fetch prev page num. page num=%d, rc=%d:%s", prev_page_num, rc, strrc(rc));
return rc;
LOG_WARN("failed to fetch prev page num. page num=%d, rc=%d:%s", prev_page_num, rc, strrc(rc));
return rc;
}
LeafIndexNodeHandler tmp_node(tree_handler_.file_header_, right_frame_);
......@@ -1904,7 +1904,7 @@ RC BplusTreeScanner::next_entry(RID *rid)
} else {
rc = tree_handler_.disk_buffer_pool_->get_this_page(page_num, &left_frame_);
if (rc != RC::SUCCESS) {
left_frame_ = nullptr;
left_frame_ = nullptr;
LOG_WARN("failed to fetch next page. page num=%d, rc=%d:%s", page_num, rc, strrc(rc));
return rc;
}
......
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by qiling on 2021/4/13.
//
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "persist.h"
#include "common/log/log.h"
PersistHandler::PersistHandler()
{}
PersistHandler::~PersistHandler()
{
close_file();
}
RC PersistHandler::create_file(const char *file_name)
{
RC rc = RC::SUCCESS;
if (file_name == nullptr) {
LOG_ERROR("Failed to create file, because file name is null.");
rc = RC::FILE_NAME;
} else if (!file_name_.empty()) {
LOG_ERROR("Failed to create %s, because a file is already bound.", file_name);
rc = RC::FILE_BOUND;
} else if (access(file_name, F_OK) != -1){
LOG_WARN("Failed to create %s, because file already exist.", file_name);
rc = RC::FILE_EXIST;
} else {
int fd;
fd = open(file_name, O_RDWR | O_CREAT | O_EXCL, S_IREAD | S_IWRITE);
if (fd < 0) {
LOG_ERROR("Failed to create %s, due to %s.", file_name, strerror(errno));
rc = RC::FILE_CREATE;
} else {
file_name_ = file_name;
close(fd);
LOG_INFO("Successfully create %s.", file_name);
}
}
return rc;
}
RC PersistHandler::open_file(const char *file_name)
{
int fd;
RC rc = RC::SUCCESS;
if (file_name == nullptr) {
if (file_name_.empty()) {
LOG_ERROR("Failed to open file, because no file name.");
rc = RC::FILE_NAME;
} else {
if ((fd = open(file_name_.c_str(), O_RDWR)) < 0) {
LOG_ERROR("Failed to open file %s, because %s.", file_name_.c_str(), strerror(errno));
rc = RC::FILE_OPEN;
} else {
file_desc_ = fd;
LOG_INFO("Successfully open file %s.", file_name_.c_str());
}
}
} else {
if (!file_name_.empty()) {
LOG_ERROR("Failed to open file, because a file is already bound.");
rc = RC::FILE_BOUND;
} else {
if ((fd = open(file_name, O_RDWR)) < 0) {
LOG_ERROR("Failed to open file %s, because %s.", file_name, strerror(errno));
return RC::FILE_OPEN;
} else {
file_name_ = file_name;
file_desc_ = fd;
LOG_INFO("Successfully open file %s.", file_name);
}
}
}
return rc;
}
RC PersistHandler::close_file()
{
RC rc = RC::SUCCESS;
if (file_desc_ >= 0) {
if (close(file_desc_) < 0) {
LOG_ERROR("Failed to close file %d:%s, error:%s", file_desc_, file_name_.c_str(), strerror(errno));
rc = RC::FILE_CLOSE;
} else {
file_desc_ = -1;
LOG_INFO("Successfully close file %d:%s.", file_desc_, file_name_.c_str());
}
}
return rc;
}
RC PersistHandler::remove_file(const char *file_name)
{
RC rc = RC::SUCCESS;
if (file_name != nullptr) {
if (remove(file_name) == 0) {
LOG_INFO("Successfully remove file %s.", file_name);
} else {
LOG_ERROR("Failed to remove file %s, error:%s", file_name, strerror(errno));
rc = RC::FILE_REMOVE;
}
} else if (!file_name_.empty()) {
if (file_desc_ < 0 || (rc = close_file()) == RC::SUCCESS) {
if (remove(file_name_.c_str()) == 0) {
LOG_INFO("Successfully remove file %s.", file_name_.c_str());
} else {
LOG_ERROR("Failed to remove file %s, error:%s", file_name_.c_str(), strerror(errno));
rc = RC::FILE_REMOVE;
}
}
}
return rc;
}
RC PersistHandler::write_file(int size, const char *data, int64_t *out_size)
{
RC rc = RC::SUCCESS;
if (file_name_.empty()) {
LOG_ERROR("Failed to write, because file is not exist.");
rc = RC::FILE_NOT_EXIST;
} else if (file_desc_ < 0) {
LOG_ERROR("Failed to write, because file is not opened.");
rc = RC::FILE_NOT_OPENED;
} else {
int64_t write_size = 0;
if ((write_size = write(file_desc_, data, size)) != size) {
LOG_ERROR("Failed to write %d:%s due to %s. Write size: %lld", file_desc_, file_name_.c_str(), strerror(errno), write_size);
rc = RC::FILE_WRITE;
}
if (out_size != nullptr) {
*out_size = write_size;
}
}
return rc;
}
RC PersistHandler::write_at(uint64_t offset, int size, const char *data, int64_t *out_size)
{
RC rc = RC::SUCCESS;
if (file_name_.empty()) {
LOG_ERROR("Failed to write, because file is not exist.");
rc = RC::FILE_NOT_EXIST;
} else if (file_desc_ < 0) {
LOG_ERROR("Failed to write, because file is not opened.");
rc = RC::FILE_NOT_OPENED;
} else {
if (lseek(file_desc_, offset, SEEK_SET) == off_t(-1)) {
LOG_ERROR("Failed to write %lld of %d:%s due to failed to seek %s.", offset, file_desc_, file_name_.c_str(), strerror(errno));
rc = RC::FILE_SEEK;
} else {
int64_t write_size = 0;
if ((write_size = write(file_desc_, data, size)) != size) {
LOG_ERROR("Failed to write %llu of %d:%s due to %s. Write size: %lld", offset, file_desc_, file_name_.c_str(), strerror(errno), write_size);
rc = RC::FILE_WRITE;
}
if (out_size != nullptr) {
*out_size = write_size;
}
}
}
return rc;
}
RC PersistHandler::append(int size, const char *data, int64_t *out_size)
{
RC rc = RC::SUCCESS;
if (file_name_.empty()) {
LOG_ERROR("Failed to write, because file is not exist.");
rc = RC::FILE_NOT_EXIST;
} else if (file_desc_ < 0) {
LOG_ERROR("Failed to append, because file is not opened.");
rc = RC::FILE_NOT_OPENED;
} else {
if (lseek(file_desc_, 0, SEEK_END) == off_t(-1)) {
LOG_ERROR("Failed to append file %d:%s due to failed to seek: %s.", file_desc_, file_name_.c_str(), strerror(errno));
rc = RC::FILE_SEEK;
} else {
int64_t write_size = 0;
if ((write_size = write(file_desc_, data, size)) != size) {
LOG_ERROR("Failed to append file %d:%s due to %s. Write size: %lld", file_desc_, file_name_.c_str(), strerror(errno), write_size);
rc = RC::FILE_WRITE;
}
if (out_size != nullptr) {
*out_size = write_size;
}
}
}
return rc;
}
RC PersistHandler::read_file(int size, char *data, int64_t *out_size)
{
RC rc = RC::SUCCESS;
if (file_name_.empty()) {
LOG_ERROR("Failed to read, because file is not exist.");
rc = RC::FILE_NOT_EXIST;
} else if (file_desc_ < 0) {
LOG_ERROR("Failed to read, because file is not opened.");
rc = RC::FILE_NOT_OPENED;
} else {
int64_t read_size = 0;
if ((read_size = read(file_desc_, data, size)) != size) {
LOG_ERROR("Failed to read file %d:%s due to %s.", file_desc_, file_name_.c_str(), strerror(errno));
rc = RC::FILE_READ;
}
if (out_size != nullptr) {
*out_size = read_size;
}
}
return rc;
}
RC PersistHandler::read_at(uint64_t offset, int size, char *data, int64_t *out_size)
{
RC rc = RC::SUCCESS;
if (file_name_.empty()) {
LOG_ERROR("Failed to read, because file is not exist.");
rc = RC::FILE_NOT_EXIST;
} else if (file_desc_ < 0) {
LOG_ERROR("Failed to read, because file is not opened.");
rc = RC::FILE_NOT_OPENED;
} else {
if (lseek(file_desc_, offset, SEEK_SET) == off_t(-1)) {
LOG_ERROR("Failed to read %llu of %d:%s due to failed to seek %s.", offset, file_desc_, file_name_.c_str(), strerror(errno));
return RC::FILE_SEEK;
} else {
int64_t read_size = 0;
if ((read_size = read(file_desc_, data, size)) != size) {
LOG_WARN("Failed to read %lld of %d:%s due to %s.", offset, file_desc_, file_name_.c_str(), strerror(errno));
rc = RC::FILE_READ;
}
if (out_size != nullptr) {
*out_size = read_size;
}
}
}
return rc;
}
RC PersistHandler::seek(uint64_t offset)
{
RC rc = RC::SUCCESS;
if (file_name_.empty()) {
LOG_ERROR("Failed to seek, because file is not exist.");
rc = RC::FILE_NOT_EXIST;
} else if (file_desc_ < 0) {
LOG_ERROR("Failed to seek, because file is not opened.");
rc = RC::FILE_NOT_OPENED;
} else if (lseek(file_desc_, offset, SEEK_SET) == off_t(-1)) {
LOG_ERROR("Failed to seek %llu of %d:%s due to %s.", offset, file_desc_, file_name_.c_str(), strerror(errno));
rc = RC::FILE_SEEK;
}
return rc;
}
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by qiling on 2021/4/13.
//
#ifndef __OBSERVER_STORAGE_PERSIST_HANDLER_H_
#define __OBSERVER_STORAGE_PERSIST_HANDLER_H_
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include "rc.h"
class PersistHandler
{
public:
PersistHandler();
~PersistHandler();
/** 创建一个名称为指定文件名的文件,并将该文件绑定到当前对象 */
RC create_file(const char *file_name);
/** 根据文件名打开一个文件并绑定到当前对象,若文件名为空则打开当前文件 */
RC open_file(const char *file_name = nullptr);
/** 关闭当前文件 */
RC close_file();
/** 删除指定文件,或删除当前文件 */
RC remove_file(const char *file_name = nullptr);
/** 在当前文件描述符的位置写入一段数据,并返回实际写入的数据大小out_size */
RC write_file(int size, const char *data, int64_t *out_size = nullptr);
/** 在指定位置写入一段数据,并返回实际写入的数据大小out_size */
RC write_at(uint64_t offset, int size, const char *data, int64_t *out_size = nullptr);
/** 在文件末尾写入一段数据,并返回实际写入的数据大小out_size */
RC append(int size, const char *data, int64_t *out_size = nullptr);
/** 在当前文件描述符的位置读取一段数据,并返回实际读取的数据大小out_size */
RC read_file(int size, char *data, int64_t *out_size = nullptr);
/** 在指定位置读取一段数据,并返回实际读取的数据大小out_size */
RC read_at(uint64_t offset, int size, char *data, int64_t *out_size = nullptr);
/** 将文件描述符移动到指定位置 */
RC seek(uint64_t offset);
private:
std::string file_name_;
int file_desc_ = -1;
};
#endif //__OBSERVER_STORAGE_PERSIST_HANDLER_H_
......@@ -106,6 +106,32 @@ RC RecordPageHandler::init(DiskBufferPool &buffer_pool, PageNum page_num)
return ret;
}
RC RecordPageHandler::recover_init(DiskBufferPool &buffer_pool, PageNum page_num)
{
if (disk_buffer_pool_ != nullptr) {
LOG_WARN("Disk buffer pool has been opened for page_num %d.", page_num);
return RC::RECORD_OPENNED;
}
RC ret = RC::SUCCESS;
if ((ret = buffer_pool.get_this_page(page_num, &frame_)) != RC::SUCCESS) {
LOG_ERROR("Failed to get page handle from disk buffer pool. ret=%d:%s", ret, strrc(ret));
return ret;
}
char *data = frame_->data();
disk_buffer_pool_ = &buffer_pool;
page_header_ = (PageHeader *)(data);
bitmap_ = data + page_fix_size();
buffer_pool.recover_page(page_num);
LOG_TRACE("Successfully init page_num %d.", page_num);
return ret;
}
RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, PageNum page_num, int record_size)
{
RC ret = init(buffer_pool, page_num);
......@@ -124,7 +150,11 @@ RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, PageNum page_
bitmap_ = frame_->data() + page_fix_size();
memset(bitmap_, 0, page_bitmap_size(page_header_->record_capacity));
frame_->mark_dirty();
if ((ret = buffer_pool.flush_page(*frame_)) != RC::SUCCESS) {
LOG_ERROR("Failed to flush page header %d:%d.", page_num);
return ret;
}
return RC::SUCCESS;
}
......@@ -167,6 +197,31 @@ RC RecordPageHandler::insert_record(const char *data, RID *rid)
return RC::SUCCESS;
}
RC RecordPageHandler::recover_insert_record(const char *data, RID *rid)
{
if (page_header_->record_num == page_header_->record_capacity) {
LOG_WARN("Page is full, page_num %d:%d.", frame_->page_num());
return RC::RECORD_NOMEM;
}
if (rid->slot_num >= page_header_->record_capacity) {
LOG_WARN("slot_num illegal, slot_num(%d) > record_capacity(%d).", rid->slot_num, page_header_->record_capacity);
return RC::RECORD_NOMEM;
}
// 更新位图
Bitmap bitmap(bitmap_, page_header_->record_capacity);
bitmap.set_bit(rid->slot_num);
page_header_->record_num++;
// 恢复数据
char *record_data = get_record_data(rid->slot_num);
memcpy(record_data, data, page_header_->record_real_size);
frame_->mark_dirty();
return RC::SUCCESS;
}
RC RecordPageHandler::update_record(const Record *rec)
{
if (rec->rid().slot_num >= page_header_->record_capacity) {
......@@ -351,6 +406,20 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid)
return record_page_handler.insert_record(data, rid);
}
RC RecordFileHandler::recover_insert_record(const char *data, int record_size, RID *rid)
{
RC ret = RC::SUCCESS;
RecordPageHandler record_page_handler;
ret = record_page_handler.recover_init(*disk_buffer_pool_, rid->page_num);
if (ret != RC::SUCCESS) {
LOG_WARN("failed to init record page handler. page num=%d, rc=%d:%s", rid->page_num, ret, strrc(ret));
return ret;
}
return record_page_handler.recover_insert_record(data, rid);
}
RC RecordFileHandler::update_record(const Record *rec)
{
RC ret;
......
......@@ -56,10 +56,12 @@ public:
RecordPageHandler() = default;
~RecordPageHandler();
RC init(DiskBufferPool &buffer_pool, PageNum page_num);
RC recover_init(DiskBufferPool &buffer_pool, PageNum page_num);
RC init_empty_page(DiskBufferPool &buffer_pool, PageNum page_num, int record_size);
RC cleanup();
RC insert_record(const char *data, RID *rid);
RC recover_insert_record(const char *data, RID *rid);
RC update_record(const Record *rec);
template <class RecordUpdater>
......@@ -120,6 +122,7 @@ public:
* 插入一个新的记录到指定文件中,pData为指向新纪录内容的指针,返回该记录的标识符rid
*/
RC insert_record(const char *data, int record_size, RID *rid);
RC recover_insert_record(const char *data, int record_size, RID *rid);
/**
* 获取指定文件中标识符为rid的记录内容到rec指向的记录结构中
......
......@@ -22,6 +22,7 @@ See the Mulan PSL v2 for more details. */
static const uint32_t DELETED_FLAG_BIT_MASK = 0x80000000;
static const uint32_t TRX_ID_BIT_MASK = 0x7FFFFFFF;
std::atomic<int32_t> Trx::trx_id(0);
int32_t Trx::default_trx_id()
{
......@@ -30,10 +31,25 @@ int32_t Trx::default_trx_id()
int32_t Trx::next_trx_id()
{
static std::atomic<int32_t> trx_id;
return ++trx_id;
}
void Trx::set_trx_id(int32_t id)
{
trx_id = id;
}
void Trx::next_current_id()
{
Trx::next_trx_id();
trx_id_ = trx_id;
}
int32_t Trx::get_current_id()
{
return trx_id_;
}
const char *Trx::trx_field_name()
{
return "__trx";
......@@ -50,7 +66,9 @@ int Trx::trx_field_len()
}
Trx::Trx()
{}
{
start_if_not_started();
}
Trx::~Trx()
{}
......@@ -61,13 +79,15 @@ RC Trx::insert_record(Table *table, Record *record)
// 先校验是否以前是否存在过(应该不会存在)
Operation *old_oper = find_operation(table, record->rid());
if (old_oper != nullptr) {
return RC::GENERIC_ERROR; // error code
if (old_oper->type() == Operation::Type::DELETE) {
delete_operation(table, record->rid());
} else {
return RC::GENERIC_ERROR;
}
}
start_if_not_started();
// 设置record中trx_field为当前的事务号
// set_record_trx_id(table, record, trx_id_, false);
// start_if_not_started();
// 记录到operations中
insert_operation(table, Operation::Type::INSERT, record->rid());
return rc;
......
......@@ -79,8 +79,12 @@ public:
*/
class Trx {
public:
static std::atomic<int32_t> trx_id;
static int32_t default_trx_id();
static int32_t next_trx_id();
static void set_trx_id(int32_t id);
static const char *trx_field_name();
static AttrType trx_field_type();
static int trx_field_len();
......@@ -103,6 +107,10 @@ public:
void init_trx_info(Table *table, Record &record);
void next_current_id();
int32_t get_current_id();
private:
void set_record_trx_id(Table *table, Record &record, int32_t trx_id, bool deleted) const;
static void get_record_trx_id(Table *table, const Record &record, int32_t &trx_id, bool &deleted);
......
/* Copyright (c) 2021-2022 Xie Meiyi(xiemeiyi@hust.edu.cn),
Huazhong University of Science and Technology
and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by huhaosheng.hhs on 2022
//
#include <string.h>
#include "storage/clog/clog.h"
#include "gtest/gtest.h"
Record *gen_ins_record(int32_t page_num, int32_t slot_num, int data_len)
{
Record *rec = new Record();
char *data = new char[data_len];
rec->set_rid(page_num, slot_num);
memset(data, data_len, data_len);
rec->set_data(data);
return rec;
}
Record *gen_del_record(int32_t page_num, int32_t slot_num)
{
Record *rec = new Record();
rec->set_rid(page_num, slot_num);
return rec;
}
TEST(test_clog, test_clog)
{
CLogManager *log_mgr = new CLogManager("/home/huhaosheng.hhs/Alibaba/miniob");
CLogRecord *log_rec[6];
CLogRecord *log_mtr_rec = nullptr;
Record *rec = nullptr;
//
log_mgr->clog_gen_record(REDO_MTR_BEGIN, 1, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec); // NOTE: 需要保留log_rec
delete log_mtr_rec;
rec = gen_ins_record(1, 1, 100);
log_mgr->clog_gen_record(REDO_INSERT, 1, log_rec[0], "table1", 100, rec);
log_mgr->clog_append_record(log_rec[0]);
delete[] rec->data();
delete rec;
rec = gen_ins_record(1, 1, 120);
log_mgr->clog_gen_record(REDO_INSERT, 1, log_rec[1], "table2", 120, rec);
log_mgr->clog_append_record(log_rec[1]);
delete[] rec->data();
delete rec;
log_mgr->clog_gen_record(REDO_MTR_BEGIN, 2, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec);
delete log_mtr_rec;
rec = gen_ins_record(1, 1, 200);
log_mgr->clog_gen_record(REDO_INSERT, 1, log_rec[2], "table3", 200, rec);
log_mgr->clog_append_record(log_rec[2]);
delete[] rec->data();
delete rec;
rec = gen_ins_record(1, 2, 120);
log_mgr->clog_gen_record(REDO_INSERT, 2, log_rec[3], "table2", 120, rec);
log_mgr->clog_append_record(log_rec[3]);
delete[] rec->data();
delete rec;
rec = gen_ins_record(1, 2, 100);
log_mgr->clog_gen_record(REDO_INSERT, 2, log_rec[4], "table1", 100, rec);
log_mgr->clog_append_record(log_rec[4]);
delete[] rec->data();
delete rec;
log_mgr->clog_gen_record(REDO_MTR_COMMIT, 2, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec);
delete log_mtr_rec;
rec = gen_del_record(1, 1);
log_mgr->clog_gen_record(REDO_DELETE, 1, log_rec[5], "table1", 0, rec);
log_mgr->clog_append_record(log_rec[5]);
delete rec;
log_mgr->clog_gen_record(REDO_MTR_COMMIT, 1, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec);
delete log_mtr_rec;
log_mgr->recover();
CLogMTRManager *log_mtr_mgr = log_mgr->get_mtr_manager();
ASSERT_EQ(true, log_mtr_mgr->trx_commited[1]);
ASSERT_EQ(true, log_mtr_mgr->trx_commited[2]);
ASSERT_EQ(6, log_mtr_mgr->log_redo_list.size());
int i = 0;
for (auto iter = log_mtr_mgr->log_redo_list.begin(); iter != log_mtr_mgr->log_redo_list.end();
iter++) {
CLogRecord *tmp = *iter;
ASSERT_EQ(1, log_rec[i]->cmp_eq(tmp));
delete tmp;
i++;
}
for (i = 0; i < 6; i++) {
delete log_rec[i];
}
}
int main(int argc, char **argv)
{
// 分析gtest程序的命令行参数
testing::InitGoogleTest(&argc, argv);
// 调用RUN_ALL_TESTS()运行所有测试用例
// main函数返回RUN_ALL_TESTS()的运行结果
return RUN_ALL_TESTS();
}
\ No newline at end of file
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by qiling on 2022
//
#include <string.h>
#include "gtest/gtest.h"
#include "storage/persist/persist.h"
#include "rc.h"
const int MAX_LEN = 50;
TEST(test_persist, test_persist_file_io)
{
std::string file_name_1 = "test_persist_file_io-file_name_1";
std::string file_name_2 = "test_persist_file_io-file_name_2";
PersistHandler persist_handler;
PersistHandler persist_handler_2;
RC rc;
// prepare
remove(file_name_1.c_str());
remove(file_name_2.c_str());
// create
rc = persist_handler.create_file(nullptr);
ASSERT_EQ(rc, RC::FILE_NAME);
rc = persist_handler.create_file(file_name_1.c_str());
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_NE(access(file_name_1.c_str(), F_OK), -1);
rc = persist_handler.create_file(file_name_2.c_str());
ASSERT_EQ(rc, RC::FILE_BOUND);
rc = persist_handler_2.create_file(file_name_1.c_str());
ASSERT_EQ(rc, RC::FILE_EXIST);
rc = persist_handler_2.create_file("");
ASSERT_EQ(rc, RC::FILE_CREATE);
// open
rc = persist_handler_2.open_file();
ASSERT_EQ(rc, RC::FILE_NAME);
rc = persist_handler_2.open_file("//**");
ASSERT_EQ(rc, RC::FILE_OPEN);
rc = persist_handler.open_file(file_name_2.c_str());
ASSERT_EQ(rc, RC::FILE_BOUND);
rc = persist_handler.open_file();
ASSERT_EQ(rc, RC::SUCCESS);
// write
std::string str_1 = "this is a string 001. ";
std::string str_2 = "this is a string 002002. ";
std::string str_3 = "THIS IS A STRING 003. ";
int64_t write_size = 0;
rc = persist_handler.write_file(str_1.size(), str_1.c_str(), &write_size);
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(write_size, str_1.size());
rc = persist_handler.append(str_2.size(), str_2.c_str(), &write_size);
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(write_size, str_2.size());
rc = persist_handler.write_at(str_1.size() + str_2.size(), str_3.size(), str_3.c_str(), &write_size);
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(write_size, str_3.size());
rc = persist_handler.close_file();
ASSERT_EQ(rc, RC::SUCCESS);
rc = persist_handler.append(str_3.size(), str_3.c_str(), &write_size);
ASSERT_EQ(rc, RC::FILE_NOT_OPENED);
rc = persist_handler.open_file();
ASSERT_EQ(rc, RC::SUCCESS);
rc = persist_handler_2.write_at(0, str_3.size(), str_3.c_str(), &write_size);
ASSERT_EQ(rc, RC::FILE_NOT_EXIST);
// read & seek
int64_t read_size = 0;
char buf[MAX_LEN] = {0};
rc = persist_handler.read_at(str_1.size(), str_2.size(), buf, &read_size);
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(read_size, str_2.size());
ASSERT_EQ(strnlen(buf, MAX_LEN), str_2.size());
ASSERT_EQ(strncmp(buf, str_2.c_str(), MAX_LEN), 0);
rc = persist_handler.seek(0);
ASSERT_EQ(rc, RC::SUCCESS);
rc = persist_handler.write_file(str_3.size(), str_3.c_str(), &write_size);
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(write_size, str_3.size());
memset(buf, 0, MAX_LEN);
rc = persist_handler.seek(0);
ASSERT_EQ(rc, RC::SUCCESS);
rc = persist_handler.read_file(str_3.size(), buf, &read_size);
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(read_size, str_3.size());
ASSERT_EQ(strnlen(buf, MAX_LEN), str_3.size());
ASSERT_EQ(strncmp(buf, str_3.c_str(), MAX_LEN), 0);
rc = persist_handler_2.read_at(0, str_3.size(), buf, &read_size);
ASSERT_EQ(rc, RC::FILE_NOT_EXIST);
// close
rc = persist_handler.close_file();
ASSERT_EQ(rc, RC::SUCCESS);
rc = persist_handler_2.close_file();
ASSERT_EQ(rc, RC::SUCCESS);
// remove
rc = persist_handler.remove_file();
ASSERT_EQ(rc, RC::SUCCESS);
ASSERT_EQ(access(file_name_1.c_str(), F_OK), -1);
}
int main(int argc, char **argv)
{
// 分析gtest程序的命令行参数
testing::InitGoogleTest(&argc, argv);
// 调用RUN_ALL_TESTS()运行所有测试用例
// main函数返回RUN_ALL_TESTS()的运行结果
return RUN_ALL_TESTS();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册