未验证 提交 380fea38 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

observer可以直接在控制台输入命令 (#199)

### What problem were solved in this pull request?

Issue Number: close #162 

Problem:
当前的observer启动流程和调试方法比较繁琐,必须使用客户端服务端的方式,先启动服务端程序,再使用客户端启动调试

### What is changed and how it works?
observer可以直接启动,不监听tcp或unix socket,直接通过终端/控制台输入命令并执行,极大的方便了调试

### Other information
新的启动方法:
./bin/observer -P cli -f ../etc/observer.ini
上级 39042c92
......@@ -64,4 +64,5 @@ jobs:
shell: bash
run: |
sudo bash build.sh init
echo "begin test..."
python3 test/case/miniob_test.py --test-cases=basic | tail -1 | grep "basic is success"
# include 另外一个cmake 配置
#INCLUDE(file1 [OPTIONAL])
cmake_minimum_required(VERSION 3.10)
set(CMAKE_CXX_STANDARD 20)
#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
project(minidb)
MESSAGE(STATUS "This is SOURCE dir " ${test_SOURCE_DIR})
MESSAGE(STATUS "This is BINARY dir " ${test_BINARY_DIR})
MESSAGE(STATUS "This is Project source dir " ${PROJECT_SOURCE_DIR})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
#SET(LIBRARY_OUTPUT_PATH <路径>)
OPTION(ENABLE_ASAN "Enable build with address sanitizer" OFF)
SET(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR}/cmake)
OPTION(ENABLE_ASAN "Enable build with address sanitizer" ON)
OPTION(WITH_UNIT_TESTS "Compile miniob with unit tests" ON)
OPTION(CONCURRENCY "Support concurrency operations" OFF)
......@@ -77,6 +72,21 @@ ELSE()
ENDIF()
MESSAGE("Install target dir is " ${CMAKE_INSTALL_PREFIX})
IF (DEFINED ENV{LD_LIBRARY_PATH})
SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH})
string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR})
MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST})
LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST})
ENDIF ()
IF (EXISTS /usr/local/lib)
LINK_DIRECTORIES (/usr/local/lib)
ENDIF ()
IF (EXISTS /usr/local/lib64)
LINK_DIRECTORIES (/usr/local/lib64)
ENDIF ()
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/deps /usr/local/include)
# ADD_SUBDIRECTORY(src bin) bin 为目标目录, 可以省略
ADD_SUBDIRECTORY(deps)
......@@ -91,16 +101,6 @@ IF(WITH_UNIT_TESTS)
ADD_SUBDIRECTORY(unittest)
ENDIF()
# install 准备安装的目录是cmakefile 的当前目录, 不是build 后生成的目录
# Files 默认权限OWNER_WRITE, OWNER_READ, GROUP_READ,和WORLD_READ,即644权限
# INSTALL(FILES docs/README DESTINATION ./ )
# INSTALL(DIRECTORY docs DESTINATION ./
# PATTERN "README" EXCLUDE)
# PERMISSIONS 可以直接替换
#INSTALL(DIRECTORY bin DESTINATION ./
# FILE_PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_READ
# DIRECTORY_PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_READ WORLD_EXECUTE)
INSTALL(DIRECTORY etc DESTINATION .
FILE_PERMISSIONS OWNER_WRITE OWNER_READ GROUP_READ WORLD_READ)
......@@ -113,11 +113,6 @@ INSTALL(CODE "MESSAGE(\"Sample install message.\")")
# [ARGS <arguments to executable>]
# [OUTPUT_VARIABLE <var>]
# ADD_TEST与ENABLE_TESTING 参考书籍
#EXEC_PROGRAM(Executable [directory in which to run]
# [ARGS <arguments to executable>]
# [OUTPUT_VARIABLE <var>]
......
......@@ -8,12 +8,13 @@ MiniOB 设计的目标是面向在校学生、数据库从业者、爱好者,
为了帮助开发者更好的上手并学习 miniob, 建议阅读:
1. [miniob 框架介绍](docs/miniob-introduction.md)
2. [如何编译 MiniOB 源码](docs/how_to_build.md)
3. [使用 GitPod 开发 MiniOB](docs/dev_by_gitpod.md)
4. [开发环境搭建(本地调试, 适用 Linux 和 Mac)](docs/how_to_dev_miniob_by_vscode.md)
5. [开发环境搭建(远程调试, 适用于 Window, Linux 和 Mac)](docs/how_to_dev_in_docker_container_by_vscode.md)
6. [MiniOB 词法语法解析开发与测试](docs/miniob-sql-parser.md)
1. [miniob 框架介绍](docs/src/miniob-introduction.md)
2. [如何编译 MiniOB 源码](docs/src/how_to_build.md)
3. [如何运行 MiniOB](docs/src/how_to_run.md)
3. [使用 GitPod 开发 MiniOB](docs/src/dev-env/dev_by_gitpod.md)
4. [开发环境搭建(本地调试, 适用 Linux 和 Mac)](docs/src/dev-env/how_to_dev_miniob_by_vscode.md)
5. [开发环境搭建(远程调试, 适用于 Window, Linux 和 Mac)](docs/src/dev-env/how_to_dev_in_docker_container_by_vscode.md)
6. [MiniOB 各模块文档](docs/src/design/introduction.md)
7. [doxygen 代码文档](docs/doxy/html/index.html)
或者直接看 [MiniOB GitHub Pages](https://oceanbase.github.io/miniob/).
......
PROJECT(benchmark)
MESSAGE("Begin to build " ${PROJECT_NAME})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR})
# 可以获取父cmake的变量
MESSAGE("${CMAKE_COMMON_FLAGS}")
#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...)
INCLUDE_DIRECTORIES(. ${PROJECT_SOURCE_DIR}/../deps ${PROJECT_SOURCE_DIR}/../src/observer /usr/local/include SYSTEM)
# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面
#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include)
LINK_DIRECTORIES(/usr/local/lib /usr/local/lib64 ${PROJECT_BINARY_DIR}/../lib)
IF (DEFINED ENV{LD_LIBRARY_PATH})
SET(LD_LIBRARY_PATH_STR $ENV{LD_LIBRARY_PATH})
#separate_arguments(LD_LIBRARY_PATH_STR) #只能处理空行
string(REPLACE ":" ";" LD_LIBRARY_PATH_LIST ${LD_LIBRARY_PATH_STR})
MESSAGE(" Add LD_LIBRARY_PATH to -L flags " ${LD_LIBRARY_PATH_LIST})
LINK_DIRECTORIES(${LD_LIBRARY_PATH_LIST})
ELSE ()
LINK_DIRECTORIES(/usr/local/lib)
ENDIF ()
find_package(benchmark CONFIG REQUIRED)
INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/src/observer)
FILE(GLOB_RECURSE ALL_SRC *.cpp)
# AUX_SOURCE_DIRECTORY 类似功能
FOREACH (F ${ALL_SRC})
......@@ -32,4 +10,3 @@ FOREACH (F ${ALL_SRC})
ADD_EXECUTABLE(${prjName} ${F})
TARGET_LINK_LIBRARIES(${prjName} common pthread dl benchmark observer_static)
ENDFOREACH (F)
MACRO (MINIOB_FIND_READLINE)
FIND_PATH(READLINE_INCLUDE_DIR readline.h PATH_SUFFIXES readline)
FIND_LIBRARY(READLINE_LIBRARY NAMES readline)
IF (READLINE_INCLUDE_DIR AND READLINE_LIBRARY)
SET(HAVE_READLINE 1)
ELSE ()
MESSAGE("cannot find readline")
ENDIF()
ENDMACRO (MINIOB_FIND_READLINE)
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR})
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/deps)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
FILE(GLOB_RECURSE ALL_SRC *.cpp)
FOREACH(F ${ALL_SRC})
......
......@@ -205,7 +205,7 @@ public:
/**
* Perform Stage-specific processing for an event
* Processing one event without swtich thread.
* Processing one event without switch thread.
* Handle the event according to requirements of specific stage. Pure
* virtual member function.
*
......@@ -222,7 +222,8 @@ public:
*
* @param[in] event Pointer to event that initiated the callback.
*/
virtual void callback_event(StageEvent *event, CallbackContext *context) = 0;
virtual void callback_event(StageEvent *event, CallbackContext *context)
{}
/**
* Perform Stage-specific callback processing for a timed out event
......
......@@ -2,6 +2,7 @@
- [MiniOB 简介](./miniob-introduction.md)
- [如何编译](./how_to_build.md)
- [如何运行](./how_to_run.md)
- [开发环境搭建](./dev-env/introduction.md)
- [使用 GitPod 开发 MiniOB](./dev-env/dev_by_gitpod.md)
- [开发环境搭建(本地调试, 适用 Linux 和 Mac)](./dev-env/how_to_dev_miniob_by_vscode.md)
......
# 如何运行
编译完成后,可以在build目录(可能是build_debug或build_release)下找到bin/observer,就是我们的服务端程序,bin/obclient是自带的客户端程序。
当前服务端程序启动已经支持了多种模式,可以以TCP、unix socket方式启动,这时需要启动客户端以发起命令。observer还支持直接执行命令的模式,这时不需要启动客户端,直接在命令行输入命令即可。
**以直接执行命令的方式启动服务端程序**
```bash
./bin/observer -f ../etc/observer.ini -P cli
```
这会以直接执行命令的方式启动服务端程序,可以直接输入命令,不需要客户端。所有的请求都会以单线程的方式运行,配置项中的线程数不再有实际意义。
**以监听TCP端口的方式启动服务端程序**
```bash
./bin/observer -f ../etc/observer.ini -p 6789
```
这会以监听6789端口的方式启动服务端程序。
启动客户端程序:
```bash
./bin/obclient -p 6789
```
这会连接到服务端的6789端口。
**以监听unix socket的方式启动服务端程序**
```bash
./bin/observer -f ../etc/observer.ini -s miniob.sock
```
这会以监听unix socket的方式启动服务端程序。
启动客户端程序:
```bash
./bin/obclient -s miniob.sock
```
这会连接到服务端的miniob.sock文件。
**更多**
observer还提供了一些其它参数,可以通过./bin/observer -h查看。
PROJECT(miniob)
MESSAGE("Begin to build " ${PROJECT_NAME})
MESSAGE(STATUS "This is PROJECT_BINARY_DIR dir " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "This is PROJECT_SOURCE_DIR dir " ${PROJECT_SOURCE_DIR})
ADD_SUBDIRECTORY(obclient)
ADD_SUBDIRECTORY(observer)
ADD_EXECUTABLE(obclient)
MESSAGE("Begin to build " obclient)
INCLUDE(CheckIncludeFiles)
INCLUDE(readline)
MINIOB_FIND_READLINE()
#INCLUDE_DIRECTORIES([AFTER|BEFORE] [SYSTEM] dir1 dir2 ...)
TARGET_INCLUDE_DIRECTORIES(obclient PRIVATE . ${PROJECT_SOURCE_DIR}/deps /usr/local/include /usr/include)
# 父cmake 设置的include_directories 和link_directories并不传导到子cmake里面
#INCLUDE_DIRECTORIES(BEFORE ${CMAKE_INSTALL_PREFIX}/include)
# stdio.h 必须放在readline/readline.h 前面,因为readline头文件不能直接单独编译
CHECK_INCLUDE_FILES("stdio.h;readline/readline.h" HAVE_READLINE_HEADER)
FIND_LIBRARY(READLINE_LIBRARY readline)
IF (HAVE_READLINE_HEADER AND READLINE_LIBRARY)
IF (HAVE_READLINE)
TARGET_LINK_LIBRARIES(obclient ${READLINE_LIBRARY})
TARGET_INCLUDE_DIRECTORIES(obclient PRIVATE ${READLINE_INCLUDE_DIR})
ADD_DEFINITIONS(-DUSE_READLINE)
MESSAGE ("obclient use readline")
ELSE ()
MESSAGE ("readline is not found")
ENDIF()
......@@ -27,13 +21,9 @@ FOREACH (F ${ALL_SRC})
ENDFOREACH (F)
# 指定目标文件位置
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
MESSAGE("Binary directory:" ${EXECUTABLE_OUTPUT_PATH})
TARGET_SOURCES(obclient PRIVATE ${PRJ_SRC})
TARGET_LINK_LIBRARIES(obclient common pthread dl)
# Target 必须在定义 ADD_EXECUTABLE 之后, programs 不受这个限制
# TARGETS和PROGRAMS 的默认权限是OWNER_EXECUTE, GROUP_EXECUTE, 和WORLD_EXECUTE,即755权限, programs 都是处理脚步类
# 类型分为RUNTIME/LIBRARY/ARCHIVE, prog
......
......@@ -75,7 +75,7 @@ char *my_readline(const char *prompt)
fprintf(stderr, "failed to alloc line buffer");
return nullptr;
}
fprintf(stdout, prompt);
fprintf(stdout, "%s", prompt);
char *s = fgets(buffer, MAX_MEM_BUFFER_SIZE, stdin);
if (nullptr == s) {
fprintf(stderr, "failed to read message from console");
......
MESSAGE(STATUS "This is CMAKE_CURRENT_SOURCE_DIR dir " ${CMAKE_CURRENT_SOURCE_DIR})
INCLUDE_DIRECTORIES(. ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/deps /usr/local/include)
LINK_DIRECTORIES(/usr/local/lib)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
FILE(GLOB_RECURSE ALL_SRC *.cpp *.c)
SET(MAIN_SRC main.cpp)
......@@ -33,6 +32,17 @@ ADD_EXECUTABLE(observer ${MAIN_SRC})
TARGET_LINK_LIBRARIES(observer observer_static)
ADD_LIBRARY(observer_static STATIC ${LIB_SRC})
INCLUDE (readline)
MINIOB_FIND_READLINE()
IF (HAVE_READLINE)
TARGET_LINK_LIBRARIES(observer_static ${READLINE_LIBRARY})
TARGET_INCLUDE_DIRECTORIES(observer_static PRIVATE ${READLINE_INCLUDE_DIR})
ADD_DEFINITIONS(-DUSE_READLINE)
MESSAGE ("observer_static use readline")
ELSE ()
MESSAGE ("readline is not found")
ENDIF()
SET_TARGET_PROPERTIES(observer_static PROPERTIES OUTPUT_NAME observer)
TARGET_LINK_LIBRARIES(observer_static ${LIBRARIES})
......
......@@ -16,6 +16,7 @@ See the Mulan PSL v2 for more details. */
/**
* @brief 这个文件定义函数返回码/错误码(Return Code)
* @enum RC
*/
#define DEFINE_RCS \
......
......@@ -38,7 +38,7 @@ void usage()
std::cout << "-p: server port. if not specified, the item in the config file will be used" << std::endl;
std::cout << "-f: path of config file." << std::endl;
std::cout << "-s: use unix socket and the argument is socket address" << std::endl;
std::cout << "-P: protocol. {plain(default), mysql}." << std::endl;
std::cout << "-P: protocol. {plain(default), mysql, cli}." << std::endl;
std::cout << "-t: transaction model. {vacuous(default), mvcc}." << std::endl;
std::cout << "-n: buffer pool memory size in byte" << std::endl;
exit(0);
......@@ -75,9 +75,6 @@ void parse_parameter(int argc, char **argv)
case 'e':
process_param->set_std_err(optarg);
break;
case 'd':
process_param->set_demon(true);
break;
case 't':
process_param->set_trx_kit_name(optarg);
break;
......@@ -131,11 +128,14 @@ Server *init_server()
server_param.port = port;
if (0 == strcasecmp(process_param->get_protocol().c_str(), "mysql")) {
server_param.protocol = CommunicateProtocol::MYSQL;
} else if (0 == strcasecmp(process_param->get_protocol().c_str(), "cli")) {
server_param.use_std_io = true;
server_param.protocol = CommunicateProtocol::CLI;
} else {
server_param.protocol = CommunicateProtocol::PLAIN;
}
if (process_param->get_unix_socket_path().size() > 0) {
if (process_param->get_unix_socket_path().size() > 0 && !server_param.use_std_io) {
server_param.use_unix_socket = true;
server_param.unix_socket_path = process_param->get_unix_socket_path();
}
......@@ -166,13 +166,14 @@ void quit_signal_handle(int signum)
int main(int argc, char **argv)
{
int rc = STATUS_SUCCESS;
setSignalHandler(quit_signal_handle);
parse_parameter(argc, argv);
int rc = STATUS_SUCCESS;
rc = init(the_process_param());
if (rc) {
if (rc != STATUS_SUCCESS) {
std::cerr << "Shutdown due to failed to init!" << std::endl;
cleanup();
return rc;
......
......@@ -43,10 +43,7 @@ RC BufferedWriter::close()
return rc;
}
if (fd_ >= 0) {
::close(fd_);
fd_ = -1;
}
fd_ = -1;
return RC::SUCCESS;
}
......
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/06/25.
//
#include "net/cli_communicator.h"
#include "net/buffered_writer.h"
#include "common/log/log.h"
#include "common/lang/string.h"
#include "event/session_event.h"
#ifdef USE_READLINE
#include "readline/readline.h"
#include "readline/history.h"
#endif
#define MAX_MEM_BUFFER_SIZE 8192
#define PORT_DEFAULT 6789
using namespace common;
#ifdef USE_READLINE
const std::string HISTORY_FILE = std::string(getenv("HOME")) + "/.miniob.history";
time_t last_history_write_time = 0;
char *my_readline(const char *prompt)
{
int size = history_length;
if (size == 0) {
read_history(HISTORY_FILE.c_str());
FILE *fp = fopen(HISTORY_FILE.c_str(), "a");
if (fp != nullptr) {
fclose(fp);
}
}
char *line = readline(prompt);
if (line != nullptr && line[0] != 0) {
add_history(line);
if (time(NULL) - last_history_write_time > 5) {
write_history(HISTORY_FILE.c_str());
}
// append_history doesn't work on some readlines
// append_history(1, HISTORY_FILE.c_str());
}
return line;
}
#else // USE_READLINE
char *my_readline(const char *prompt)
{
char *buffer = (char *)malloc(MAX_MEM_BUFFER_SIZE);
if (nullptr == buffer) {
LOG_WARN("failed to alloc line buffer");
return nullptr;
}
fprintf(stdout, "%s", prompt);
char *s = fgets(buffer, MAX_MEM_BUFFER_SIZE, stdin);
if (nullptr == s) {
free(buffer);
if (ferror(stdin) || feof(stdin)) {
LOG_WARN("failed to read line: %s", strerror(errno));
}
return nullptr;
}
return buffer;
}
#endif // USE_READLINE
/* this function config a exit-cmd list, strncasecmp func truncate the command from terminal according to the number,
'strncasecmp("exit", cmd, 4)' means that obclient read command string from terminal, truncate it to 4 chars from
the beginning, then compare the result with 'exit', if they match, exit the obclient.
*/
bool is_exit_command(const char *cmd) {
return 0 == strncasecmp("exit", cmd, 4) ||
0 == strncasecmp("bye", cmd, 3) ||
0 == strncasecmp("\\q", cmd, 2) ;
}
char *read_command()
{
const char *prompt_str = "miniob > ";
char *input_command = nullptr;
for (input_command = my_readline(prompt_str);
common::is_blank(input_command);
input_command = my_readline(prompt_str)) {
free(input_command);
input_command = nullptr;
}
return input_command;
}
RC CliCommunicator::init(int fd, Session *session, const std::string &addr)
{
RC rc = PlainCommunicator::init(fd, session, addr);
if (OB_FAIL(rc)) {
LOG_WARN("fail to init communicator", strrc(rc));
return rc;
}
if (fd == STDIN_FILENO) {
write_fd_ = STDOUT_FILENO;
delete writer_;
writer_ = new BufferedWriter(write_fd_);
const char delimiter = '\n';
send_message_delimiter_.assign(1, delimiter);
fd_ = -1; // 防止被父类析构函数关闭
} else {
rc = RC::INVALID_ARGUMENT;
LOG_WARN("only stdin supported");
}
return rc;
}
RC CliCommunicator::read_event(SessionEvent *&event)
{
event = nullptr;
char *command = read_command();
if (is_exit_command(command)) {
free(command);
event = nullptr;
return RC::SUCCESS;
}
event = new SessionEvent(this);
event->set_query(std::string(command));
free(command);
return RC::SUCCESS;
}
RC CliCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
{
RC rc = PlainCommunicator::write_result(event, need_disconnect);
need_disconnect = false;
return rc;
}
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2023/06/25.
//
#pragma once
#include "net/plain_communicator.h"
/**
* @brief 用于命令行模式的通讯器
* @ingroup Communicator
* @details 直接通过终端/标准输入输出进行通讯。从这里的实现来看,是不需要libevent的一些实现的,
* 因此communicator需要重构,或者需要重构server中的各个通讯启动模式。
*/
class CliCommunicator : public PlainCommunicator
{
public:
CliCommunicator() = default;
virtual ~CliCommunicator() = default;
RC init(int fd, Session *session, const std::string &addr) override;
RC read_event(SessionEvent *&event) override;
RC write_result(SessionEvent *event, bool &need_disconnect) override;
private:
int write_fd_ = -1; ///< 与使用远程通讯模式不同,如果读数据使用标准输入,那么输出应该是标准输出
};
......@@ -14,13 +14,13 @@ See the Mulan PSL v2 for more details. */
#include "net/communicator.h"
#include "net/mysql_communicator.h"
#include "net/plain_communicator.h"
#include "net/cli_communicator.h"
#include "net/buffered_writer.h"
#include "sql/expr/tuple.h"
#include "event/session_event.h"
#include "common/lang/mutex.h"
#include "common/io/io.h"
#include "session/session.h"
#include "common/lang/mutex.h"
RC Communicator::init(int fd, Session *session, const std::string &addr)
{
fd_ = fd;
......@@ -47,254 +47,6 @@ Communicator::~Communicator()
}
}
/////////////////////////////////////////////////////////////////////////////////
RC PlainCommunicator::read_event(SessionEvent *&event)
{
RC rc = RC::SUCCESS;
event = nullptr;
int data_len = 0;
int read_len = 0;
const int max_packet_size = 8192;
std::vector<char> buf(max_packet_size);
// 持续接收消息,直到遇到'\0'。将'\0'遇到的后续数据直接丢弃没有处理,因为目前仅支持一收一发的模式
while (true) {
read_len = ::read(fd_, buf.data() + data_len, max_packet_size - data_len);
if (read_len < 0) {
if (errno == EAGAIN) {
continue;
}
break;
}