未验证 提交 aea25b25 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

合并之前的持久化代码到mvcc (#194)

### What problem were solved in this pull request?

close issue: #186 

Problem:
实现MVCC代码后没有将持久化代码合并进来

### What is changed and how it works?
支持并发模式下落日志,并在异常停机后回复。

持久化是事务模块中最复杂的功能(没有之一),当前miniob实现的持久化简化了非常多,因此不能依赖持久化能力,仅仅作为一个学习测试的工具,在实际测试时也有一些限制。

### Other information
上级 1a2cf476
......@@ -3,6 +3,9 @@
./deps/googletest
./deps/jsoncpp
./deps/benchmark
.vscode
!.vscode/tasks.json
!.vscode/launch.json
./docs/doxy/
build/*
build_*
......
{
"files.associations": {
"typeinfo": "cpp"
}
}
\ No newline at end of file
......@@ -12,11 +12,10 @@ MESSAGE(STATUS "This is BINARY dir " ${test_BINARY_DIR})
MESSAGE(STATUS "This is Project source dir " ${PROJECT_SOURCE_DIR})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
#SET(LIBRARY_OUTPUT_PATH <路径>)
OPTION(ENABLE_ASAN "Enable build with address sanitizer" ON)
OPTION(ENABLE_ASAN "Enable build with address sanitizer" OFF)
OPTION(WITH_UNIT_TESTS "Compile miniob with unit tests" ON)
OPTION(CONCURRENCY "Support concurrency operations" OFF)
......@@ -81,9 +80,11 @@ MESSAGE("Install target dir is " ${CMAKE_INSTALL_PREFIX})
# ADD_SUBDIRECTORY(src bin) bin 为目标目录, 可以省略
ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(src/obclient)
ADD_SUBDIRECTORY(src/observer)
ADD_SUBDIRECTORY(test/perf)
ADD_SUBDIRECTORY(benchmark)
ADD_SUBDIRECTORY(tools)
IF(WITH_UNIT_TESTS)
enable_testing()
......
......@@ -66,6 +66,15 @@ int getFileSize(const char *filePath, u64_t &fileLen);
* @return int 0 表示成功,否则返回errno
*/
int writen(int fd, const void *buf, int size);
/**
* @brief 一次性读取指定长度的数据
*
* @param fd 读取的描述符
* @param buf 读取到这里
* @param size 读取的数据长度
* @return int 返回0表示成功。-1 表示读取到文件尾,并且没有读到size大小数据,其它表示errno
*/
int readn(int fd, void *buf, int size);
} // namespace common
......@@ -262,12 +262,14 @@ void DebugMutex::lock()
{
#ifdef DEBUG
lock_.lock();
LOG_DEBUG("debug lock %p, lbt=%s", &lock_, lbt());
#endif
}
void DebugMutex::unlock()
{
#ifdef DEBUG
LOG_DEBUG("debug unlock %p, lbt=%s", &lock_, lbt());
lock_.unlock();
#endif
}
......@@ -277,13 +279,18 @@ void Mutex::lock()
{
#ifdef CONCURRENCY
lock_.lock();
LOG_DEBUG("lock %p, lbt=%s", &lock_, lbt());
#endif
}
bool Mutex::try_lock()
{
#ifdef CONCURRENCY
return lock_.try_lock();
bool result = lock_.try_lock();
if (result) {
LOG_DEBUG("try lock success %p, lbt=%s", &lock_, lbt());
}
return result;
#else
return true;
#endif
......@@ -292,6 +299,7 @@ bool Mutex::try_lock()
void Mutex::unlock()
{
#ifdef CONCURRENCY
LOG_DEBUG("unlock %p, lbt=%s", &lock_, lbt());
lock_.unlock();
#endif
}
......@@ -302,26 +310,38 @@ void Mutex::unlock()
void SharedMutex::lock()
{
lock_.lock();
LOG_DEBUG("shared lock %p, lbt=%s", &lock_, lbt());
}
bool SharedMutex::try_lock()
{
return lock_.try_lock();
bool result = lock_.try_lock();
if (result) {
LOG_DEBUG("try shared lock : %p, lbt=%s", &lock_, lbt());
}
return result;
}
void SharedMutex::unlock() // unlock exclusive
{
LOG_DEBUG("shared lock unlock %p, lbt=%s", &lock_, lbt());
lock_.unlock();
}
void SharedMutex::lock_shared()
{
lock_.lock_shared();
LOG_DEBUG("shared lock shared: %p lbt=%s", &lock_, lbt());
}
bool SharedMutex::try_lock_shared()
{
return lock_.try_lock_shared();
bool result = lock_.try_lock_shared();
if (result) {
LOG_DEBUG("shared lock try lock shared: %p, lbt=%s", &lock_, lbt());
}
return result;
}
void SharedMutex::unlock_shared()
{
LOG_DEBUG("shared lock unlock shared %p lbt=%s", &lock_, lbt());
lock_.unlock_shared();
}
......
......@@ -44,6 +44,8 @@ Log::Log(const std::string &log_file_name, const LOG_LEVEL log_level, const LOG_
rotate_type_ = LOG_ROTATE_BYDAY;
check_param_valid();
context_getter_ = []() { return 0; };
}
Log::~Log(void)
......@@ -311,6 +313,20 @@ int Log::rotate(const int year, const int month, const int day)
return result;
}
void Log::set_context_getter(std::function<intptr_t()> context_getter)
{
if (context_getter) {
context_getter_ = context_getter;
} else if (!context_getter_) {
context_getter_ = []() { return 0; };
}
}
intptr_t Log::context_id()
{
return context_getter_();
}
LoggerFactory::LoggerFactory()
{
// Auto-generated constructor stub
......
......@@ -25,6 +25,7 @@ See the Mulan PSL v2 for more details. */
#include <map>
#include <set>
#include <string>
#include <functional>
#include "common/defs.h"
......@@ -105,6 +106,9 @@ public:
int rotate(const int year = 0, const int month = 0, const int day = 0);
void set_context_getter(std::function<intptr_t()> context_getter);
intptr_t context_id();
private:
void check_param_valid();
......@@ -137,6 +141,8 @@ private:
typedef std::set<std::string> DefaultSet;
DefaultSet default_set_;
std::function<intptr_t()> context_getter_;
};
class LoggerFactory {
......@@ -157,7 +163,7 @@ extern Log *g_log;
#define __FILE_NAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#endif
#define LOG_HEAD_SIZE 64
#define LOG_HEAD_SIZE 128
#define LOG_HEAD(prefix, level) \
if (common::g_log) { \
......@@ -168,7 +174,7 @@ extern Log *g_log;
if (p) { \
int usec = (int)tv.tv_usec; \
snprintf(sz_head, LOG_HEAD_SIZE, \
"%04d-%02d-%02d %02d:%02d:%02u.%06d pid:%u tid:%llx ", \
"%04d-%02d-%02d %02d:%02d:%02u.%06d pid:%u tid:%llx ctx:%lx", \
p->tm_year + 1900, \
p->tm_mon + 1, \
p->tm_mday, \
......@@ -177,17 +183,18 @@ extern Log *g_log;
p->tm_sec, \
usec, \
(u32_t)getpid(), \
gettid()); \
gettid(), \
common::g_log->context_id()); \
common::g_log->rotate(p->tm_year + 1900, p->tm_mon + 1, p->tm_mday); \
} \
snprintf(prefix, \
sizeof(prefix), \
"[%s %s %s:%u %s] >> ", \
"[%s %s %s@%s:%u] >> ", \
sz_head, \
(common::g_log)->prefix_msg(level), \
__FUNCTION__, \
__FILE_NAME__, \
(u32_t)__LINE__, \
__FUNCTION__ \
(u32_t)__LINE__ \
); \
}
......@@ -302,9 +309,7 @@ int Log::out(const LOG_LEVEL console_level, const LOG_LEVEL log_level, T &msg)
#else // DEBUG
#define ASSERT(expression, description, ...) \
do { \
if (!(expression)) { \
LOG_ERROR(description, ##__VA_ARGS__); \
} \
(void)(expression); \
} while (0)
#endif // DEBUG
......@@ -318,5 +323,13 @@ int Log::out(const LOG_LEVEL console_level, const LOG_LEVEL log_level, T &msg)
*/
const char *lbt();
} // namespace common
/**
* @brief 设置一个在日志中打印当前上下文信息的回调函数
* @details 比如设置一个获取当前session标识的函数,那么每次在打印日志时都会输出session信息。
* 这个回调函数返回了一个intptr_t类型的数据,可能返回字符串更好,但是现在够用了。
*/
void set_log_context_getter(std::function<intptr_t()> context_getter);
extern std::function<intptr_t()> g_context_getter;
} // namespace common
......@@ -12,8 +12,7 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2010
//
#ifndef __COMMON_OS_PROCESS_PARAM_H__
#define __COMMON_OS_PROCESS_PARAM_H__
#pragma once
#include <string>
#include <vector>
......@@ -131,6 +130,16 @@ public:
return trx_kit_name_;
}
void set_buffer_pool_memory_size(int bytes)
{
buffer_pool_memory_size_ = bytes;
}
int buffer_pool_memory_size() const
{
return buffer_pool_memory_size_;
}
private:
std::string std_out_; // The output file
std::string std_err_; // The err output file
......@@ -142,9 +151,9 @@ private:
std::string unix_socket_path_;
std::string protocol_;
std::string trx_kit_name_;
int buffer_pool_memory_size_ = -1;
};
ProcessParam *&the_process_param();
} // namespace common
#endif //__COMMON_OS_PROCESS_PARAM_H__
......@@ -76,30 +76,22 @@ bool MetricsStage::set_properties()
// Initialize stage params and validate outputs
bool MetricsStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
timer_stage_ = *(stgp++);
MetricsReportEvent *report_event = new MetricsReportEvent();
add_event(report_event);
LOG_TRACE("Exit");
return true;
}
// Cleanup after disconnection
void MetricsStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void MetricsStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
CompletionCallback *cb = new CompletionCallback(this, NULL);
if (cb == NULL) {
LOG_ERROR("Failed to new callback");
......@@ -123,14 +115,11 @@ void MetricsStage::handle_event(StageEvent *event)
event->push_callback(cb);
timer_stage_->add_event(tm_event);
LOG_TRACE("Exit\n");
return;
}
void MetricsStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
MetricsRegistry &metrics_registry = get_metrics_registry();
metrics_registry.snapshot();
......@@ -139,6 +128,5 @@ void MetricsStage::callback_event(StageEvent *event, CallbackContext *context)
// do it again.
add_event(event);
LOG_TRACE("Exit\n");
return;
}
......@@ -36,14 +36,12 @@ namespace common {
*/
Stage::Stage(const char *tag) : next_stage_list_(), event_list_(), connected_(false), event_ref_(0)
{
LOG_TRACE("%s", "enter");
assert(tag != NULL);
MUTEX_INIT(&list_mutex_, NULL);
COND_INIT(&disconnect_cond_, NULL);
stage_name_ = new char[strlen(tag) + 1];
snprintf(stage_name_, strlen(tag) + 1, "%s", tag);
LOG_TRACE("%s", "exit");
}
/**
......@@ -53,7 +51,6 @@ Stage::Stage(const char *tag) : next_stage_list_(), event_list_(), connected_(fa
*/
Stage::~Stage()
{
LOG_TRACE("%s", "enter");
assert(!connected_);
MUTEX_LOCK(&list_mutex_);
while (event_list_.size() > 0) {
......@@ -66,7 +63,6 @@ Stage::~Stage()
MUTEX_DESTROY(&list_mutex_);
COND_DESTROY(&disconnect_cond_);
delete[] stage_name_;
LOG_TRACE("%s", "exit");
}
/**
......@@ -236,4 +232,4 @@ void Stage::release_event()
MUTEX_UNLOCK(&list_mutex_);
}
} // namespace common
\ No newline at end of file
} // namespace common
......@@ -255,7 +255,7 @@ void *Threadpool::run_thread(void *pool_ptr)
// this is not portable, but is easier to map to LWP
s64_t threadid = gettid();
LOG_INFO("threadid = %llx, threadname = %s\n", threadid, pool->get_name().c_str());
LOG_INFO("threadid = %llx, threadname = %s", threadid, pool->get_name().c_str());
#ifdef __APPLE__
pthread_setname_np(pool->get_name().c_str());
#else
......@@ -342,4 +342,4 @@ const Threadpool *Threadpool::get_thread_pool_ptr()
return (const Threadpool *)pthread_getspecific(pool_ptr_key_);
}
} // namespace common
\ No newline at end of file
} // namespace common
本文介绍 [MiniOB](https://github.com/oceanbase/miniob) 中的 clog 模块是如何工作的。
# 背景
持久化(Durability) 是事务中非常重要的一个模块,也是最复杂的一个模块,实现持久化才能保证数据不丢失。而持久化同时还要保证事务的原子性与数据完整性。如果对事务的一些概念不太了解,建议先学习了解事务的基本概念,比如学习[事务处理](lectures/lecture-6.md)章节,或者在网上搜索更多资料。
# MiniOB 中的持久化
## 实现简介
MiniOB 是一个学习使用的数据库,当前也在持续演进,持久化的功能是极不完善的,可以说实现了真正持久化功能的0.0001%。MiniOB 本身使用堆表保存数据,另外还有B+树当做索引,与传统数据库类似,会使用 buffer pool manager 管理堆表与索引数据在内存与磁盘中的存放。Buffer pool manager 会按照页来组织数据,在页上的修改,就会使用WAL(write append logging)记录日志,这里叫做clog。但是clog的实现是非常简化的,可以处理的异常场景也比较有限。希望大家通过这么简化的模块,了解一下数据库的日志与恢复的基本流程。
## 如何运行与测试
以mvcc模式启动miniob:
```bash
./bin/observer -f ../etc/observer.ini -s miniob.sock -t mvcc
```
客户端连接做操作,就可以看到 miniob/db/sys/clog 文件在增长。
如何测试日志恢复流程?
observer运行过程中产生了一些日志,这时执行 kill -9 `pidof observer` 将服务端进行强制杀死,然后再使用上面的启动命令将服务端启动起来即可。启动时,就会进入到恢复流程。
## CLog
CLog 的命名取自 [OceanBase](https://github.com/oceanbase/oceanbase) 中的日志模块,全称是 commit log。
当MiniOB启动时开启了mvcc模式,在运行时,如果有事务数据产生,就会生成日志,每一次操作对应一条日志(CLogRecord)。日志记录了当前操作的内容,比如插入一条数据、删除一条数据。
运行时生成的日志会先记录在内存中(CLogBuffer),当事务提交时,会将当前的事务以及之前的事务日志刷新到磁盘中。
刷新(写入)日志到磁盘时,并没有开启单独的线程,而是直接在调用刷盘(`CLogManager::sync`)的线程中直接写数据的。由于事务是并发运行的,会存在于多个线程中,因此`CLogBuffer::flush_buffer`做了简单粗暴的加锁控制,一次只有一个线程在刷日志。
对数据库比较了解的同学都知道,事务日志有逻辑日志、物理日志,或者混合类型的日志。那MiniOB的日志是什么?
日志中除了事务操作(提交、回滚)相关的日志,只有插入记录、删除记录两种日志,并且记录了操作的具体页面和槽位,因此算是混合日志。
**如何恢复的?**
在进程启动时,会初始化db对象,db对象会尝试加载日志(当然也是CLog模块干的),然后遍历这些日志调用事务模块的redo接口,将数据恢复出来。恢复的代码可以参考 `CLogManager::recover`
**当前的诸多缺陷**
当前CLog仅仅记录redo日志并没有undo日志,也没有记录每个页面的对应的日志编号,因此在使用相关的功能时会有很多限制。
在恢复时,默认是**初始状态**的buffer pool加载起来,然后从日志中恢复数据。当然,如果redo日志中没有提交的事务,可以回滚。最终redo完后,数据也没有问题。但是如果buffer pool中已经有了一部分数据,特别是有了没有提交事务的数据,那么这部分数据是无法回滚了。因为每个页面中没有日志编号,不知道自己记录的数据对应的哪个版本。
另外,日志记录时也没有处理各种异常情况,比如日志写一半失败了、磁盘满了,恢复时日志没有办法读取出来。
对于buffer pool中的数据,也没有办法保证一个页面是原子写入的,即一个页面要么都写入成功,要么都写入失败,文件系统没有这个保证,需要从应用层考虑解决这个问题。
**如何实现更完善的日志模块?**
日志不仅要考虑数据页面中数据的恢复,还要考虑一些元数据的操作,以及索引相关的操作。
元数据相关的操作包括buffer pool的管理,比如页面的分配、回收。索引当前使用的是B+树,那就需要考虑B+树页面的分配、回收,以及B+树的分裂、合并等操作。这些操作都需要记录日志,以便在恢复时能够恢复出来。这些操作不能完全对应着某一个事务,需要做一些特殊的处理。
**日志模块的性能瓶颈**
当前的日志实现非常低效,可以认为是单线程串行写日志,而且是每个事务完成都要刷新日志到磁盘,这个过程非常耗时。另外,生成日志的过程也是非常低效的,事务每增加一个操作,并且会追加到日志队列中。
**日志之外?**
日志系统是为了配合数据库做恢复,除了日志模块,还有一些需要做的事情,比如数据库的checkpoint,这个是为了减少恢复时的日志量,以及加快恢复速度。
**工具**
为了帮助定位与调试问题,写了一个简单的日志解析工具,可以在miniob编译后找到二进制文件clog_reader,使用方法如下:
```bash
clog_reader miniob/db/sys/
```
> 注意给的参数不是日志文件,而是日志文件所在的目录,工具会自动找到日志文件并解析。由于CLogFile设计缺陷,这里也不能指定文件名。
# 扩展
- [ARIES: A Transaction Recovery Method Supporting Fine-Granularity Locking and Partial Rollbacks Using Write-Ahead Logging](https://github.com/tpn/pdfs/blob/master/ARIES%20-%20A%20Transaction%20Recovery%20Method%20Supporting%20Fine-Granularity%20Locking%20and%20Partial%20Rollbacks%20Using%20Write-Ahead%20Logging%20(1992).pdf) 该论文提出了ARIES算法,这是一种基于日志记录的恢复算法,支持细粒度锁和部分回滚,可以在数据库崩溃时恢复事务。如果想要对steal/no-force的概念有比较详细的了解,可以直接阅读该论文相关部分。
- [Shadow paging](https://www.geeksforgeeks.org/shadow-paging-dbms/) 除了日志做恢复,Shadow paging 是另外一种做恢复的方法。
- [Aether: A Scalable Approach to Logging](http://www.pandis.net/resources/vldb10aether.pdf) 介绍可扩展日志系统的。
- [InnoDB之REDO LOG](http://catkang.github.io/2020/02/27/mysql-redo.html) 非常详细而且深入的介绍了一下InnoDB的redo日志。
- [B+树恢复](http://catkang.github.io/2022/10/05/btree-crash-recovery.html) 介绍如何恢复B+树。
- [CMU 15445 Logging](https://15445.courses.cs.cmu.edu/spring2023/slides/19-logging.pdf)
......@@ -183,12 +183,8 @@ MVCC很好的处理了只读事务与写事务的并发,只读事务可以在
- [A Critique of ANSI SQL Isolation Levels](https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/tr-95-51.pdf) 该论文针对ANSI SQL标准中隔离级别的定义进行了深入的分析,提出了一些改进的建议。天天看到RC/RR名词的,可以看看这篇论文,了解更详细一点。
- [ARIES: A Transaction Recovery Method Supporting Fine-Granularity Locking and Partial Rollbacks Using Write-Ahead Logging](https://github.com/tpn/pdfs/blob/master/ARIES%20-%20A%20Transaction%20Recovery%20Method%20Supporting%20Fine-Granularity%20Locking%20and%20Partial%20Rollbacks%20Using%20Write-Ahead%20Logging%20(1992).pdf) 该论文提出了ARIES算法,这是一种基于日志记录的恢复算法,支持细粒度锁和部分回滚,可以在数据库崩溃时恢复事务。如果想要对steal/no-force的概念有比较详细的了解,可以直接阅读该论文相关部分。
- [Granularity of Locks and Degrees of Consistency in a Shared Data Base](https://web.stanford.edu/class/cs245/readings/granularity-of-locks.pdf) 这也是一个又老又香的论文,提出了基于锁的并发控制。
- [Aether: A Scalable Approach to Logging](http://www.pandis.net/resources/vldb10aether.pdf) 介绍可扩展日志系统的。
- [An Empirical Evaluation of InMemory Multi-Version Concurrency Control](https://www.vldb.org/pvldb/vol10/p781-Wu.pdf) 介绍MVCC可扩展性的。通过这篇论文可以对MVCC有非常清晰的认识。
- [Scalable Garbage Collection for In-Memory MVCC Systems](http://www.vldb.org/pvldb/vol13/p128-bottcher.pdf) 这里对各种垃圾回收算法做了说明,并且有些创新算法。
......
......@@ -4,7 +4,7 @@ MESSAGE("Begin to build " obclient)
INCLUDE(CheckIncludeFiles)
#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...)
TARGET_INCLUDE_DIRECTORIES(obclient PRIVATE . ${PROJECT_SOURCE_DIR}/../deps /usr/local/include /usr/include)
TARGET_INCLUDE_DIRECTORIES(obclient PRIVATE . ${PROJECT_SOURCE_DIR}/deps /usr/local/include /usr/include)
# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面
#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include)
......@@ -28,7 +28,7 @@ FOREACH (F ${ALL_SRC})
ENDFOREACH (F)
# 指定目标文件位置
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/../bin)
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
MESSAGE("Binary directory:" ${EXECUTABLE_OUTPUT_PATH})
TARGET_SOURCES(obclient PRIVATE ${PRJ_SRC})
TARGET_LINK_LIBRARIES(obclient common pthread dl)
......
......@@ -24,7 +24,7 @@ FOREACH (F ${ALL_SRC})
ENDFOREACH (F)
SET(LIBRARIES common pthread dl event_pthreads event libjsoncpp.a)
SET(LIBRARIES common pthread dl libevent_pthreads.a libevent.a libjsoncpp.a)
# 指定目标文件位置
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/../../bin)
......@@ -38,7 +38,6 @@ ADD_LIBRARY(${PROJECT_NAME}_static STATIC ${LIB_SRC})
SET_TARGET_PROPERTIES(${PROJECT_NAME}_static PROPERTIES OUTPUT_NAME ${PROJECT_NAME})
TARGET_LINK_LIBRARIES(${PROJECT_NAME}_static ${LIBRARIES})
# Target 必须在定义 ADD_EXECUTABLE 之后, programs 不受这个限制
# TARGETS和PROGRAMS 的默认权限是OWNER_EXECUTE, GROUP_EXECUTE, 和WORLD_EXECUTE,即755权限, programs 都是处理脚本类
# 类型分为RUNTIME/LIBRARY/ARCHIVE, prog
......
......@@ -14,5 +14,15 @@ See the Mulan PSL v2 for more details. */
#pragma once
/// 磁盘文件,包括存放数据的文件和索引(B+-Tree)文件,都按照页来组织
/// 每一页都有一个编号,称为PageNum
using PageNum = int32_t;
/// 数据文件中按照页来组织,每一页会存放一些行数据(row),或称为记录(record)
/// 每一行(row/record),都占用一个槽位(slot),这些槽有一个编号,称为SlotNum
using SlotNum = int32_t;
/// LSN for log sequence number
using LSN = int32_t;
......@@ -23,7 +23,8 @@ class SessionEvent;
class Stmt;
class Command;
class SQLStageEvent : public common::StageEvent {
class SQLStageEvent : public common::StageEvent
{
public:
SQLStageEvent(SessionEvent *event, const std::string &sql);
virtual ~SQLStageEvent() noexcept;
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/5/29.
//
#include "global_context.h"
static GlobalContext global_context;
GlobalContext &GlobalContext::instance()
{
return global_context;
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/5/29.
//
#pragma once
class BufferPoolManager;
class DefaultHandler;
class TrxKit;
struct GlobalContext
{
BufferPoolManager *buffer_pool_manager_ = nullptr;
DefaultHandler *handler_ = nullptr;
TrxKit *trx_kit_ = nullptr;
static GlobalContext &instance();
};
#define GCTX GlobalContext::instance()
\ No newline at end of file
......@@ -27,6 +27,7 @@ See the Mulan PSL v2 for more details. */
#include "common/metrics/log_reporter.h"
#include "common/metrics/metrics_registry.h"
#include "session/session.h"
#include "session/session_stage.h"
#include "sql/executor/execute_stage.h"
#include "sql/optimizer/optimize_stage.h"
......@@ -39,6 +40,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/default/disk_buffer_pool.h"
#include "storage/default/default_handler.h"
#include "storage/trx/trx.h"
#include "global_context.h"
using namespace common;
......@@ -79,6 +81,8 @@ int init_log(ProcessParam *process_cfg, Ini &properties)
return 0;
}
auto log_context_getter = []() { return reinterpret_cast<intptr_t>(Session::current_session()); };
const std::string log_section_name = "LOG";
std::map<std::string, std::string> log_section = properties.get(log_section_name);
......@@ -115,6 +119,7 @@ int init_log(ProcessParam *process_cfg, Ini &properties)
}
LoggerFactory::init_default(log_file_name, log_level, console_level);
g_log->set_context_getter(log_context_getter);
key = ("DefaultLogModules");
it = log_section.find(key);
......@@ -159,13 +164,13 @@ int prepare_init_seda()
return 0;
}
int init_global_objects(ProcessParam *process_param)
int init_global_objects(ProcessParam *process_param, Ini &properties)
{
BufferPoolManager *bpm = new BufferPoolManager();
BufferPoolManager::set_instance(bpm);
GCTX.buffer_pool_manager_ = new BufferPoolManager();
BufferPoolManager::set_instance(GCTX.buffer_pool_manager_);
DefaultHandler *handler = new DefaultHandler();
DefaultHandler::set_default(handler);
GCTX.handler_ = new DefaultHandler();
DefaultHandler::set_default(GCTX.handler_);
int ret = 0;
RC rc = TrxKit::init_global(process_param->trx_kit_name().c_str());
......@@ -173,11 +178,13 @@ int init_global_objects(ProcessParam *process_param)
LOG_ERROR("failed to init trx kit. rc=%s", strrc(rc));
ret = -1;
}
GCTX.trx_kit_ = TrxKit::instance();
return ret;
}
int uninit_global_objects()
{
// TODO use global context
DefaultHandler *default_handler = &DefaultHandler::get_default();
if (default_handler != nullptr) {
DefaultHandler::set_default(nullptr);
......@@ -194,7 +201,6 @@ int uninit_global_objects()
int init(ProcessParam *process_param)
{
if (get_init()) {
return 0;
......@@ -236,7 +242,7 @@ int init(ProcessParam *process_param)
get_properties()->to_string(conf_data);
LOG_INFO("Output configuration \n%s", conf_data.c_str());
rc = init_global_objects(process_param);
rc = init_global_objects(process_param, *get_properties());
if (rc != 0) {
LOG_ERROR("failed to init global objects");
return rc;
......
......@@ -40,6 +40,7 @@ void usage()
std::cout << "-s: use unix socket and the argument is socket address" << std::endl;
std::cout << "-P: protocol. {plain(default), mysql}." << std::endl;
std::cout << "-t: transaction model. {vacuous(default), mvcc}." << std::endl;
std::cout << "-n: buffer pool memory size in byte" << std::endl;
exit(0);
}
......@@ -54,7 +55,7 @@ void parse_parameter(int argc, char **argv)
// Process args
int opt;
extern char *optarg;
while ((opt = getopt(argc, argv, "dp:P:s:t:f:o:e:h")) > 0) {
while ((opt = getopt(argc, argv, "dp:P:s:t:f:o:e:hn:")) > 0) {
switch (opt) {
case 's':
process_param->set_unix_socket_path(optarg);
......@@ -80,6 +81,9 @@ void parse_parameter(int argc, char **argv)
case 't':
process_param->set_trx_kit_name(optarg);
break;
case 'n':
process_param->set_buffer_pool_memory_size(atoi(optarg));
break;
case 'h':
default:
usage();
......
......@@ -164,7 +164,8 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
}
RC rc = sql_result->open();
if (rc != RC::SUCCESS) {
if (OB_FAIL(rc)) {
sql_result->close();
sql_result->set_return_code(rc);
return write_state(event, need_disconnect);
}
......@@ -276,7 +277,7 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
}
RC rc_close = sql_result->close();
if (rc == RC::SUCCESS) {
if (OB_SUCC(rc)) {
rc = rc_close;
}
return rc;
......
......@@ -54,6 +54,7 @@ See the Mulan PSL v2 for more details. */
DEFINE_RC(IOERR_CLOSE) \
DEFINE_RC(IOERR_SEEK) \
DEFINE_RC(IOERR_TOO_LONG) \
DEFINE_RC(IOERR_SYNC) \
DEFINE_RC(LOCKED_UNLOCK) \
DEFINE_RC(LOCKED_NEED_WAIT) \
DEFINE_RC(LOCKED_CONCURRENCY_CONFLICT) \
......
......@@ -16,6 +16,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/trx/trx.h"
#include "storage/common/db.h"
#include "storage/default/default_handler.h"
#include "global_context.h"
Session &Session::default_session()
{
......@@ -28,8 +29,10 @@ Session::Session(const Session &other) : db_(other.db_)
Session::~Session()
{
delete trx_;
trx_ = nullptr;
if (nullptr != trx_) {
GCTX.trx_kit_->destroy_trx(trx_);
trx_ = nullptr;
}
}
const char *Session::get_current_db_name() const
......@@ -71,7 +74,19 @@ bool Session::is_trx_multi_operation_mode() const
Trx *Session::current_trx()
{
if (trx_ == nullptr) {
trx_ = TrxKit::instance()->create_trx();
trx_ = GCTX.trx_kit_->create_trx(db_->clog_manager());
}
return trx_;
}
thread_local Session *thread_session = nullptr;
void Session::set_current_session(Session *session)
{
thread_session = session;
}
Session *Session::current_session()
{
return thread_session;
}
......@@ -42,6 +42,9 @@ public:
Trx *current_trx();
static void set_current_session(Session *session);
static Session *current_session();
private:
Db *db_ = nullptr;
Trx *trx_ = nullptr;
......
......@@ -71,48 +71,36 @@ bool SessionStage::set_properties()
// Initialize stage params and validate outputs
bool SessionStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
query_cache_stage_ = *(stgp++);
MetricsRegistry &metricsRegistry = get_metrics_registry();
sql_metric_ = new SimpleTimer();
metricsRegistry.register_metric(SQL_METRIC_TAG, sql_metric_);
LOG_TRACE("Exit");
return true;
}
// Cleanup after disconnection
void SessionStage::cleanup()
{
LOG_TRACE("Enter");
MetricsRegistry &metricsRegistry = get_metrics_registry();
if (sql_metric_ != nullptr) {
metricsRegistry.unregister(SQL_METRIC_TAG);
delete sql_metric_;
sql_metric_ = nullptr;
}
LOG_TRACE("Exit");
}
void SessionStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
// right now, we just support only one event.
handle_request(event);
LOG_TRACE("Exit\n");
return;
}
void SessionStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
SessionEvent *sev = dynamic_cast<SessionEvent *>(event);
if (nullptr == sev) {
LOG_ERROR("Cannot cat event to sessionEvent");
......@@ -126,8 +114,8 @@ void SessionStage::callback_event(StageEvent *event, CallbackContext *context)
if (need_disconnect) {
Server::close_connection(communicator);
}
Session::set_current_session(nullptr);
LOG_TRACE("Exit\n");
return;
}
......@@ -150,13 +138,13 @@ void SessionStage::handle_request(StageEvent *event)
CompletionCallback *cb = new (std::nothrow) CompletionCallback(this, nullptr);
if (cb == nullptr) {
LOG_ERROR("Failed to new callback for SessionEvent");
sev->done_immediate();
return;
}
sev->push_callback(cb);
Session::set_current_session(sev->session());
SQLStageEvent *sql_event = new SQLStageEvent(sev, sql);
query_cache_stage_->handle_event(sql_event);
}
......@@ -88,41 +88,29 @@ bool ExecuteStage::set_properties()
//! Initialize stage params and validate outputs
bool ExecuteStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
default_storage_stage_ = *(stgp++);
mem_storage_stage_ = *(stgp++);
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void ExecuteStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void ExecuteStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter");
handle_request(event);
LOG_TRACE("Exit");
return;
}
void ExecuteStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter");
// here finish read all data from disk or network, but do nothing here.
LOG_TRACE("Exit");
return;
}
......
......@@ -16,6 +16,7 @@ See the Mulan PSL v2 for more details. */
#include "sql/executor/sql_result.h"
#include "session/session.h"
#include "storage/trx/trx.h"
#include "common/log/log.h"
SqlResult::SqlResult(Session *session) : session_(session)
{}
......@@ -71,3 +72,9 @@ RC SqlResult::next_tuple(Tuple *&tuple)
tuple = operator_->current_tuple();
return rc;
}
void SqlResult::set_operator(std::unique_ptr<PhysicalOperator> oper)
{
ASSERT(operator_ == nullptr, "current operator is not null. Result is not closed?");
operator_ = std::move(oper);
}
......@@ -39,10 +39,8 @@ public:
state_string_ = state_string;
}
void set_operator(std::unique_ptr<PhysicalOperator> oper)
{
operator_ = std::move(oper);
}
void set_operator(std::unique_ptr<PhysicalOperator> oper);
bool has_operator() const
{
return operator_ != nullptr;
......
......@@ -82,26 +82,19 @@ bool OptimizeStage::set_properties()
//! Initialize stage params and validate outputs
bool OptimizeStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
execute_stage_ = *(stgp++);
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void OptimizeStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void OptimizeStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter");
SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
SqlResult *sql_result = sql_event->session_event()->sql_result();
......@@ -111,7 +104,6 @@ void OptimizeStage::handle_event(StageEvent *event)
} else {
execute_stage_->handle_event(event);
}
LOG_TRACE("Exit");
}
RC OptimizeStage::handle_request(SQLStageEvent *sql_event)
{
......@@ -167,8 +159,6 @@ RC OptimizeStage::generate_physical_plan(
void OptimizeStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
return;
}
......
......@@ -65,13 +65,10 @@ bool ParseStage::set_properties()
//! Initialize stage params and validate outputs
bool ParseStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
// optimize_stage_ = *(stgp++);
resolve_stage_ = *(stgp++);
LOG_TRACE("Exit");
return true;
}
......@@ -85,8 +82,6 @@ void ParseStage::cleanup()
void ParseStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
RC rc = handle_request(event);
if (RC::SUCCESS != rc) {
callback_event(event, nullptr);
......@@ -104,17 +99,14 @@ void ParseStage::handle_event(StageEvent *event)
resolve_stage_->handle_event(event);
event->done_immediate();
LOG_TRACE("Exit\n");
return;
}
void ParseStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
sql_event->session_event()->done_immediate();
sql_event->done_immediate();
LOG_TRACE("Exit\n");
return;
}
......
......@@ -66,27 +66,19 @@ bool ResolveStage::set_properties()
//! Initialize stage params and validate outputs
bool ResolveStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
plan_cache_stage_ = *(stgp++);
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void ResolveStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void ResolveStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
if (nullptr == sql_event) {
LOG_WARN("failed to get sql stage event");
......@@ -115,14 +107,10 @@ void ResolveStage::handle_event(StageEvent *event)
plan_cache_stage_->handle_event(sql_event);
LOG_TRACE("Exit\n");
return;
}
void ResolveStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
LOG_TRACE("Exit\n");
return;
}
......@@ -62,28 +62,20 @@ bool PlanCacheStage::set_properties()
//! Initialize stage params and validate outputs
bool PlanCacheStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
// execute_stage = *(stgp++);
optimizer_stage_ = *(stgp++);
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void PlanCacheStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void PlanCacheStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
// Add callback to update plan cache
/*
CompletionCallback *cb = new (std::nothrow) CompletionCallback(this, nullptr);
......@@ -98,17 +90,13 @@ void PlanCacheStage::handle_event(StageEvent *event)
// do nothing here, pass the event to the next stage
optimizer_stage_->handle_event(event);
LOG_TRACE("Exit\n");
return;
}
void PlanCacheStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
// update execute plan here
// event->done_immediate();
LOG_TRACE("Exit\n");
return;
}
......@@ -62,36 +62,25 @@ bool QueryCacheStage::set_properties()
//! Initialize stage params and validate outputs
bool QueryCacheStage::initialize()
{
LOG_TRACE("Enter");
std::list<Stage *>::iterator stgp = next_stage_list_.begin();
parser_stage_ = *(stgp++);
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void QueryCacheStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void QueryCacheStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
parser_stage_->handle_event(event);
LOG_TRACE("Exit\n");
return;
}
void QueryCacheStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
LOG_TRACE("Exit\n");
return;
}
......@@ -14,6 +14,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/buffer/frame.h"
#include "session/thread_data.h"
#include "session/session.h"
using namespace std;
......@@ -54,6 +55,7 @@ string to_string(const FrameId &frame_id)
////////////////////////////////////////////////////////////////////////////////
intptr_t get_default_debug_xid()
{
#if 0
ThreadData *thd = ThreadData::current();
intptr_t xid = (thd == nullptr) ?
// pthread_self的返回值类型是pthread_t,pthread_t在linux和mac上不同
......@@ -61,7 +63,13 @@ intptr_t get_default_debug_xid()
// 就将pthread_self返回值转换两次
reinterpret_cast<intptr_t>(reinterpret_cast<void*>(pthread_self())) :
reinterpret_cast<intptr_t>(thd);
return xid;
#endif
Session *session = Session::current_session();
if (session == nullptr) {
return reinterpret_cast<intptr_t>(reinterpret_cast<void*>(pthread_self()));
} else {
return reinterpret_cast<intptr_t>(session);
}
}
void Frame::write_latch()
......@@ -101,6 +109,7 @@ void Frame::write_unlatch()
void Frame::write_unlatch(intptr_t xid)
{
// 因为当前已经加着写锁,而且写锁只有一个,所以不再加debug_lock来做校验
debug_lock_.lock();
ASSERT(pin_count_.load() > 0,
"frame lock. write unlock failed while pin count is invalid."
......@@ -118,6 +127,8 @@ void Frame::write_unlatch(intptr_t xid)
if (--write_recursive_count_ == 0) {
write_locker_ = 0;
}
debug_lock_.unlock();
lock_.unlock();
}
......
......@@ -24,6 +24,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/buffer/page.h"
#include "common/log/log.h"
#include "common/lang/mutex.h"
#include "defs.h"
class FrameId
{
......@@ -46,7 +47,7 @@ class Frame
public:
~Frame()
{
LOG_INFO("deallocate frame. this=%p, lbt=%s", this, common::lbt());
// LOG_DEBUG("deallocate frame. this=%p, lbt=%s", this, common::lbt());
}
/**
......
......@@ -15,20 +15,9 @@ See the Mulan PSL v2 for more details. */
#pragma once
#include <stdint.h>
#include "defs.h"
using TrxID = int32_t;
using SpaceID = int32_t;
/// 磁盘文件,包括存放数据的文件和索引(B+-Tree)文件,都按照页来组织
/// 每一页都有一个编号,称为PageNum
using PageNum = int32_t;
/// 数据文件中按照页来组织,每一页会存放一些行数据(row),或称为记录(record)
/// 每一行(row/record),都占用一个槽位(slot),这些槽有一个编号,称为SlotNum
using SlotNum = int32_t;
/// LSN for log sequence number
using LSN = int32_t;
static constexpr int BP_INVALID_PAGE_NUM = -1;
static constexpr int INVALID_TRX_ID = -1;
......
此差异已折叠。
......@@ -12,252 +12,402 @@ See the Mulan PSL v2 for more details. */
// Created by huhaosheng.hhs on 2022
//
#ifndef __OBSERVER_STORAGE_REDO_REDOLOG_H_
#define __OBSERVER_STORAGE_REDO_REDOLOG_H_
#pragma once
#include <stddef.h>
#include <stdint.h>
#include <list>
#include <atomic>
#include <unordered_map>
#include <deque>
#include <memory>
#include <string>
#include "storage/record/record.h"
#include "storage/persist/persist.h"
#include "common/lang/mutex.h"
#include "rc.h"
// 固定文件大小 TODO: 循环文件组
#define CLOG_FILE_SIZE 48 * 1024 * 1024
#define CLOG_BUFFER_SIZE 4 * 1024 * 1024
#define TABLE_NAME_MAX_LEN 20 // TODO: 表名不要超过20字节
class CLogManager;
class CLogBuffer;
class CLogFile;
class PersistHandler;
struct CLogRecordHeader;
struct CLogFileHeader;
struct CLogBlockHeader;
struct CLogBlock;
struct CLogMTRManager;
enum CLogType { REDO_ERROR = 0, REDO_MTR_BEGIN, REDO_MTR_COMMIT, REDO_INSERT, REDO_DELETE };
class Db;
/**
* @defgroup CLog
* @file clog.h
* @brief CLog 就是 commit log
* @details 这个模块想要实现数据库事务中的D(durability),也就是持久化。
* 持久化是事务四大特性(ACID)中最复杂的模块,这里的实现简化了99.999%,仅在一些特定场景下才能
* 恢复数据库。
*/
/**
* @enum CLogType
* @ingroup CLog
* @brief 定义clog的几种类型
* @details 除了事务操作相关的类型,比如MTR_BEGIN/MTR_COMMIT等,都是需要事务自己去处理的。
* 也就是说,像INSERT、DELETE等是事务自己处理的,其实这种类型的日志不需要在这里定义,而是在各个
* 事务模型中定义,由各个事务模型自行处理。
*/
#define DEFINE_CLOG_TYPE_ENUM \
DEFINE_CLOG_TYPE(ERROR) \
DEFINE_CLOG_TYPE(MTR_BEGIN) \
DEFINE_CLOG_TYPE(MTR_COMMIT) \
DEFINE_CLOG_TYPE(MTR_ROLLBACK) \
DEFINE_CLOG_TYPE(INSERT) \
DEFINE_CLOG_TYPE(DELETE)
enum class CLogType
{
#define DEFINE_CLOG_TYPE(name) name,
DEFINE_CLOG_TYPE_ENUM
#undef DEFINE_CLOG_TYPE
};
struct CLogRecordHeader {
int32_t lsn_;
int32_t trx_id_;
int type_;
int logrec_len_;
/**
* @brief clog type 转换成字符串
*/
const char *clog_type_name(CLogType type);
/**
* @brief clog type 转换成数字
*/
int32_t clog_type_to_integer(CLogType type);
/**
* @brief 数字转换成clog type
*/
CLogType clog_type_from_integer(int32_t value);
/**
* @brief CLog的记录头。每个日志都带有这个信息
*/
struct CLogRecordHeader
{
int32_t lsn_ = -1; /// log sequence number。当前没有使用
int32_t trx_id_ = -1; /// 日志所属事务的编号
int32_t type_ = clog_type_to_integer(CLogType::ERROR); /// 日志类型
int32_t logrec_len_ = 0; /// record的长度,不包含header长度
bool operator==(const CLogRecordHeader &other) const
{
return lsn_ == other.lsn_ && trx_id_ == other.trx_id_ && type_ == other.type_ && logrec_len_ == other.logrec_len_;
}
};
struct CLogInsertRecord {
CLogRecordHeader hdr_;
char table_name_[TABLE_NAME_MAX_LEN];
RID rid_;
int data_len_;
char *data_;
bool operator==(const CLogInsertRecord &other) const
{
return hdr_ == other.hdr_ && (strcmp(table_name_, other.table_name_) == 0) && (rid_ == other.rid_) &&
(data_len_ == other.data_len_) && (memcmp(data_, other.data_, data_len_) == 0);
}
};
struct CLogDeleteRecord {
CLogRecordHeader hdr_;
char table_name_[TABLE_NAME_MAX_LEN];
RID rid_;
bool operator==(const CLogDeleteRecord &other) const
{
return hdr_ == other.hdr_ && strcmp(table_name_, other.table_name_) == 0 && rid_ == other.rid_;
}
std::string to_string() const;
};
struct CLogMTRRecord {
CLogRecordHeader hdr_;
/**
* @brief MTR_COMMIT 日志的数据
* 其它的类型的MTR日志都没有数据,只有COMMIT有。
*/
struct CLogRecordCommitData
{
int32_t commit_xid_ = -1; /// 事务提交的事务号
bool operator==(const CLogMTRRecord &other) const
bool operator == (const CLogRecordCommitData &other) const
{
return hdr_ == other.hdr_;
return this->commit_xid_ == other.commit_xid_;
}
};
union CLogRecords {
CLogInsertRecord ins;
CLogDeleteRecord del;
CLogMTRRecord mtr;
char *errors;
std::string to_string() const;
};
class CLogRecord {
friend class Db;
public:
// TODO: lsn当前在内部分配
// 对齐在内部处理
CLogRecord(CLogType flag, int32_t trx_id, const char *table_name = nullptr, int data_len = 0, Record *rec = nullptr);
// 从外存恢复log record
CLogRecord(char *data);
~CLogRecord();
CLogType get_log_type()
{
return flag_;
}
int32_t get_trx_id()
{
return log_record_.mtr.hdr_.trx_id_;
}
int32_t get_logrec_len()
{
return log_record_.mtr.hdr_.logrec_len_;
}
int32_t get_lsn()
/**
* @brief 有具体数据修改的事务日志数据
* @details 这里记录的都是操作的记录,比如插入、删除一条数据。
*/
struct CLogRecordData
{
int32_t table_id_ = -1; /// 操作的表
RID rid_; /// 操作的哪条记录
int32_t data_len_ = 0; /// 记录的数据长度(因为header中也包含长度信息,这个长度可以不要)
int32_t data_offset_ = 0; /// 操作的数据在完整记录中的偏移量
char * data_ = nullptr; /// 具体的数据,可能没有任何数据
~CLogRecordData();
bool operator==(const CLogRecordData &other) const
{
return log_record_.mtr.hdr_.lsn_;
return table_id_ == other.table_id_ &&
rid_ == other.rid_ &&
data_len_ == other.data_len_ &&
data_offset_ == other.data_offset_ &&
0 == memcmp(data_, other.data_, data_len_);
}
RC copy_record(void *dest, int start_off, int copy_len);
/// for unitest
int cmp_eq(CLogRecord *other);
CLogRecords *get_record()
{
return &log_record_;
}
///
std::string to_string() const;
protected:
CLogType flag_;
CLogRecords log_record_;
const static int32_t HEADER_SIZE; /// 指RecordData的头长度,即不包含data_的长度
};
// TODO: 当前为简单实现,无循环
class CLogBuffer {
/**
* @brief 表示一条日志记录
* @details 一条日志记录由一个日志头和具体的数据构成。
* 具体的数据根据日志类型不同,也是不同的类型。
*/
class CLogRecord
{
public:
CLogBuffer();
~CLogBuffer();
/**
* @brief 默认构造函数。
* @details 通常不需要直接调用这个函数来创建一条日志,而是调用 `build_xxx`创建对象。
*/
CLogRecord() = default;
RC append_log_record(CLogRecord *log_rec, int &start_off);
// 将buffer中的数据下刷到log_file
RC flush_buffer(CLogFile *log_file);
void set_current_block_no(const int32_t block_no)
{
current_block_no_ = block_no;
}
void set_write_block_offset(const int32_t write_block_offset)
{
write_block_offset_ = write_block_offset;
};
void set_write_offset(const int32_t write_offset)
{
write_offset_ = write_offset;
};
~CLogRecord();
RC block_copy(int32_t offset, CLogBlock *log_block);
/**
* @brief 创建一个事务相关的日志对象
* @details 除了MTR_COMMIT的日志(请参考 build_commit_record)。
* @param type 日志类型
* @param trx_id 事务编号
*/
static CLogRecord *build_mtr_record(CLogType type, int32_t trx_id);
/**
* @brief 创建一个表示提交事务的日志对象
*
* @param trx_id 事务编号
* @param commit_xid 事务提交时使用的编号
*/
static CLogRecord *build_commit_record(int32_t trx_id, int32_t commit_xid);
/**
* @brief 创建一个表示数据操作的日志对象
*
* @param type 类型
* @param trx_id 事务编号
* @param table_id 操作的表
* @param rid 操作的哪条记录
* @param data_len 数据的长度
* @param data_offset 偏移量,参考 CLogRecordData::data_offset_
* @param data 具体的数据
*/
static CLogRecord *build_data_record(CLogType type,
int32_t trx_id,
int32_t table_id,
const RID &rid,
int32_t data_len,
int32_t data_offset,
const char *data);
/**
* @brief 根据二进制数据创建日志对象
* @details 通常是从日志文件中读取数据,然后调用此函数创建日志对象
* @param header 日志头信息
* @param data 读取的剩余数据信息,长度是header.logrec_len_
*/
static CLogRecord *build(const CLogRecordHeader &header, char *data);
CLogType log_type() const { return clog_type_from_integer(header_.type_); }
int32_t trx_id() const { return header_.trx_id_; }
int32_t logrec_len() const { return header_.logrec_len_; }
CLogRecordHeader &header() { return header_; }
CLogRecordCommitData &commit_record() { return commit_record_; }
CLogRecordData &data_record() { return data_record_; }
const CLogRecordHeader &header() const { return header_; }
const CLogRecordCommitData &commit_record() const { return commit_record_; }
const CLogRecordData &data_record() const { return data_record_; }
std::string to_string() const;
protected:
int32_t current_block_no_;
int32_t write_block_offset_;
int32_t write_offset_;
char buffer_[CLOG_BUFFER_SIZE];
};
//
#define CLOG_FILE_HDR_SIZE (sizeof(CLogFileHeader))
#define CLOG_BLOCK_SIZE (1 << 9)
#define CLOG_BLOCK_DATA_SIZE (CLOG_BLOCK_SIZE - sizeof(CLogBlockHeader))
#define CLOG_BLOCK_HDR_SIZE (sizeof(CLogBlockHeader))
#define CLOG_REDO_BUFFER_SIZE 8 * CLOG_BLOCK_SIZE
struct CLogRecordBuf {
int32_t write_offset_;
// TODO: 当前假定log record大小不会超过CLOG_REDO_BUFFER_SIZE
char buffer_[CLOG_REDO_BUFFER_SIZE];
};
CLogRecordHeader header_; /// 日志头信息
struct CLogFileHeader {
int32_t current_file_real_offset_;
// TODO: 用于文件组,当前没用
int32_t current_file_lsn_;
CLogRecordData data_record_; /// 如果日志操作的是数据,此结构生效
CLogRecordCommitData commit_record_; /// 如果是事务提交日志,此结构生效
};
struct CLogFHDBlock {
CLogFileHeader hdr_;
char pad[CLOG_BLOCK_SIZE - CLOG_FILE_HDR_SIZE];
};
struct CLogBlockHeader {
int32_t log_block_no; // 在文件中的offset no=n*CLOG_BLOCK_SIZE
int16_t log_data_len_;
int16_t first_rec_offset_;
};
/**
* @brief 缓存运行时产生的日志对象
* @details 当前的实现非常简单,没有采用其它数据库中常用的将日志序列化到二进制buffer,
* 管理二进制buffer的方法。这里仅仅把日志记录下来,放到链表中。如果达到一定量的日志,
* 或者日志数量超过某个阈值,就会调用flush_buffer将日志刷新到磁盘中。
*/
class CLogBuffer
{
public:
CLogBuffer();
~CLogBuffer();
struct CLogBlock {
CLogBlockHeader log_block_hdr_;
char data[CLOG_BLOCK_DATA_SIZE];
/**
* @brief 增加一条日志
* @details 如果当前的日志达到一定量,就会刷新数据
*/
RC append_log_record(CLogRecord *log_record);
/**
* @brief 将当前的日志都刷新到日志文件中
* @details 因为多线程访问与日志管理的问题,只能有一个线程调用此函数
* @param log_file 日志文件
*/
RC flush_buffer(CLogFile &log_file);
private:
/**
* @brief 将日志记录写入到日志文件中
*
* @param log_file 日志文件,概念上来讲不一定是某个特定的文件
* @param log_record 要写入的日志记录
*/
RC write_log_record(CLogFile &log_file, CLogRecord *log_record);
private:
common::Mutex lock_; /// 加锁支持多线程并发写入
std::deque<std::unique_ptr<CLogRecord>> log_records_; /// 当前等待刷数据的日志记录
std::atomic_int32_t total_size_; /// 当前缓存中的日志记录的总大小
};
class CLogFile {
/**
* @brief 读写日志文件
* @details 这里的名字不太贴切,因为这个类希望管理所有日志文件,而不是特定的某个文件。不过当前
* 只有一个文件,并且文件名是固定的。
*/
class CLogFile
{
public:
CLogFile(const char *path);
CLogFile() = default;
~CLogFile();
RC update_log_fhd(int32_t current_file_lsn);
RC append(int data_len, char *data);
RC write(uint64_t offset, int data_len, char *data);
RC recover(CLogMTRManager *mtr_mgr, CLogBuffer *log_buffer);
RC block_recover(CLogBlock *block, int16_t &offset, CLogRecordBuf *logrec_buf, CLogRecord *&log_rec);
/**
* @brief 初始化
*
* @param path 日志文件存放的路径。会打开这个目录下叫做 clog 的文件。
*/
RC init(const char *path);
/**
* @brief 写入指定数据,全部写入成功返回成功,否则返回失败
* @details 作为日志文件读写的类,实现一个write_log_record可能更合适。
* @note 如果日志文件写入一半失败了,应该做特殊处理,但是这里什么都没管。
* @param data 写入的数据
* @param len 数据的长度
*/
RC write(const char *data, int len);
/**
* @brief 读取指定长度的数据。全部读取成功返回成功,否则返回失败
* @details 与 write 有类似的问题。如果读取到了文件尾,会标记eof,可以通过eof()函数来判断。
* @param data 数据读出来放这里
* @param len 读取的长度
*/
RC read(char *data, int len);
/**
* @brief 将当前写的文件执行sync同步数据到磁盘
*/
RC sync();
/**
* @brief 获取当前读取的文件位置
*/
RC offset(int64_t &off) const;
/**
* @brief 当前是否已经读取到文件尾
*/
bool eof() const { return eof_; }
protected:
CLogFHDBlock log_fhd_;
PersistHandler *log_file_;
std::string filename_; /// 日志文件名。总是init函数参数path路径下的clog文件
int fd_ = -1; /// 操作的文件描述符
bool eof_ = false; /// 是否已经读取到文件尾
};
// TODO: 当前简单管理mtr
struct CLogMTRManager {
std::list<CLogRecord *> log_redo_list;
std::unordered_map<int32_t, bool> trx_commited; // <trx_id, commited>
/**
* @brief 日志记录遍历器
* @details 使用时先执行初始化(init),然后多次调用next,直到valid返回false。
*/
class CLogRecordIterator
{
public:
CLogRecordIterator() = default;
~CLogRecordIterator() = default;
void log_record_manage(CLogRecord *log_rec);
RC init(CLogFile &log_file);
~CLogMTRManager();
bool valid() const;
RC next();
const CLogRecord &log_record();
private:
CLogFile *log_file_ = nullptr;
CLogRecord *log_record_ = nullptr;
};
//
class CLogManager {
/**
* @brief 日志管理器
* @details 一个日志管理器属于某一个DB(当前仅有一个DB sys)。
* 管理器负责写日志(运行时)、读日志与恢复(启动时)
*/
class CLogManager
{
public:
CLogManager(const char *path);
CLogManager() = default;
~CLogManager();
RC init();
RC clog_gen_record(CLogType flag, int32_t trx_id, CLogRecord *&log_rec, const char *table_name = nullptr,
int data_len = 0, Record *rec = nullptr);
// 追加写到log_buffer
RC clog_append_record(CLogRecord *log_rec);
// 通常不需要在外部调用
RC clog_sync();
// TODO: 优化回放过程,对同一位置的修改可以用哈希聚合
RC recover();
CLogMTRManager *get_mtr_manager();
static int32_t get_next_lsn(int32_t rec_len);
static std::atomic<int32_t> gloabl_lsn_;
protected:
CLogBuffer *log_buffer_;
CLogFile *log_file_;
CLogMTRManager *log_mtr_mgr_;
/**
* @brief 初始化日志管理器
*
* @param path 日志都放在这个目录下。当前就是数据库的目录
*/
RC init(const char *path);
/**
* @brief 新增一条数据更新的日志
*/
RC append_log(CLogType type,
int32_t trx_id,
int32_t table_id,
const RID &rid,
int32_t data_len,
int32_t data_offset,
const char *data);
/**
* @brief 开启一个事务
*
* @param trx_id 事务编号
*/
RC begin_trx(int32_t trx_id);
/**
* @brief 提交一个事务
*
* @param trx_id 事务编号
* @param commit_xid 事务提交时使用的编号
*/
RC commit_trx(int32_t trx_id, int32_t commit_xid);
/**
* @brief 回滚一个事务
*
* @param trx_id 事务编号
*/
RC rollback_trx(int32_t trx_id);
/**
* @brief 也可以调用这个函数直接增加一条日志
*/
RC append_log(CLogRecord *log_record);
/**
* @brief 刷新日志到磁盘
*/
RC sync();
/**
* @brief 重做
* @details 当前会重做所有日志。也就是说,所有buffer pool页面都不会写入到磁盘中,
* 否则可能无法恢复成功。
*/
RC recover(Db *db);
private:
CLogBuffer *log_buffer_ = nullptr; /// 日志缓存。新增日志时先放到内存,也就是这个buffer中
CLogFile * log_file_ = nullptr; /// 管理日志,比如读写日志
};
#endif // __OBSERVER_STORAGE_REDO_REDOLOG_H_
......@@ -37,7 +37,6 @@ Db::~Db()
RC Db::init(const char *name, const char *dbpath)
{
if (common::is_blank(name)) {
LOG_ERROR("Failed to init DB, name cannot be empty");
return RC::INVALID_ARGUMENT;
......@@ -48,16 +47,33 @@ RC Db::init(const char *name, const char *dbpath)
return RC::INTERNAL;
}
clog_manager_ = new CLogManager(dbpath);
clog_manager_.reset(new CLogManager());
if (clog_manager_ == nullptr) {
LOG_ERROR("Failed to init CLogManager.");
return RC::INTERNAL;
return RC::NOMEM;
}
RC rc = clog_manager_->init(dbpath);
if (OB_FAIL(rc)) {
LOG_WARN("failed to init clog manager. dbpath=%s, rc=%s", dbpath, strrc(rc));
return rc;
}
name_ = name;
path_ = dbpath;
return open_all_tables();
rc = open_all_tables();
if (OB_FAIL(rc)) {
LOG_WARN("failed to open all tables. dbpath=%s, rc=%s", dbpath, strrc(rc));
return rc;
}
rc = recover();
if (OB_FAIL(rc)) {
LOG_WARN("failed to recover db. dbpath=%s, rc=%s", dbpath, strrc(rc));
return rc;
}
return rc;
}
RC Db::create_table(const char *table_name, int attribute_count, const AttrInfo *attributes)
......@@ -72,8 +88,7 @@ RC Db::create_table(const char *table_name, int attribute_count, const AttrInfo
// 文件路径可以移到Table模块
std::string table_file_path = table_meta_file(path_.c_str(), table_name);
Table *table = new Table();
rc = table->create(
table_file_path.c_str(), table_name, path_.c_str(), attribute_count, attributes);
rc = table->create(next_table_id_++, table_file_path.c_str(), table_name, path_.c_str(), attribute_count, attributes);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to create table %s.", table_name);
delete table;
......@@ -94,6 +109,16 @@ Table *Db::find_table(const char *table_name) const
return nullptr;
}
Table *Db::find_table(int32_t table_id) const
{
for (auto pair : opened_tables_) {
if (pair.second->table_id() == table_id) {
return pair.second;
}
}
return nullptr;
}
RC Db::open_all_tables()
{
std::vector<std::string> table_meta_files;
......@@ -121,6 +146,9 @@ RC Db::open_all_tables()
return RC::INTERNAL;
}
if (table->table_id() >= next_table_id_) {
next_table_id_ = table->table_id();
}
opened_tables_[table->name()] = table;
LOG_INFO("Open table: %s, file: %s", table->name(), filename.c_str());
}
......@@ -157,78 +185,12 @@ RC Db::sync()
return rc;
}
#if 0
RC Db::recover()
{
RC rc = RC::SUCCESS;
if ((rc = clog_manager_->recover()) == RC::SUCCESS) {
int32_t max_trx_id = 0;
CLogMTRManager *mtr_manager = clog_manager_->get_mtr_manager();
for (auto it = mtr_manager->log_redo_list.begin(); it != mtr_manager->log_redo_list.end(); it++) {
CLogRecord *clog_record = *it;
if (clog_record->get_log_type() != CLogType::REDO_INSERT &&
clog_record->get_log_type() != CLogType::REDO_DELETE) {
delete clog_record;
continue;
}
auto find_iter = mtr_manager->trx_commited.find(clog_record->get_trx_id());
if (find_iter == mtr_manager->trx_commited.end()) {
LOG_ERROR("CLog record without commit message! "); // unexpected error
delete clog_record;
return RC::INTERNAL;
} else if (find_iter->second == false) {
delete clog_record;
continue;
}
Table *table = find_table(clog_record->log_record_.ins.table_name_);
if (table == nullptr) {
delete clog_record;
continue;
}
switch (clog_record->get_log_type()) {
case CLogType::REDO_INSERT: {
char *record_data = new char[clog_record->log_record_.ins.data_len_];
memcpy(record_data, clog_record->log_record_.ins.data_, clog_record->log_record_.ins.data_len_);
Record record;
record.set_data(record_data);
record.set_rid(clog_record->log_record_.ins.rid_);
rc = table->recover_insert_record(&record);
delete[] record_data;
} break;
case CLogType::REDO_DELETE: {
Record record;
record.set_rid(clog_record->log_record_.del.rid_);
rc = table->recover_delete_record(&record);
} break;
default: {
rc = RC::SUCCESS;
}
}
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to recover. rc=%d:%s", rc, strrc(rc));
break;
}
if (max_trx_id < clog_record->get_trx_id()) {
max_trx_id = clog_record->get_trx_id();
}
delete clog_record;
}
if (rc == RC::SUCCESS && max_trx_id > 0) {
Trx::set_trx_id(max_trx_id);
}
}
return rc;
return clog_manager_->recover(this);
}
#endif
CLogManager *Db::get_clog_manager()
CLogManager *Db::clog_manager()
{
return clog_manager_;
return clog_manager_.get();
}
\ No newline at end of file
......@@ -17,6 +17,7 @@ See the Mulan PSL v2 for more details. */
#include <vector>
#include <string>
#include <unordered_map>
#include <memory>
#include "rc.h"
#include "sql/parser/parse_defs.h"
......@@ -24,16 +25,30 @@ See the Mulan PSL v2 for more details. */
class Table;
class CLogManager;
class Db {
/**
* @brief 一个DB实例负责管理一批表
* @details 当前DB的存储模式很简单,一个DB对应一个目录,所有的表和数据都放置在这个目录下。
* 启动时,从指定的目录下加载所有的表和元数据。
*/
class Db
{
public:
Db() = default;
~Db();
/**
* @brief 初始化一个数据库实例
* @details 从指定的目录下加载指定名称的数据库。这里就会加载dbpath目录下的数据。
* @param name 数据库名称
* @param dbpath 当前数据库放在哪个目录下
* @note 数据库不是放在dbpath/name下,是直接使用dbpath目录
*/
RC init(const char *name, const char *dbpath);
RC create_table(const char *table_name, int attribute_count, const AttrInfo *attributes);
Table *find_table(const char *table_name) const;
Table *find_table(int32_t table_id) const;
const char *name() const;
......@@ -41,9 +56,9 @@ public:
RC sync();
// RC recover();
RC recover();
CLogManager *get_clog_manager();
CLogManager *clog_manager();
private:
RC open_all_tables();
......@@ -52,5 +67,8 @@ private:
std::string name_;
std::string path_;
std::unordered_map<std::string, Table *> opened_tables_;
CLogManager *clog_manager_ = nullptr;
std::unique_ptr<CLogManager> clog_manager_;
/// 给每个table都分配一个ID,用来记录日志。这里假设所有的DDL都不会并发操作,所以相关的数据都不上锁
int32_t next_table_id_ = 0;
};
......@@ -31,3 +31,8 @@ int Field::get_int(const Record &record)
TupleCell cell(field_, const_cast<char *>(record.data() + field_->offset()), field_->len());
return cell.get_int();
}
const char *Field::get_data(const Record &record)
{
return record.data() + field_->offset();
}
\ No newline at end of file
......@@ -60,6 +60,8 @@ public:
void set_int(Record &record, int value);
int get_int(const Record &record);
const char *get_data(const Record &record);
private:
const Table *table_ = nullptr;
const FieldMeta *field_ = nullptr;
......
......@@ -50,7 +50,8 @@ Table::~Table()
LOG_INFO("Table has been closed: %s", name());
}
RC Table::create(const char *path,
RC Table::create(int32_t table_id,
const char *path,
const char *name,
const char *base_dir,
int attribute_count,
......@@ -85,7 +86,7 @@ RC Table::create(const char *path,
close(fd);
// 创建文件
if ((rc = table_meta_.init(name, attribute_count, attributes)) != RC::SUCCESS) {
if ((rc = table_meta_.init(table_id, name, attribute_count, attributes)) != RC::SUCCESS) {
LOG_ERROR("Failed to init table meta. name:%s, ret:%d", name, rc);
return rc; // delete table file
}
......@@ -228,6 +229,31 @@ RC Table::get_record(const RID &rid, Record &record)
return rc;
}
RC Table::recover_insert_record(Record &record)
{
RC rc = RC::SUCCESS;
rc = record_handler_->recover_insert_record(record.data(), table_meta_.record_size(), record.rid());
if (rc != RC::SUCCESS) {
LOG_ERROR("Insert record failed. table name=%s, rc=%s", table_meta_.name(), strrc(rc));
return rc;
}
rc = insert_entry_of_indexes(record.data(), record.rid());
if (rc != RC::SUCCESS) { // 可能出现了键值重复
RC rc2 = delete_entry_of_indexes(record.data(), record.rid(), false/*error_on_not_exists*/);
if (rc2 != RC::SUCCESS) {
LOG_ERROR("Failed to rollback index data when insert index entries failed. table name=%s, rc=%d:%s",
name(), rc2, strrc(rc2));
}
rc2 = record_handler_->delete_record(&record.rid());
if (rc2 != RC::SUCCESS) {
LOG_PANIC("Failed to rollback record data when insert index entries failed. table name=%s, rc=%d:%s",
name(), rc2, strrc(rc2));
}
}
return rc;
}
const char *Table::name() const
{
return table_meta_.name();
......
......@@ -44,7 +44,12 @@ public:
* @param attribute_count 字段个数
* @param attributes 字段
*/
RC create(const char *path, const char *name, const char *base_dir, int attribute_count, const AttrInfo attributes[]);
RC create(int32_t table_id,
const char *path,
const char *name,
const char *base_dir,
int attribute_count,
const AttrInfo attributes[]);
/**
* 打开一个表
......@@ -53,12 +58,27 @@ public:
*/
RC open(const char *meta_file, const char *base_dir);
/**
* @brief 根据给定的字段生成一个记录/行
* @details 通常是由用户传过来的字段,按照schema信息组装成一个record。
* @param value_num 字段的个数
* @param values 每个字段的值
* @param record 生成的记录数据
*/
RC make_record(int value_num, const Value *values, Record &record);
/**
* @brief 在当前的表中插入一条记录
* @details 在表文件和索引中插入关联数据。这里只管在表中插入数据,不关心事务相关操作。
* @param record[in/out] 传入的数据包含具体的数据,插入成功会通过此字段返回RID
*/
RC insert_record(Record &record);
RC delete_record(const Record &record);
RC visit_record(const RID &rid, bool readonly, std::function<void(Record &)> visitor);
RC get_record(const RID &rid, Record &record);
RC recover_insert_record(Record &record);
// TODO refactor
RC create_index(Trx *trx, const FieldMeta *field_meta, const char *index_name);
......@@ -70,7 +90,7 @@ public:
}
public:
int32_t table_id() const { return table_id_; }
int32_t table_id() const { return table_meta_.table_id(); }
const char *name() const;
const TableMeta &table_meta() const;
......@@ -89,9 +109,8 @@ public:
Index *find_index_by_field(const char *field_name) const;
private:
int32_t table_id_ = -1;
std::string base_dir_;
TableMeta table_meta_;
TableMeta table_meta_;
DiskBufferPool *data_buffer_pool_ = nullptr; /// 数据文件关联的buffer pool
RecordFileHandler *record_handler_ = nullptr; /// 记录操作
std::vector<Index *> indexes_;
......
......@@ -22,6 +22,7 @@ See the Mulan PSL v2 for more details. */
using namespace std;
static const Json::StaticString FIELD_TABLE_ID("table_id");
static const Json::StaticString FIELD_TABLE_NAME("table_name");
static const Json::StaticString FIELD_FIELDS("fields");
static const Json::StaticString FIELD_INDEXES("indexes");
......@@ -38,7 +39,7 @@ void TableMeta::swap(TableMeta &other) noexcept
std::swap(record_size_, other.record_size_);
}
RC TableMeta::init(const char *name, int field_num, const AttrInfo attributes[])
RC TableMeta::init(int32_t table_id, const char *name, int field_num, const AttrInfo attributes[])
{
if (common::is_blank(name)) {
LOG_ERROR("Name cannot be empty");
......@@ -83,8 +84,9 @@ RC TableMeta::init(const char *name, int field_num, const AttrInfo attributes[])
record_size_ = field_offset;
name_ = name;
LOG_INFO("Sussessfully initialized table meta. table name=%s", name);
table_id_ = table_id;
name_ = name;
LOG_INFO("Sussessfully initialized table meta. table id=%d, name=%s", table_id, name);
return RC::SUCCESS;
}
......@@ -188,6 +190,7 @@ int TableMeta::serialize(std::ostream &ss) const
{
Json::Value table_value;
table_value[FIELD_TABLE_ID] = table_id_;
table_value[FIELD_TABLE_NAME] = name_;
Json::Value fields_value;
......@@ -230,6 +233,14 @@ int TableMeta::deserialize(std::istream &is)
return -1;
}
const Json::Value &table_id_value = table_value[FIELD_TABLE_ID];
if (!table_id_value.isInt()) {
LOG_ERROR("Invalid table id. json value=%s", table_id_value.toStyledString().c_str());
return -1;
}
int32_t table_id = table_id_value.asInt();
const Json::Value &table_name_value = table_value[FIELD_TABLE_NAME];
if (!table_name_value.isString()) {
LOG_ERROR("Invalid table name. json value=%s", table_name_value.toStyledString().c_str());
......@@ -261,6 +272,7 @@ int TableMeta::deserialize(std::istream &is)
auto comparator = [](const FieldMeta &f1, const FieldMeta &f2) { return f1.offset() < f2.offset(); };
std::sort(fields.begin(), fields.end(), comparator);
table_id_ = table_id;
name_.swap(table_name);
fields_.swap(fields);
record_size_ = fields_.back().offset() + fields_.back().len() - fields_.begin()->offset();
......
......@@ -32,11 +32,12 @@ public:
void swap(TableMeta &other) noexcept;
RC init(const char *name, int field_num, const AttrInfo attributes[]);
RC init(int32_t table_id, const char *name, int field_num, const AttrInfo attributes[]);
RC add_index(const IndexMeta &index);
public:
int32_t table_id() const { return table_id_; }
const char *name() const;
const FieldMeta *trx_field() const;
const FieldMeta *field(int index) const;
......@@ -66,6 +67,7 @@ public:
void desc(std::ostream &os) const;
protected:
int32_t table_id_ = -1;
std::string name_;
std::vector<FieldMeta> fields_; // 包含sys_fields
std::vector<IndexMeta> indexes_;
......
......@@ -23,6 +23,7 @@ See the Mulan PSL v2 for more details. */
#include "storage/index/bplus_tree.h"
#include "storage/common/table.h"
#include "storage/common/condition_filter.h"
#include "storage/clog/clog.h"
static DefaultHandler *default_handler = nullptr;
......@@ -121,11 +122,12 @@ RC DefaultHandler::open_db(const char *dbname)
Db *db = new Db();
RC ret = RC::SUCCESS;
if ((ret = db->init(dbname, dbpath.c_str())) != RC::SUCCESS) {
LOG_ERROR("Failed to open db: %s. error=%d", dbname, ret);
LOG_ERROR("Failed to open db: %s. error=%s", dbname, strrc(ret));
delete db;
} else {
opened_dbs_[dbname] = db;
}
opened_dbs_[dbname] = db;
return RC::SUCCESS;
return ret;
}
RC DefaultHandler::close_db(const char *dbname)
......
......@@ -101,7 +101,7 @@ bool DefaultStorageStage::set_properties()
ret = handler_->open_db(sys_db);
if (ret != RC::SUCCESS) {
LOG_ERROR("Failed to open system db");
LOG_ERROR("Failed to open system db. rc=%s", strrc(ret));
return false;
}
......@@ -115,31 +115,24 @@ bool DefaultStorageStage::set_properties()
//! Initialize stage params and validate outputs
bool DefaultStorageStage::initialize()
{
LOG_TRACE("Enter");
MetricsRegistry &metricsRegistry = get_metrics_registry();
query_metric_ = new SimpleTimer();
metricsRegistry.register_metric(QUERY_METRIC_TAG, query_metric_);
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void DefaultStorageStage::cleanup()
{
LOG_TRACE("Enter");
if (handler_) {
handler_->destroy();
handler_ = nullptr;
}
LOG_TRACE("Exit");
}
void DefaultStorageStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
TimerStat timerStat(*query_metric_);
SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);
......@@ -177,16 +170,12 @@ void DefaultStorageStage::handle_event(StageEvent *event)
LOG_ERROR("Failed to commit trx. rc=%d:%s", rc, strrc(rc));
}
}
LOG_TRACE("Exit\n");
}
void DefaultStorageStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
StorageEvent *storage_event = static_cast<StorageEvent *>(event);
storage_event->sql_event()->done_immediate();
LOG_TRACE("Exit\n");
return;
}
......
......@@ -132,6 +132,7 @@ Frame *BPFrameManager::alloc(int file_desc, PageNum page_num)
if (frame != nullptr) {
ASSERT(frame->pin_count() == 0, "got an invalid frame that pin count is not 0. frame=%s",
to_string(*frame).c_str());
frame->set_page_num(page_num);
frame->pin();
frames_.put(frame_id, frame);
}
......@@ -568,6 +569,7 @@ RC DiskBufferPool::allocate_frame(PageNum page_num, Frame **buffer)
return RC::SUCCESS;
}
LOG_TRACE("frames are all allocated, so we should purge some frames to get one free frame");
(void)frame_manager_.purge_frames(1/*count*/, purger);
}
return RC::BUFFERPOOL_NOBUF;
......@@ -605,23 +607,20 @@ RC DiskBufferPool::load_page(PageNum page_num, Frame *frame)
return RC::SUCCESS;
}
RC DiskBufferPool::get_page_count(int *page_count)
{
*page_count = file_header_->allocated_pages;
return RC::SUCCESS;
}
int DiskBufferPool::file_desc() const
{
return file_desc_;
}
////////////////////////////////////////////////////////////////////////////////
BufferPoolManager::BufferPoolManager(int page_num /* = 0 */)
BufferPoolManager::BufferPoolManager(int memory_size /* = 0 */)
{
if (page_num <= 0) {
page_num = MEM_POOL_ITEM_NUM * DEFAULT_ITEM_NUM_PER_POOL;
if (memory_size <= 0) {
memory_size = MEM_POOL_ITEM_NUM * DEFAULT_ITEM_NUM_PER_POOL * BP_PAGE_SIZE;
}
const int pool_num = std::max(page_num / DEFAULT_ITEM_NUM_PER_POOL, 1);
const int pool_num = std::max(memory_size / BP_PAGE_SIZE / DEFAULT_ITEM_NUM_PER_POOL, 1);
frame_manager_.init(pool_num);
LOG_INFO("buffer pool manager init with memory size %d, page num: %d, pool num: %d",
memory_size, pool_num * DEFAULT_ITEM_NUM_PER_POOL, pool_num);
}
BufferPoolManager::~BufferPoolManager()
......
......@@ -123,7 +123,7 @@ private:
FrameAllocator allocator_;
};
class BufferPoolIterator
class BufferPoolIterator
{
public:
BufferPoolIterator();
......@@ -172,27 +172,29 @@ public:
*/
RC allocate_page(Frame **frame);
/**
* @brief 释放某个页面,将此页面设置为未分配状态
*
* @param page_num 待释放的页面
*/
RC dispose_page(PageNum page_num);
/**
* 释放指定文件关联的页的内存, 如果已经脏, 则刷到磁盘,除了pinned page
* @brief 释放指定文件关联的页的内存
* 如果已经脏, 则刷到磁盘,除了pinned page
*/
RC purge_page(PageNum page_num);
RC purge_all_pages();
/**
* 此函数用于解除pageHandle对应页面的驻留缓冲区限制。
* @brief 用于解除pageHandle对应页面的驻留缓冲区限制
*
* 在调用GetThisPage或AllocatePage函数将一个页面读入缓冲区后,
* 该页面被设置为驻留缓冲区状态,以防止其在处理过程中被置换出去,
* 因此在该页面使用完之后应调用此函数解除该限制,使得该页面此后可以正常地被淘汰出缓冲区
*/
RC unpin_page(Frame *frame);
/**
* 获取文件的总页数
*/
RC get_page_count(int *page_count);
/**
* 检查是否所有页面都是pin count == 0状态(除了第1个页面)
* 调试使用
......@@ -253,7 +255,7 @@ private:
class BufferPoolManager
{
public:
BufferPoolManager(int page_num = 0);
BufferPoolManager(int memory_size = 0);
~BufferPoolManager();
RC create_file(const char *file_name);
......
......@@ -63,35 +63,24 @@ bool MemStorageStage::set_properties()
//! Initialize stage params and validate outputs
bool MemStorageStage::initialize()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
return true;
}
//! Cleanup after disconnection
void MemStorageStage::cleanup()
{
LOG_TRACE("Enter");
LOG_TRACE("Exit");
}
void MemStorageStage::handle_event(StageEvent *event)
{
LOG_TRACE("Enter\n");
TimerStat timerStat(*queryMetric);
event->done_immediate();
LOG_TRACE("Exit\n");
return;
}
void MemStorageStage::callback_event(StageEvent *event, CallbackContext *context)
{
LOG_TRACE("Enter\n");
LOG_TRACE("Exit\n");
return;
}
......@@ -143,6 +143,7 @@ public:
char *data() { return this->data_; }
const char *data() const { return this->data_; }
int len() const { return this->len_; }
void set_rid(const RID &rid) { this->rid_ = rid; }
void set_rid(const PageNum page_num, const SlotNum slot_num)
......
......@@ -128,6 +128,9 @@ RC RecordPageHandler::recover_init(DiskBufferPool &buffer_pool, PageNum page_num
return ret;
}
frame_->write_latch();
readonly_ = false;
char *data = frame_->data();
disk_buffer_pool_ = &buffer_pool;
......@@ -213,24 +216,22 @@ RC RecordPageHandler::insert_record(const char *data, RID *rid)
return RC::SUCCESS;
}
RC RecordPageHandler::recover_insert_record(const char *data, RID *rid)
RC RecordPageHandler::recover_insert_record(const char *data, const RID &rid)
{
if (page_header_->record_num == page_header_->record_capacity) {
LOG_WARN("Page is full, page_num %d:%d.", frame_->page_num());
return RC::RECORD_NOMEM;
}
if (rid->slot_num >= page_header_->record_capacity) {
LOG_WARN("slot_num illegal, slot_num(%d) > record_capacity(%d).", rid->slot_num, page_header_->record_capacity);
return RC::RECORD_NOMEM;
if (rid.slot_num >= page_header_->record_capacity) {
LOG_WARN("slot_num illegal, slot_num(%d) > record_capacity(%d).", rid.slot_num, page_header_->record_capacity);
return RC::RECORD_INVALID_RID;
}
// 更新位图
Bitmap bitmap(bitmap_, page_header_->record_capacity);
bitmap.set_bit(rid->slot_num);
page_header_->record_num++;
if (!bitmap.get_bit(rid.slot_num)) {
bitmap.set_bit(rid.slot_num);
page_header_->record_num++;
}
// 恢复数据
char *record_data = get_record_data(rid->slot_num);
char *record_data = get_record_data(rid.slot_num);
memcpy(record_data, data, page_header_->record_real_size);
frame_->mark_dirty();
......@@ -278,7 +279,7 @@ RC RecordPageHandler::get_record(const RID *rid, Record *rec)
}
rec->set_rid(*rid);
rec->set_data(get_record_data(rid->slot_num));
rec->set_data(get_record_data(rid->slot_num), page_header_->record_real_size);
return RC::SUCCESS;
}
......@@ -415,15 +416,15 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid)
return record_page_handler.insert_record(data, rid);
}
RC RecordFileHandler::recover_insert_record(const char *data, int record_size, RID *rid)
RC RecordFileHandler::recover_insert_record(const char *data, int record_size, const RID &rid)
{
RC ret = RC::SUCCESS;
RecordPageHandler record_page_handler;
ret = record_page_handler.recover_init(*disk_buffer_pool_, rid->page_num);
ret = record_page_handler.recover_init(*disk_buffer_pool_, rid.page_num);
if (ret != RC::SUCCESS) {
LOG_WARN("failed to init record page handler. page num=%d, rc=%d:%s", rid->page_num, ret, strrc(ret));
LOG_WARN("failed to init record page handler. page num=%d, rc=%s", rid.page_num, strrc(ret));
return ret;
}
......@@ -563,13 +564,12 @@ RC RecordFileScanner::fetch_next_record()
// 所有的页面都遍历完了,没有数据了
next_record_.rid().slot_num = -1;
record_page_handler_.cleanup();
return RC::RECORD_EOF;
}
/**
* @brief 遍历当前页面,尝试找到一条有效的记录
*
* @return RC
*/
RC RecordFileScanner::fetch_next_record_in_page()
{
......
......@@ -26,7 +26,10 @@ class Trx;
class Table;
/**
* 这里负责管理在一个文件上表记录(行)的组织/管理
* @defgroup RecordManager
* @file record_manager.h
*
* @brief 这里负责管理在一个文件上表记录(行)的组织/管理
*
* 表记录管理的内容包括如何在文件上存放、读取、检索。也就是记录的增删改查。
* 这里的文件都会被拆分成页面,每个页面都有一样的大小。更详细的信息可以参考BufferPool。
......@@ -59,15 +62,17 @@ class Table;
*/
struct PageHeader
{
int32_t record_num; // 当前页面记录的个数
int32_t record_capacity; // 最大记录个数
int32_t record_num; /// 当前页面记录的个数
int32_t record_capacity; /// 最大记录个数
int32_t record_real_size; // 每条记录的实际大小
int32_t record_size; // 每条记录占用实际空间大小(可能对齐)
int32_t first_record_offset; // 第一条记录的偏移量
};
/**
* 遍历一个页面中每条记录的iterator
* @ingroup RecordManager
*
* @brief 遍历一个页面中每条记录的iterator
*/
class RecordPageIterator
{
......@@ -96,14 +101,12 @@ private:
};
/**
* 负责处理一个页面中各种操作,比如插入记录、删除记录或者查找记录
* @brief 负责处理一个页面中各种操作,比如插入记录、删除记录或者查找记录
*
* 当前定长记录模式下每个页面的组织大概是这样的:
* |-------------------------------------|
* | PageHeader | record allocate bitmap |
* |-------------------------------------|
* |------------|------------------------|
* | record1 | record2 | ..... | recordN |
* |-------------------------------------|
*/
class RecordPageHandler
{
......@@ -120,7 +123,12 @@ public:
*/
RC init(DiskBufferPool &buffer_pool, PageNum page_num, bool readonly);
/// 当前所有的recover函数都没有使用,可以忽略
/**
* @brief 数据库恢复时,与普通的运行场景有所不同,不做任何并发操作,也不需要加锁
*
* @param buffer_pool 关联某个文件时,都通过buffer pool来做读写文件
* @param page_num 操作的页面编号
*/
RC recover_init(DiskBufferPool &buffer_pool, PageNum page_num);
/**
......@@ -144,7 +152,14 @@ public:
* @param rid 如果插入成功,通过这个参数返回插入的位置
*/
RC insert_record(const char *data, RID *rid);
RC recover_insert_record(const char *data, RID *rid);
/**
* @brief 数据库恢复时,在指定位置插入数据
*
* @param data 要插入的数据行
* @param rid 插入的位置
*/
RC recover_insert_record(const char *data, const RID &rid);
/**
* @brief 删除指定的记录
......@@ -175,11 +190,11 @@ protected:
}
protected:
DiskBufferPool *disk_buffer_pool_ = nullptr; // 当前操作的buffer pool(文件)
bool readonly_ = false; // 当前的操作是否都是只读的
Frame *frame_ = nullptr; // 当前操作页面关联的frame(frame的更多概念可以参考buffer pool和frame)
PageHeader *page_header_ = nullptr; // 当前页面上页面头
char *bitmap_ = nullptr; // 当前页面上record分配状态信息bitmap内存起始位置
DiskBufferPool *disk_buffer_pool_ = nullptr; /// 当前操作的buffer pool(文件)
bool readonly_ = false; /// 当前的操作是否都是只读的
Frame *frame_ = nullptr; /// 当前操作页面关联的frame(frame的更多概念可以参考buffer pool和frame)
PageHeader *page_header_ = nullptr; /// 当前页面上页面头
char *bitmap_ = nullptr; /// 当前页面上record分配状态信息bitmap内存起始位置
private:
friend class RecordPageIterator;
......@@ -216,7 +231,7 @@ public:
* 插入一个新的记录到指定文件中,data为指向新纪录内容的指针,返回该记录的标识符rid
*/
RC insert_record(const char *data, int record_size, RID *rid);
RC recover_insert_record(const char *data, int record_size, RID *rid);
RC recover_insert_record(const char *data, int record_size, const RID &rid);
/**
* 获取指定文件中标识符为rid的记录内容到rec指向的记录结构中
......@@ -247,8 +262,8 @@ private:
private:
DiskBufferPool *disk_buffer_pool_ = nullptr;
std::unordered_set<PageNum> free_pages_; // 没有填充满的页面集合
common::Mutex lock_; // 当编译时增加-DCONCURRENCY=ON 选项时,才会真正的支持并发
std::unordered_set<PageNum> free_pages_; /// 没有填充满的页面集合
common::Mutex lock_; /// 当编译时增加-DCONCURRENCY=ON 选项时,才会真正的支持并发
};
/**
......@@ -285,14 +300,15 @@ private:
RC fetch_next_record_in_page();
private:
Table *table_ = nullptr; // 当前遍历的是哪张表。这个字段仅供事务函数使用,如果设计合适,可以去掉
DiskBufferPool *disk_buffer_pool_ = nullptr; // 当前访问的文件
Trx *trx_ = nullptr; // 当前是哪个事务在遍历
bool readonly_ = false; // 遍历出来的数据,是否可能对它做修改
BufferPoolIterator bp_iterator_; // 遍历buffer pool的所有页面
ConditionFilter *condition_filter_ = nullptr; // 过滤record
// TODO 对于一个纯粹的record遍历器来说,不应该关心表和事务
Table *table_ = nullptr; /// 当前遍历的是哪张表。这个字段仅供事务函数使用,如果设计合适,可以去掉
DiskBufferPool *disk_buffer_pool_ = nullptr; /// 当前访问的文件
Trx *trx_ = nullptr; /// 当前是哪个事务在遍历
bool readonly_ = false; /// 遍历出来的数据,是否可能对它做修改
BufferPoolIterator bp_iterator_; /// 遍历buffer pool的所有页面
ConditionFilter *condition_filter_ = nullptr; /// 过滤record
RecordPageHandler record_page_handler_;
RecordPageIterator record_page_iterator_; // 遍历某个页面上的所有record
RecordPageIterator record_page_iterator_; /// 遍历某个页面上的所有record
Record next_record_;
};
......@@ -15,9 +15,22 @@ See the Mulan PSL v2 for more details. */
#include <limits>
#include "storage/trx/mvcc_trx.h"
#include "storage/common/field.h"
#include "storage/clog/clog.h"
#include "storage/common/db.h"
#include "storage/clog/clog.h"
using namespace std;
MvccTrxKit::~MvccTrxKit()
{
vector<Trx *> tmp_trxes;
tmp_trxes.swap(trxes_);
for (Trx *trx : tmp_trxes) {
delete trx;
}
}
RC MvccTrxKit::init()
{
fields_ = vector<FieldMeta>{
......@@ -44,16 +57,76 @@ int32_t MvccTrxKit::max_trx_id() const
return numeric_limits<int32_t>::max();
}
Trx *MvccTrxKit::create_trx()
Trx *MvccTrxKit::create_trx(CLogManager *log_manager)
{
Trx *trx = new MvccTrx(*this, log_manager);
if (trx != nullptr) {
lock_.lock();
trxes_.push_back(trx);
lock_.unlock();
}
return trx;
}
Trx *MvccTrxKit::create_trx(int32_t trx_id)
{
Trx *trx = new MvccTrx(*this, trx_id);
if (trx != nullptr) {
lock_.lock();
trxes_.push_back(trx);
if (current_trx_id_ < trx_id) {
current_trx_id_ = trx_id;
}
lock_.unlock();
}
return trx;
}
void MvccTrxKit::destroy_trx(Trx *trx)
{
lock_.lock();
erase(trxes_, trx);
lock_.unlock();
delete trx;
}
Trx *MvccTrxKit::find_trx(int32_t trx_id)
{
return new MvccTrx(*this);
lock_.lock();
for (Trx *trx : trxes_) {
if (trx->id() == trx_id) {
lock_.unlock();
return trx;
}
}
lock_.unlock();
return nullptr;
}
void MvccTrxKit::all_trxes(std::vector<Trx *> &trxes)
{
lock_.lock();
trxes = trxes_;
lock_.unlock();
}
////////////////////////////////////////////////////////////////////////////////
MvccTrx::MvccTrx(MvccTrxKit &kit) : trx_kit_(kit)
MvccTrx::MvccTrx(MvccTrxKit &kit, CLogManager *log_manager) : trx_kit_(kit), log_manager_(log_manager)
{}
MvccTrx::MvccTrx(MvccTrxKit &kit, int32_t trx_id) : trx_kit_(kit), trx_id_(trx_id)
{
started_ = true;
recovering_ = true;
}
MvccTrx::~MvccTrx()
{
}
RC MvccTrx::insert_record(Table *table, Record &record)
{
Field begin_field;
......@@ -69,6 +142,10 @@ RC MvccTrx::insert_record(Table *table, Record &record)
return rc;
}
rc = log_manager_->append_log(CLogType::INSERT, trx_id_, table->table_id(), record.rid(), record.len(), 0/*offset*/, record.data());
ASSERT(rc == RC::SUCCESS, "failed to append insert record log. trx id=%d, table id=%d, rid=%s, record len=%d, rc=%s",
trx_id_, table->table_id(), record.rid().to_string().c_str(), record.len(), strrc(rc));
pair<OperationSet::iterator, bool> ret =
operations_.insert(Operation(Operation::Type::INSERT, table, record.rid()));
if (!ret.second) {
......@@ -86,8 +163,17 @@ RC MvccTrx::delete_record(Table * table, Record &record)
[[maybe_unused]] int32_t end_xid = end_field.get_int(record);
/// 在删除之前,第一次获取record时,就已经对record做了对应的检查,并且保证不会有其它的事务来访问这条数据
ASSERT(end_xid == trx_kit_.max_trx_id(), "cannot delete an old version record. end_xid=%d", end_xid);
ASSERT(end_xid > 0, "concurrency conflit: other transaction is updating this record. end_xid=%d, current trx id=%d, rid=%s",
end_xid, trx_id_, record.rid().to_string().c_str());
if (end_xid != trx_kit_.max_trx_id()) {
// 当前不是多版本数据中的最新记录,不需要删除
return RC::SUCCESS;
}
end_field.set_int(record, -trx_id_);
RC rc = log_manager_->append_log(CLogType::DELETE, trx_id_, table->table_id(), record.rid(), 0, 0, nullptr);
ASSERT(rc == RC::SUCCESS, "failed to append delete record log. trx id=%d, table id=%d, rid=%s, record len=%d, rc=%s",
trx_id_, table->table_id(), record.rid().to_string().c_str(), record.len(), strrc(rc));
operations_.insert(Operation(Operation::Type::DELETE, table, record.rid()));
......@@ -152,26 +238,33 @@ RC MvccTrx::start_if_need()
if (!started_) {
ASSERT(operations_.empty(), "try to start a new trx while operations is not empty");
trx_id_ = trx_kit_.next_trx_id();
LOG_DEBUG("current thread change to new trx with %d", trx_id_);
RC rc = log_manager_->begin_trx(trx_id_);
ASSERT(rc == RC::SUCCESS, "failed to append log to clog. rc=%s", strrc(rc));
started_ = true;
}
return RC::SUCCESS;
}
RC MvccTrx::commit()
{
int32_t commit_id = trx_kit_.next_trx_id();
return commit_with_trx_id(commit_id);
}
RC MvccTrx::commit_with_trx_id(int32_t commit_xid)
{
// TODO 这里存在一个很大的问题,不能让其他事务一次性看到当前事务更新到的数据或同时看不到
RC rc = RC::SUCCESS;
started_ = false;
int32_t commit_xid = trx_kit_.next_trx_id();
for (const Operation &operation : operations_) {
switch (operation.type()) {
case Operation::Type::INSERT: {
Record record;
RID rid(operation.page_num(), operation.slot_num());
Table *table = operation.table();
Field begin_xid_field, end_xid_field;
trx_fields(operation.table(), begin_xid_field, end_xid_field);
trx_fields(table, begin_xid_field, end_xid_field);
auto record_updater = [ this, &begin_xid_field, commit_xid](Record &record) {
(void)this;
......@@ -188,11 +281,11 @@ RC MvccTrx::commit()
} break;
case Operation::Type::DELETE: {
Record record;
Table *table = operation.table();
RID rid(operation.page_num(), operation.slot_num());
Field begin_xid_field, end_xid_field;
trx_fields(operation.table(), begin_xid_field, end_xid_field);
trx_fields(table, begin_xid_field, end_xid_field);
auto record_updater = [this, &end_xid_field, commit_xid](Record &record) {
(void)this;
......@@ -215,6 +308,11 @@ RC MvccTrx::commit()
}
operations_.clear();
if (!recovering_) {
rc = log_manager_->commit_trx(trx_id_, commit_xid);
}
LOG_TRACE("append trx commit log. trx id=%d, commit_xid=%d, rc=%s", trx_id_, commit_xid, strrc(rc));
return rc;
}
......@@ -229,7 +327,8 @@ RC MvccTrx::rollback()
RID rid(operation.page_num(), operation.slot_num());
Record record;
Table *table = operation.table();
// TODO 这里虽然调用get_record好像多次一举,而且看起来放在table的实现中更好,但是实际上trx应该记录下来自己曾经插入过的数据
// TODO 这里虽然调用get_record好像多次一举,而且看起来放在table的实现中更好,
// 而且实际上trx应该记录下来自己曾经插入过的数据
// 也就是不需要从table中获取这条数据,可以直接从当前内存中获取
// 这里也可以不删除,仅仅给数据加个标识位,等垃圾回收器来收割也行
rc = table->get_record(rid, record);
......@@ -241,23 +340,25 @@ RC MvccTrx::rollback()
} break;
case Operation::Type::DELETE: {
Record record;
Table *table = operation.table();
RID rid(operation.page_num(), operation.slot_num());
ASSERT(rc == RC::SUCCESS, "failed to get record while rollback. rid=%s, rc=%s",
rid.to_string().c_str(), strrc(rc));
Field begin_xid_field, end_xid_field;
trx_fields(operation.table(), begin_xid_field, end_xid_field);
trx_fields(table, begin_xid_field, end_xid_field);
auto record_updater = [this, &end_xid_field](Record &record) {
ASSERT(end_xid_field.get_int(record) == -trx_id_,
"got an invalid record while rollback. end xid=%d, this trx id=%d",
end_xid_field.get_int(record), trx_id_);
"got an invalid record while rollback. end xid=%d, this trx id=%d",
end_xid_field.get_int(record), trx_id_);
end_xid_field.set_int(record, trx_kit_.max_trx_id());
};
rc = operation.table()->visit_record(rid, false/*readonly*/, record_updater);
rc = table->visit_record(rid, false/*readonly*/, record_updater);
ASSERT(rc == RC::SUCCESS, "failed to get record while committing. rid=%s, rc=%s",
rid.to_string().c_str(), strrc(rc));
} break;
default: {
......@@ -267,5 +368,93 @@ RC MvccTrx::rollback()
}
operations_.clear();
if (!recovering_) {
rc = log_manager_->rollback_trx(trx_id_);
}
LOG_TRACE("append trx rollback log. trx id=%d, rc=%s", trx_id_, strrc(rc));
return rc;
}
RC find_table(Db *db, const CLogRecord &log_record, Table *&table)
{
switch (clog_type_from_integer(log_record.header().type_)) {
case CLogType::INSERT:
case CLogType::DELETE: {
const CLogRecordData &data_record = log_record.data_record();
table = db->find_table(data_record.table_id_);
if (nullptr == table) {
LOG_WARN("no such table to redo. table id=%d, log record=%s",
data_record.table_id_, log_record.to_string().c_str());
return RC::SCHEMA_TABLE_NOT_EXIST;
}
} break;
default:{
// do nothing
} break;
}
return RC::SUCCESS;
}
RC MvccTrx::redo(Db *db, const CLogRecord &log_record)
{
Table *table = nullptr;
RC rc = find_table(db, log_record, table);
if (OB_FAIL(rc)) {
return rc;
}
switch (log_record.log_type()) {
case CLogType::INSERT: {
const CLogRecordData &data_record = log_record.data_record();
Record record;
record.set_data(const_cast<char *>(data_record.data_), data_record.data_len_);
record.set_rid(data_record.rid_);
RC rc = table->recover_insert_record(record);
if (OB_FAIL(rc)) {
LOG_WARN("failed to recover insert. table=%s, log record=%s, rc=%s",
table->name(), log_record.to_string().c_str(), strrc(rc));
return rc;
}
operations_.insert(Operation(Operation::Type::INSERT, table, record.rid()));
} break;
case CLogType::DELETE: {
const CLogRecordData &data_record = log_record.data_record();
Field begin_field;
Field end_field;
trx_fields(table, begin_field, end_field);
auto record_updater = [this, &end_field](Record &record) {
(void)this;
ASSERT(end_field.get_int(record) == trx_kit_.max_trx_id(),
"got an invalid record while committing. end xid=%d, this trx id=%d",
end_field.get_int(record), trx_id_);
end_field.set_int(record, -trx_id_);
};
RC rc = table->visit_record(data_record.rid_, false/*readonly*/, record_updater);
ASSERT(rc == RC::SUCCESS, "failed to get record while committing. rid=%s, rc=%s",
data_record.rid_.to_string().c_str(), strrc(rc));
operations_.insert(Operation(Operation::Type::DELETE, table, data_record.rid_));
} break;
case CLogType::MTR_COMMIT: {
const CLogRecordCommitData &commit_record = log_record.commit_record();
commit_with_trx_id(commit_record.commit_xid_);
} break;
case CLogType::MTR_ROLLBACK: {
rollback();
} break;
default: {
ASSERT(false, "unsupported redo log. log_record=%s", log_record.to_string().c_str());
return RC::INTERNAL;
} break;
}
return RC::SUCCESS;
}
......@@ -14,18 +14,32 @@ See the Mulan PSL v2 for more details. */
#pragma once
#include <vector>
#include "storage/trx/trx.h"
class CLogManager;
class MvccTrxKit : public TrxKit
{
public:
MvccTrxKit() = default;
virtual ~MvccTrxKit() = default;
virtual ~MvccTrxKit();
RC init() override;
const std::vector<FieldMeta> *trx_fields() const override;
Trx *create_trx() override;
Trx *create_trx(CLogManager *log_manager) override;
Trx *create_trx(int32_t trx_id) override;
void destroy_trx(Trx *trx) override;
/**
* @brief 找到对应事务号的事务
* @details 当前仅在recover场景下使用
*/
Trx *find_trx(int32_t trx_id) override;
void all_trxes(std::vector<Trx *> &trxes) override;
public:
int32_t next_trx_id();
public:
......@@ -35,6 +49,9 @@ private:
std::vector<FieldMeta> fields_; // 存储事务数据需要用到的字段元数据,所有表结构都需要带的
std::atomic<int32_t> current_trx_id_{0};
common::Mutex lock_;
std::vector<Trx *> trxes_;
};
/**
......@@ -44,8 +61,9 @@ private:
class MvccTrx : public Trx
{
public:
MvccTrx(MvccTrxKit &trx_kit);
virtual ~MvccTrx() = default;
MvccTrx(MvccTrxKit &trx_kit, CLogManager *log_manager);
MvccTrx(MvccTrxKit &trx_kit, int32_t trx_id); // used for recover
virtual ~MvccTrx();
RC insert_record(Table *table, Record &record) override;
RC delete_record(Table *table, Record &record) override;
......@@ -66,7 +84,12 @@ public:
RC commit() override;
RC rollback() override;
RC redo(Db *db, const CLogRecord &log_record) override;
int32_t id() const override { return trx_id_; }
private:
RC commit_with_trx_id(int32_t commit_id);
void trx_fields(Table *table, Field &begin_xid_field, Field &end_xid_field) const;
private:
......@@ -74,8 +97,10 @@ private:
private:
using OperationSet = std::unordered_set<Operation, OperationHasher, OperationEqualer>;
MvccTrxKit &trx_kit_;
int32_t trx_id_;
bool started_ = false;
MvccTrxKit & trx_kit_;
CLogManager *log_manager_ = nullptr;
int32_t trx_id_ = -1;
bool started_ = false;
bool recovering_ = false;
OperationSet operations_;
};
......@@ -53,4 +53,9 @@ RC TrxKit::init_global(const char *name)
TrxKit *TrxKit::instance()
{
return global_trxkit;
}
\ No newline at end of file
}
RC Trx::redo(Db *db, const CLogRecord &)
{
return RC::UNIMPLENMENT;
}
......@@ -25,6 +25,20 @@ See the Mulan PSL v2 for more details. */
#include "storage/common/table.h"
#include "rc.h"
/**
* @defgroup Transaction 事务模块
* @brief 描述事务相关的代码
*/
class Db;
class CLogManager;
class CLogRecord;
class Trx;
/**
* @brief 描述一个操作,比如插入、删除行等
* @details 通常包含一个操作的类型,以及操作的对象和具体的数据
*/
class Operation
{
public:
......@@ -52,6 +66,7 @@ public:
private:
Type type_;
/// 操作的哪张表。这里直接使用表其实并不准确,因为表中的索引也可能有日志
Table * table_ = nullptr;
PageNum page_num_; // TODO use RID instead of page num and slot num
SlotNum slot_num_;
......@@ -91,7 +106,12 @@ public:
virtual RC init() = 0;
virtual const std::vector<FieldMeta> *trx_fields() const = 0;
virtual Trx *create_trx() = 0;
virtual Trx *create_trx(CLogManager *log_manager) = 0;
virtual Trx *create_trx(int32_t trx_id) = 0;
virtual Trx *find_trx(int32_t trx_id) = 0;
virtual void all_trxes(std::vector<Trx *> &trxes) = 0;
virtual void destroy_trx(Trx *trx) = 0;
public:
static TrxKit *create(const char *name);
......@@ -112,4 +132,8 @@ public:
virtual RC start_if_need() = 0;
virtual RC commit() = 0;
virtual RC rollback() = 0;
virtual RC redo(Db *db, const CLogRecord &log_record);
virtual int32_t id() const = 0;
};
......@@ -26,11 +26,29 @@ const vector<FieldMeta> *VacuousTrxKit::trx_fields() const
return nullptr;
}
Trx *VacuousTrxKit::create_trx()
Trx *VacuousTrxKit::create_trx(CLogManager *)
{
return new VacuousTrx;
}
Trx *VacuousTrxKit::create_trx(int32_t /*trx_id*/)
{
return nullptr;
}
void VacuousTrxKit::destroy_trx(Trx *)
{}
Trx *VacuousTrxKit::find_trx(int32_t /* trx_id */)
{
return nullptr;
}
void VacuousTrxKit::all_trxes(std::vector<Trx *> &trxes)
{
return;
}
////////////////////////////////////////////////////////////////////////////////
RC VacuousTrx::insert_record(Table *table, Record &record)
......
......@@ -27,7 +27,12 @@ public:
RC init() override;
const std::vector<FieldMeta> *trx_fields() const override;
Trx *create_trx() override;
Trx *create_trx(CLogManager *log_manager) override;
Trx *create_trx(int32_t trx_id) override;
Trx *find_trx(int32_t trx_id) override;
void all_trxes(std::vector<Trx *> &trxes) override;
void destroy_trx(Trx *trx) override;
};
class VacuousTrx : public Trx
......@@ -42,4 +47,6 @@ public:
RC start_if_need() override;
RC commit() override;
RC rollback() override;
int32_t id() const override { return 0; }
};
ADD_EXECUTABLE(clog_reader)
MESSAGE("Begin to build clog_reader")
SET(CLOG_READER_SRC clog_reader_cmd.cpp)
# 指定目标文件位置
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
MESSAGE("Binary directory:" ${EXECUTABLE_OUTPUT_PATH})
TARGET_SOURCES(clog_reader PRIVATE ${CLOG_READER_SRC})
TARGET_LINK_LIBRARIES(clog_reader observer_static)
TARGET_INCLUDE_DIRECTORIES(clog_reader PRIVATE . ${PROJECT_SOURCE_DIR}/src/observer/ ${PROJECT_SOURCE_DIR}/deps)
TARGET_LINK_DIRECTORIES(clog_reader PUBLIC /usr/local/lib)
# Target 必须在定义 ADD_EXECUTABLE 之后, programs 不受这个限制
# TARGETS和PROGRAMS 的默认权限是OWNER_EXECUTE, GROUP_EXECUTE, 和WORLD_EXECUTE,即755权限, programs 都是处理脚步类
# 类型分为RUNTIME/LIBRARY/ARCHIVE, prog
INSTALL(TARGETS clog_reader RUNTIME DESTINATION bin)
/* Copyright (c) 2021-2022 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by wangyunlai.wyl on 2023/06/07
//
#include <inttypes.h>
#include "storage/clog/clog.h"
using namespace std;
void dump(const char *filename)
{
CLogFile file;
RC rc = file.init(filename);
if (OB_FAIL(rc)) {
printf("failed to open file: '%s'. syserr=%s, rc=%s\n", filename, strerror(errno), strrc(rc));
return;
}
CLogRecordIterator iterator;
rc = iterator.init(file);
if (OB_FAIL(rc)) {
printf("failed to init iterator. rc=%s\n", strrc(rc));
return;
}
int64_t offset = 0;
int index = 0;
for (index++, rc = iterator.next(); OB_SUCC(rc) && iterator.valid(); rc = iterator.next(), ++index) {
const CLogRecord &log_record = iterator.log_record();
printf("index:%d, file_offset:%" PRId64 ", %s\n", index, offset, log_record.to_string().c_str());
(void)file.offset(offset);
}
if (rc != RC::RECORD_EOF) {
printf("something error. error=%s\n", strrc(rc));
}
}
int main(int argc, char *argv[])
{
if (argc < 2) {
printf("please give me a clog file name\n");
return 1;
}
dump(argv[1]);
return 0;
}
PROJECT(unittest)
MESSAGE("Begin to build " ${PROJECT_NAME})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR})
# 可以获取父cmake的变量
MESSAGE("${CMAKE_COMMON_FLAGS}")
#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...)
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../deps ${PROJECT_SOURCE_DIR}/../src/observer /usr/local/include SYSTEM)
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/deps ${PROJECT_SOURCE_DIR}/src/observer /usr/local/include)
# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面
#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include)
LINK_DIRECTORIES(/usr/local/lib /usr/local/lib64 ${PROJECT_BINARY_DIR}/../lib)
LINK_DIRECTORIES(/usr/local/lib /usr/local/lib64 ${PROJECT_BINARY_DIR}/lib)
IF (DEFINED ENV{LD_LIBRARY_PATH})
......
......@@ -20,93 +20,17 @@ See the Mulan PSL v2 for more details. */
using namespace common;
Record *gen_ins_record(int32_t page_num, int32_t slot_num, int data_len)
{
Record *rec = new Record();
char *data = new char[data_len];
rec->set_rid(page_num, slot_num);
memset(data, data_len, data_len);
rec->set_data(data);
return rec;
}
Record *gen_del_record(int32_t page_num, int32_t slot_num)
{
Record *rec = new Record();
rec->set_rid(page_num, slot_num);
return rec;
}
TEST(test_clog, test_clog)
{
const char *path = ".";
const char *clog_file = "./clog";
remove(clog_file);
CLogManager *log_mgr = new CLogManager("./");
CLogRecord *log_rec[6];
CLogRecord *log_mtr_rec = nullptr;
Record *rec = nullptr;
//
log_mgr->clog_gen_record(REDO_MTR_BEGIN, 1, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec); // NOTE: 需要保留log_rec
// delete log_mtr_rec;
rec = gen_ins_record(1, 1, 100);
log_mgr->clog_gen_record(REDO_INSERT, 1, log_rec[0], "table1", 100, rec);
log_mgr->clog_append_record(log_rec[0]);
delete[] rec->data();
delete rec;
rec = gen_ins_record(1, 1, 120);
log_mgr->clog_gen_record(REDO_INSERT, 1, log_rec[1], "table2", 120, rec);
log_mgr->clog_append_record(log_rec[1]);
delete[] rec->data();
delete rec;
log_mgr->clog_gen_record(REDO_MTR_BEGIN, 2, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec);
// delete log_mtr_rec;
rec = gen_ins_record(1, 1, 200);
log_mgr->clog_gen_record(REDO_INSERT, 1, log_rec[2], "table3", 200, rec);
log_mgr->clog_append_record(log_rec[2]);
delete[] rec->data();
delete rec;
rec = gen_ins_record(1, 2, 120);
log_mgr->clog_gen_record(REDO_INSERT, 2, log_rec[3], "table2", 120, rec);
log_mgr->clog_append_record(log_rec[3]);
delete[] rec->data();
delete rec;
rec = gen_ins_record(1, 2, 100);
log_mgr->clog_gen_record(REDO_INSERT, 2, log_rec[4], "table1", 100, rec);
log_mgr->clog_append_record(log_rec[4]);
delete[] rec->data();
delete rec;
log_mgr->clog_gen_record(REDO_MTR_COMMIT, 2, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec);
// delete log_mtr_rec;
rec = gen_del_record(1, 1);
log_mgr->clog_gen_record(REDO_DELETE, 1, log_rec[5], "table1", 0, rec);
log_mgr->clog_append_record(log_rec[5]);
delete rec;
log_mgr->clog_gen_record(REDO_MTR_COMMIT, 1, log_mtr_rec);
log_mgr->clog_append_record(log_mtr_rec);
// delete log_mtr_rec;
log_mgr->recover();
CLogMTRManager *log_mtr_mgr = log_mgr->get_mtr_manager();
ASSERT_EQ(true, log_mtr_mgr->trx_commited[1]);
ASSERT_EQ(true, log_mtr_mgr->trx_commited[2]);
ASSERT_EQ(6, log_mtr_mgr->log_redo_list.size());
CLogManager log_mgr;
RC rc = log_mgr.init(path);
ASSERT_EQ(rc, RC::SUCCESS);
// TODO test
/*
// record 已经被删掉了,不能再访问
......@@ -119,7 +43,6 @@ TEST(test_clog, test_clog)
i++;
}
*/
delete log_mgr;
}
int main(int argc, char **argv)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册