未验证 提交 39042c92 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

Network buffer (#198)

### What problem were solved in this pull request?

Issue Number: close #151 

Problem:
当前在发送消息到客户端时,每个数据都会刷新socket buffer,效率很低

### What is changed and how it works?
实现BufferedWriter,通过buffered
writer将消息缓存在内存中,结果写完时或者buffer满时,才将结果真正的发送到客户端。

### Other information
上级 fe061d05
# OceanBase大赛miniob代码架构框架设计和说明
# miniob代码架构框架设计和说明
# miniob代码结构说明
......@@ -123,9 +123,6 @@ miniob采用TCP通信,纯文本模式,使用'\0'作为每个消息的终结
注意:测试程序也使用这种方法,***请不要修改协议,后台测试程序依赖这个协议***
注意:返回的普通数据结果中不要包含'\0',也不支持转义处理。
### 参赛/训练营建议
在做miniob的题目时,不要做一个题目再看下一个题目,团队中多个同学分别做自己的题目时,也不要一直单独作战,因为完成课题时,需要修改的模块会有非常多的重叠,因此建议团队尽量统筹规划,避免代码冲突以及“越走越难”。
# 参考
- 《数据库系统实现》
- 《数据库系统概念》
......@@ -139,16 +136,6 @@ miniob采用TCP通信,纯文本模式,使用'\0'作为每个消息的终结
- [OceanBase开源网站](https://github.com/oceanbase/oceanbase)
# 附录-编译安装测试
## 编译环境
miniob使用cmake管理,要求cmake版本至少3.10,编译的C++标准是C++20,所以使用的编译器需要支持C++20。
编译器推荐使用gcc或clang,使用Windows操作系统的同学,建议使用Linux虚拟机或docker编译,程序会最终在Linux操作系统上测试。
使用MacOS的同学,注意默认编译器是clang,即使使用命令gcc,实际编译器可能也是clang。
miniob 的开发环境依赖的组件比较多,搭建开发环境可能需要比较多的时间,同学们可以直接使用Docker来开发,具体参考 [MiniOB Docker 开发](https://hub.docker.com/r/oceanbase/miniob)
NOTE:clang编译器有些表现与gcc不一致,官方测试后台使用gcc,如果后续测试出现编译错误,可以更换为gcc测试。
## 编译
参考源码中 [如何构建MiniOB](./how_to_build.md) 文件。
......@@ -189,16 +176,3 @@ obclient -s miniob.sock
其中 -s 使用指定unix socket 文件连接observer,如果启动observer时也指定了unix socket。
![running-the-client](images/miniob-introduction-running-the-client.png)
# FAQ
- 命令没有加分号解析失败,比如 `help`命令
miniob 是功能非常简单的数据库,语法解析也做得非常简单。所有的命令都需要加上分号`;`
- YYYY-m-d是否是正确的日期
比如 2021-1-2 是否正确日期?是正确的。
> 输出的时候要求一定要是 YYYY-mm-dd输出(2021-01-02),但是输入的时候,没有那么严格要求。
- 增加CPP/H 文件是否需要更新CMakeLists.txt
不需要。Cmake文件中使用自动探测的方式。加了新的源代码,再执行一遍 cmake .. 就可以自动探测到新加的源文件。
\ No newline at end of file
......@@ -72,7 +72,7 @@ See the Mulan PSL v2 for more details. */
DEFINE_RC(FILE_WRITE) \
DEFINE_RC(LOGBUF_FULL)
enum RC
enum class RC
{
#define DEFINE_RC(name) name,
DEFINE_RCS
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/06/16.
//
#include <unistd.h>
#include <algorithm>
#include "net/buffered_writer.h"
using namespace std;
BufferedWriter::BufferedWriter(int fd)
: fd_(fd), buffer_()
{}
BufferedWriter::BufferedWriter(int fd, int32_t size)
: fd_(fd), buffer_(size)
{}
BufferedWriter::~BufferedWriter()
{
close();
}
RC BufferedWriter::close()
{
if (fd_ < 0) {
return RC::SUCCESS;
}
RC rc = flush();
if (OB_FAIL(rc)) {
return rc;
}
if (fd_ >= 0) {
::close(fd_);
fd_ = -1;
}
return RC::SUCCESS;
}
RC BufferedWriter::write(const char *data, int32_t size, int32_t &write_size)
{
if (fd_ < 0) {
return RC::INVALID_ARGUMENT;
}
if (buffer_.remain() == 0) {
RC rc = flush_internal(size);
if (OB_FAIL(rc)) {
return rc;
}
}
return buffer_.write(data, size, write_size);
}
RC BufferedWriter::writen(const char *data, int32_t size)
{
if (fd_ < 0) {
return RC::INVALID_ARGUMENT;
}
int32_t write_size = 0;
while (write_size < size) {
int32_t tmp_write_size = 0;
RC rc = write(data + write_size, size - write_size, tmp_write_size);
if (OB_FAIL(rc)) {
return rc;
}
write_size += tmp_write_size;
}
return RC::SUCCESS;
}
RC BufferedWriter::flush()
{
if (fd_ < 0) {
return RC::INVALID_ARGUMENT;
}
RC rc = RC::SUCCESS;
while (OB_SUCC(rc) && buffer_.size() > 0) {
rc = flush_internal(buffer_.size());
}
return rc;
}
RC BufferedWriter::flush_internal(int32_t size)
{
if (fd_ < 0) {
return RC::INVALID_ARGUMENT;
}
RC rc = RC::SUCCESS;
int32_t write_size = 0;
while (OB_SUCC(rc) && buffer_.size() > 0 && size > write_size) {
const char *buf = nullptr;
int32_t read_size = 0;
rc = buffer_.buffer(buf, read_size);
if (OB_FAIL(rc)) {
return rc;
}
ssize_t tmp_write_size = 0;
while (tmp_write_size == 0) {
tmp_write_size = ::write(fd_, buf, read_size);
if (tmp_write_size < 0) {
if (errno == EAGAIN || errno == EINTR) {
tmp_write_size = 0;
continue;
} else {
return RC::IOERR_WRITE;
}
}
}
write_size += tmp_write_size;
buffer_.forward(tmp_write_size);
}
return rc;
}
\ No newline at end of file
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/06/16.
//
#pragma once
#include "net/ring_buffer.h"
/**
* @brief 支持以缓存模式写入数据到文件/socket
* @details 缓存使用ring buffer实现,当缓存满时会自动刷新缓存。
* 看起来直接使用fdopen也可以实现缓存写,不过fdopen会在close时直接关闭fd。
* @note 在执行close时,描述符fd并不会被关闭
*/
class BufferedWriter
{
public:
BufferedWriter(int fd);
BufferedWriter(int fd, int32_t size);
~BufferedWriter();
/**
* @brief 关闭缓存
*/
RC close();
/**
* @brief 写数据到文件/socket
* @details 缓存满会自动刷新缓存
* @param data 要写入的数据
* @param size 要写入的数据大小
* @param write_size 实际写入的数据大小
*/
RC write(const char *data, int32_t size, int32_t &write_size);
/**
* @brief 写数据到文件/socket,全部写入成功返回成功
* @details 与write的区别就是会尝试一直写直到写成成功或者有不可恢复的错误
* @param data 要写入的数据
* @param size 要写入的数据大小
*/
RC writen(const char *data, int32_t size);
/**
* @brief 刷新缓存
* @details 将缓存中的数据全部写入文件/socket
*/
RC flush();
private:
/**
* @brief 刷新缓存
* @details 期望缓存可以刷新size大小的数据,实际刷新的数据量可能小于size也可能大于size。
* 通常是在缓存满的时候,希望刷新掉一部分数据,然后继续写入。
* @param size 期望刷新的数据大小
*/
RC flush_internal(int32_t size);
private:
int fd_ = -1;
RingBuffer buffer_;
};
\ No newline at end of file
......@@ -14,6 +14,7 @@ See the Mulan PSL v2 for more details. */
#include "net/communicator.h"
#include "net/mysql_communicator.h"
#include "net/buffered_writer.h"
#include "sql/expr/tuple.h"
#include "event/session_event.h"
#include "common/lang/mutex.h"
......@@ -25,6 +26,7 @@ RC Communicator::init(int fd, Session *session, const std::string &addr)
fd_ = fd;
session_ = session;
addr_ = addr;
writer_ = new BufferedWriter(fd_);
return RC::SUCCESS;
}
......@@ -38,6 +40,11 @@ Communicator::~Communicator()
delete session_;
session_ = nullptr;
}
if (writer_ != nullptr) {
delete writer_;
writer_ = nullptr;
}
}
/////////////////////////////////////////////////////////////////////////////////
......@@ -118,8 +125,8 @@ RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect)
snprintf(buf, buf_size, "%s > %s\n", strrc(sql_result->return_code()), state_string.c_str());
}
int ret = common::writen(fd_, buf, strlen(buf) + 1);
if (ret != 0) {
RC rc = writer_->writen(buf, strlen(buf) + 1);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
need_disconnect = true;
delete[] buf;
......@@ -128,6 +135,8 @@ RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect)
need_disconnect = false;
delete[] buf;
writer_->flush();
return RC::SUCCESS;
}
......@@ -143,16 +152,17 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
const char *response = "Unexpected error: no result";
int len = strlen(response);
int ret = common::writen(fd_, response, len);
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
RC rc = writer_->writen(response, len);
if (OB_FAIL(rc)) {
LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno));
return RC::IOERR_WRITE;
return rc;
}
ret = common::writen(fd_, &message_terminate, sizeof(message_terminate));
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
return RC::IOERR_WRITE;
rc = writer_->writen(&message_terminate, sizeof(message_terminate));
if (OB_FAIL(rc)) {
LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno));
return rc;
}
need_disconnect = false;
......@@ -179,30 +189,30 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
if (nullptr != alias || alias[0] != 0) {
if (0 != i) {
const char *delim = " | ";
int ret = common::writen(fd_, delim, strlen(delim));
if (ret != 0) {
rc = writer_->writen(delim, strlen(delim));
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR_WRITE;
return rc;
}
}
int len = strlen(alias);
int ret = common::writen(fd_, alias, len);
if (ret != 0) {
rc = writer_->writen(alias, len);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return RC::IOERR_WRITE;
return rc;
}
}
}
if (cell_num > 0) {
char newline = '\n';
int ret = common::writen(fd_, &newline, 1);
if (ret != 0) {
rc = writer_->writen(&newline, 1);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return RC::IOERR_WRITE;
return rc;
}
}
......@@ -215,11 +225,11 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
for (int i = 0; i < cell_num; i++) {
if (i != 0) {
const char *delim = " | ";
int ret = common::writen(fd_, delim, strlen(delim));
if (ret != 0) {
rc = writer_->writen(delim, strlen(delim));
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return RC::IOERR_WRITE;
return rc;
}
}
......@@ -233,20 +243,20 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
std::stringstream ss;
cell.to_string(ss);
std::string cell_str = ss.str();
int ret = common::writen(fd_, cell_str.data(), cell_str.size());
if (ret != RC::SUCCESS) {
rc = writer_->writen(cell_str.data(), cell_str.size());
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return RC::IOERR_WRITE;
return rc;
}
}
char newline = '\n';
int ret = common::writen(fd_, &newline, 1);
if (ret != 0) {
rc = writer_->writen(&newline, 1);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return RC::IOERR_WRITE;
return rc;
}
}
......@@ -266,11 +276,11 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
return write_state(event, need_disconnect);
} else {
int ret = common::writen(fd_, &message_terminate, sizeof(message_terminate));
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
rc = writer_->writen(&message_terminate, sizeof(message_terminate));
if (OB_FAIL(rc)) {
LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno));
sql_result->close();
return RC::IOERR_WRITE;
return rc;
}
need_disconnect = false;
......@@ -280,6 +290,8 @@ RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
if (OB_SUCC(rc)) {
rc = rc_close;
}
writer_->flush(); // TODO handle error
return rc;
}
......
......@@ -21,32 +21,41 @@ See the Mulan PSL v2 for more details. */
struct ConnectionContext;
class SessionEvent;
class Session;
class BufferedWriter;
/**
* 负责与客户端通讯
* @defgroup Communicator
* @brief 负责处理与客户端的通讯
* @details 当前有两种通讯协议,一种是普通的文本协议,以'\0'作为结尾,一种是mysql协议。
*/
/**
* @brief 负责与客户端通讯
* @ingroup Communicator
*
* 在listener接收到一个新的连接(参考 server.cpp::accept), 就创建一个Communicator对象。
* @details 在listener接收到一个新的连接(参考 server.cpp::accept), 就创建一个Communicator对象。
* 并调用init进行初始化。
* 在server中监听到某个连接有新的消息,就通过Communicator::read_event接收消息。
*/
class Communicator {
class Communicator
{
public:
virtual ~Communicator();
/**
* 接收到一个新的连接时,进行初始化
* @brief 接收到一个新的连接时,进行初始化
*/
virtual RC init(int fd, Session *session, const std::string &addr);
/**
* 监听到有新的数据到达,调用此函数进行接收消息
* @brief 监听到有新的数据到达,调用此函数进行接收消息
* 如果需要创建新的任务来处理,那么就创建一个SessionEvent 对象并通过event参数返回。
*/
virtual RC read_event(SessionEvent *&event) = 0;
/**
* 在任务处理完成后,通过此接口将结果返回给客户端
* @brief 在任务处理完成后,通过此接口将结果返回给客户端
* @param event 任务数据,包括处理的结果
* @param need_disconnect 是否需要断开连接
* @return 处理结果。即使返回不是SUCCESS,也不能直接断开连接,需要通过need_disconnect来判断
......@@ -55,7 +64,7 @@ public:
virtual RC write_result(SessionEvent *event, bool &need_disconnect) = 0;
/**
* 关联的会话信息
* @brief 关联的会话信息
*/
Session *session() const
{
......@@ -63,7 +72,7 @@ public:
}
/**
* libevent使用的数据,参考server.cpp
* @brief libevent使用的数据,参考server.cpp
*/
struct event &read_event()
{
......@@ -71,7 +80,7 @@ public:
}
/**
* 对端地址
* @brief 对端地址
* 如果是unix socket,可能没有意义
*/
const char *addr() const
......@@ -81,16 +90,19 @@ public:
protected:
Session *session_ = nullptr;
int fd_ = -1;
struct event read_event_;
std::string addr_;
BufferedWriter *writer_ = nullptr;
int fd_ = -1;
};
/**
* 与客户端进行通讯
* 使用简单的文本通讯协议,每个消息使用'\0'结尾
* @brief 与客户端进行通讯
* @ingroup Communicator
* @details 使用简单的文本通讯协议,每个消息使用'\0'结尾
*/
class PlainCommunicator : public Communicator {
class PlainCommunicator : public Communicator
{
public:
RC read_event(SessionEvent *&event) override;
RC write_result(SessionEvent *event, bool &need_disconnect) override;
......@@ -100,14 +112,21 @@ private:
};
/**
* 当前支持的通讯协议
* @brief 当前支持的通讯协议
* @ingroup Communicator
*/
enum class CommunicateProtocol {
enum class CommunicateProtocol
{
PLAIN, //! 以'\0'结尾的协议
MYSQL, //! mysql通讯协议。具体实现参考 MysqlCommunicator
};
class CommunicatorFactory {
/**
* @brief 通讯协议工厂
* @ingroup Communicator
*/
class CommunicatorFactory
{
public:
Communicator *create(CommunicateProtocol protocol);
};
......@@ -18,9 +18,15 @@ See the Mulan PSL v2 for more details. */
#include "common/log/log.h"
#include "common/io/io.h"
#include "net/mysql_communicator.h"
#include "net/buffered_writer.h"
#include "event/session_event.h"
#include "sql/operator/string_list_physical_operator.h"
/**
* @brief MySQL协议相关实现
* @defgroup MySQLProtocol
*/
// https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__capabilities__flags.html
// the flags below are negotiate by handshake packet
const uint32_t CLIENT_PROTOCOL_41 = 512;
......@@ -42,15 +48,24 @@ const uint32_t CLIENT_OPTIONAL_RESULTSET_METADATA =
// const uint32_t NUM_FLAG = 32768; // Field is num (for clients)
// const uint32_t PART_KEY_FLAG = 16384; // Intern; Part of some key.
enum ResultSetMetaData {
/**
* @brief Resultset metadata
* @details 这些枚举值都是从MySQL的协议中抄过来的
* @ingroup MySQLProtocol
*/
enum ResultSetMetaData
{
RESULTSET_METADATA_NONE = 0,
RESULTSET_METADATA_FULL = 1,
};
/**
Column types for MySQL
*/
enum enum_field_types {
* @brief Column types for MySQL
* @details 枚举值类型是从MySQL的协议中抄过来的
* @ingroup MySQLProtocol
*/
enum enum_field_types
{
MYSQL_TYPE_DECIMAL,
MYSQL_TYPE_TINY,
MYSQL_TYPE_SHORT,
......@@ -87,44 +102,111 @@ enum enum_field_types {
MYSQL_TYPE_GEOMETRY = 255
};
// little endian
// We suppose our platform is little endian too
/**
* @brief 根据MySQL协议的描述实现的数据写入函数
* @defgroup MySQLProtocolStore
* @note 当前仅考虑小端模式,所以当前的代码仅能运行在小端模式的机器上,比如Intel。
*/
/**
* @brief 将数据写入到缓存中
*
* @param buf 数据缓存
* @param value 要写入的值
* @return int 写入的字节数
* @ingroup MySQLProtocolStore
* @ingroup MySQLProtocol
*/
int store_int1(char *buf, int8_t value)
{
*buf = value;
return 1;
}
/**
* @brief 将数据写入到缓存中
*
* @param buf 数据缓存
* @param value 要写入的值
* @return int 写入的字节数