diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cb56efe0d3a32c80cfa0fff6631d3e5d74f5333c..33fec4382c0379d3ab0b491de37372fcc58c412a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -64,4 +64,5 @@ jobs: shell: bash run: | sudo bash build.sh init + echo "begin test..." python3 test/case/miniob_test.py --test-cases=basic | tail -1 | grep "basic is success" diff --git a/CMakeLists.txt b/CMakeLists.txt index 84d5850516a4812ddb5bd957291176ddfc463029..cbd57026bcb50376c2f4a49ccab1f9eeaaaf34be 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,21 +1,16 @@ -# include 另外一个cmake 配置 -#INCLUDE(file1 [OPTIONAL]) - cmake_minimum_required(VERSION 3.10) set(CMAKE_CXX_STANDARD 20) -#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") project(minidb) -MESSAGE(STATUS "This is SOURCE dir " ${test_SOURCE_DIR}) -MESSAGE(STATUS "This is BINARY dir " ${test_BINARY_DIR}) MESSAGE(STATUS "This is Project source dir " ${PROJECT_SOURCE_DIR}) MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR}) SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin) -#SET(LIBRARY_OUTPUT_PATH <路径>) -OPTION(ENABLE_ASAN "Enable build with address sanitizer" OFF) +SET(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR}/cmake) + +OPTION(ENABLE_ASAN "Enable build with address sanitizer" ON) OPTION(WITH_UNIT_TESTS "Compile miniob with unit tests" ON) OPTION(CONCURRENCY "Support concurrency operations" OFF) @@ -77,6 +72,21 @@ ELSE() ENDIF() MESSAGE("Install target dir is " ${CMAKE_INSTALL_PREFIX}) +IF (DEFINED ENV{LD_LIBRARY_PATH}) + SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH}) + string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR}) + MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST}) + LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST}) +ENDIF () + +IF (EXISTS /usr/local/lib) + LINK_DIRECTORIES (/usr/local/lib) +ENDIF () +IF (EXISTS /usr/local/lib64) + LINK_DIRECTORIES (/usr/local/lib64) +ENDIF () + +INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/deps /usr/local/include) # ADD_SUBDIRECTORY(src bin) bin 为目标目录, 可以省略 ADD_SUBDIRECTORY(deps) @@ -91,16 +101,6 @@ IF(WITH_UNIT_TESTS) ADD_SUBDIRECTORY(unittest) ENDIF() -# install 准备安装的目录是cmakefile 的当前目录, 不是build 后生成的目录 -# Files 默认权限OWNER_WRITE, OWNER_READ, GROUP_READ,和WORLD_READ,即644权限 -# INSTALL(FILES docs/README DESTINATION ./ ) -# INSTALL(DIRECTORY docs DESTINATION ./ -# PATTERN "README" EXCLUDE) -# PERMISSIONS 可以直接替换 -#INSTALL(DIRECTORY bin DESTINATION ./ -# FILE_PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_READ -# DIRECTORY_PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_READ WORLD_EXECUTE) - INSTALL(DIRECTORY etc DESTINATION . FILE_PERMISSIONS OWNER_WRITE OWNER_READ GROUP_READ WORLD_READ) @@ -113,11 +113,6 @@ INSTALL(CODE "MESSAGE(\"Sample install message.\")") # [ARGS ] # [OUTPUT_VARIABLE ] - - - -# ADD_TEST与ENABLE_TESTING 参考书籍 - #EXEC_PROGRAM(Executable [directory in which to run] # [ARGS ] # [OUTPUT_VARIABLE ] diff --git a/README.md b/README.md index 1f4fae1c3e237143ed20376095ef2321f404ef80..97783bbddd714024b1aa6f827355b2b933c2c782 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,13 @@ MiniOB 设计的目标是面向在校学生、数据库从业者、爱好者, 为了帮助开发者更好的上手并学习 miniob, 建议阅读: -1. [miniob 框架介绍](docs/miniob-introduction.md) -2. [如何编译 MiniOB 源码](docs/how_to_build.md) -3. [使用 GitPod 开发 MiniOB](docs/dev_by_gitpod.md) -4. [开发环境搭建(本地调试, 适用 Linux 和 Mac)](docs/how_to_dev_miniob_by_vscode.md) -5. [开发环境搭建(远程调试, 适用于 Window, Linux 和 Mac)](docs/how_to_dev_in_docker_container_by_vscode.md) -6. [MiniOB 词法语法解析开发与测试](docs/miniob-sql-parser.md) +1. [miniob 框架介绍](docs/src/miniob-introduction.md) +2. [如何编译 MiniOB 源码](docs/src/how_to_build.md) +3. [如何运行 MiniOB](docs/src/how_to_run.md) +3. [使用 GitPod 开发 MiniOB](docs/src/dev-env/dev_by_gitpod.md) +4. [开发环境搭建(本地调试, 适用 Linux 和 Mac)](docs/src/dev-env/how_to_dev_miniob_by_vscode.md) +5. [开发环境搭建(远程调试, 适用于 Window, Linux 和 Mac)](docs/src/dev-env/how_to_dev_in_docker_container_by_vscode.md) +6. [MiniOB 各模块文档](docs/src/design/introduction.md) 7. [doxygen 代码文档](docs/doxy/html/index.html) 或者直接看 [MiniOB GitHub Pages](https://oceanbase.github.io/miniob/). diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 1ed94c8449c8837b61c3a81c5148a6bf5942ab63..0422562d8c8559691ff0a0c5ae0f40855739a798 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -1,29 +1,7 @@ -PROJECT(benchmark) -MESSAGE("Begin to build " ${PROJECT_NAME}) -MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR}) -MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR}) - -# 可以获取父cmake的变量 -MESSAGE("${CMAKE_COMMON_FLAGS}") - -#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...) -INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../deps ${PROJECT_SOURCE_DIR}/../src/observer /usr/local/include SYSTEM) -# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面 -#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include) -LINK_DIRECTORIES(/usr/local/lib /usr/local/lib64 ${PROJECT_BINARY_DIR}/../lib) - -IF (DEFINED ENV{LD_LIBRARY_PATH}) - SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH}) - #separate_arguments(LD_LIBRARY_PATH_STR) #只能处理空行 - string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR}) - MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST}) - LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST}) -ELSE () - LINK_DIRECTORIES(/usr/local/lib) -ENDIF () - find_package(benchmark CONFIG REQUIRED) +INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/src/observer) + FILE(GLOB_RECURSE ALL_SRC *.cpp) # AUX_SOURCE_DIRECTORY 类似功能 FOREACH (F ${ALL_SRC}) @@ -32,4 +10,3 @@ FOREACH (F ${ALL_SRC}) ADD_EXECUTABLE(${prjName} ${F}) TARGET_LINK_LIBRARIES(${prjName} common pthread dl benchmark observer_static) ENDFOREACH (F) - diff --git a/cmake/readline.cmake b/cmake/readline.cmake new file mode 100644 index 0000000000000000000000000000000000000000..07350e1740324f89921541bdf9d025f4008b3d8b --- /dev/null +++ b/cmake/readline.cmake @@ -0,0 +1,12 @@ + +MACRO (MINIOB_FIND_READLINE) + + FIND_PATH(READLINE_INCLUDE_DIR readline.h PATH_SUFFIXES readline) + FIND_LIBRARY(READLINE_LIBRARY NAMES readline) + IF (READLINE_INCLUDE_DIR AND READLINE_LIBRARY) + SET(HAVE_READLINE 1) + ELSE () + MESSAGE("cannot find readline") + ENDIF() + +ENDMACRO (MINIOB_FIND_READLINE) diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 35379735ae39348e4d31b9f1c4eda2cc721a0fe1..697f2be7c5d914355ce3f0b1d33eff0d1f01a171 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -1,3 +1,2 @@ - ADD_SUBDIRECTORY(common) diff --git a/deps/common/CMakeLists.txt b/deps/common/CMakeLists.txt index 61b570e85225dfd985e1efe8ab967a2a95fdc794..85463bc08ed519bbdbee80aa26a1a521d36dba0e 100644 --- a/deps/common/CMakeLists.txt +++ b/deps/common/CMakeLists.txt @@ -1,7 +1,7 @@ MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR}) MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR}) -INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/deps) +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) FILE(GLOB_RECURSE ALL_SRC *.cpp) FOREACH(F ${ALL_SRC}) diff --git a/deps/common/seda/stage.h b/deps/common/seda/stage.h index 77f9b095362e9e6b74437ea46e6230c39274addf..e9f7a6f142abcc17f456f8dde536ce70a4b2e1dd 100644 --- a/deps/common/seda/stage.h +++ b/deps/common/seda/stage.h @@ -205,7 +205,7 @@ public: /** * Perform Stage-specific processing for an event - * Processing one event without swtich thread. + * Processing one event without switch thread. * Handle the event according to requirements of specific stage. Pure * virtual member function. * @@ -222,7 +222,8 @@ public: * * @param[in] event Pointer to event that initiated the callback. */ - virtual void callback_event(StageEvent *event, CallbackContext *context) = 0; + virtual void callback_event(StageEvent *event, CallbackContext *context) + {} /** * Perform Stage-specific callback processing for a timed out event diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index cdf60d948979f2e6064fdb5f399330489970431a..63656faf2db0e5dac992227131a85d745f8c8d52 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -2,6 +2,7 @@ - [MiniOB 简介](./miniob-introduction.md) - [如何编译](./how_to_build.md) +- [如何运行](./how_to_run.md) - [开发环境搭建](./dev-env/introduction.md) - [使用 GitPod 开发 MiniOB](./dev-env/dev_by_gitpod.md) - [开发环境搭建(本地调试, 适用 Linux 和 Mac)](./dev-env/how_to_dev_miniob_by_vscode.md) diff --git a/docs/src/how_to_run.md b/docs/src/how_to_run.md new file mode 100644 index 0000000000000000000000000000000000000000..e2467de3b99e85abff9c558a0a081cec50399119 --- /dev/null +++ b/docs/src/how_to_run.md @@ -0,0 +1,36 @@ +# 如何运行 + +编译完成后,可以在build目录(可能是build_debug或build_release)下找到bin/observer,就是我们的服务端程序,bin/obclient是自带的客户端程序。 +当前服务端程序启动已经支持了多种模式,可以以TCP、unix socket方式启动,这时需要启动客户端以发起命令。observer还支持直接执行命令的模式,这时不需要启动客户端,直接在命令行输入命令即可。 + +**以直接执行命令的方式启动服务端程序** +```bash +./bin/observer -f ../etc/observer.ini -P cli +``` +这会以直接执行命令的方式启动服务端程序,可以直接输入命令,不需要客户端。所有的请求都会以单线程的方式运行,配置项中的线程数不再有实际意义。 + +**以监听TCP端口的方式启动服务端程序** + +```bash +./bin/observer -f ../etc/observer.ini -p 6789 +``` +这会以监听6789端口的方式启动服务端程序。 +启动客户端程序: +```bash +./bin/obclient -p 6789 +``` +这会连接到服务端的6789端口。 + +**以监听unix socket的方式启动服务端程序** +```bash +./bin/observer -f ../etc/observer.ini -s miniob.sock +``` +这会以监听unix socket的方式启动服务端程序。 +启动客户端程序: +```bash +./bin/obclient -s miniob.sock +``` +这会连接到服务端的miniob.sock文件。 + +**更多** +observer还提供了一些其它参数,可以通过./bin/observer -h查看。 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt deleted file mode 100644 index 0542e58b2c4a391a4c425b4a014d4e83383f68cf..0000000000000000000000000000000000000000 --- a/src/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -PROJECT(miniob) -MESSAGE("Begin to build " ${PROJECT_NAME}) -MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR}) -MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR}) - - -ADD_SUBDIRECTORY(obclient) -ADD_SUBDIRECTORY(observer) - - - - - diff --git a/src/obclient/CMakeLists.txt b/src/obclient/CMakeLists.txt index 6d6192d573392e6cfdacbde805aaafb41de643ed..09a31859b0ee63d12f50cc065f313945acb5c207 100644 --- a/src/obclient/CMakeLists.txt +++ b/src/obclient/CMakeLists.txt @@ -1,20 +1,14 @@ ADD_EXECUTABLE(obclient) MESSAGE("Begin to build " obclient) -INCLUDE(CheckIncludeFiles) +INCLUDE(readline) +MINIOB_FIND_READLINE() -#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...) -TARGET_INCLUDE_DIRECTORIES(obclient PRIVATE . ${PROJECT_SOURCE_DIR}/deps /usr/local/include /usr/include) -# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面 -#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include) - -# stdio.h 必须放在readline/readline.h 前面,因为readline头文件不能直接单独编译 -CHECK_INCLUDE_FILES("stdio.h;readline/readline.h" HAVE_READLINE_HEADER) - -FIND_LIBRARY(READLINE_LIBRARY readline) -IF (HAVE_READLINE_HEADER AND READLINE_LIBRARY) +IF (HAVE_READLINE) TARGET_LINK_LIBRARIES(obclient ${READLINE_LIBRARY}) + TARGET_INCLUDE_DIRECTORIES(obclient PRIVATE ${READLINE_INCLUDE_DIR}) ADD_DEFINITIONS(-DUSE_READLINE) + MESSAGE ("obclient use readline") ELSE () MESSAGE ("readline is not found") ENDIF() @@ -27,13 +21,9 @@ FOREACH (F ${ALL_SRC}) ENDFOREACH (F) -# 指定目标文件位置 -SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin) -MESSAGE("Binary directory:" ${EXECUTABLE_OUTPUT_PATH}) TARGET_SOURCES(obclient PRIVATE ${PRJ_SRC}) TARGET_LINK_LIBRARIES(obclient common pthread dl) - # Target 必须在定义 ADD_EXECUTABLE 之后, programs 不受这个限制 # TARGETS和PROGRAMS 的默认权限是OWNER_EXECUTE, GROUP_EXECUTE, 和WORLD_EXECUTE,即755权限, programs 都是处理脚步类 # 类型分为RUNTIME/LIBRARY/ARCHIVE, prog diff --git a/src/obclient/client.cpp b/src/obclient/client.cpp index 1eade65a27f04ebe837851d446db8bc653fa87e0..8ecefc216da20e35272ae60611d0443904565b2a 100644 --- a/src/obclient/client.cpp +++ b/src/obclient/client.cpp @@ -75,7 +75,7 @@ char *my_readline(const char *prompt) fprintf(stderr, "failed to alloc line buffer"); return nullptr; } - fprintf(stdout, prompt); + fprintf(stdout, "%s", prompt); char *s = fgets(buffer, MAX_MEM_BUFFER_SIZE, stdin); if (nullptr == s) { fprintf(stderr, "failed to read message from console"); diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index 393ba7d137a577692057b212eec8b1c0882f9dc2..c62ac3c369a59984c4ca82439f90636397d0ccb4 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -1,7 +1,6 @@ MESSAGE(STATUS "This is CMAKE_CURRENT_SOURCE_DIR dir " ${CMAKE_CURRENT_SOURCE_DIR}) -INCLUDE_DIRECTORIES(. ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/deps /usr/local/include) -LINK_DIRECTORIES(/usr/local/lib) +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) FILE(GLOB_RECURSE ALL_SRC *.cpp *.c) SET(MAIN_SRC main.cpp) @@ -33,6 +32,17 @@ ADD_EXECUTABLE(observer ${MAIN_SRC}) TARGET_LINK_LIBRARIES(observer observer_static) ADD_LIBRARY(observer_static STATIC ${LIB_SRC}) +INCLUDE (readline) +MINIOB_FIND_READLINE() +IF (HAVE_READLINE) + TARGET_LINK_LIBRARIES(observer_static ${READLINE_LIBRARY}) + TARGET_INCLUDE_DIRECTORIES(observer_static PRIVATE ${READLINE_INCLUDE_DIR}) + ADD_DEFINITIONS(-DUSE_READLINE) + MESSAGE ("observer_static use readline") +ELSE () + MESSAGE ("readline is not found") +ENDIF() + SET_TARGET_PROPERTIES(observer_static PROPERTIES OUTPUT_NAME observer) TARGET_LINK_LIBRARIES(observer_static ${LIBRARIES}) diff --git a/src/observer/common/rc.h b/src/observer/common/rc.h index bd0ddc2f7ec2c0c31608ed930c6f369bc25a07e2..c429714346ca2bc69abf80b6673de31ae427e60f 100644 --- a/src/observer/common/rc.h +++ b/src/observer/common/rc.h @@ -16,6 +16,7 @@ See the Mulan PSL v2 for more details. */ /** * @brief 这个文件定义函数返回码/错误码(Return Code) + * @enum RC */ #define DEFINE_RCS \ diff --git a/src/observer/main.cpp b/src/observer/main.cpp index 601d3604467ee339bb038e3b380167235bfde230..2fdcdd138886f4697d0bf16277ffcec7f95c77e1 100644 --- a/src/observer/main.cpp +++ b/src/observer/main.cpp @@ -38,7 +38,7 @@ void usage() std::cout << "-p: server port. if not specified, the item in the config file will be used" << std::endl; std::cout << "-f: path of config file." << std::endl; std::cout << "-s: use unix socket and the argument is socket address" << std::endl; - std::cout << "-P: protocol. {plain(default), mysql}." << std::endl; + std::cout << "-P: protocol. {plain(default), mysql, cli}." << std::endl; std::cout << "-t: transaction model. {vacuous(default), mvcc}." << std::endl; std::cout << "-n: buffer pool memory size in byte" << std::endl; exit(0); @@ -75,9 +75,6 @@ void parse_parameter(int argc, char **argv) case 'e': process_param->set_std_err(optarg); break; - case 'd': - process_param->set_demon(true); - break; case 't': process_param->set_trx_kit_name(optarg); break; @@ -131,11 +128,14 @@ Server *init_server() server_param.port = port; if (0 == strcasecmp(process_param->get_protocol().c_str(), "mysql")) { server_param.protocol = CommunicateProtocol::MYSQL; + } else if (0 == strcasecmp(process_param->get_protocol().c_str(), "cli")) { + server_param.use_std_io = true; + server_param.protocol = CommunicateProtocol::CLI; } else { server_param.protocol = CommunicateProtocol::PLAIN; } - if (process_param->get_unix_socket_path().size() > 0) { + if (process_param->get_unix_socket_path().size() > 0 && !server_param.use_std_io) { server_param.use_unix_socket = true; server_param.unix_socket_path = process_param->get_unix_socket_path(); } @@ -166,13 +166,14 @@ void quit_signal_handle(int signum) int main(int argc, char **argv) { + int rc = STATUS_SUCCESS; + setSignalHandler(quit_signal_handle); parse_parameter(argc, argv); - int rc = STATUS_SUCCESS; rc = init(the_process_param()); - if (rc) { + if (rc != STATUS_SUCCESS) { std::cerr << "Shutdown due to failed to init!" << std::endl; cleanup(); return rc; diff --git a/src/observer/net/buffered_writer.cpp b/src/observer/net/buffered_writer.cpp index 72325444fede7e3a80050e87225517387e9cc7dc..69271ca769c6ebd00211dcee9a45a7c68ec75732 100644 --- a/src/observer/net/buffered_writer.cpp +++ b/src/observer/net/buffered_writer.cpp @@ -43,10 +43,7 @@ RC BufferedWriter::close() return rc; } - if (fd_ >= 0) { - ::close(fd_); - fd_ = -1; - } + fd_ = -1; return RC::SUCCESS; } diff --git a/src/observer/net/cli_communicator.cpp b/src/observer/net/cli_communicator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7e34e9f9a186a9881bddbeba489e798b42b361b9 --- /dev/null +++ b/src/observer/net/cli_communicator.cpp @@ -0,0 +1,148 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/25. +// + +#include "net/cli_communicator.h" +#include "net/buffered_writer.h" +#include "common/log/log.h" +#include "common/lang/string.h" +#include "event/session_event.h" + +#ifdef USE_READLINE +#include "readline/readline.h" +#include "readline/history.h" +#endif + +#define MAX_MEM_BUFFER_SIZE 8192 +#define PORT_DEFAULT 6789 + +using namespace common; + +#ifdef USE_READLINE +const std::string HISTORY_FILE = std::string(getenv("HOME")) + "/.miniob.history"; +time_t last_history_write_time = 0; + +char *my_readline(const char *prompt) +{ + int size = history_length; + if (size == 0) { + read_history(HISTORY_FILE.c_str()); + + FILE *fp = fopen(HISTORY_FILE.c_str(), "a"); + if (fp != nullptr) { + fclose(fp); + } + } + + char *line = readline(prompt); + if (line != nullptr && line[0] != 0) { + add_history(line); + if (time(NULL) - last_history_write_time > 5) { + write_history(HISTORY_FILE.c_str()); + } + // append_history doesn't work on some readlines + // append_history(1, HISTORY_FILE.c_str()); + } + return line; +} +#else // USE_READLINE +char *my_readline(const char *prompt) +{ + char *buffer = (char *)malloc(MAX_MEM_BUFFER_SIZE); + if (nullptr == buffer) { + LOG_WARN("failed to alloc line buffer"); + return nullptr; + } + fprintf(stdout, "%s", prompt); + char *s = fgets(buffer, MAX_MEM_BUFFER_SIZE, stdin); + if (nullptr == s) { + free(buffer); + if (ferror(stdin) || feof(stdin)) { + LOG_WARN("failed to read line: %s", strerror(errno)); + } + return nullptr; + } + return buffer; +} +#endif // USE_READLINE + +/* this function config a exit-cmd list, strncasecmp func truncate the command from terminal according to the number, + 'strncasecmp("exit", cmd, 4)' means that obclient read command string from terminal, truncate it to 4 chars from + the beginning, then compare the result with 'exit', if they match, exit the obclient. +*/ +bool is_exit_command(const char *cmd) { + return 0 == strncasecmp("exit", cmd, 4) || + 0 == strncasecmp("bye", cmd, 3) || + 0 == strncasecmp("\\q", cmd, 2) ; +} + +char *read_command() +{ + const char *prompt_str = "miniob > "; + char *input_command = nullptr; + for (input_command = my_readline(prompt_str); + common::is_blank(input_command); + input_command = my_readline(prompt_str)) { + free(input_command); + input_command = nullptr; + } + return input_command; +} + +RC CliCommunicator::init(int fd, Session *session, const std::string &addr) +{ + RC rc = PlainCommunicator::init(fd, session, addr); + if (OB_FAIL(rc)) { + LOG_WARN("fail to init communicator", strrc(rc)); + return rc; + } + + if (fd == STDIN_FILENO) { + write_fd_ = STDOUT_FILENO; + delete writer_; + writer_ = new BufferedWriter(write_fd_); + + const char delimiter = '\n'; + send_message_delimiter_.assign(1, delimiter); + + fd_ = -1; // 防止被父类析构函数关闭 + } else { + rc = RC::INVALID_ARGUMENT; + LOG_WARN("only stdin supported"); + } + return rc; +} + +RC CliCommunicator::read_event(SessionEvent *&event) +{ + event = nullptr; + char *command = read_command(); + + if (is_exit_command(command)) { + free(command); + event = nullptr; + return RC::SUCCESS; + } + + event = new SessionEvent(this); + event->set_query(std::string(command)); + free(command); + return RC::SUCCESS; +} + +RC CliCommunicator::write_result(SessionEvent *event, bool &need_disconnect) +{ + RC rc = PlainCommunicator::write_result(event, need_disconnect); + need_disconnect = false; + return rc; +} diff --git a/src/observer/net/cli_communicator.h b/src/observer/net/cli_communicator.h new file mode 100644 index 0000000000000000000000000000000000000000..fcaeb79d56032c48b13cecd1cd826304937243c2 --- /dev/null +++ b/src/observer/net/cli_communicator.h @@ -0,0 +1,37 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/25. +// + +#pragma once + +#include "net/plain_communicator.h" + +/** + * @brief 用于命令行模式的通讯器 + * @ingroup Communicator + * @details 直接通过终端/标准输入输出进行通讯。从这里的实现来看,是不需要libevent的一些实现的, + * 因此communicator需要重构,或者需要重构server中的各个通讯启动模式。 + */ +class CliCommunicator : public PlainCommunicator +{ +public: + CliCommunicator() = default; + virtual ~CliCommunicator() = default; + + RC init(int fd, Session *session, const std::string &addr) override; + RC read_event(SessionEvent *&event) override; + RC write_result(SessionEvent *event, bool &need_disconnect) override; + +private: + int write_fd_ = -1; ///< 与使用远程通讯模式不同,如果读数据使用标准输入,那么输出应该是标准输出 +}; diff --git a/src/observer/net/communicator.cpp b/src/observer/net/communicator.cpp index 9ec0a1ee587be811db824a8282fa751712547c60..e02442475e8e790585b5498c32b8e1b93741f02f 100644 --- a/src/observer/net/communicator.cpp +++ b/src/observer/net/communicator.cpp @@ -14,13 +14,13 @@ See the Mulan PSL v2 for more details. */ #include "net/communicator.h" #include "net/mysql_communicator.h" +#include "net/plain_communicator.h" +#include "net/cli_communicator.h" #include "net/buffered_writer.h" -#include "sql/expr/tuple.h" -#include "event/session_event.h" -#include "common/lang/mutex.h" -#include "common/io/io.h" #include "session/session.h" +#include "common/lang/mutex.h" + RC Communicator::init(int fd, Session *session, const std::string &addr) { fd_ = fd; @@ -47,254 +47,6 @@ Communicator::~Communicator() } } -///////////////////////////////////////////////////////////////////////////////// -RC PlainCommunicator::read_event(SessionEvent *&event) -{ - RC rc = RC::SUCCESS; - - event = nullptr; - - int data_len = 0; - int read_len = 0; - - const int max_packet_size = 8192; - std::vector buf(max_packet_size); - - // 持续接收消息,直到遇到'\0'。将'\0'遇到的后续数据直接丢弃没有处理,因为目前仅支持一收一发的模式 - while (true) { - read_len = ::read(fd_, buf.data() + data_len, max_packet_size - data_len); - if (read_len < 0) { - if (errno == EAGAIN) { - continue; - } - break; - } - if (read_len == 0) { - break; - } - - if (read_len + data_len > max_packet_size) { - data_len += read_len; - break; - } - - bool msg_end = false; - for (int i = 0; i < read_len; i++) { - if (buf[data_len + i] == 0) { - data_len += i + 1; - msg_end = true; - break; - } - } - - if (msg_end) { - break; - } - - data_len += read_len; - } - - if (data_len > max_packet_size) { - LOG_WARN("The length of sql exceeds the limitation %d", max_packet_size); - return RC::IOERR_TOO_LONG; - } - if (read_len == 0) { - LOG_INFO("The peer has been closed %s\n", addr()); - return RC::IOERR_CLOSE; - } else if (read_len < 0) { - LOG_ERROR("Failed to read socket of %s, %s\n", addr(), strerror(errno)); - return RC::IOERR_READ; - } - - LOG_INFO("receive command(size=%d): %s", data_len, buf.data()); - event = new SessionEvent(this); - event->set_query(std::string(buf.data())); - return rc; -} - -RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect) -{ - SqlResult *sql_result = event->sql_result(); - const int buf_size = 2048; - char *buf = new char[buf_size]; - const std::string &state_string = sql_result->state_string(); - if (state_string.empty()) { - const char *result = RC::SUCCESS == sql_result->return_code() ? "SUCCESS" : "FAILURE"; - snprintf(buf, buf_size, "%s\n", result); - } else { - snprintf(buf, buf_size, "%s > %s\n", strrc(sql_result->return_code()), state_string.c_str()); - } - - RC rc = writer_->writen(buf, strlen(buf) + 1); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - need_disconnect = true; - delete[] buf; - return RC::IOERR_WRITE; - } - - need_disconnect = false; - delete[] buf; - - writer_->flush(); - return RC::SUCCESS; -} - -RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) -{ - need_disconnect = true; - - const char message_terminate = '\0'; - - SqlResult *sql_result = event->sql_result(); - if (nullptr == sql_result) { - - const char *response = "Unexpected error: no result"; - int len = strlen(response); - - RC rc = writer_->writen(response, len); - if (OB_FAIL(rc)) { - LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); - - return rc; - } - - rc = writer_->writen(&message_terminate, sizeof(message_terminate)); - if (OB_FAIL(rc)) { - LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); - return rc; - } - - need_disconnect = false; - return RC::SUCCESS; - } - - if (RC::SUCCESS != sql_result->return_code() || !sql_result->has_operator()) { - return write_state(event, need_disconnect); - } - - RC rc = sql_result->open(); - if (OB_FAIL(rc)) { - sql_result->close(); - sql_result->set_return_code(rc); - return write_state(event, need_disconnect); - } - - const TupleSchema &schema = sql_result->tuple_schema(); - const int cell_num = schema.cell_num(); - - for (int i = 0; i < cell_num; i++) { - const TupleCellSpec &spec = schema.cell_at(i); - const char *alias = spec.alias(); - if (nullptr != alias || alias[0] != 0) { - if (0 != i) { - const char *delim = " | "; - rc = writer_->writen(delim, strlen(delim)); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - return rc; - } - } - - int len = strlen(alias); - rc = writer_->writen(alias, len); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - sql_result->close(); - return rc; - } - } - } - - if (cell_num > 0) { - char newline = '\n'; - rc = writer_->writen(&newline, 1); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - sql_result->close(); - return rc; - } - } - - rc = RC::SUCCESS; - Tuple *tuple = nullptr; - while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) { - assert(tuple != nullptr); - - int cell_num = tuple->cell_num(); - for (int i = 0; i < cell_num; i++) { - if (i != 0) { - const char *delim = " | "; - rc = writer_->writen(delim, strlen(delim)); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - sql_result->close(); - return rc; - } - } - - TupleCell cell; - rc = tuple->cell_at(i, cell); - if (rc != RC::SUCCESS) { - sql_result->close(); - return rc; - } - - std::stringstream ss; - cell.to_string(ss); - std::string cell_str = ss.str(); - rc = writer_->writen(cell_str.data(), cell_str.size()); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - sql_result->close(); - return rc; - } - } - - char newline = '\n'; - rc = writer_->writen(&newline, 1); - if (OB_FAIL(rc)) { - LOG_WARN("failed to send data to client. err=%s", strerror(errno)); - sql_result->close(); - return rc; - } - } - - if (rc == RC::RECORD_EOF) { - rc = RC::SUCCESS; - } - - if (cell_num == 0) { - // 除了select之外,其它的消息通常不会通过operator来返回结果,表头和行数据都是空的 - // 这里针对这种情况做特殊处理,当表头和行数据都是空的时候,就返回处理的结果 - // 可能是insert/delete等操作,不直接返回给客户端数据,这里把处理结果返回给客户端 - RC rc_close = sql_result->close(); - if (rc == RC::SUCCESS) { - rc = rc_close; - } - sql_result->set_return_code(rc); - return write_state(event, need_disconnect); - } else { - - rc = writer_->writen(&message_terminate, sizeof(message_terminate)); - if (OB_FAIL(rc)) { - LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); - sql_result->close(); - return rc; - } - - need_disconnect = false; - } - - RC rc_close = sql_result->close(); - if (OB_SUCC(rc)) { - rc = rc_close; - } - - writer_->flush(); // TODO handle error - return rc; -} - ///////////////////////////////////////////////////////////////////////////////// Communicator *CommunicatorFactory::create(CommunicateProtocol protocol) @@ -303,6 +55,9 @@ Communicator *CommunicatorFactory::create(CommunicateProtocol protocol) case CommunicateProtocol::PLAIN: { return new PlainCommunicator; } break; + case CommunicateProtocol::CLI: { + return new CliCommunicator; + } break; case CommunicateProtocol::MYSQL: { return new MysqlCommunicator; } break; diff --git a/src/observer/net/communicator.h b/src/observer/net/communicator.h index 97523731960f362a247801687ee519e2f0d92b61..8fcae32737e72c8897228ced95a4cd840d22bca5 100644 --- a/src/observer/net/communicator.h +++ b/src/observer/net/communicator.h @@ -96,29 +96,15 @@ protected: int fd_ = -1; }; -/** - * @brief 与客户端进行通讯 - * @ingroup Communicator - * @details 使用简单的文本通讯协议,每个消息使用'\0'结尾 - */ -class PlainCommunicator : public Communicator -{ -public: - RC read_event(SessionEvent *&event) override; - RC write_result(SessionEvent *event, bool &need_disconnect) override; - -private: - RC write_state(SessionEvent *event, bool &need_disconnect); -}; - /** * @brief 当前支持的通讯协议 * @ingroup Communicator */ enum class CommunicateProtocol { - PLAIN, //! 以'\0'结尾的协议 - MYSQL, //! mysql通讯协议。具体实现参考 MysqlCommunicator + PLAIN, ///< 以'\0'结尾的协议 + CLI, ///< 与客户端进行交互的协议 + MYSQL, ///< mysql通讯协议。具体实现参考 MysqlCommunicator }; /** diff --git a/src/observer/net/plain_communicator.cpp b/src/observer/net/plain_communicator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d3a21ef771f3a4e37b5a410d3d7f5f7733b1bb41 --- /dev/null +++ b/src/observer/net/plain_communicator.cpp @@ -0,0 +1,271 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/25. +// + +#include "net/plain_communicator.h" +#include "net/buffered_writer.h" +#include "sql/expr/tuple.h" +#include "event/session_event.h" +#include "session/session.h" +#include "common/io/io.h" +#include "common/log/log.h" + +PlainCommunicator::PlainCommunicator() +{ + send_message_delimiter_.assign(1, '\0'); +} + +RC PlainCommunicator::read_event(SessionEvent *&event) +{ + RC rc = RC::SUCCESS; + + event = nullptr; + + int data_len = 0; + int read_len = 0; + + const int max_packet_size = 8192; + std::vector buf(max_packet_size); + + // 持续接收消息,直到遇到'\0'。将'\0'遇到的后续数据直接丢弃没有处理,因为目前仅支持一收一发的模式 + while (true) { + read_len = ::read(fd_, buf.data() + data_len, max_packet_size - data_len); + if (read_len < 0) { + if (errno == EAGAIN) { + continue; + } + break; + } + if (read_len == 0) { + break; + } + + if (read_len + data_len > max_packet_size) { + data_len += read_len; + break; + } + + bool msg_end = false; + for (int i = 0; i < read_len; i++) { + if (buf[data_len + i] == 0) { + data_len += i + 1; + msg_end = true; + break; + } + } + + if (msg_end) { + break; + } + + data_len += read_len; + } + + if (data_len > max_packet_size) { + LOG_WARN("The length of sql exceeds the limitation %d", max_packet_size); + return RC::IOERR_TOO_LONG; + } + if (read_len == 0) { + LOG_INFO("The peer has been closed %s", addr()); + return RC::IOERR_CLOSE; + } else if (read_len < 0) { + LOG_ERROR("Failed to read socket of %s, %s", addr(), strerror(errno)); + return RC::IOERR_READ; + } + + LOG_INFO("receive command(size=%d): %s", data_len, buf.data()); + event = new SessionEvent(this); + event->set_query(std::string(buf.data())); + return rc; +} + +RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect) +{ + SqlResult *sql_result = event->sql_result(); + const int buf_size = 2048; + char *buf = new char[buf_size]; + const std::string &state_string = sql_result->state_string(); + if (state_string.empty()) { + const char *result = RC::SUCCESS == sql_result->return_code() ? "SUCCESS" : "FAILURE"; + snprintf(buf, buf_size, "%s\n", result); + } else { + snprintf(buf, buf_size, "%s > %s\n", strrc(sql_result->return_code()), state_string.c_str()); + } + + RC rc = writer_->writen(buf, strlen(buf) + 1); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + need_disconnect = true; + delete[] buf; + return RC::IOERR_WRITE; + } + + need_disconnect = false; + delete[] buf; + + writer_->flush(); + return RC::SUCCESS; +} + +RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect) +{ + need_disconnect = true; + + SqlResult *sql_result = event->sql_result(); + if (nullptr == sql_result) { + + const char *response = "Unexpected error: no result"; + int len = strlen(response); + + RC rc = writer_->writen(response, len); + if (OB_FAIL(rc)) { + LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); + + return rc; + } + + rc = writer_->writen(send_message_delimiter_.data(), send_message_delimiter_.size()); + if (OB_FAIL(rc)) { + LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); + return rc; + } + + need_disconnect = false; + return RC::SUCCESS; + } + + if (RC::SUCCESS != sql_result->return_code() || !sql_result->has_operator()) { + return write_state(event, need_disconnect); + } + + RC rc = sql_result->open(); + if (OB_FAIL(rc)) { + sql_result->close(); + sql_result->set_return_code(rc); + return write_state(event, need_disconnect); + } + + const TupleSchema &schema = sql_result->tuple_schema(); + const int cell_num = schema.cell_num(); + + for (int i = 0; i < cell_num; i++) { + const TupleCellSpec &spec = schema.cell_at(i); + const char *alias = spec.alias(); + if (nullptr != alias || alias[0] != 0) { + if (0 != i) { + const char *delim = " | "; + rc = writer_->writen(delim, strlen(delim)); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + return rc; + } + } + + int len = strlen(alias); + rc = writer_->writen(alias, len); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + sql_result->close(); + return rc; + } + } + } + + if (cell_num > 0) { + char newline = '\n'; + rc = writer_->writen(&newline, 1); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + sql_result->close(); + return rc; + } + } + + rc = RC::SUCCESS; + Tuple *tuple = nullptr; + while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) { + assert(tuple != nullptr); + + int cell_num = tuple->cell_num(); + for (int i = 0; i < cell_num; i++) { + if (i != 0) { + const char *delim = " | "; + rc = writer_->writen(delim, strlen(delim)); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + sql_result->close(); + return rc; + } + } + + TupleCell cell; + rc = tuple->cell_at(i, cell); + if (rc != RC::SUCCESS) { + sql_result->close(); + return rc; + } + + std::stringstream ss; + cell.to_string(ss); + std::string cell_str = ss.str(); + rc = writer_->writen(cell_str.data(), cell_str.size()); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + sql_result->close(); + return rc; + } + } + + char newline = '\n'; + rc = writer_->writen(&newline, 1); + if (OB_FAIL(rc)) { + LOG_WARN("failed to send data to client. err=%s", strerror(errno)); + sql_result->close(); + return rc; + } + } + + if (rc == RC::RECORD_EOF) { + rc = RC::SUCCESS; + } + + if (cell_num == 0) { + // 除了select之外,其它的消息通常不会通过operator来返回结果,表头和行数据都是空的 + // 这里针对这种情况做特殊处理,当表头和行数据都是空的时候,就返回处理的结果 + // 可能是insert/delete等操作,不直接返回给客户端数据,这里把处理结果返回给客户端 + RC rc_close = sql_result->close(); + if (rc == RC::SUCCESS) { + rc = rc_close; + } + sql_result->set_return_code(rc); + return write_state(event, need_disconnect); + } else { + + rc = writer_->writen(send_message_delimiter_.data(), send_message_delimiter_.size()); + if (OB_FAIL(rc)) { + LOG_ERROR("Failed to send data back to client. ret=%s, error=%s", strrc(rc), strerror(errno)); + sql_result->close(); + return rc; + } + + need_disconnect = false; + } + + RC rc_close = sql_result->close(); + if (OB_SUCC(rc)) { + rc = rc_close; + } + + writer_->flush(); // TODO handle error + return rc; +} \ No newline at end of file diff --git a/src/observer/net/plain_communicator.h b/src/observer/net/plain_communicator.h new file mode 100644 index 0000000000000000000000000000000000000000..276823fda40902c698f2ff91cb7a31513c12292c --- /dev/null +++ b/src/observer/net/plain_communicator.h @@ -0,0 +1,40 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// +// Created by Wangyunlai on 2023/06/25. +// + +#pragma once + +#include + +#include "net/communicator.h" + +/** + * @brief 与客户端进行通讯 + * @ingroup Communicator + * @details 使用简单的文本通讯协议,每个消息使用'\0'结尾 + */ +class PlainCommunicator : public Communicator +{ +public: + PlainCommunicator(); + virtual ~PlainCommunicator() = default; + + RC read_event(SessionEvent *&event) override; + RC write_result(SessionEvent *event, bool &need_disconnect) override; + +private: + RC write_state(SessionEvent *event, bool &need_disconnect); + +protected: + std::vector send_message_delimiter_; ///< 发送消息分隔符 +}; diff --git a/src/observer/net/server.cpp b/src/observer/net/server.cpp index aaf990f7894709261056f0cd6c161c5accf0c814..c8abcfe51359919abdc8b0156e86365f9f1a66ac 100644 --- a/src/observer/net/server.cpp +++ b/src/observer/net/server.cpp @@ -39,12 +39,8 @@ See the Mulan PSL v2 for more details. */ #include using namespace common; -static const std::string READ_SOCKET_METRIC_TAG = "SessionStage.readsocket"; -static const std::string WRITE_SOCKET_METRIC_TAG = "SessionStage.writesocket"; Stage *Server::session_stage_ = nullptr; -common::SimpleTimer *Server::read_socket_metric_ = nullptr; -common::SimpleTimer *Server::write_socket_metric_ = nullptr; ServerParam::ServerParam() { @@ -55,10 +51,6 @@ ServerParam::ServerParam() Server::Server(ServerParam input_server_param) : server_param_(input_server_param) { - started_ = false; - server_socket_ = 0; - event_base_ = nullptr; - listen_ev_ = nullptr; } Server::~Server() @@ -71,17 +63,6 @@ Server::~Server() void Server::init() { session_stage_ = get_seda_config()->get_stage(SESSION_STAGE_NAME); - - MetricsRegistry &metricsRegistry = get_metrics_registry(); - if (Server::read_socket_metric_ == nullptr) { - Server::read_socket_metric_ = new SimpleTimer(); - metricsRegistry.register_metric(READ_SOCKET_METRIC_TAG, Server::read_socket_metric_); - } - - if (Server::write_socket_metric_ == nullptr) { - Server::write_socket_metric_ = new SimpleTimer(); - metricsRegistry.register_metric(WRITE_SOCKET_METRIC_TAG, Server::write_socket_metric_); - } } int Server::set_non_block(int fd) @@ -125,29 +106,6 @@ void Server::recv(int fd, short ev, void *arg) session_stage_->add_event(event); } -#if 0 -// 这个函数仅负责发送数据,至于是否是一个完整的消息,由调用者控制 -int Server::send( *client, const char *buf, int data_len) -{ - if (buf == nullptr || data_len == 0) { - return 0; - } - - TimerStat writeStat(*write_socket_metric_); - - int ret = common::writen(client->fd, buf, data_len); - if (ret < 0) { - LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno)); - MUTEX_UNLOCK(&client->mutex); - - close_connection(client); - return -STATUS_FAILED_NETWORK; - } - - return 0; -} -#endif - void Server::accept(int fd, short ev, void *arg) { Server *instance = (Server *)arg; @@ -202,8 +160,8 @@ void Server::accept(int fd, short ev, void *arg) ret = event_base_set(instance->event_base_, &communicator->read_event()); if (ret < 0) { - LOG_ERROR( - "Failed to do event_base_set for read event of %s into libevent, %s", communicator->addr(), strerror(errno)); + LOG_ERROR("Failed to do event_base_set for read event of %s into libevent, %s", + communicator->addr(), strerror(errno)); delete communicator; return; } @@ -220,12 +178,15 @@ void Server::accept(int fd, short ev, void *arg) int Server::start() { - if (server_param_.use_unix_socket) { + if (server_param_.use_std_io) { + return start_stdin_server(); + } else if (server_param_.use_unix_socket) { return start_unix_socket_server(); } else { return start_tcp_server(); } } + int Server::start_tcp_server() { int ret = 0; @@ -293,7 +254,6 @@ int Server::start_tcp_server() int Server::start_unix_socket_server() { - int ret = 0; server_socket_ = socket(PF_UNIX, SOCK_STREAM, 0); if (server_socket_ < 0) { @@ -308,7 +268,7 @@ int Server::start_unix_socket_server() return -1; } - unlink(server_param_.unix_socket_path.c_str()); + unlink(server_param_.unix_socket_path.c_str()); /// 如果不删除源文件,可能会导致bind失败 struct sockaddr_un sockaddr; memset(&sockaddr, 0, sizeof(sockaddr)); @@ -349,6 +309,38 @@ int Server::start_unix_socket_server() return 0; } +int Server::start_stdin_server() +{ + Communicator *communicator = communicator_factory_.create(server_param_.protocol); + RC rc = communicator->init(STDIN_FILENO, new Session(Session::default_session()), "stdin"); + if (OB_FAIL(rc)) { + LOG_WARN("failed to init cli communicator. rc=%s", strrc(rc)); + return -1; + } + + started_ = true; + + while (started_) { + SessionEvent *event = nullptr; + rc = communicator->read_event(event); + if (OB_FAIL(rc)) { + LOG_WARN("failed to read event. rc=%s", strrc(rc)); + return -1; + } + + if (event == nullptr) { + break; + } + + /// 在当前线程立即处理对应的事件 + session_stage_->handle_event(event); + } + + delete communicator; + communicator = nullptr; + return 0; +} + int Server::serve() { evthread_use_pthreads(); @@ -364,7 +356,9 @@ int Server::serve() exit(-1); } - event_base_dispatch(event_base_); + if (!server_param_.use_std_io) { + event_base_dispatch(event_base_); + } if (listen_ev_ != nullptr) { event_del(listen_ev_); diff --git a/src/observer/net/server.h b/src/observer/net/server.h index e4f3f691e210d26812b18f8d7987aa4e675a5696..6e78106ac2fa6830daa807b917303e2b1ba93a10 100644 --- a/src/observer/net/server.h +++ b/src/observer/net/server.h @@ -21,14 +21,20 @@ See the Mulan PSL v2 for more details. */ class Communicator; -class Server { +/** + * @brief 负责接收客户端消息并创建任务 + * @ingroup Communicator + * @details 当前支持网络连接,有TCP和Unix Socket两种方式。通过命令行参数来指定使用哪种方式。 + * 启动后监听端口或unix socket,使用libevent来监听事件,当有新的连接到达时,创建一个Communicator对象进行处理。 + */ +class Server +{ public: Server(ServerParam input_server_param); ~Server(); public: static void init(); - // static int send(ConnectionContext *client, const char *buf, int data_len); static void close_connection(Communicator *comm); public: @@ -36,28 +42,55 @@ public: void shutdown(); private: + /** + * @brief 接收到新的连接时,调用此函数创建Communicator对象 + * @details 此函数作为libevent中监听套接字对应的回调函数 + * @param fd libevent回调函数传入的参数,即监听套接字 + * @param ev 本次触发的事件,通常是EV_READ + * @param arg 在注册libevent回调函数时,传入的参数,即Server对象 + */ static void accept(int fd, short ev, void *arg); - // close connection + /** + * @brief 接收到客户端消息时,调用此函数创建任务 + * @details 此函数作为libevent中客户端套接字对应的回调函数。当有新的消息到达时,调用此函数创建任务。 + * @param fd libevent回调函数传入的参数,即客户端套接字 + * @param ev 本次触发的事件,通常是EV_READ + * @param arg 在注册libevent回调函数时,传入的参数,即Communicator对象 + */ static void recv(int fd, short ev, void *arg); private: + /** + * @brief 将socket描述符设置为非阻塞模式 + * + * @param fd 指定的描述符 + */ int set_non_block(int fd); + int start(); + + /** + * @brief 启动TCP服务 + */ int start_tcp_server(); + + /** + * @brief 启动Unix Socket服务 + */ int start_unix_socket_server(); + int start_stdin_server(); + private: - bool started_; + volatile bool started_ = false; - int server_socket_; - struct event_base *event_base_; - struct event *listen_ev_; + int server_socket_ = -1; ///< 监听套接字,是一个描述符 + struct event_base *event_base_ = nullptr; ///< libevent对象 + struct event *listen_ev_ = nullptr; ///< libevent监听套接字事件 - ServerParam server_param_; + ServerParam server_param_; ///< 服务启动参数 - CommunicatorFactory communicator_factory_; + CommunicatorFactory communicator_factory_; ///< 通过这个对象创建新的Communicator对象 - static common::Stage *session_stage_; - static common::SimpleTimer *read_socket_metric_; - static common::SimpleTimer *write_socket_metric_; + static common::Stage *session_stage_; ///< 通过这个对象创建新的请求任务 }; diff --git a/src/observer/net/server_param.h b/src/observer/net/server_param.h index 28331dfe1590e88f2e620ffbff887253dc12d84d..d2031990df5417a1941fecc9a6d058512e325544 100644 --- a/src/observer/net/server_param.h +++ b/src/observer/net/server_param.h @@ -17,6 +17,10 @@ See the Mulan PSL v2 for more details. */ #include #include "net/communicator.h" +/** + * @brief 服务端启动参数 + * @ingroup Communicator + */ class ServerParam { public: @@ -29,14 +33,17 @@ public: // accpet client's address, default is INADDR_ANY, means accept every address long listen_addr; - int max_connection_num; - // server listing port - int port; + int max_connection_num; ///< 最大连接数 - std::string unix_socket_path; + int port; ///< 监听的端口号 - // 如果使用标准输入输出作为通信条件,就不再监听端口 + std::string unix_socket_path; ///< unix socket的路径 + + bool use_std_io = false; ///< 是否使用标准输入输出作为通信条件 + + ///< 如果使用标准输入输出作为通信条件,就不再监听端口 + ///< 后面如果改成支持多种通讯方式,就不需要这个参数了 bool use_unix_socket = false; - CommunicateProtocol protocol; + CommunicateProtocol protocol; ///< 通讯协议,目前支持文本协议和mysql协议 }; diff --git a/src/observer/session/session_stage.cpp b/src/observer/session/session_stage.cpp index 82a94d6e9e6926154ed098d440fa263842be631a..d79108f5ce400b55af38dd3c641f4e28ab32ef28 100644 --- a/src/observer/session/session_stage.cpp +++ b/src/observer/session/session_stage.cpp @@ -96,26 +96,7 @@ void SessionStage::handle_event(StageEvent *event) // right now, we just support only one event. handle_request(event); - return; -} - -void SessionStage::callback_event(StageEvent *event, CallbackContext *context) -{ - SessionEvent *sev = dynamic_cast(event); - if (nullptr == sev) { - LOG_ERROR("Cannot cat event to sessionEvent"); - return; - } - - Communicator *communicator = sev->get_communicator(); - bool need_disconnect = false; - RC rc = communicator->write_result(sev, need_disconnect); - LOG_INFO("write result return %s", strrc(rc)); - if (need_disconnect) { - Server::close_connection(communicator); - } - Session::set_current_session(nullptr); - + event->done_immediate(); return; } @@ -131,20 +112,19 @@ void SessionStage::handle_request(StageEvent *event) std::string sql = sev->query(); if (common::is_blank(sql.c_str())) { - sev->done_immediate(); - return; - } - - CompletionCallback *cb = new (std::nothrow) CompletionCallback(this, nullptr); - if (cb == nullptr) { - LOG_ERROR("Failed to new callback for SessionEvent"); - sev->done_immediate(); return; } - sev->push_callback(cb); - Session::set_current_session(sev->session()); SQLStageEvent *sql_event = new SQLStageEvent(sev, sql); query_cache_stage_->handle_event(sql_event); + + Communicator *communicator = sev->get_communicator(); + bool need_disconnect = false; + RC rc = communicator->write_result(sev, need_disconnect); + LOG_INFO("write result return %s", strrc(rc)); + if (need_disconnect) { + Server::close_connection(communicator); + } + Session::set_current_session(nullptr); } diff --git a/src/observer/session/session_stage.h b/src/observer/session/session_stage.h index 6f94a83b1efceb547f8dfdeda2f8a76cd1dfea81..6a1204f157b6b0df813690084d4e63ec7116a56c 100644 --- a/src/observer/session/session_stage.h +++ b/src/observer/session/session_stage.h @@ -18,16 +18,25 @@ See the Mulan PSL v2 for more details. */ #include "common/metrics/metrics.h" /** + * @brief SEDA处理的stage + * @defgroup SQLStage + * @details 收到的客户端请求会放在SEDA框架中处理,每个stage都是一个处理阶段。 + * 当前的处理流程可以通过observer.ini配置文件查看。 * seda::stage使用说明: * 这里利用seda的线程池与调度。stage是一个事件处理的几个阶段。 * 目前包括session,parse,execution和storage * 每个stage使用handleEvent函数处理任务,并且使用StageEvent::pushCallback注册回调函数。 - * 这时当调用StageEvent::done(Immediate)时,就会调用该事件注册的回调函数。 + * 这时当调用StageEvent::done(Immediate)时,就会调用该事件注册的回调函数,如果没有回调函数,就会释放自己。 + */ + +/** + * @brief SQL处理的session阶段,也是第一个阶段 + * @ingroup SQLStage */ class SessionStage : public common::Stage { public: - ~SessionStage(); + virtual ~SessionStage(); static Stage *make_stage(const std::string &tag); protected: @@ -38,11 +47,8 @@ protected: bool initialize() override; void cleanup() override; void handle_event(common::StageEvent *event) override; - void callback_event(common::StageEvent *event, common::CallbackContext *context) override; protected: - void handle_input(common::StageEvent *event); - void handle_request(common::StageEvent *event); private: diff --git a/src/observer/sql/executor/execute_stage.cpp b/src/observer/sql/executor/execute_stage.cpp index 5638775111a3476595bba140cd71449cef0b2d7d..0c5ff80c1971c1805b785859249de2dc6a02a07a 100644 --- a/src/observer/sql/executor/execute_stage.cpp +++ b/src/observer/sql/executor/execute_stage.cpp @@ -86,13 +86,6 @@ void ExecuteStage::handle_event(StageEvent *event) return; } -void ExecuteStage::callback_event(StageEvent *event, CallbackContext *context) -{ - // here finish read all data from disk or network, but do nothing here. - - return; -} - RC ExecuteStage::handle_request(common::StageEvent *event) { SQLStageEvent *sql_event = static_cast(event); diff --git a/src/observer/sql/executor/execute_stage.h b/src/observer/sql/executor/execute_stage.h index b7826e211af7478b8c0507818e3e596e1bd0116c..d3c87a40abac735e2d98c3be69a59a33029ff21b 100644 --- a/src/observer/sql/executor/execute_stage.h +++ b/src/observer/sql/executor/execute_stage.h @@ -24,6 +24,7 @@ class SelectStmt; /** * @brief 执行SQL语句的Stage,包括DML和DDL + * @ingroup SQLStage * @details 根据前面阶段生成的结果,有些语句会生成执行计划,有些不会。 * 整体上分为两类,带执行计划的,或者 @class CommandExecutor 可以直接执行的。 */ @@ -41,7 +42,6 @@ protected: bool initialize() override; void cleanup() override; void handle_event(common::StageEvent *event) override; - void callback_event(common::StageEvent *event, common::CallbackContext *context) override; RC handle_request(common::StageEvent *event); RC handle_request_with_physical_operator(SQLStageEvent *sql_event); diff --git a/src/observer/sql/operator/delete_logical_operator.h b/src/observer/sql/operator/delete_logical_operator.h index bc967aa1c10901e58c9b9e18372621cbd0dfec7a..c9f704bce098e894a03fbab9fc9242d76ea1a61f 100644 --- a/src/observer/sql/operator/delete_logical_operator.h +++ b/src/observer/sql/operator/delete_logical_operator.h @@ -16,6 +16,10 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/logical_operator.h" +/** + * @brief 逻辑算子,用于执行delete语句 + * @ingroup LogicalOperator + */ class DeleteLogicalOperator : public LogicalOperator { public: diff --git a/src/observer/sql/operator/delete_physical_operator.h b/src/observer/sql/operator/delete_physical_operator.h index 56d05a145468ff3ed7a96b554d7f81c44a1c4d93..5859c107ddd809d48326b6f152153b9fac3ec53c 100644 --- a/src/observer/sql/operator/delete_physical_operator.h +++ b/src/observer/sql/operator/delete_physical_operator.h @@ -19,6 +19,10 @@ See the Mulan PSL v2 for more details. */ class Trx; class DeleteStmt; +/** + * @brief 物理算子,删除 + * @ingroup PhysicalOperator + */ class DeletePhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/explain_logical_operator.h b/src/observer/sql/operator/explain_logical_operator.h index fd47b253dd163f59c5ea6810edf340f125cd4d41..4920e79880e0edcd3aa397337f5b4842a769dadb 100644 --- a/src/observer/sql/operator/explain_logical_operator.h +++ b/src/observer/sql/operator/explain_logical_operator.h @@ -16,7 +16,12 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/logical_operator.h" -class ExplainLogicalOperator : public LogicalOperator { +/** + * @brief Explain逻辑算子 + * @ingroup LogicalOperator + */ +class ExplainLogicalOperator : public LogicalOperator +{ public: ExplainLogicalOperator() = default; virtual ~ExplainLogicalOperator() = default; diff --git a/src/observer/sql/operator/explain_physical_operator.h b/src/observer/sql/operator/explain_physical_operator.h index c47005a2d4fddc49b27dacc67b03a60e5f7c7d67..1832efebdf3574dbb6237e5cc2da5bb101848947 100644 --- a/src/observer/sql/operator/explain_physical_operator.h +++ b/src/observer/sql/operator/explain_physical_operator.h @@ -16,6 +16,10 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/physical_operator.h" +/** + * @brief Explain物理算子 + * @ingroup PhysicalOperator + */ class ExplainPhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/index_scan_physical_operator.h b/src/observer/sql/operator/index_scan_physical_operator.h index 8b4e2da503f281916860b94c7d8494516c2951bb..9b7a51eaf83232e527be538080f50684e337567c 100644 --- a/src/observer/sql/operator/index_scan_physical_operator.h +++ b/src/observer/sql/operator/index_scan_physical_operator.h @@ -18,6 +18,10 @@ See the Mulan PSL v2 for more details. */ #include "sql/expr/tuple.h" #include "storage/record/record_manager.h" +/** + * @brief 索引扫描物理算子 + * @ingroup PhysicalOperator + */ class IndexScanPhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/insert_logical_operator.h b/src/observer/sql/operator/insert_logical_operator.h index e54bf1ec6d1b338f101d72266c67628920cc6f06..ffaf1772c652e4179b7bebac8f3f504949ea6cbe 100644 --- a/src/observer/sql/operator/insert_logical_operator.h +++ b/src/observer/sql/operator/insert_logical_operator.h @@ -19,6 +19,10 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/logical_operator.h" #include "sql/parser/parse_defs.h" +/** + * @brief 插入逻辑算子 + * @ingroup LogicalOperator + */ class InsertLogicalOperator : public LogicalOperator { public: diff --git a/src/observer/sql/operator/insert_physical_operator.h b/src/observer/sql/operator/insert_physical_operator.h index 62151a55472ccaef19650de67b3957d4d78197f2..d5ce2c9f86cc1107133cd8511ba3285691edd21a 100644 --- a/src/observer/sql/operator/insert_physical_operator.h +++ b/src/observer/sql/operator/insert_physical_operator.h @@ -20,6 +20,10 @@ See the Mulan PSL v2 for more details. */ class InsertStmt; +/** + * @brief 插入物理算子 + * @ingroup PhysicalOperator + */ class InsertPhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/join_logical_operator.h b/src/observer/sql/operator/join_logical_operator.h index 42c4a0a252c11ecc727b32fa4631474de088629e..e13909e642c06eb5af55ec78f918518eae54fa80 100644 --- a/src/observer/sql/operator/join_logical_operator.h +++ b/src/observer/sql/operator/join_logical_operator.h @@ -16,7 +16,13 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/logical_operator.h" -class JoinLogicalOperator : public LogicalOperator { +/** + * @brief 连接算子 + * @ingroup LogicalOperator + * @details 连接算子,用于连接两个表。对应的物理算子或者实现,可能有NestedLoopJoin,HashJoin等等。 + */ +class JoinLogicalOperator : public LogicalOperator +{ public: JoinLogicalOperator() = default; virtual ~JoinLogicalOperator() = default; diff --git a/src/observer/sql/operator/join_physical_operator.h b/src/observer/sql/operator/join_physical_operator.h index 8f507ad4637003b6f7da17fededf6bccb1e98a69..d5adb17a494d4529d91cffe646846656ea7252b1 100644 --- a/src/observer/sql/operator/join_physical_operator.h +++ b/src/observer/sql/operator/join_physical_operator.h @@ -18,8 +18,9 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/physical_operator.h" /** - * 最简单的两表(称为左表、右表)join算子 - * 依次遍历左表的每一行,然后关联右表的每一行 + * @brief 最简单的两表(称为左表、右表)join算子 + * @details 依次遍历左表的每一行,然后关联右表的每一行 + * @ingroup PhysicalOperator */ class NestedLoopJoinPhysicalOperator : public PhysicalOperator { diff --git a/src/observer/sql/operator/logical_operator.h b/src/observer/sql/operator/logical_operator.h index e66a5c86706ba244e140d2478ac8d7123e152eb6..ac038fb96533d5b34929b78fdd265955898846c5 100644 --- a/src/observer/sql/operator/logical_operator.h +++ b/src/observer/sql/operator/logical_operator.h @@ -19,20 +19,31 @@ See the Mulan PSL v2 for more details. */ #include "sql/expr/expression.h" +/** + * @brief 逻辑算子 + * @defgroup LogicalOperator + * @details 逻辑算子描述当前执行计划要做什么,比如从表中获取数据,过滤,投影,连接等等。 + * 物理算子会描述怎么做某件事情,这是与其不同的地方。 + */ + +/** + * @brief 逻辑算子类型 + * + */ enum class LogicalOperatorType { - TABLE_GET, - PREDICATE, - PROJECTION, - JOIN, - INSERT, - DELETE, - EXPLAIN, + TABLE_GET, ///< 从表中获取数据 + PREDICATE, ///< 过滤,就是谓词 + PROJECTION, ///< 投影,就是select + JOIN, ///< 连接 + INSERT, ///< 插入 + DELETE, ///< 删除,删除可能会有子查询 + EXPLAIN, ///< 查看执行计划 }; /** - * 逻辑算子描述当前执行计划要做什么 - * 可以看OptimizeStage中相关的代码 + * @brief 逻辑算子描述当前执行计划要做什么 + * @details 可以看OptimizeStage中相关的代码 */ class LogicalOperator { @@ -53,6 +64,9 @@ public: } protected: - std::vector> children_; - std::vector> expressions_; + std::vector> children_; ///< 子算子 + + ///< 表达式,比如select中的列,where中的谓词等等,都可以使用表达式来表示 + ///< 表达式能是一个常量,也可以是一个函数,也可以是一个列,也可以是一个子查询等等 + std::vector> expressions_; }; diff --git a/src/observer/sql/operator/physical_operator.h b/src/observer/sql/operator/physical_operator.h index 55d19dc4041e19788d80bdbcb591ac6c71a73911..629284314ea6d054febd71d64c9d34651b516cf8 100644 --- a/src/observer/sql/operator/physical_operator.h +++ b/src/observer/sql/operator/physical_operator.h @@ -25,6 +25,16 @@ class Record; class TupleCellSpec; class Trx; +/** + * @brief 物理算子 + * @defgroup PhysicalOperator + * @details 物理算子描述执行计划将如何执行,比如从表中怎么获取数据,如何做投影,怎么做连接等 + */ + +/** + * @brief 物理算子类型 + * @ingroup PhysicalOperator + */ enum class PhysicalOperatorType { TABLE_SCAN, @@ -39,7 +49,8 @@ enum class PhysicalOperatorType }; /** - * 与LogicalOperator对应,物理算子描述执行计划将如何执行 + * @brief 与LogicalOperator对应,物理算子描述执行计划将如何执行 + * @ingroup PhysicalOperator */ class PhysicalOperator { diff --git a/src/observer/sql/operator/predicate_logical_operator.h b/src/observer/sql/operator/predicate_logical_operator.h index f92dcb02d8f9685c79f01a296cdffc090de23136..1d3480a14e46b6b4a15bd7b4d6cb1b415517f74e 100644 --- a/src/observer/sql/operator/predicate_logical_operator.h +++ b/src/observer/sql/operator/predicate_logical_operator.h @@ -17,6 +17,10 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/logical_operator.h" #include "sql/expr/expression.h" +/** + * @brief 谓词/过滤逻辑算子 + * @ingroup LogicalOperator + */ class PredicateLogicalOperator : public LogicalOperator { public: diff --git a/src/observer/sql/operator/predicate_physical_operator.h b/src/observer/sql/operator/predicate_physical_operator.h index 8ecad52d384342b00fa3206cb23d7fffe70935d4..362cbb5e72d986526a2dbd7dcf2dab0c3e63a24b 100644 --- a/src/observer/sql/operator/predicate_physical_operator.h +++ b/src/observer/sql/operator/predicate_physical_operator.h @@ -20,6 +20,10 @@ See the Mulan PSL v2 for more details. */ class FilterStmt; +/** + * @brief 过滤/谓词物理算子 + * @ingroup PhysicalOperator + */ class PredicatePhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/project_logical_operator.h b/src/observer/sql/operator/project_logical_operator.h index ecde59cc08fd97e4a5fc75b5510bae9b39563a75..ba02737c6eb8881c91c73f9931d77bf9942d42a0 100644 --- a/src/observer/sql/operator/project_logical_operator.h +++ b/src/observer/sql/operator/project_logical_operator.h @@ -22,7 +22,9 @@ See the Mulan PSL v2 for more details. */ #include "storage/field/field.h" /** - * project 表示投影运算 + * @brief project 表示投影运算 + * @ingroup LogicalOperator + * @details 从表中获取数据后,可能需要过滤,投影,连接等等。 */ class ProjectLogicalOperator : public LogicalOperator { diff --git a/src/observer/sql/operator/project_physical_operator.h b/src/observer/sql/operator/project_physical_operator.h index 15926b02494bf1986b4a0fcf4c0fe06ac9dbb168..6d624bd14dfebd0f3f4e91d6fbb0cd0b8966487a 100644 --- a/src/observer/sql/operator/project_physical_operator.h +++ b/src/observer/sql/operator/project_physical_operator.h @@ -16,6 +16,10 @@ See the Mulan PSL v2 for more details. */ #include "sql/operator/physical_operator.h" +/** + * @brief 选择/投影物理算子 + * @ingroup PhysicalOperator + */ class ProjectPhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/string_list_physical_operator.h b/src/observer/sql/operator/string_list_physical_operator.h index 1957fa96467bf08bf295051b4844e8179d5be7b4..b8e0017f74c800061e711344507f5f6b729c5b5c 100644 --- a/src/observer/sql/operator/string_list_physical_operator.h +++ b/src/observer/sql/operator/string_list_physical_operator.h @@ -17,6 +17,11 @@ See the Mulan PSL v2 for more details. */ #include #include "sql/operator/physical_operator.h" +/** + * @brief 字符串列表物理算子 + * @ingroup PhysicalOperator + * @details 用于将字符串列表转换为物理算子,为了方便实现的接口,比如help命令 + */ class StringListPhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/operator/table_get_logical_operator.h b/src/observer/sql/operator/table_get_logical_operator.h index 0c15360134f20020a242aa8c63b4783cf28ed85e..1871f0aea52f0a8ceab547bd50010e9585bfc3b7 100644 --- a/src/observer/sql/operator/table_get_logical_operator.h +++ b/src/observer/sql/operator/table_get_logical_operator.h @@ -17,8 +17,9 @@ See the Mulan PSL v2 for more details. */ #include "storage/field/field.h" /** - * 表示从表中获取数据的算子 - * 比如使用全表扫描、通过索引获取数据等 + * @brief 表示从表中获取数据的算子 + * @details 比如使用全表扫描、通过索引获取数据等 + * @ingroup LogicalOperator */ class TableGetLogicalOperator : public LogicalOperator { diff --git a/src/observer/sql/operator/table_scan_physical_operator.h b/src/observer/sql/operator/table_scan_physical_operator.h index 7933fb1b1fa49a710c93a821d431bbd37735cfca..5f10fbbdf58aacc5d20a5c076c654079afce2e27 100644 --- a/src/observer/sql/operator/table_scan_physical_operator.h +++ b/src/observer/sql/operator/table_scan_physical_operator.h @@ -20,6 +20,10 @@ See the Mulan PSL v2 for more details. */ class Table; +/** + * @brief 表扫描物理算子 + * @ingroup PhysicalOperator + */ class TableScanPhysicalOperator : public PhysicalOperator { public: diff --git a/src/observer/sql/optimizer/optimize_stage.cpp b/src/observer/sql/optimizer/optimize_stage.cpp index 25adbbf1757921ffda0c5ba1f1f62dd108abcd7f..8a6898c56c84f6b071266c82b0133a0be976b678 100644 --- a/src/observer/sql/optimizer/optimize_stage.cpp +++ b/src/observer/sql/optimizer/optimize_stage.cpp @@ -157,11 +157,6 @@ RC OptimizeStage::generate_physical_plan( return rc; } -void OptimizeStage::callback_event(StageEvent *event, CallbackContext *context) -{ - return; -} - RC OptimizeStage::rewrite(unique_ptr &logical_operator) { RC rc = RC::SUCCESS; diff --git a/src/observer/sql/optimizer/optimize_stage.h b/src/observer/sql/optimizer/optimize_stage.h index 5a10cb3b5959ea4983f438c9ae0447b7386c59c9..5272e734dacf705ef5950ba5616d27bd734adfb2 100644 --- a/src/observer/sql/optimizer/optimize_stage.h +++ b/src/observer/sql/optimizer/optimize_stage.h @@ -34,6 +34,7 @@ class ExplainStmt; /** * @brief 将解析后的Statement转换成执行计划,并进行优化 + * @ingroup SQLStage * @details 优化分为两种,一个是根据规则重写,一个是根据代价模型优化。 * 在这里,先将Statement转换成逻辑计划,然后进行重写(rewrite),最后生成物理计划。 * 不过并不是所有的语句都需要生成计划,有些可以直接执行,比如create table、create index等。 @@ -53,7 +54,6 @@ protected: bool initialize(); void cleanup(); void handle_event(common::StageEvent *event); - void callback_event(common::StageEvent *event, common::CallbackContext *context); private: RC handle_request(SQLStageEvent *event); diff --git a/src/observer/sql/parser/parse_stage.cpp b/src/observer/sql/parser/parse_stage.cpp index 2e6a07a8b2c9e01d356c1098902f652030d2b6d0..73fbfa97dc9c52aa00deca93e7892ba2bb1d4227 100644 --- a/src/observer/sql/parser/parse_stage.cpp +++ b/src/observer/sql/parser/parse_stage.cpp @@ -84,28 +84,10 @@ void ParseStage::handle_event(StageEvent *event) { RC rc = handle_request(event); if (RC::SUCCESS != rc) { - callback_event(event, nullptr); - return; + LOG_ERROR("Failed to handle request"); } - CompletionCallback *cb = new (std::nothrow) CompletionCallback(this, nullptr); - if (cb == nullptr) { - LOG_ERROR("Failed to new callback for SQLStageEvent"); - callback_event(event, nullptr); - return; - } - - event->push_callback(cb); - resolve_stage_->handle_event(event); - event->done_immediate(); - - return; -} - -void ParseStage::callback_event(StageEvent *event, CallbackContext *context) -{ SQLStageEvent *sql_event = static_cast(event); - sql_event->session_event()->done_immediate(); sql_event->done_immediate(); return; } @@ -138,5 +120,7 @@ RC ParseStage::handle_request(StageEvent *event) } sql_event->set_command(std::move(cmd)); + + resolve_stage_->handle_event(event); return RC::SUCCESS; } diff --git a/src/observer/sql/parser/parse_stage.h b/src/observer/sql/parser/parse_stage.h index 50400aa3a5b780d73fe446a4c727b0bf399d8022..4ab30af7e280338328917cfa614f53e5b96c17a2 100644 --- a/src/observer/sql/parser/parse_stage.h +++ b/src/observer/sql/parser/parse_stage.h @@ -19,7 +19,7 @@ See the Mulan PSL v2 for more details. */ /** * @brief 解析SQL语句,解析后的结果可以参考parse_defs.h - * + * @ingroup SQLStage */ class ParseStage : public common::Stage { @@ -30,12 +30,11 @@ public: protected: // common function ParseStage(const char *tag); - bool set_properties(); + bool set_properties() override; - bool initialize(); - void cleanup(); - void handle_event(common::StageEvent *event); - void callback_event(common::StageEvent *event, common::CallbackContext *context); + bool initialize() override; + void cleanup() override; + void handle_event(common::StageEvent *event) override; protected: RC handle_request(common::StageEvent *event); diff --git a/src/observer/sql/parser/resolve_stage.cpp b/src/observer/sql/parser/resolve_stage.cpp index 026a8a9be03515e723bd3ee2d7974f773d8535bb..bd818c1926da8100cbd0c18fb901360d56336175 100644 --- a/src/observer/sql/parser/resolve_stage.cpp +++ b/src/observer/sql/parser/resolve_stage.cpp @@ -109,8 +109,3 @@ void ResolveStage::handle_event(StageEvent *event) return; } - -void ResolveStage::callback_event(StageEvent *event, CallbackContext *context) -{ - return; -} diff --git a/src/observer/sql/parser/resolve_stage.h b/src/observer/sql/parser/resolve_stage.h index 4c13b0881bb4c3b0ce74b3a5eabb08f3716b7039..118389165fd9a75da8418b0c6e149c82c0a785e2 100644 --- a/src/observer/sql/parser/resolve_stage.h +++ b/src/observer/sql/parser/resolve_stage.h @@ -17,8 +17,8 @@ See the Mulan PSL v2 for more details. */ #include "common/seda/stage.h" /** - * @brief 执行Resolve,将解析后的SQL语句,转换成各种Stmt(Statement) - * + * @brief 执行Resolve,将解析后的SQL语句,转换成各种Stmt(Statement), 同时会做错误检查 + * @ingroup SQLStage */ class ResolveStage : public common::Stage { @@ -34,7 +34,6 @@ protected: bool initialize(); void cleanup(); void handle_event(common::StageEvent *event); - void callback_event(common::StageEvent *event, common::CallbackContext *context); protected: private: diff --git a/src/observer/sql/plan_cache/plan_cache_stage.cpp b/src/observer/sql/plan_cache/plan_cache_stage.cpp index 4c415b156fcf980b7bceb837b4cd2572e56c2454..14f7e118e3e46f0aa6a1a303e6d82ba8e08bdcf2 100644 --- a/src/observer/sql/plan_cache/plan_cache_stage.cpp +++ b/src/observer/sql/plan_cache/plan_cache_stage.cpp @@ -92,11 +92,3 @@ void PlanCacheStage::handle_event(StageEvent *event) return; } - -void PlanCacheStage::callback_event(StageEvent *event, CallbackContext *context) -{ - // update execute plan here - // event->done_immediate(); - - return; -} diff --git a/src/observer/sql/plan_cache/plan_cache_stage.h b/src/observer/sql/plan_cache/plan_cache_stage.h index e48e0ad35281065e38f026eb96eae029377eec78..8d4d8f981ca47d39cf4b1378d793b687f682b4d1 100644 --- a/src/observer/sql/plan_cache/plan_cache_stage.h +++ b/src/observer/sql/plan_cache/plan_cache_stage.h @@ -18,24 +18,24 @@ See the Mulan PSL v2 for more details. */ /** * @brief 尝试从Plan的缓存中获取Plan,如果没有命中,则执行Optimizer + * @ingroup SQLStage * @details 实际上现在什么都没做。不过PlanCache对数据库的优化提升明显,是一个非常有趣的功能, * 感兴趣的同学可以参考OceanBase的实现 */ class PlanCacheStage : public common::Stage { public: - ~PlanCacheStage(); + virtual ~PlanCacheStage(); static Stage *make_stage(const std::string &tag); protected: // common function PlanCacheStage(const char *tag); - bool set_properties(); + bool set_properties() override; - bool initialize(); - void cleanup(); - void handle_event(common::StageEvent *event); - void callback_event(common::StageEvent *event, common::CallbackContext *context); + bool initialize() override; + void cleanup() override; + void handle_event(common::StageEvent *event) override; protected: private: diff --git a/src/observer/sql/query_cache/query_cache_stage.cpp b/src/observer/sql/query_cache/query_cache_stage.cpp index 4265e5d49d37541dc50934b1adf7c9c31b8177a1..7844ca0c8f8b8318f51e7df80c3f608255e0a080 100644 --- a/src/observer/sql/query_cache/query_cache_stage.cpp +++ b/src/observer/sql/query_cache/query_cache_stage.cpp @@ -79,8 +79,3 @@ void QueryCacheStage::handle_event(StageEvent *event) return; } - -void QueryCacheStage::callback_event(StageEvent *event, CallbackContext *context) -{ - return; -} diff --git a/src/observer/sql/query_cache/query_cache_stage.h b/src/observer/sql/query_cache/query_cache_stage.h index 834b1f33f388a83f35f3e67ac847ca1f783a3474..3b8e010662962433eca1a9f939c42901ed990b78 100644 --- a/src/observer/sql/query_cache/query_cache_stage.h +++ b/src/observer/sql/query_cache/query_cache_stage.h @@ -16,20 +16,25 @@ See the Mulan PSL v2 for more details. */ #include "common/seda/stage.h" -class QueryCacheStage : public common::Stage { +/** + * @brief 查询缓存处理 + * @ingroup SQLStage + * @details 当前什么都没做 + */ +class QueryCacheStage : public common::Stage +{ public: - ~QueryCacheStage(); + virtual ~QueryCacheStage(); static Stage *make_stage(const std::string &tag); protected: // common function QueryCacheStage(const char *tag); - bool set_properties(); + bool set_properties() override; - bool initialize(); - void cleanup(); - void handle_event(common::StageEvent *event); - void callback_event(common::StageEvent *event, common::CallbackContext *context); + bool initialize() override; + void cleanup() override; + void handle_event(common::StageEvent *event) override; protected: private: diff --git a/src/observer/storage/default/default_storage_stage.h b/src/observer/storage/default/default_storage_stage.h index 9af92f86fd25673a1e3da73ac8b7e7639368b49f..8149d01789f0603dc9a7e9838304f4d01b5d2637 100644 --- a/src/observer/storage/default/default_storage_stage.h +++ b/src/observer/storage/default/default_storage_stage.h @@ -20,7 +20,12 @@ See the Mulan PSL v2 for more details. */ class DefaultHandler; class SqlResult; -class DefaultStorageStage : public common::Stage { +/** + * @brief 当前仅实现了load data,是准备删除的一个SQL stage + * @ingroup SQLStage + */ +class DefaultStorageStage : public common::Stage +{ public: ~DefaultStorageStage(); static Stage *make_stage(const std::string &tag); diff --git a/test/perf/CMakeLists.txt b/test/perf/CMakeLists.txt index 80a7b7fc7f4805c710526f63ffab66348735876a..b377d35b299af9e542f14c8959c68775d730f1be 100644 --- a/test/perf/CMakeLists.txt +++ b/test/perf/CMakeLists.txt @@ -1,31 +1,3 @@ -PROJECT(test) -MESSAGE("Begin to build " ${PROJECT_NAME}) -MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR}) -MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR}) - - -# 可以获取父cmake的变量 -MESSAGE("${CMAKE_COMMON_FLAGS}") - - -#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...) -INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../../deps /usr/local/include SYSTEM) -# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面 -#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include) -LINK_DIRECTORIES(/usr/local/lib ${PROJECT_BINARY_DIR}/../../lib) - - -IF (DEFINED ENV{LD_LIBRARY_PATH}) - SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH}) - #separate_arguments(LD_LIBRARY_PATH_STR) #只能处理空行 - string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR}) - MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST}) - LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST}) -ELSE () - LINK_DIRECTORIES(/usr/local/lib) -ENDIF () - - #get_filename_component( FileName # PATH|ABSOLUTE|NAME|EXT|NAME_WE|REALPATH diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 6da3f4b4d7bed3bb30b278f9e472e123ae2b4746..453702d40ca2c58caefb8390a5b678618f6c1c07 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -3,13 +3,9 @@ MESSAGE("Begin to build clog_reader") SET(CLOG_READER_SRC clog_reader_cmd.cpp) -# 指定目标文件位置 -SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin) -MESSAGE("Binary directory:" ${EXECUTABLE_OUTPUT_PATH}) TARGET_SOURCES(clog_reader PRIVATE ${CLOG_READER_SRC}) TARGET_LINK_LIBRARIES(clog_reader observer_static) -TARGET_INCLUDE_DIRECTORIES(clog_reader PRIVATE . ${PROJECT_SOURCE_DIR}/src/observer/ ${PROJECT_SOURCE_DIR}/deps) -TARGET_LINK_DIRECTORIES(clog_reader PUBLIC /usr/local/lib) +TARGET_INCLUDE_DIRECTORIES(clog_reader PRIVATE ${PROJECT_SOURCE_DIR}/src/observer/) # Target 必须在定义 ADD_EXECUTABLE 之后, programs 不受这个限制 # TARGETS和PROGRAMS 的默认权限是OWNER_EXECUTE, GROUP_EXECUTE, 和WORLD_EXECUTE,即755权限, programs 都是处理脚步类 diff --git a/unittest/CMakeLists.txt b/unittest/CMakeLists.txt index 3d2f928ceb5fbb14114205f6eb3b25031e6c578a..af11bea98951fcffdbf19de8668f929880f356ad 100644 --- a/unittest/CMakeLists.txt +++ b/unittest/CMakeLists.txt @@ -1,23 +1,7 @@ # 可以获取父cmake的变量 MESSAGE("${CMAKE_COMMON_FLAGS}") - -#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...) -INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/deps ${PROJECT_SOURCE_DIR}/src/observer /usr/local/include) -# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面 -#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include) -LINK_DIRECTORIES(/usr/local/lib /usr/local/lib64 ${PROJECT_BINARY_DIR}/lib) - - -IF (DEFINED ENV{LD_LIBRARY_PATH}) - SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH}) - #separate_arguments(LD_LIBRARY_PATH_STR) #只能处理空行 - string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR}) - MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST}) - LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST}) -ELSE () - LINK_DIRECTORIES(/usr/local/lib) -ENDIF () +INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/src/observer) find_package(GTest CONFIG REQUIRED) @@ -35,4 +19,3 @@ FOREACH (F ${ALL_SRC}) TARGET_LINK_LIBRARIES(${prjName} common pthread dl gtest gtest_main observer_static) gtest_discover_tests(${prjName}) ENDFOREACH (F) -