From 39042c92ba75652f967a7a946cba453a4feca2bb Mon Sep 17 00:00:00 2001 From: wangyunlai Date: Sun, 25 Jun 2023 17:48:31 +0800 Subject: [PATCH] Network buffer (#198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem were solved in this pull request? Issue Number: close #151 Problem: 当前在发送消息到客户端时,每个数据都会刷新socket buffer,效率很低 ### What is changed and how it works? 实现BufferedWriter,通过buffered writer将消息缓存在内存中,结果写完时或者buffer满时,才将结果真正的发送到客户端。 ### Other information --- docs/src/miniob-introduction.md | 28 +-- src/observer/common/rc.h | 2 +- src/observer/net/buffered_writer.cpp | 137 ++++++++++++ src/observer/net/buffered_writer.h | 72 ++++++ src/observer/net/communicator.cpp | 76 ++++--- src/observer/net/communicator.h | 51 +++-- src/observer/net/mysql_communicator.cpp | 258 ++++++++++++++++++---- src/observer/net/mysql_communicator.h | 53 +++-- src/observer/net/ring_buffer.cpp | 114 ++++++++++ src/observer/net/ring_buffer.h | 94 ++++++++ src/observer/storage/index/bplus_tree.cpp | 2 +- src/observer/storage/trx/trx.cpp | 8 +- unittest/ring_buffer_test.cpp | 127 +++++++++++ 13 files changed, 890 insertions(+), 132 deletions(-) create mode 100644 src/observer/net/buffered_writer.cpp create mode 100644 src/observer/net/buffered_writer.h create mode 100644 src/observer/net/ring_buffer.cpp create mode 100644 src/observer/net/ring_buffer.h create mode 100644 unittest/ring_buffer_test.cpp diff --git a/docs/src/miniob-introduction.md b/docs/src/miniob-introduction.md index 16239ab..4cb8c7e 100644 --- a/docs/src/miniob-introduction.md +++ b/docs/src/miniob-introduction.md @@ -1,4 +1,4 @@ -# OceanBase大赛miniob代码架构框架设计和说明 +# miniob代码架构框架设计和说明 # miniob代码结构说明 @@ -123,9 +123,6 @@ miniob采用TCP通信,纯文本模式,使用'\0'作为每个消息的终结 注意:测试程序也使用这种方法,***请不要修改协议,后台测试程序依赖这个协议***。 注意:返回的普通数据结果中不要包含'\0',也不支持转义处理。 -### 参赛/训练营建议 -在做miniob的题目时,不要做一个题目再看下一个题目,团队中多个同学分别做自己的题目时,也不要一直单独作战,因为完成课题时,需要修改的模块会有非常多的重叠,因此建议团队尽量统筹规划,避免代码冲突以及“越走越难”。 - # 参考 - 《数据库系统实现》 - 《数据库系统概念》 @@ -139,16 +136,6 @@ miniob采用TCP通信,纯文本模式,使用'\0'作为每个消息的终结 - [OceanBase开源网站](https://github.com/oceanbase/oceanbase) # 附录-编译安装测试 -## 编译环境 -miniob使用cmake管理,要求cmake版本至少3.10,编译的C++标准是C++20,所以使用的编译器需要支持C++20。 - -编译器推荐使用gcc或clang,使用Windows操作系统的同学,建议使用Linux虚拟机或docker编译,程序会最终在Linux操作系统上测试。 - -使用MacOS的同学,注意默认编译器是clang,即使使用命令gcc,实际编译器可能也是clang。 - -miniob 的开发环境依赖的组件比较多,搭建开发环境可能需要比较多的时间,同学们可以直接使用Docker来开发,具体参考 [MiniOB Docker 开发](https://hub.docker.com/r/oceanbase/miniob) - -NOTE:clang编译器有些表现与gcc不一致,官方测试后台使用gcc,如果后续测试出现编译错误,可以更换为gcc测试。 ## 编译 参考源码中 [如何构建MiniOB](./how_to_build.md) 文件。 @@ -189,16 +176,3 @@ obclient -s miniob.sock 其中 -s 使用指定unix socket 文件连接observer,如果启动observer时也指定了unix socket。 ![running-the-client](images/miniob-introduction-running-the-client.png) - -# FAQ - -- 命令没有加分号解析失败,比如 `help`命令 - -miniob 是功能非常简单的数据库,语法解析也做得非常简单。所有的命令都需要加上分号`;`。 - -- YYYY-m-d是否是正确的日期 -比如 2021-1-2 是否正确日期?是正确的。 -> 输出的时候要求一定要是 YYYY-mm-dd输出(2021-01-02),但是输入的时候,没有那么严格要求。 - -- 增加CPP/H 文件是否需要更新CMakeLists.txt -不需要。Cmake文件中使用自动探测的方式。加了新的源代码,再执行一遍 cmake .. 就可以自动探测到新加的源文件。 \ No newline at end of file diff --git a/src/observer/common/rc.h b/src/observer/common/rc.h index e7ea466..bd0ddc2 100644 --- a/src/observer/common/rc.h +++ b/src/observer/common/rc.h @@ -72,7 +72,7 @@ See the Mulan PSL v2 for more details. */ DEFINE_RC(FILE_WRITE) \ DEFINE_RC(LOGBUF_FULL) -enum RC +enum class RC { #define DEFINE_RC(name) name, DEFINE_RCS diff --git a/src/observer/net/buffered_writer.cpp b/src/observer/net/buffered_writer.cpp new file mode 100644 index 0000000..7232544 --- /dev/null +++ b/src/observer/net/buffered_writer.cpp @@ -0,0 +1,137 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/16. +// + +#include +#include + +#include "net/buffered_writer.h" + +using namespace std; + +BufferedWriter::BufferedWriter(int fd) + : fd_(fd), buffer_() +{} + +BufferedWriter::BufferedWriter(int fd, int32_t size) + : fd_(fd), buffer_(size) +{} + +BufferedWriter::~BufferedWriter() +{ + close(); +} + +RC BufferedWriter::close() +{ + if (fd_ < 0) { + return RC::SUCCESS; + } + + RC rc = flush(); + if (OB_FAIL(rc)) { + return rc; + } + + if (fd_ >= 0) { + ::close(fd_); + fd_ = -1; + } + + return RC::SUCCESS; +} + +RC BufferedWriter::write(const char *data, int32_t size, int32_t &write_size) +{ + if (fd_ < 0) { + return RC::INVALID_ARGUMENT; + } + + if (buffer_.remain() == 0) { + RC rc = flush_internal(size); + if (OB_FAIL(rc)) { + return rc; + } + } + + return buffer_.write(data, size, write_size); +} + +RC BufferedWriter::writen(const char *data, int32_t size) +{ + if (fd_ < 0) { + return RC::INVALID_ARGUMENT; + } + + int32_t write_size = 0; + while (write_size < size) { + int32_t tmp_write_size = 0; + RC rc = write(data + write_size, size - write_size, tmp_write_size); + if (OB_FAIL(rc)) { + return rc; + } + + write_size += tmp_write_size; + } + + return RC::SUCCESS; +} + +RC BufferedWriter::flush() +{ + if (fd_ < 0) { + return RC::INVALID_ARGUMENT; + } + + RC rc = RC::SUCCESS; + while (OB_SUCC(rc) && buffer_.size() > 0) { + rc = flush_internal(buffer_.size()); + } + return rc; +} + +RC BufferedWriter::flush_internal(int32_t size) +{ + if (fd_ < 0) { + return RC::INVALID_ARGUMENT; + } + + RC rc = RC::SUCCESS; + int32_t write_size = 0; + while (OB_SUCC(rc) && buffer_.size() > 0 && size > write_size) { + const char *buf = nullptr; + int32_t read_size = 0; + rc = buffer_.buffer(buf, read_size); + if (OB_FAIL(rc)) { + return rc; + } + + ssize_t tmp_write_size = 0; + while (tmp_write_size == 0) { + tmp_write_size = ::write(fd_, buf, read_size); + if (tmp_write_size < 0) { + if (errno == EAGAIN || errno == EINTR) { + tmp_write_size = 0; + continue; + } else { + return RC::IOERR_WRITE; + } + } + } + + write_size += tmp_write_size; + buffer_.forward(tmp_write_size); + } + + return rc; +} \ No newline at end of file diff --git a/src/observer/net/buffered_writer.h b/src/observer/net/buffered_writer.h new file mode 100644 index 0000000..7d0e9ad --- /dev/null +++ b/src/observer/net/buffered_writer.h @@ -0,0 +1,72 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/16. +// + +#pragma once + +#include "net/ring_buffer.h" + +/** + * @brief 支持以缓存模式写入数据到文件/socket + * @details 缓存使用ring buffer实现,当缓存满时会自动刷新缓存。 + * 看起来直接使用fdopen也可以实现缓存写,不过fdopen会在close时直接关闭fd。 + * @note 在执行close时,描述符fd并不会被关闭 + */ +class BufferedWriter +{ +public: + BufferedWriter(int fd); + BufferedWriter(int fd, int32_t size); + ~BufferedWriter(); + + /** + * @brief 关闭缓存 + */ + RC close(); + + /** + * @brief 写数据到文件/socket + * @details 缓存满会自动刷新缓存 + * @param data 要写入的数据 + * @param size 要写入的数据大小 + * @param write_size 实际写入的数据大小 + */ + RC write(const char *data, int32_t size, int32_t &write_size); + + /** + * @brief 写数据到文件/socket,全部写入成功返回成功 + * @details 与write的区别就是会尝试一直写直到写成成功或者有不可恢复的错误 + * @param data 要写入的数据 + * @param size 要写入的数据大小 + */ + RC writen(const char *data, int32_t size); + + /** + * @brief 刷新缓存 + * @details 将缓存中的数据全部写入文件/socket + */ + RC flush(); + +private: + /** + * @brief 刷新缓存 + * @details 期望缓存可以刷新size大小的数据,实际刷新的数据量可能小于size也可能大于size。 + * 通常是在缓存满的时候,希望刷新掉一部分数据,然后继续写入。 + * @param size 期望刷新的数据大小 + */ + RC flush_internal(int32_t size); + +private: + int fd_ = -1; + RingBuffer buffer_; +}; \ No newline at end of file diff --git a/src/observer/net/communicator.cpp b/src/observer/net/communicator.cpp index cbfac91..9ec0a1e 100644 --- a/src/observer/net/communicator.cpp +++ b/src/observer/net/communicator.cpp @@ -14,6 +14,7 @@ See the Mulan PSL v2 for more details. */ #include "net/communicator.h" #include "net/mysql_communicator.h" +#include "net/buffered_writer.h" #include "sql/expr/tuple.h" #include "event/session_event.h" #include "common/lang/mutex.h" @@ -25,6 +26,7 @@ RC Communicator::init(int fd, Session *session, const std::string &addr) fd_ = fd; session_ = session; addr_ = addr; + writer_ = new BufferedWriter(fd_); return RC::SUCCESS; } @@ -38,6 +40,11 @@ Communicator::~Communicator() delete session_; session_ = nullptr; } + + if (writer_ != nullptr) { + delete writer_; + writer_ = nullptr; + } } ///////////////////////////////////////////////////////////////////////////////// @@ -118,8 +125,8 @@ RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect) snprintf(buf, buf_size, "%s > %s\n", strrc(sql_result->return_code()), state_string.c_str()); } - int ret = common::writen(fd_, buf, strlen(buf) + 1); - if (ret != 0) { + RC rc = writer_->writen(buf, strlen(buf) + 1); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); need_disconnect = true; delete[] buf; @@ -128,6 +135,8 @@ RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect) need_disconnect = false; delete[] buf; + + writer_->flush(); return RC::SUCCESS; } @@ -143,16 +152,17 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) const char *response = "Unexpected error: no result"; int len = strlen(response); - int ret = common::writen(fd_, response, len); - if (ret < 0) { - LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno)); + RC rc = writer_->writen(response, len); + if (OB_FAIL(rc)) { + LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); - return RC::IOERR_WRITE; + return rc; } - ret = common::writen(fd_, &message_terminate, sizeof(message_terminate)); - if (ret < 0) { - LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno)); - return RC::IOERR_WRITE; + + rc = writer_->writen(&message_terminate, sizeof(message_terminate)); + if (OB_FAIL(rc)) { + LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); + return rc; } need_disconnect = false; @@ -179,30 +189,30 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) if (nullptr != alias || alias[0] != 0) { if (0 != i) { const char *delim = " | "; - int ret = common::writen(fd_, delim, strlen(delim)); - if (ret != 0) { + rc = writer_->writen(delim, strlen(delim)); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - return RC::IOERR_WRITE; + return rc; } } int len = strlen(alias); - int ret = common::writen(fd_, alias, len); - if (ret != 0) { + rc = writer_->writen(alias, len); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); sql_result->close(); - return RC::IOERR_WRITE; + return rc; } } } if (cell_num > 0) { char newline = '\n'; - int ret = common::writen(fd_, &newline, 1); - if (ret != 0) { + rc = writer_->writen(&newline, 1); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); sql_result->close(); - return RC::IOERR_WRITE; + return rc; } } @@ -215,11 +225,11 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) for (int i = 0; i < cell_num; i++) { if (i != 0) { const char *delim = " | "; - int ret = common::writen(fd_, delim, strlen(delim)); - if (ret != 0) { + rc = writer_->writen(delim, strlen(delim)); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); sql_result->close(); - return RC::IOERR_WRITE; + return rc; } } @@ -233,20 +243,20 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) std::stringstream ss; cell.to_string(ss); std::string cell_str = ss.str(); - int ret = common::writen(fd_, cell_str.data(), cell_str.size()); - if (ret != RC::SUCCESS) { + rc = writer_->writen(cell_str.data(), cell_str.size()); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); sql_result->close(); - return RC::IOERR_WRITE; + return rc; } } char newline = '\n'; - int ret = common::writen(fd_, &newline, 1); - if (ret != 0) { + rc = writer_->writen(&newline, 1); + if (OB_FAIL(rc)) { LOG_WARN("failed to send data to client. err=%s", strerror(errno)); sql_result->close(); - return RC::IOERR_WRITE; + return rc; } } @@ -266,11 +276,11 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) return write_state(event, need_disconnect); } else { - int ret = common::writen(fd_, &message_terminate, sizeof(message_terminate)); - if (ret < 0) { - LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno)); + rc = writer_->writen(&message_terminate, sizeof(message_terminate)); + if (OB_FAIL(rc)) { + LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); sql_result->close(); - return RC::IOERR_WRITE; + return rc; } need_disconnect = false; @@ -280,6 +290,8 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) if (OB_SUCC(rc)) { rc = rc_close; } + + writer_->flush(); // TODO handle error return rc; } diff --git a/src/observer/net/communicator.h b/src/observer/net/communicator.h index 774a424..9752373 100644 --- a/src/observer/net/communicator.h +++ b/src/observer/net/communicator.h @@ -21,32 +21,41 @@ See the Mulan PSL v2 for more details. */ struct ConnectionContext; class SessionEvent; class Session; +class BufferedWriter; /** - * 负责与客户端通讯 + * @defgroup Communicator + * @brief 负责处理与客户端的通讯 + * @details 当前有两种通讯协议,一种是普通的文本协议,以'\0'作为结尾,一种是mysql协议。 + */ + +/** + * @brief 负责与客户端通讯 + * @ingroup Communicator * - * 在listener接收到一个新的连接(参考 server.cpp::accept), 就创建一个Communicator对象。 + * @details 在listener接收到一个新的连接(参考 server.cpp::accept), 就创建一个Communicator对象。 * 并调用init进行初始化。 * 在server中监听到某个连接有新的消息,就通过Communicator::read_event接收消息。 */ -class Communicator { +class Communicator +{ public: virtual ~Communicator(); /** - * 接收到一个新的连接时,进行初始化 + * @brief 接收到一个新的连接时,进行初始化 */ virtual RC init(int fd, Session *session, const std::string &addr); /** - * 监听到有新的数据到达,调用此函数进行接收消息 + * @brief 监听到有新的数据到达,调用此函数进行接收消息 * 如果需要创建新的任务来处理,那么就创建一个SessionEvent 对象并通过event参数返回。 */ virtual RC read_event(SessionEvent *&event) = 0; /** - * 在任务处理完成后,通过此接口将结果返回给客户端 + * @brief 在任务处理完成后,通过此接口将结果返回给客户端 * @param event 任务数据,包括处理的结果 * @param need_disconnect 是否需要断开连接 * @return 处理结果。即使返回不是SUCCESS,也不能直接断开连接,需要通过need_disconnect来判断 @@ -55,7 +64,7 @@ public: virtual RC write_result(SessionEvent *event, bool &need_disconnect) = 0; /** - * 关联的会话信息 + * @brief 关联的会话信息 */ Session *session() const { @@ -63,7 +72,7 @@ public: } /** - * libevent使用的数据,参考server.cpp + * @brief libevent使用的数据,参考server.cpp */ struct event &read_event() { @@ -71,7 +80,7 @@ public: } /** - * 对端地址 + * @brief 对端地址 * 如果是unix socket,可能没有意义 */ const char *addr() const @@ -81,16 +90,19 @@ public: protected: Session *session_ = nullptr; - int fd_ = -1; struct event read_event_; std::string addr_; + BufferedWriter *writer_ = nullptr; + int fd_ = -1; }; /** - * 与客户端进行通讯 - * 使用简单的文本通讯协议,每个消息使用'\0'结尾 + * @brief 与客户端进行通讯 + * @ingroup Communicator + * @details 使用简单的文本通讯协议,每个消息使用'\0'结尾 */ -class PlainCommunicator : public Communicator { +class PlainCommunicator : public Communicator +{ public: RC read_event(SessionEvent *&event) override; RC write_result(SessionEvent *event, bool &need_disconnect) override; @@ -100,14 +112,21 @@ private: }; /** - * 当前支持的通讯协议 + * @brief 当前支持的通讯协议 + * @ingroup Communicator */ -enum class CommunicateProtocol { +enum class CommunicateProtocol +{ PLAIN, //! 以'\0'结尾的协议 MYSQL, //! mysql通讯协议。具体实现参考 MysqlCommunicator }; -class CommunicatorFactory { +/** + * @brief 通讯协议工厂 + * @ingroup Communicator + */ +class CommunicatorFactory +{ public: Communicator *create(CommunicateProtocol protocol); }; diff --git a/src/observer/net/mysql_communicator.cpp b/src/observer/net/mysql_communicator.cpp index e5c7ec2..b24b41f 100644 --- a/src/observer/net/mysql_communicator.cpp +++ b/src/observer/net/mysql_communicator.cpp @@ -18,9 +18,15 @@ See the Mulan PSL v2 for more details. */ #include "common/log/log.h" #include "common/io/io.h" #include "net/mysql_communicator.h" +#include "net/buffered_writer.h" #include "event/session_event.h" #include "sql/operator/string_list_physical_operator.h" +/** + * @brief MySQL协议相关实现 + * @defgroup MySQLProtocol + */ + // https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__capabilities__flags.html // the flags below are negotiate by handshake packet const uint32_t CLIENT_PROTOCOL_41 = 512; @@ -42,15 +48,24 @@ const uint32_t CLIENT_OPTIONAL_RESULTSET_METADATA = // const uint32_t NUM_FLAG = 32768; // Field is num (for clients) // const uint32_t PART_KEY_FLAG = 16384; // Intern; Part of some key. -enum ResultSetMetaData { +/** + * @brief Resultset metadata + * @details 这些枚举值都是从MySQL的协议中抄过来的 + * @ingroup MySQLProtocol + */ +enum ResultSetMetaData +{ RESULTSET_METADATA_NONE = 0, RESULTSET_METADATA_FULL = 1, }; /** - Column types for MySQL -*/ -enum enum_field_types { + * @brief Column types for MySQL + * @details 枚举值类型是从MySQL的协议中抄过来的 + * @ingroup MySQLProtocol + */ +enum enum_field_types +{ MYSQL_TYPE_DECIMAL, MYSQL_TYPE_TINY, MYSQL_TYPE_SHORT, @@ -87,44 +102,111 @@ enum enum_field_types { MYSQL_TYPE_GEOMETRY = 255 }; -// little endian -// We suppose our platform is little endian too +/** + * @brief 根据MySQL协议的描述实现的数据写入函数 + * @defgroup MySQLProtocolStore + * @note 当前仅考虑小端模式,所以当前的代码仅能运行在小端模式的机器上,比如Intel。 + */ + +/** + * @brief 将数据写入到缓存中 + * + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_int1(char *buf, int8_t value) { *buf = value; return 1; } +/** + * @brief 将数据写入到缓存中 + * + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_int2(char *buf, int16_t value) { memcpy(buf, &value, sizeof(value)); return 2; } +/** + * @brief 将数据写入到缓存中 + * + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_int3(char *buf, int32_t value) { memcpy(buf, &value, 3); return 3; } +/** + * @brief 将数据写入到缓存中 + * + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_int4(char *buf, int32_t value) { memcpy(buf, &value, 4); return 4; } +/** + * @brief 将数据写入到缓存中 + * + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_int6(char *buf, int64_t value) { memcpy(buf, &value, 6); return 6; } +/** + * @brief 将数据写入到缓存中 + * + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_int8(char *buf, int64_t value) { memcpy(buf, &value, 8); return 8; } +/** + * @brief 将数据写入到缓存中 + * @details 按照MySQL协议的描述,这是一个变长编码的整数,最大可以编码8个字节的整数。不同大小的数字,第一个字节的值不同。 + * @param buf 数据缓存 + * @param value 要写入的值 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_lenenc_int(char *buf, uint64_t value) { if (value < 251) { @@ -149,6 +231,15 @@ int store_lenenc_int(char *buf, uint64_t value) return 9; } +/** + * @brief 将以'\0'结尾的字符串写入到缓存中 + * + * @param buf 数据缓存 + * @param s 要写入的字符串 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_null_terminated_string(char *buf, const char *s) { if (nullptr == s || s[0] == 0) { @@ -160,6 +251,16 @@ int store_null_terminated_string(char *buf, const char *s) return len; } +/** + * @brief 将指定长度的字符串写入到缓存中 + * + * @param buf 数据缓存 + * @param s 要写入的字符串 + * @param len 字符串的长度 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_fix_length_string(char *buf, const char *s, int len) { if (len == 0) { @@ -170,6 +271,15 @@ int store_fix_length_string(char *buf, const char *s, int len) return len; } +/** + * @brief 按照带有长度标识的字符串写入到缓存,长度标识以变长整数编码 + * + * @param buf 数据缓存 + * @param s 要写入的字符串 + * @return int 写入的字节数 + * @ingroup MySQLProtocolStore + * @ingroup MySQLProtocol + */ int store_lenenc_string(char *buf, const char *s) { int len = strlen(s); @@ -179,16 +289,24 @@ int store_lenenc_string(char *buf, const char *s) } /** - * 每个包都有一个包头 + * @brief 每个包都有一个包头 * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_packets.html * https://mariadb.com/kb/en/0-packet/ + * @ingroup MySQLProtocol */ -struct PacketHeader { +struct PacketHeader +{ int32_t payload_length : 24; //! 当前packet的除掉头的长度 int8_t sequence_id = 0; //! 当前packet在当前处理过程中是第几个包 }; -class BasePacket { +/** + * @brief 所有的包都继承自BasePacket + * @details 所有的包都有一个包头,所以BasePacket中包含了一个PacketHeader + * @ingroup MySQLProtocol + */ +class BasePacket +{ public: PacketHeader packet_header; @@ -198,16 +316,25 @@ public: } virtual ~BasePacket() = default; + + /** + * @brief 将当前包编码成网络包 + * + * @param capabilities MySQL协议中的capability标志 + * @param net_packet 编码后的网络包 + */ virtual RC encode(uint32_t capabilities, std::vector &net_packet) const = 0; }; /** - * 握手包 - * 先由服务端发送到客户端 - * 这个包会交互capability与用户名密码 + * @brief 握手包 + * @ingroup MySQLProtocol + * @details 先由服务端发送到客户端。 + * 这个包会交互capability与用户名密码。 * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase_packets_protocol_handshake_v10.html */ -struct HandshakeV10 : public BasePacket { +struct HandshakeV10 : public BasePacket +{ int8_t protocol = 10; char server_version[7] = "5.7.25"; int32_t thread_id = 21501807; // conn id @@ -259,7 +386,12 @@ struct HandshakeV10 : public BasePacket { } }; -struct OkPacket : public BasePacket { +/** + * @brief 响应包,在很多场景中都会使用 + * @ingroup MySQLProtocol + */ +struct OkPacket : public BasePacket +{ int8_t header = 0; // 0x00 for ok and 0xFE for EOF int32_t affected_rows = 0; int32_t last_insert_id = 0; @@ -307,7 +439,13 @@ struct OkPacket : public BasePacket { } }; -struct EofPacket : public BasePacket { +/** + * @brief EOF包 + * @ingroup MySQLProtocol + * @details [basic_err_packet](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_err_packet.html) + */ +struct EofPacket : public BasePacket +{ int8_t header = 0xFE; int16_t warnings = 0; int16_t status_flags = 0x22; @@ -316,9 +454,6 @@ struct EofPacket : public BasePacket { {} virtual ~EofPacket() = default; - /** - * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_err_packet.html - */ virtual RC encode(uint32_t capabilities, std::vector &net_packet) const override { net_packet.resize(10); @@ -346,7 +481,13 @@ struct EofPacket : public BasePacket { } }; -struct ErrPacket : public BasePacket { +/** + * @brief ERR包,出现错误时返回 + * @details [eof_packet](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_eof_packet.html) + * @ingroup MySQLProtocol + */ +struct ErrPacket : public BasePacket +{ int8_t header = 0xFF; int16_t error_code = 0; char sql_state_marker[1] = {'#'}; @@ -357,9 +498,6 @@ struct ErrPacket : public BasePacket { {} virtual ~ErrPacket() = default; - /** - * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_eof_packet.html - */ virtual RC encode(uint32_t capabilities, std::vector &net_packet) const override { net_packet.resize(1000); @@ -388,18 +526,23 @@ struct ErrPacket : public BasePacket { } }; -// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_command_phase.html -// https://mariadb.com/kb/en/2-text-protocol/ -struct QueryPacket { +/** + * @brief MySQL客户端发过来的请求包 + * @ingroup MySQLProtocol + * @details [MySQL Protocol Command Phase](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_command_phase.html) + * [MariaDB Text Protocol](https://mariadb.com/kb/en/2-text-protocol/) + */ +struct QueryPacket +{ PacketHeader packet_header; int8_t command; // 0x03: COM_QUERY std::string query; // the text of the SQL query to execute }; /** - * decode query packet - * packet_header is not included in net_packet - * https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query.html + * @brief decode query packet + * @details packet_header is not included in net_packet + * [MySQL Protocol COM_QUERY](https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query.html) */ RC decode_query_packet(std::vector &net_packet, QueryPacket &query_packet) { @@ -409,6 +552,11 @@ RC decode_query_packet(std::vector &net_packet, QueryPacket &query_packet) return RC::SUCCESS; } +/** + * @brief MySQL客户端连接时会发起一个"select @@version_comment"的查询,这里对这个查询进行特殊处理 + * @param[out] sql_result 生成的结果 + * @ingroup MySQLProtocol + */ RC create_version_comment_sql_result(SqlResult *sql_result) { TupleSchema tuple_schema; @@ -426,6 +574,13 @@ RC create_version_comment_sql_result(SqlResult *sql_result) return RC::SUCCESS; } +/** + * @brief MySQL链接做初始化,需要进行握手和一些预处理 + * @ingroup MySQLProtocol + * @param fd 连接描述符 + * @param session 当前的会话 + * @param addr 对端地址 + */ RC MysqlCommunicator::init(int fd, Session *session, const std::string &addr) { // https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_connection_phase.html @@ -443,9 +598,16 @@ RC MysqlCommunicator::init(int fd, Session *session, const std::string &addr) return rc; } + writer_->flush(); + return rc; } +/** + * @brief MySQL客户端连接时会发起一个"select @@version_comment"的查询,这里对这个查询进行特殊处理 + * + * @param[out] need_disconnect 连接上如果出现异常,通过这个标识来判断是否需要断开连接 + */ RC MysqlCommunicator::handle_version_comment(bool &need_disconnect) { SessionEvent session_event(this); @@ -459,9 +621,16 @@ RC MysqlCommunicator::handle_version_comment(bool &need_disconnect) return rc; } +/** + * @brief 读取客户端发过来的请求 + * + * @param[out] event 如果有新的请求,就会生成一个SessionEvent + */ RC MysqlCommunicator::read_event(SessionEvent *&event) { RC rc = RC::SUCCESS; + + /// 读取一个完整的数据包 PacketHeader packet_header; int ret = common::readn(fd_, &packet_header, sizeof(packet_header)); if (ret != 0) { @@ -485,6 +654,7 @@ RC MysqlCommunicator::read_event(SessionEvent *&event) event = nullptr; if (!authed_) { + /// 还没有做过认证,就先需要完成握手阶段 uint32_t client_flag = *(uint32_t *)buf.data(); // TODO should use decode (little endian as default) LOG_INFO("client handshake response with capabilities flag=%d", client_flag); client_capabilities_flag_ = client_flag; @@ -495,6 +665,7 @@ RC MysqlCommunicator::read_event(SessionEvent *&event) if (rc != RC::SUCCESS) { LOG_WARN("failed to send ok packet while auth"); } + writer_->flush(); authed_ = true; LOG_INFO("client authed. addr=%s. rc=%s", addr_.c_str(), strrc(rc)); return rc; @@ -503,7 +674,8 @@ RC MysqlCommunicator::read_event(SessionEvent *&event) int8_t command_type = buf[0]; LOG_TRACE("recv command from client =%d", command_type); - if (command_type == 0x03) { // COM_QUERY + /// 已经做过握手,接收普通的消息包 + if (command_type == 0x03) { // COM_QUERY,这是一个普通的文本请求 QueryPacket query_packet; rc = decode_query_packet(buf, query_packet); if (rc != RC::SUCCESS) { @@ -520,12 +692,14 @@ RC MysqlCommunicator::read_event(SessionEvent *&event) event = new SessionEvent(this); event->set_query(query_packet.query); } else { + /// 其它的非文本请求,暂时不支持 OkPacket ok_packet(sequence_id_); rc = send_packet(ok_packet); if (rc != RC::SUCCESS) { LOG_WARN("failed to send ok packet. command=%d, addr=%s, error=%s", command_type, addr(), strrc(rc)); return rc; } + writer_->flush(); } return rc; } @@ -566,6 +740,7 @@ RC MysqlCommunicator::write_state(SessionEvent *event, bool &need_disconnect) } delete[] buf; + writer_->flush(); return rc; } @@ -622,6 +797,7 @@ RC MysqlCommunicator::write_result(SessionEvent *event, bool &need_disconnect) if (rc == RC::SUCCESS) { rc = close_rc; } + writer_->flush(); return rc; } @@ -634,10 +810,10 @@ RC MysqlCommunicator::send_packet(const BasePacket &packet) return rc; } - int ret = common::writen(fd_, net_packet.data(), net_packet.size()); - if (ret != 0) { + rc = writer_->writen(net_packet.data(), net_packet.size()); + if (OB_FAIL(rc)) { LOG_WARN("failed to send packet to client. addr=%s, error=%s", addr(), strerror(errno)); - return RC::IOERR_WRITE; + return rc; } LOG_TRACE("send ok packet success. packet length=%d", net_packet.size()); @@ -685,11 +861,11 @@ RC MysqlCommunicator::send_column_definition(SqlResult *sql_result, bool &need_d store_int3(buf, payload_length); net_packet.resize(pos); - int ret = common::writen(fd_, net_packet.data(), net_packet.size()); - if (ret != 0) { + rc = writer_->writen(net_packet.data(), net_packet.size()); + if (OB_FAIL(rc)) { LOG_WARN("failed to send column count to client. addr=%s, error=%s", addr(), strerror(errno)); need_disconnect = true; - return RC::IOERR_WRITE; + return rc; } for (int i = 0; i < cell_num; i++) { @@ -740,11 +916,11 @@ RC MysqlCommunicator::send_column_definition(SqlResult *sql_result, bool &need_d store_int3(buf, payload_length); net_packet.resize(pos); - ret = common::writen(fd_, net_packet.data(), net_packet.size()); - if (ret != 0) { + rc = writer_->writen(net_packet.data(), net_packet.size()); + if (OB_FAIL(rc)) { LOG_WARN("failed to write column definition to client. addr=%s, error=%s", addr(), strerror(errno)); need_disconnect = true; - return RC::IOERR_WRITE; + return rc; } } @@ -814,11 +990,11 @@ RC MysqlCommunicator::send_result_rows(SqlResult *sql_result, bool no_column_def int payload_length = pos - 4; store_int3(buf, payload_length); - int ret = common::writen(fd_, buf, pos); - if (ret != 0) { + rc = writer_->writen(buf, pos); + if (OB_FAIL(rc)) { LOG_WARN("failed to send row packet to client. addr=%s, error=%s", addr(), strerror(errno)); need_disconnect = true; - return RC::IOERR_WRITE; + return rc; } } diff --git a/src/observer/net/mysql_communicator.h b/src/observer/net/mysql_communicator.h index 59d0c47..3bdf026 100644 --- a/src/observer/net/mysql_communicator.h +++ b/src/observer/net/mysql_communicator.h @@ -20,45 +20,72 @@ class SqlResult; class BasePacket; /** - * 与客户端通讯 - * 实现MySQL通讯协议 - * 可以参考 https://dev.mysql.com/doc/dev/mysql-server/latest/PAGE_PROTOCOL.html - * 或 mariadb文档 https://mariadb.com/kb/en/clientserver-protocol/ + * @brief 与客户端通讯 + * @ingroup Communicator + * @ingroup MySQLProtol + * @details 实现MySQL通讯协议 + * 可以参考 [MySQL Page Protocol](https://dev.mysql.com/doc/dev/mysql-server/latest/PAGE_PROTOCOL.html) + * 或 [MariaDB Protocol](https://mariadb.com/kb/en/clientserver-protocol/) */ -class MysqlCommunicator : public Communicator { +class MysqlCommunicator : public Communicator +{ public: /** - * 连接刚开始建立时,进行一些初始化 - * 参考MySQL或MariaDB的手册,服务端要首先向客户端发送一个握手包,等客户端回复后, + * @brief 连接刚开始建立时,进行一些初始化 + * @details 参考MySQL或MariaDB的手册,服务端要首先向客户端发送一个握手包,等客户端回复后, * 再回复一个OkPacket或ErrPacket */ virtual RC init(int fd, Session *session, const std::string &addr) override; /** - * 有新的消息到达时,接收消息 - * 因为MySQL协议的特殊性,收到数据后不一定需要向后流转,比如握手包 + * @brief 有新的消息到达时,接收消息 + * @details 因为MySQL协议的特殊性,收到数据后不一定需要向后流转,比如握手包 */ virtual RC read_event(SessionEvent *&event) override; /** - * 将处理结果返回给客户端 + * @brief 将处理结果返回给客户端 + * @param[in] event 任务数据,包括处理的结果 + * @param[out] need_disconnect 是否需要断开连接 */ virtual RC write_result(SessionEvent *event, bool &need_disconnect) override; private: + /** + * @brief 发送数据包到客户端 + * + * @param[in] packet 要发送的数据包 + */ RC send_packet(const BasePacket &packet); + + /** + * @brief 有些情况下不需要给客户端返回一行行的记录结果,而是返回执行是否成功即可,比如create table等 + * + * @param[in] event 处理的结果 + * @param[out] need_disconnect 是否需要断开连接 + */ RC write_state(SessionEvent *event, bool &need_disconnect); /** - * 根据MySQL text protocol 描述,普通的结果分为列信息描述和行数据 + * @brief 返回客户端列描述信息 + * @details 根据MySQL text protocol 描述,普通的结果分为列信息描述和行数据。 * 这里就分为两个函数 */ RC send_column_definition(SqlResult *sql_result, bool &need_disconnect); + + /** + * @brief 返回客户端行数据 + * + * @param[in] sql_result 返回的结果 + * @param no_column_def 是否没有列描述信息 + * @param[out] need_disconnect 是否需要断开连接 + * @return RC + */ RC send_result_rows(SqlResult *sql_result, bool no_column_def, bool &need_disconnect); /** - * 根据实际测试,客户端在连接上来时,会发起一个 version_comment的查询 - * 这里就针对这个查询返回一个结果 + * @brief 根据实际测试,客户端在连接上来时,会发起一个 version_comment的查询 + * @details 这里就针对这个查询返回一个结果 */ RC handle_version_comment(bool &need_disconnect); diff --git a/src/observer/net/ring_buffer.cpp b/src/observer/net/ring_buffer.cpp new file mode 100644 index 0000000..28c16cb --- /dev/null +++ b/src/observer/net/ring_buffer.cpp @@ -0,0 +1,114 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/16. +// + +#include + +#include "net/ring_buffer.h" +#include "common/log/log.h" + +using namespace std; + +const int32_t DEFAULT_BUFFER_SIZE = 16 * 1024; + +RingBuffer::RingBuffer() + : RingBuffer(DEFAULT_BUFFER_SIZE) +{} + +RingBuffer::RingBuffer(int32_t size) + : buffer_(size) +{} + +RingBuffer::~RingBuffer() +{} + +RC RingBuffer::read(char *buf, int32_t size, int32_t &read_size) +{ + if (size < 0) { + return RC::INVALID_ARGUMENT; + } + + RC rc = RC::SUCCESS; + read_size = 0; + while (OB_SUCC(rc) && read_size < size && this->size() > 0) { + const char *tmp_buf = nullptr; + int32_t tmp_size = 0; + rc = buffer(tmp_buf, tmp_size); + if (OB_SUCC(rc)) { + int32_t copy_size = min(size - read_size, tmp_size); + memcpy(buf + read_size, tmp_buf, copy_size); + read_size += copy_size; + + rc = forward(copy_size); + } + } + + return rc; +} + +RC RingBuffer::buffer(const char *&buf, int32_t &read_size) +{ + const int32_t size = this->size(); + if (size == 0) { + buf = buffer_.data(); + read_size = 0; + return RC::SUCCESS; + } + + const int32_t read_pos = this->read_pos(); + if (read_pos < write_pos_) { + read_size = write_pos_ - read_pos; + } else { + read_size = capacity() - read_pos; + } + buf = buffer_.data() + read_pos; + return RC::SUCCESS; +} + +RC RingBuffer::forward(int32_t size) +{ + if (size <= 0) { + return RC::INVALID_ARGUMENT; + } + + if (size > this->size()) { + LOG_DEBUG("forward size is too large.size=%d, size=%d", size, this->size()); + return RC::INVALID_ARGUMENT; + } + + data_size_ -= size; + return RC::SUCCESS; +} + +RC RingBuffer::write(const char *data, int32_t size, int32_t &write_size) +{ + if (size < 0) { + return RC::INVALID_ARGUMENT; + } + + RC rc = RC::SUCCESS; + write_size = 0; + while (OB_SUCC(rc) && write_size < size && this->remain() > 0) { + + const int32_t read_pos = this->read_pos(); + const int32_t tmp_buf_size = (read_pos <= write_pos_) ? (capacity() - write_pos_) : (read_pos - write_pos_); + + const int32_t copy_size = min(size - write_size, tmp_buf_size); + memcpy(buffer_.data() + write_pos_, data + write_size, copy_size); + write_size += copy_size; + write_pos_ = (write_pos_ + copy_size) % capacity(); + data_size_ += copy_size; + } + + return rc; +} diff --git a/src/observer/net/ring_buffer.h b/src/observer/net/ring_buffer.h new file mode 100644 index 0000000..d25b3fe --- /dev/null +++ b/src/observer/net/ring_buffer.h @@ -0,0 +1,94 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/16. +// + +#pragma once + +#include + +#include "common/rc.h" + +/** + * @brief 环形缓存,当前用于通讯写入数据时的缓存 + * @ingroup Communicator + */ +class RingBuffer +{ +public: + /** + * @brief 使用默认缓存大小的构造函数,默认大小16K + */ + RingBuffer(); + + /** + * @brief 指定初始化大小的构造函数 + * + */ + explicit RingBuffer(int32_t size); + + virtual ~RingBuffer(); + + /** + * @brief 从缓存中读取数据 + * @param buf 读取数据的缓存 + * @param size 读取数据的大小 + * @param read_size 实际读取的数据大小 + */ + RC read(char *buf, int32_t size, int32_t &read_size); + + /** + * @brief 从缓存中读取数据,不会移动读指针 + * @details 读取数据时直接返回缓存中的指针,不会移动读指针。读取完成后执行forward函数移动读指针。 + * @param buf 读取的数据 + * @param read_size 数据大小 + */ + RC buffer(const char *&buf, int32_t &read_size); + + /** + * @brief 将读指针向前移动size个字节 + * @details 通常在buffer函数读取数据后,调用forward函数移动读指针 + * @param size 移动的字节数 + */ + RC forward(int32_t size); + + /** + * @brief 将数据写入缓存 + * @param buf 待写入的数据 + * @param size 待写入的数据大小 + * @param write_size 实际写入的数据大小 + */ + RC write(const char *buf, int32_t size, int32_t &write_size); + + /** + * @brief 缓存的总容量 + */ + int32_t capacity() const { return static_cast(buffer_.size()); } + + /** + * @brief 缓存中剩余的可写入数据的空间 + */ + int32_t remain() const { return capacity() - size(); } + + /** + * @brief 缓存中已经写入数据的空间大小 + */ + int32_t size() const { return data_size_; } + +private: + int32_t read_pos() const { return (write_pos_ - this->size() + capacity()) % capacity(); } + +private: + std::vector buffer_; ///< 缓存使用的内存,使用vector方便管理 + int32_t data_size_ = 0; ///< 已经写入的数据量 + int32_t write_pos_ = 0; ///< 当前写指针的位置,范围不会超出[0, capacity) +}; \ No newline at end of file diff --git a/src/observer/storage/index/bplus_tree.cpp b/src/observer/storage/index/bplus_tree.cpp index 4bc5f6c..9070cae 100644 --- a/src/observer/storage/index/bplus_tree.cpp +++ b/src/observer/storage/index/bplus_tree.cpp @@ -1042,7 +1042,7 @@ bool BplusTreeHandler::validate_tree() RC rc = latch_memo.get_page(file_header_.root_page, frame); // 这里仅仅调试使用,不加root锁 if (rc != RC::SUCCESS) { LOG_WARN("failed to fetch root page. page id=%d, rc=%d:%s", file_header_.root_page, rc, strrc(rc)); - return rc; + return false; } if (!validate_node_recursive(latch_memo, frame) || !validate_leaf_link(latch_memo)) { diff --git a/src/observer/storage/trx/trx.cpp b/src/observer/storage/trx/trx.cpp index de87f07..05f69b6 100644 --- a/src/observer/storage/trx/trx.cpp +++ b/src/observer/storage/trx/trx.cpp @@ -19,6 +19,7 @@ See the Mulan PSL v2 for more details. */ #include "storage/record/record_manager.h" #include "storage/field/field_meta.h" #include "common/log/log.h" +#include "common/lang/string.h" #include "storage/field/field.h" #include "storage/trx/mvcc_trx.h" #include "storage/trx/vacuous_trx.h" @@ -27,11 +28,16 @@ static TrxKit *global_trxkit = nullptr; TrxKit *TrxKit::create(const char *name) { + if (common::is_blank(name) || 0 == strcasecmp(name, "vacuous")) { + return new VacuousTrxKit(); + } + if (0 == strcasecmp(name, "mvcc")) { return new MvccTrxKit(); } - return new VacuousTrxKit(); + LOG_ERROR("unknown trx kit name. name=%s", name); + return nullptr; } RC TrxKit::init_global(const char *name) diff --git a/unittest/ring_buffer_test.cpp b/unittest/ring_buffer_test.cpp new file mode 100644 index 0000000..332759b --- /dev/null +++ b/unittest/ring_buffer_test.cpp @@ -0,0 +1,127 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/16. +// + +#include "gtest/gtest.h" + +#include "net/ring_buffer.h" + +TEST(ring_buffer, test_init) +{ + const int buf_size = 10; + RingBuffer buffer(buf_size); + EXPECT_EQ(buffer.capacity(), buf_size); + EXPECT_EQ(buffer.remain(), buf_size); + EXPECT_EQ(buffer.size(), 0); + EXPECT_EQ(buffer.remain(), 10); +} + +TEST(ring_buffer, test_write) +{ + const int buf_size = 25; + RingBuffer buffer(buf_size); + + const char *data = "0123456789"; + int32_t size = strlen(data); + int32_t write_size = 0; + EXPECT_EQ(buffer.write(data, size, write_size), RC::SUCCESS); + EXPECT_EQ(write_size, size); + EXPECT_EQ(buffer.size(), size); + EXPECT_EQ(buffer.remain(), buf_size - size); + + EXPECT_EQ(buffer.write(data, size, write_size), RC::SUCCESS); + EXPECT_EQ(write_size, size); + EXPECT_EQ(buffer.size(), size * 2); + EXPECT_EQ(buffer.remain(), buf_size - size * 2); + + EXPECT_EQ(buffer.write(data, size, write_size), RC::SUCCESS); + EXPECT_EQ(write_size, 5); + EXPECT_EQ(buffer.size(), buffer.capacity()); + EXPECT_EQ(buffer.remain(), 0); +} + +TEST(ring_buffer, test_read) +{ + const int buf_size = 100; + RingBuffer buffer(buf_size); + + const char *data = "0123456789"; + int32_t size = strlen(data); + int32_t write_size = 0; + EXPECT_EQ(buffer.write(data, size, write_size), RC::SUCCESS); + EXPECT_EQ(write_size, size); + EXPECT_EQ(buffer.size(), size); + EXPECT_EQ(buffer.remain(), buf_size - size); + + char read_buf[buf_size]; + int32_t read_size = 0; + const int32_t test_read_size = 5; + EXPECT_EQ(buffer.read(read_buf, test_read_size, read_size), RC::SUCCESS); + EXPECT_EQ(read_size, test_read_size); + EXPECT_EQ(buffer.size(), test_read_size); + EXPECT_EQ(buffer.remain(), buf_size - test_read_size); + + EXPECT_EQ(buffer.read(read_buf, test_read_size, read_size), RC::SUCCESS); + EXPECT_EQ(read_size, test_read_size); + EXPECT_EQ(buffer.size(), 0); + EXPECT_EQ(buffer.remain(), buf_size); + + EXPECT_EQ(buffer.read(read_buf, test_read_size, read_size), RC::SUCCESS); + EXPECT_EQ(read_size, 0); + EXPECT_EQ(buffer.size(), 0); + EXPECT_EQ(buffer.remain(), buf_size); +} + +TEST(ring_buffer, test_buffer) +{ + const int buf_size = 15; + RingBuffer buffer(buf_size); + + const char *data = "0123456789"; + int32_t size = strlen(data); + int32_t write_size = 0; + EXPECT_EQ(buffer.write(data, size, write_size), RC::SUCCESS); + EXPECT_EQ(write_size, size); + EXPECT_EQ(buffer.size(), size); + EXPECT_EQ(buffer.remain(), buf_size - size); + + const char *tmp_buffer = nullptr; + int32_t buffer_size = 0; + EXPECT_EQ(buffer.buffer(tmp_buffer, buffer_size), RC::SUCCESS); + EXPECT_EQ(buffer_size, size); + EXPECT_EQ(buffer.forward(buffer_size), RC::SUCCESS); + + EXPECT_EQ(buffer.buffer(tmp_buffer, buffer_size), RC::SUCCESS); + EXPECT_EQ(buffer_size, 0); + + EXPECT_EQ(buffer.write(data, size, write_size), RC::SUCCESS); + EXPECT_EQ(write_size, size); + + EXPECT_EQ(buffer.buffer(tmp_buffer, buffer_size), RC::SUCCESS); + EXPECT_LT(buffer_size, size); + EXPECT_EQ(buffer.forward(buffer_size), RC::SUCCESS); + + EXPECT_EQ(buffer.buffer(tmp_buffer, buffer_size), RC::SUCCESS); + EXPECT_LT(buffer_size, size); + EXPECT_EQ(buffer.forward(buffer_size), RC::SUCCESS); +} + +int main(int argc, char **argv) +{ + // 分析gtest程序的命令行参数 + testing::InitGoogleTest(&argc, argv); + + // 调用RUN_ALL_TESTS()运行所有测试用例 + // main函数返回RUN_ALL_TESTS()的运行结果 + return RUN_ALL_TESTS(); +} \ No newline at end of file -- GitLab