未验证 提交 e279dc7a 编写于 作者: 羽飞's avatar 羽飞 提交者: GitHub

mysql communicator (#124)

tested by mariadb ( Ver 15.1 Distrib 5.5.65-MariaDB, for Linux (x86_64) using readline 5.1)
but failed with obclient
上级 28eee7d5
......@@ -55,7 +55,7 @@ SET(CMAKE_CXX_FLAGS ${CMAKE_COMMON_FLAGS})
SET(CMAKE_C_FLAGS ${CMAKE_COMMON_FLAGS})
MESSAGE("CMAKE_CXX_FLAGS is " ${CMAKE_CXX_FLAGS})
OPTION(ENABLE_ASAN OFF "Enable build with address sanitizer")
OPTION(ENABLE_ASAN "Enable build with address sanitizer" ON)
IF (ENABLE_ASAN)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-omit-frame-pointer -fsanitize=address")
......
......@@ -110,6 +110,16 @@ public:
return unix_socket_path_;
}
void set_protocol(const char *protocol)
{
protocol_ = protocol;
}
const std::string &get_protocol() const
{
return protocol_;
}
private:
std::string std_out_; // The output file
std::string std_err_; // The err output file
......@@ -119,6 +129,7 @@ private:
std::vector<std::string> args; // arguments
int server_port_ = -1; // server port(if valid, will overwrite the port in the config file)
std::string unix_socket_path_;
std::string protocol_;
};
ProcessParam *&the_process_param();
......
......@@ -13,23 +13,28 @@ See the Mulan PSL v2 for more details. */
//
#include "session_event.h"
#include "net/communicator.h"
SessionEvent::SessionEvent(ConnectionContext *client) : client_(client)
SessionEvent::SessionEvent(Communicator *comm) : communicator_(comm)
{
}
SessionEvent::~SessionEvent()
{
if (sql_result_) {
delete sql_result_;
sql_result_ = nullptr;
}
}
ConnectionContext *SessionEvent::get_client() const
Communicator *SessionEvent::get_communicator() const
{
return client_;
return communicator_;
}
Session *SessionEvent::session() const
{
return client_->session;
return communicator_->session();
}
const char *SessionEvent::get_response() const
......@@ -57,12 +62,7 @@ int SessionEvent::get_response_len() const
return response_.size();
}
char *SessionEvent::get_request_buf()
const char *SessionEvent::get_request_buf()
{
return client_->buf;
}
int SessionEvent::get_request_buf_len()
{
return SOCKET_BUFFER_SIZE;
return query_.c_str();
}
......@@ -12,37 +12,42 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2021/4/13.
//
#ifndef __OBSERVER_SESSION_SESSIONEVENT_H__
#define __OBSERVER_SESSION_SESSIONEVENT_H__
#pragma once
#include <string.h>
#include <string>
#include "common/seda/stage_event.h"
#include "net/connection_context.h"
#include "sql/executor/sql_result.h"
class Session;
class Communicator;
class SqlResult;
class SessionEvent : public common::StageEvent {
public:
SessionEvent(ConnectionContext *client);
SessionEvent(Communicator *client);
virtual ~SessionEvent();
ConnectionContext *get_client() const;
Communicator *get_communicator() const;
Session *session() const;
void set_query(const std::string &query) { query_ = query; }
void set_sql_result(SqlResult *result) { sql_result_ = result; }
const std::string &query() const { return query_; }
SqlResult *sql_result() const { return sql_result_; }
const char *get_response() const;
void set_response(const char *response);
void set_response(const char *response, int len);
void set_response(std::string &&response);
int get_response_len() const;
char *get_request_buf();
int get_request_buf_len();
const char *get_request_buf(); // TODO remove me
private:
ConnectionContext *client_;
Communicator *communicator_ = nullptr;
SqlResult *sql_result_ = nullptr;
std::string query_;
std::string response_;
};
#endif //__OBSERVER_SESSION_SESSIONEVENT_H__
......@@ -12,8 +12,7 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2021/4/14.
//
#ifndef __OBSERVER_SQL_EVENT_SQLEVENT_H__
#define __OBSERVER_SQL_EVENT_SQLEVENT_H__
#pragma once
#include <string>
#include "common/seda/stage_event.h"
......@@ -48,4 +47,3 @@ private:
Stmt *stmt_ = nullptr;
};
#endif //__SRC_OBSERVER_SQL_EVENT_SQLEVENT_H__
......@@ -20,6 +20,7 @@ See the Mulan PSL v2 for more details. */
#include <iostream>
#include "init.h"
#include "ini_setting.h"
#include "common/os/process.h"
#include "common/os/signal.h"
#include "net/server.h"
......@@ -37,6 +38,7 @@ void usage()
std::cout << "-p: server port. if not specified, the item in the config file will be used" << std::endl;
std::cout << "-f: path of config file." << std::endl;
std::cout << "-s: use unix socket and the argument is socket address" << std::endl;
std::cout << "-P: protocol. {plain, mysql}. plain is default." << std::endl;
exit(0);
}
......@@ -51,7 +53,7 @@ void parse_parameter(int argc, char **argv)
// Process args
int opt;
extern char *optarg;
while ((opt = getopt(argc, argv, "dp:s:f:o:e:h")) > 0) {
while ((opt = getopt(argc, argv, "dp:P:s:f:o:e:h")) > 0) {
switch (opt) {
case 's':
process_param->set_unix_socket_path(optarg);
......@@ -59,6 +61,9 @@ void parse_parameter(int argc, char **argv)
case 'p':
process_param->set_server_port(atoi(optarg));
break;
case 'P':
process_param->set_protocol(optarg);
break;
case 'f':
process_param->set_conf(optarg);
break;
......@@ -116,6 +121,11 @@ Server *init_server()
server_param.listen_addr = listen_addr;
server_param.max_connection_num = max_connection_num;
server_param.port = port;
if (0 == strcasecmp(process_param->get_protocol().c_str(), "mysql")) {
server_param.protocol = CommunicateProtocol::MYSQL;
} else {
server_param.protocol = CommunicateProtocol::PLAIN;
}
if (process_param->get_unix_socket_path().size() > 0) {
server_param.use_unix_socket = true;
......
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2022/11/17.
//
#include "net/communicator.h"
#include "net/mysql_communicator.h"
#include "sql/expr/tuple.h"
#include "event/session_event.h"
#include "common/lang/mutex.h"
#include "common/io/io.h"
#include "session/session.h"
RC Communicator::init(int fd, Session *session, const std::string &addr)
{
fd_ = fd;
session_ = session;
addr_ = addr;
return RC::SUCCESS;
}
Communicator::~Communicator()
{
if (fd_ >= 0) {
close(fd_);
fd_ = -1;
}
if (session_ != nullptr) {
delete session_;
session_ = nullptr;
}
}
/////////////////////////////////////////////////////////////////////////////////
RC PlainCommunicator::read_event(SessionEvent *&event)
{
RC rc = RC::SUCCESS;
event = nullptr;
int data_len = 0;
int read_len = 0;
const int max_packet_size = 8192;
std::vector<char> buf(max_packet_size);
// 持续接收消息,直到遇到'\0'。将'\0'遇到的后续数据直接丢弃没有处理,因为目前仅支持一收一发的模式
while (true) {
read_len = ::read(fd_, buf.data() + data_len, max_packet_size - data_len);
if (read_len < 0) {
if (errno == EAGAIN) {
continue;
}
break;
}
if (read_len == 0) {
break;
}
if (read_len + data_len > max_packet_size) {
data_len += read_len;
break;
}
bool msg_end = false;
for (int i = 0; i < read_len; i++) {
if (buf[data_len + i] == 0) {
data_len += i + 1;
msg_end = true;
break;
}
}
if (msg_end) {
break;
}
data_len += read_len;
}
if (data_len > max_packet_size) {
LOG_WARN("The length of sql exceeds the limitation %d", max_packet_size);
return RC::IOERR;
}
if (read_len == 0) {
LOG_INFO("The peer has been closed %s\n", addr());
return RC::IOERR;
} else if (read_len < 0) {
LOG_ERROR("Failed to read socket of %s, %s\n", addr(), strerror(errno));
return RC::IOERR;
}
LOG_INFO("receive command(size=%d): %s", data_len, buf.data());
event = new SessionEvent(this);
event->set_query(std::string(buf.data()));
return rc;
}
RC PlainCommunicator::write_state(SessionEvent *event, bool &need_disconnect)
{
SqlResult *sql_result = event->sql_result();
const int buf_size = 2048;
char *buf = new char[buf_size];
const std::string &state_string = sql_result->state_string();
if (state_string.empty()) {
const char *result = RC::SUCCESS == sql_result->return_code() ? "SUCCESS" : "FAILURE";
snprintf(buf, buf_size, "%s\n", result);
} else {
snprintf(buf, buf_size, "%s > %s\n", strrc(sql_result->return_code()), state_string.c_str());
}
int ret = common::writen(fd_, buf, strlen(buf) + 1);
if (ret != 0) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
need_disconnect = true;
delete[] buf;
return RC::IOERR;
}
need_disconnect = false;
delete[] buf;
return RC::SUCCESS;
}
RC PlainCommunicator::write_result(SessionEvent *event, bool &need_disconnect)
{
need_disconnect = true;
const char message_terminate = '\0';
SqlResult *sql_result = event->sql_result();
if (nullptr == sql_result) {
const char *response = event->get_response();
int len = event->get_response_len();
int ret = common::writen(fd_, response, len);
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
return RC::IOERR;
}
ret = common::writen(fd_, &message_terminate, sizeof(message_terminate));
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
return RC::IOERR;
}
need_disconnect = false;
return RC::SUCCESS;
}
if (RC::SUCCESS != sql_result->return_code() || !sql_result->has_operator()) {
return write_state(event, need_disconnect);
}
RC rc = sql_result->open();
if (rc != RC::SUCCESS) {
sql_result->set_return_code(rc);
return write_state(event, need_disconnect);
}
const TupleSchema &schema = sql_result->tuple_schema();
const int cell_num = schema.cell_num();
for (int i = 0; i < cell_num; i++) {
const TupleCellSpec &spec = schema.cell_at(i);
const char *alias = spec.alias();
if (nullptr != alias || alias[0] != 0) {
if (0 != i) {
const char *delim = " | ";
int ret = common::writen(fd_, delim, strlen(delim));
if (ret != 0) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR;
}
}
int len = strlen(alias);
int ret = common::writen(fd_, alias, len);
if (ret != 0) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR;
}
}
}
if (cell_num > 0) {
char newline = '\n';
int ret = common::writen(fd_, &newline, 1);
if (ret != 0) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR;
}
}
rc = RC::SUCCESS;
Tuple *tuple = nullptr;
while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) {
assert(tuple != nullptr);
int cell_num = tuple->cell_num();
for (int i = 0; i < cell_num; i++) {
if (i != 0) {
const char *delim = " | ";
int ret = common::writen(fd_, delim, strlen(delim));
if (ret != 0) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR;
}
}
TupleCell cell;
rc = tuple->cell_at(i, cell);
if (rc != RC::SUCCESS) {
return rc;
}
std::stringstream ss;
cell.to_string(ss);
std::string cell_str = ss.str();
int ret = common::writen(fd_, cell_str.data(), cell_str.size());
if (ret != RC::SUCCESS) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR;
}
}
char newline = '\n';
int ret = common::writen(fd_, &newline, 1);
if (ret != 0) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
return RC::IOERR;
}
}
if (rc != RC::RECORD_EOF) {
LOG_WARN("operator is done with error. error=%s", strrc(rc));
} else {
rc = RC::SUCCESS;
int ret = common::writen(fd_, &message_terminate, sizeof(message_terminate));
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
return RC::IOERR;
}
need_disconnect = false;
}
return rc;
}
/////////////////////////////////////////////////////////////////////////////////
Communicator *CommunicatorFactory::create(CommunicateProtocol protocol)
{
switch (protocol) {
case CommunicateProtocol::PLAIN: {
return new PlainCommunicator;
} break;
case CommunicateProtocol::MYSQL: {
return new MysqlCommunicator;
} break;
default: {
return nullptr;
}
}
}
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2022/11/17.
//
#pragma once
#include <string>
#include <event.h>
#include "rc.h"
struct ConnectionContext;
class SessionEvent;
class Session;
/**
* 负责与客户端通讯
*
* 在listener接收到一个新的连接(参考 server.cpp::accept), 就创建一个Communicator对象。
* 并调用init进行初始化。
* 在server中监听到某个连接有新的消息,就通过Communicator::read_event接收消息。
*/
class Communicator {
public:
virtual ~Communicator();
/**
* 接收到一个新的连接时,进行初始化
*/
virtual RC init(int fd, Session *session, const std::string &addr);
/**
* 监听到有新的数据到达,调用此函数进行接收消息
* 如果需要创建新的任务来处理,那么就创建一个SessionEvent 对象并通过event参数返回。
*/
virtual RC read_event(SessionEvent *&event) = 0;
/**
* 在任务处理完成后,通过此接口将结果返回给客户端
* @param event 任务数据,包括处理的结果
* @param need_disconnect 是否需要断开连接
* @return 处理结果。即使返回不是SUCCESS,也不能直接断开连接,需要通过need_disconnect来判断
* 是否需要断开连接
*/
virtual RC write_result(SessionEvent *event, bool &need_disconnect) = 0;
/**
* 关联的会话信息
*/
Session *session() const { return session_; }
/**
* libevent使用的数据,参考server.cpp
*/
struct event &read_event() { return read_event_; }
/**
* 对端地址
* 如果是unix socket,可能没有意义
*/
const char *addr() const { return addr_.c_str(); }
protected:
Session *session_ = nullptr;
int fd_ = -1;
struct event read_event_;
std::string addr_;
};
/**
* 与客户端进行通讯
* 使用简单的文本通讯协议,每个消息使用'\0'结尾
*/
class PlainCommunicator : public Communicator {
public:
RC read_event(SessionEvent *&event) override;
RC write_result(SessionEvent *event, bool &need_disconnect) override;
private:
RC write_state(SessionEvent *event, bool &need_disconnect);
};
/**
* 当前支持的通讯协议
*/
enum class CommunicateProtocol
{
PLAIN, //! 以'\0'结尾的协议
MYSQL, //! mysql通讯协议。具体实现参考 MysqlCommunicator
};
class CommunicatorFactory
{
public:
Communicator *create(CommunicateProtocol protocol);
};
此差异已折叠。
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2022/11/22.
//
#pragma once
#include "net/communicator.h"
class SqlResult;
class BasePacket;
/**
* 与客户端通讯
* 实现MySQL通讯协议
* 可以参考 https://dev.mysql.com/doc/dev/mysql-server/latest/PAGE_PROTOCOL.html
* 或 mariadb文档 https://mariadb.com/kb/en/clientserver-protocol/
*/
class MysqlCommunicator : public Communicator {
public:
/**
* 连接刚开始建立时,进行一些初始化
* 参考MySQL或MariaDB的手册,服务端要首先向客户端发送一个握手包,等客户端回复后,
* 再回复一个OkPacket或ErrPacket
*/
virtual RC init(int fd, Session *session, const std::string &addr) override;
/**
* 有新的消息到达时,接收消息
* 因为MySQL协议的特殊性,收到数据后不一定需要向后流转,比如握手包
*/
virtual RC read_event(SessionEvent *&event) override;
/**
* 将处理结果返回给客户端
*/
virtual RC write_result(SessionEvent *event, bool &need_disconnect) override;
private:
RC send_packet(const BasePacket &packet);
RC write_state(SessionEvent *event, bool &need_disconnect);
/**
* 根据MySQL text protocol 描述,普通的结果分为列信息描述和行数据
* 这里就分为两个函数
*/
RC send_column_definition(SqlResult *sql_result, bool &need_disconnect);
RC send_result_rows(SqlResult *sql_result, bool &need_disconnect);
/**
* 根据实际测试,客户端在连接上来时,会发起一个 version_comment的查询
* 这里就针对这个查询返回一个结果
*/
RC handle_version_comment(bool &need_disconnect);
private:
//! 握手阶段(鉴权),需要做一些特殊处理,所以加个字段单独标记
bool authed_ = false;
//! 有时需要根据握手包中capabilities字段值的不同,而发送或接收不同的包
uint32_t client_capabilities_flag_ = 0;
//! 在一次通讯过程中(一个任务的请求与处理),每个包(packet)都有一个sequence id
//! 这个sequence id是递增的
int8_t sequence_id_ = 0;
};
......@@ -35,6 +35,7 @@ See the Mulan PSL v2 for more details. */
#include "event/session_event.h"
#include "session/session.h"
#include "ini_setting.h"
#include "net/communicator.h"
#include <common/metrics/metrics_registry.h>
using namespace common;
......@@ -85,7 +86,6 @@ void Server::init()
int Server::set_non_block(int fd)
{
int flags = fcntl(fd, F_GETFL);
if (flags == -1) {
LOG_INFO("Failed to get flags of fd :%d. ", fd);
......@@ -100,87 +100,34 @@ int Server::set_non_block(int fd)
return 0;
}
void Server::close_connection(ConnectionContext *client_context)
void Server::close_connection(Communicator *communicator)
{
LOG_INFO("Close connection of %s.", client_context->addr);
event_del(&client_context->read_event);
::close(client_context->fd);
delete client_context->session;
client_context->session = nullptr;
delete client_context;
LOG_INFO("Close connection of %s.", communicator->addr());
event_del(&communicator->read_event());
delete communicator;
}
void Server::recv(int fd, short ev, void *arg)
{
ConnectionContext *client = (ConnectionContext *)arg;
// Server::send(sev->getClient(), sev->getRequestBuf(), strlen(sev->getRequestBuf()));
int data_len = 0;
int read_len = 0;
int buf_size = sizeof(client->buf);
memset(client->buf, 0, buf_size);
TimerStat timer_stat(*read_socket_metric_);
MUTEX_LOCK(&client->mutex);
// 持续接收消息,直到遇到'\0'。将'\0'遇到的后续数据直接丢弃没有处理,因为目前仅支持一收一发的模式
while (true) {
read_len = ::read(client->fd, client->buf + data_len, buf_size - data_len);
if (read_len < 0) {
if (errno == EAGAIN) {
continue;
}
break;
}
if (read_len == 0) {
break;
}
if (read_len + data_len > buf_size) {
data_len += read_len;
break;
}
bool msg_end = false;
for (int i = 0; i < read_len; i++) {
if (client->buf[data_len + i] == 0) {
data_len += i + 1;
msg_end = true;
break;
}
}
if (msg_end) {
break;
}
data_len += read_len;
}
MUTEX_UNLOCK(&client->mutex);
timer_stat.end();
Communicator *comm = (Communicator *)arg;
if (data_len > buf_size) {
LOG_WARN("The length of sql exceeds the limitation %d\n", buf_size);
close_connection(client);
SessionEvent *event = nullptr;
RC rc = comm->read_event(event);
if (rc != RC::SUCCESS) {
close_connection(comm);
return;
}
if (read_len == 0) {
LOG_INFO("The peer has been closed %s\n", client->addr);
close_connection(client);
return;
} else if (read_len < 0) {
LOG_ERROR("Failed to read socket of %s, %s\n", client->addr, strerror(errno));
close_connection(client);
if (event == nullptr) {
LOG_WARN("event is null while read event return success");
return;
}
LOG_INFO("receive command(size=%d): %s", data_len, client->buf);
SessionEvent *sev = new SessionEvent(client);
session_stage_->add_event(sev);
session_stage_->add_event(event);
}
#if 0
// 这个函数仅负责发送数据,至于是否是一个完整的消息,由调用者控制
int Server::send(ConnectionContext *client, const char *buf, int data_len)
int Server::send( *client, const char *buf, int data_len)
{
if (buf == nullptr || data_len == 0) {
return 0;
......@@ -188,7 +135,6 @@ int Server::send(ConnectionContext *client, const char *buf, int data_len)
TimerStat writeStat(*write_socket_metric_);
MUTEX_LOCK(&client->mutex);
int ret = common::writen(client->fd, buf, data_len);
if (ret < 0) {
LOG_ERROR("Failed to send data back to client. ret=%d, error=%s", ret, strerror(errno));
......@@ -198,9 +144,9 @@ int Server::send(ConnectionContext *client, const char *buf, int data_len)
return -STATUS_FAILED_NETWORK;
}
MUTEX_UNLOCK(&client->mutex);
return 0;
}
#endif
void Server::accept(int fd, short ev, void *arg)
{
......@@ -244,33 +190,32 @@ void Server::accept(int fd, short ev, void *arg)
}
}
ConnectionContext *client_context = new ConnectionContext();
memset(client_context, 0, sizeof(ConnectionContext));
client_context->fd = client_fd;
snprintf(client_context->addr, sizeof(client_context->addr), "%s", addr_str.c_str());
pthread_mutex_init(&client_context->mutex, nullptr);
Communicator *communicator = instance->communicator_factory_.create(instance->server_param_.protocol);
RC rc = communicator->init(client_fd, new Session(Session::default_session()), addr_str);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to init communicator. rc=%s", strrc(rc));
delete communicator;
return;
}
event_set(&client_context->read_event, client_context->fd, EV_READ | EV_PERSIST, recv, client_context);
event_set(&communicator->read_event(), client_fd, EV_READ | EV_PERSIST, recv, communicator);
ret = event_base_set(instance->event_base_, &client_context->read_event);
ret = event_base_set(instance->event_base_, &communicator->read_event());
if (ret < 0) {
LOG_ERROR(
"Failed to do event_base_set for read event of %s into libevent, %s", client_context->addr, strerror(errno));
delete client_context;
::close(instance->server_socket_);
LOG_ERROR("Failed to do event_base_set for read event of %s into libevent, %s",
communicator->addr(), strerror(errno));
delete communicator;
return;
}
ret = event_add(&client_context->read_event, nullptr);
ret = event_add(&communicator->read_event(), nullptr);
if (ret < 0) {
LOG_ERROR("Failed to event_add for read event of %s into libevent, %s", client_context->addr, strerror(errno));
delete client_context;
::close(instance->server_socket_);
LOG_ERROR("Failed to event_add for read event of %s into libevent, %s", communicator->addr(), strerror(errno));
delete communicator;
return;
}
client_context->session = new Session(Session::default_session());
LOG_INFO("Accepted connection from %s\n", client_context->addr);
LOG_INFO("Accepted connection from %s\n", communicator->addr());
}
int Server::start()
......
......@@ -12,15 +12,15 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2021/4/1.
//
#ifndef __OBSERVER_NET_SERVER_H__
#define __OBSERVER_NET_SERVER_H__
#pragma once
#include "common/defs.h"
#include "common/metrics/metrics.h"
#include "common/seda/stage.h"
#include "net/connection_context.h"
#include "net/server_param.h"
class Communicator;
class Server {
public:
Server(ServerParam input_server_param);
......@@ -28,7 +28,8 @@ public:
public:
static void init();
static int send(ConnectionContext *client, const char *buf, int data_len);
// static int send(ConnectionContext *client, const char *buf, int data_len);
static void close_connection(Communicator *comm);
public:
int serve();
......@@ -37,7 +38,6 @@ public:
private:
static void accept(int fd, short ev, void *arg);
// close connection
static void close_connection(ConnectionContext *client_context);
static void recv(int fd, short ev, void *arg);
private:
......@@ -55,17 +55,9 @@ private:
ServerParam server_param_;
CommunicatorFactory communicator_factory_;
static common::Stage *session_stage_;
static common::SimpleTimer *read_socket_metric_;
static common::SimpleTimer *write_socket_metric_;
};
class Communicator {
public:
virtual ~Communicator() = default;
virtual int init(const ServerParam &server_param) = 0;
virtual int start() = 0;
virtual int stop() = 0;
};
#endif //__OBSERVER_NET_SERVER_H__
......@@ -12,10 +12,10 @@ See the Mulan PSL v2 for more details. */
// Created by Longda on 2021/4/13.
//
#ifndef __SRC_OBSERVER_NET_SERVER_PARAM_H__
#define __SRC_OBSERVER_NET_SERVER_PARAM_H__
#pragma once
#include <string>
#include "net/communicator.h"
class ServerParam {
public:
......@@ -36,6 +36,6 @@ public:
// 如果使用标准输入输出作为通信条件,就不再监听端口
bool use_unix_socket = false;
};
#endif //__SRC_OBSERVER_NET_SERVER_PARAM_H__
CommunicateProtocol protocol;
};
......@@ -27,6 +27,7 @@ See the Mulan PSL v2 for more details. */
#include "event/session_event.h"
#include "event/sql_event.h"
#include "net/server.h"
#include "net/communicator.h"
#include "session/session.h"
using namespace common;
......@@ -118,20 +119,14 @@ void SessionStage::callback_event(StageEvent *event, CallbackContext *context)
return;
}
const char *response = sev->get_response();
int len = sev->get_response_len();
if (len <= 0 || response == nullptr) {
response = "No data\n";
len = strlen(response) + 1;
}
Server::send(sev->get_client(), response, len);
if ('\0' != response[len - 1]) {
// 这里强制性的给发送一个消息终结符,如果需要发送多条消息,需要调整
char end = 0;
Server::send(sev->get_client(), &end, 1);
Communicator *communicator = sev->get_communicator();
bool need_disconnect = false;
RC rc = communicator->write_result(sev, need_disconnect);
LOG_INFO("write result return %s", strrc(rc));
if (need_disconnect) {
Server::close_connection(communicator);
}
// sev->done();
LOG_TRACE("Exit\n");
return;
}
......
......@@ -16,7 +16,6 @@ See the Mulan PSL v2 for more details. */
#define __OBSERVER_SESSION_SESSIONSTAGE_H__
#include "common/seda/stage.h"
#include "net/connection_context.h"
#include "common/metrics/metrics.h"
/**
......
......@@ -32,6 +32,7 @@ See the Mulan PSL v2 for more details. */
#include "sql/operator/predicate_operator.h"
#include "sql/operator/delete_operator.h"
#include "sql/operator/project_operator.h"
#include "sql/operator/string_list_operator.h"
#include "sql/stmt/stmt.h"
#include "sql/stmt/select_stmt.h"
#include "sql/stmt/update_stmt.h"
......@@ -200,18 +201,25 @@ void ExecuteStage::handle_request(common::StageEvent *event)
do_clog_sync(sql_event);
}
case SCF_ROLLBACK: {
Trx *trx = session_event->get_client()->session->current_trx();
Trx *trx = session_event->session()->current_trx();
RC rc = trx->rollback();
session->set_trx_multi_operation_mode(false);
session_event->set_response(strrc(rc));
SqlResult *sql_result = new SqlResult;
sql_result->set_return_code(rc);
session_event->set_sql_result(sql_result);
} break;
case SCF_EXIT: {
// do nothing
const char *response = "Unsupported\n";
session_event->set_response(response);
SqlResult *sql_result = new SqlResult;
sql_result->set_return_code(RC::SUCCESS);
session_event->set_sql_result(sql_result);
} break;
default: {
LOG_ERROR("Unsupported command=%d\n", sql->flag);
SqlResult *sql_result = new SqlResult;
sql_result->set_return_code(RC::UNIMPLENMENT);
sql_result->set_state_string("Unsupported command");
session_event->set_sql_result(sql_result);
}
}
}
......@@ -228,25 +236,6 @@ void end_trx_if_need(Session *session, Trx *trx, bool all_right)
}
}
void print_tuple_header(std::ostream &os, const ProjectOperator &oper)
{
const int cell_num = oper.tuple_cell_num();
const TupleCellSpec *cell_spec = nullptr;
for (int i = 0; i < cell_num; i++) {
oper.tuple_cell_spec_at(i, cell_spec);
if (i != 0) {
os << " | ";
}
if (cell_spec->alias()) {
os << cell_spec->alias();
}
}
if (cell_num > 0) {
os << '\n';
}
}
void tuple_to_string(std::ostream &os, const Tuple &tuple)
{
TupleCell cell;
......@@ -409,15 +398,21 @@ RC ExecuteStage::do_select(SQLStageEvent *sql_event)
scan_oper = new TableScanOperator(select_stmt->tables()[0]);
}
DEFER([&] () {delete scan_oper;});
PredicateOperator pred_oper(select_stmt->filter_stmt());
pred_oper.add_child(scan_oper);
ProjectOperator project_oper;
project_oper.add_child(&pred_oper);
SqlResult *sql_result = new SqlResult;
PredicateOperator *pred_oper = new PredicateOperator(select_stmt->filter_stmt());
pred_oper->add_child(scan_oper);
ProjectOperator *project_oper = new ProjectOperator;
project_oper->add_child(pred_oper);
TupleSchema schema;
for (const Field &field : select_stmt->query_fields()) {
project_oper.add_projection(field.table(), field.meta());
project_oper->add_projection(field.table(), field.meta());
schema.append_cell(field.field_name());
}
sql_result->set_tuple_schema(schema);
sql_result->set_operator(project_oper);
/*
rc = project_oper.open();
if (rc != RC::SUCCESS) {
LOG_WARN("failed to open operator");
......@@ -447,21 +442,31 @@ RC ExecuteStage::do_select(SQLStageEvent *sql_event)
rc = project_oper.close();
}
session_event->set_response(ss.str());
*/
session_event->set_sql_result(sql_result);
return rc;
}
RC ExecuteStage::do_help(SQLStageEvent *sql_event)
{
SessionEvent *session_event = sql_event->session_event();
const char *response = "show tables;\n"
"desc `table name`;\n"
"create table `table name` (`column name` `column type`, ...);\n"
"create index `index name` on `table` (`column`);\n"
"insert into `table` values(`value1`,`value2`);\n"
"update `table` set column=value [where `column`=`value`];\n"
"delete from `table` [where `column`=`value`];\n"
"select [ * | `columns` ] from `table`;\n";
session_event->set_response(response);
const char *strings[] = {
"show tables;",
"desc `table name`;",
"create table `table name` (`column name` `column type`, ...);",
"create index `index name` on `table` (`column`);",
"insert into `table` values(`value1`,`value2`);",
"update `table` set column=value [where `column`=`value`];",
"delete from `table` [where `column`=`value`];",
"select [ * | `columns` ] from `table`;"
};
StringListOperator *oper = new StringListOperator();
for (size_t i = 0; i < sizeof(strings)/sizeof(strings[0]); i++) {
oper->append(strings[i]);
}
SqlResult *sql_result = new SqlResult;
sql_result->set_operator(oper);
session_event->set_sql_result(sql_result);
return RC::SUCCESS;
}
......@@ -472,44 +477,45 @@ RC ExecuteStage::do_create_table(SQLStageEvent *sql_event)
Db *db = session_event->session()->get_current_db();
RC rc = db->create_table(create_table.relation_name,
create_table.attribute_count, create_table.attributes);
if (rc == RC::SUCCESS) {
session_event->set_response("SUCCESS\n");
} else {
session_event->set_response("FAILURE\n");
}
SqlResult *sql_result = new SqlResult;
sql_result->set_return_code(rc);
sql_event->session_event()->set_sql_result(sql_result);
return rc;
}
RC ExecuteStage::do_create_index(SQLStageEvent *sql_event)
{
SqlResult *sql_result = new SqlResult;
SessionEvent *session_event = sql_event->session_event();
session_event->set_sql_result(sql_result);
Db *db = session_event->session()->get_current_db();
const CreateIndex &create_index = sql_event->query()->sstr.create_index;
Table *table = db->find_table(create_index.relation_name);
if (nullptr == table) {
session_event->set_response("FAILURE\n");
sql_result->set_return_code(RC::SCHEMA_TABLE_NOT_EXIST);
return RC::SCHEMA_TABLE_NOT_EXIST;
}
RC rc = table->create_index(nullptr, create_index.index_name, create_index.attribute_name);
sql_event->session_event()->set_response(rc == RC::SUCCESS ? "SUCCESS\n" : "FAILURE\n");
sql_result->set_return_code(rc);
return rc;
}
RC ExecuteStage::do_show_tables(SQLStageEvent *sql_event)
{
SqlResult *sql_result = new SqlResult;
SessionEvent *session_event = sql_event->session_event();
session_event->set_sql_result(sql_result);
Db *db = session_event->session()->get_current_db();
std::vector<std::string> all_tables;
db->all_tables(all_tables);
if (all_tables.empty()) {
session_event->set_response("No table\n");
} else {
std::stringstream ss;
for (const auto &table : all_tables) {
ss << table << std::endl;
}
session_event->set_response(ss.str().c_str());
TupleSchema tuple_schema;
tuple_schema.append_cell(TupleCellSpec("", "Tables_in_SYS", "Tables_in_SYS"));
sql_result->set_tuple_schema(tuple_schema);
StringListOperator *oper = new StringListOperator;
for (const std::string &s : all_tables) {
oper->append(s);
}
sql_result->set_operator(oper);
return RC::SUCCESS;
}
......@@ -519,13 +525,27 @@ RC ExecuteStage::do_desc_table(SQLStageEvent *sql_event)
Db *db = sql_event->session_event()->session()->get_current_db();
const char *table_name = query->sstr.desc_table.relation_name;
Table *table = db->find_table(table_name);
std::stringstream ss;
SqlResult *sql_result = new SqlResult;
sql_event->session_event()->set_sql_result(sql_result);
if (table != nullptr) {
table->table_meta().desc(ss);
TupleSchema tuple_schema;
tuple_schema.append_cell(TupleCellSpec("", "Field", "Field"));
tuple_schema.append_cell(TupleCellSpec("", "Type", "Type"));
tuple_schema.append_cell(TupleCellSpec("", "Length", "Length"));
// TODO add Key
sql_result->set_tuple_schema(tuple_schema);
StringListOperator *oper = new StringListOperator;
const TableMeta &table_meta = table->table_meta();
for (int i = table_meta.sys_field_num(); i < table_meta.field_num(); i++) {
const FieldMeta *field_meta = table_meta.field(i);
oper->append({field_meta->name(), attr_type_to_string(field_meta->type()),
std::to_string(field_meta->len())});
}
sql_result->set_operator(oper);
} else {
ss << "No such table: " << table_name << std::endl;
sql_result->set_return_code(RC::SCHEMA_TABLE_NOT_EXIST);
sql_result->set_state_string("Table not exists");
}
sql_event->session_event()->set_response(ss.str().c_str());
return RC::SUCCESS;
}
......@@ -533,6 +553,8 @@ RC ExecuteStage::do_insert(SQLStageEvent *sql_event)
{
Stmt *stmt = sql_event->stmt();
SessionEvent *session_event = sql_event->session_event();
SqlResult *sql_result = new SqlResult;
session_event->set_sql_result(sql_result);
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
......@@ -552,23 +574,26 @@ RC ExecuteStage::do_insert(SQLStageEvent *sql_event)
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_COMMIT, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
if (rc == RC::SUCCESS) {
rc = RC::INTERNAL;
}
sql_result->set_return_code(rc);
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
sql_result->set_return_code(rc);
return rc;
}
trx->next_current_id();
session_event->set_response("SUCCESS\n");
sql_result->set_return_code(RC::SUCCESS);
} else {
session_event->set_response("SUCCESS\n");
sql_result->set_return_code(RC::SUCCESS);
}
} else {
session_event->set_response("FAILURE\n");
sql_result->set_return_code(rc);
}
return rc;
}
......@@ -624,6 +649,8 @@ RC ExecuteStage::do_begin(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SessionEvent *session_event = sql_event->session_event();
SqlResult *sql_result = new SqlResult;
session_event->set_sql_result(sql_result);
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
......@@ -634,16 +661,12 @@ RC ExecuteStage::do_begin(SQLStageEvent *sql_event)
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_BEGIN, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
sql_result->set_return_code(rc);
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
}
sql_result->set_return_code(rc);
return rc;
}
......@@ -652,6 +675,8 @@ RC ExecuteStage::do_commit(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SessionEvent *session_event = sql_event->session_event();
SqlResult *sql_result = new SqlResult;
session_event->set_sql_result(sql_result);
Session *session = session_event->session();
Db *db = session->get_current_db();
Trx *trx = session->current_trx();
......@@ -662,16 +687,12 @@ RC ExecuteStage::do_commit(SQLStageEvent *sql_event)
CLogRecord *clog_record = nullptr;
rc = clog_manager->clog_gen_record(CLogType::REDO_MTR_COMMIT, trx->get_current_id(), clog_record);
if (rc != RC::SUCCESS || clog_record == nullptr) {
session_event->set_response("FAILURE\n");
sql_result->set_return_code(rc);
return rc;
}
rc = clog_manager->clog_append_record(clog_record);
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
}
sql_result->set_return_code(rc);
trx->next_current_id();
......@@ -681,16 +702,14 @@ RC ExecuteStage::do_commit(SQLStageEvent *sql_event)
RC ExecuteStage::do_clog_sync(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
SqlResult *sql_result = new SqlResult;
SessionEvent *session_event = sql_event->session_event();
session_event->set_sql_result(sql_result);
Db *db = session_event->session()->get_current_db();
CLogManager *clog_manager = db->get_clog_manager();
rc = clog_manager->clog_sync();
if (rc != RC::SUCCESS) {
session_event->set_response("FAILURE\n");
} else {
session_event->set_response("SUCCESS\n");
}
sql_result->set_return_code(rc);
return rc;
}
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by WangYunlai on 2022/11/18.
//
#include "rc.h"
#include "sql/executor/sql_result.h"
void SqlResult::set_tuple_schema(const TupleSchema &schema)
{
tuple_schema_ = schema;
}
RC SqlResult::open()
{
if (nullptr == operator_) {
return RC::INVALID_ARGUMENT;
}
return operator_->open();
}
RC SqlResult::close()
{
if (nullptr == operator_) {
return RC::INVALID_ARGUMENT;
}
return operator_->close();
}
RC SqlResult::next_tuple(Tuple *&tuple)
{
RC rc = operator_->next();
if (rc != RC::SUCCESS) {
return rc;
}
tuple = operator_->current_tuple();
return rc;
}
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by WangYunlai on 2022/11/17.
//
#pragma once
#include <string>
#include "sql/expr/tuple.h"
#include "sql/operator/operator.h"
class SqlResult {
public:
SqlResult() = default;
~SqlResult()
{
delete operator_;
operator_ = nullptr;
}
void set_tuple_schema(const TupleSchema &schema);
void set_return_code(RC rc) { return_code_ = rc; }
void set_state_string(const std::string &state_string) { state_string_ = state_string; }
void set_operator(Operator *oper) { operator_ = oper; }
bool has_operator() const { return operator_ != nullptr; }
const TupleSchema &tuple_schema() const { return tuple_schema_; }
RC return_code() const { return return_code_; }
const std::string &state_string() const { return state_string_; }
RC open();
RC close();
RC next_tuple(Tuple *&tuple);
private:
Operator *operator_ = nullptr;
TupleSchema tuple_schema_;
RC return_code_ = RC::SUCCESS;
std::string state_string_;
};
......@@ -17,7 +17,7 @@ See the Mulan PSL v2 for more details. */
RC FieldExpr::get_value(const Tuple &tuple, TupleCell &cell) const
{
return tuple.find_cell(field_, cell);
return tuple.find_cell(TupleCellSpec(table_name(), field_name()), cell);
}
RC ValueExpr::get_value(const Tuple &tuple, TupleCell & cell) const
......
......@@ -34,6 +34,7 @@ public:
virtual RC get_value(const Tuple &tuple, TupleCell &cell) const = 0;
virtual ExprType type() const = 0;
virtual AttrType value_type() const = 0;
};
class FieldExpr : public Expression
......@@ -49,6 +50,10 @@ public:
{
return ExprType::FIELD;
}
AttrType value_type() const override
{
return field_.attr_type();
}
Field &field()
{
......@@ -94,6 +99,11 @@ public:
return ExprType::VALUE;
}
AttrType value_type() const override
{
return tuple_cell_.attr_type();
}
void get_tuple_cell(TupleCell &cell) const {
cell = tuple_cell_;
}
......
......@@ -25,38 +25,17 @@ See the Mulan PSL v2 for more details. */
class Table;
class TupleCellSpec
class TupleSchema
{
public:
TupleCellSpec() = default;
TupleCellSpec(Expression *expr) : expression_(expr)
{}
~TupleCellSpec()
{
if (expression_) {
delete expression_;
expression_ = nullptr;
}
}
void set_alias(const char *alias)
{
this->alias_ = alias;
}
const char *alias() const
{
return alias_;
}
Expression *expression() const
{
return expression_;
}
public:
void append_cell(const TupleCellSpec &cell) { cells_.push_back(cell); }
void append_cell(const char *table, const char *field) { append_cell(TupleCellSpec(table, field)); }
void append_cell(const char *alias) { append_cell(TupleCellSpec(alias)); }
int cell_num() const { return static_cast<int>(cells_.size()); }
const TupleCellSpec &cell_at(int i) const { return cells_[i]; }
private:
const char *alias_ = nullptr;
Expression *expression_ = nullptr;
std::vector<TupleCellSpec> cells_;
};
class Tuple
......@@ -67,9 +46,7 @@ public:
virtual int cell_num() const = 0;
virtual RC cell_at(int index, TupleCell &cell) const = 0;
virtual RC find_cell(const Field &field, TupleCell &cell) const = 0;
virtual RC cell_spec_at(int index, const TupleCellSpec *&spec) const = 0;
virtual RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const = 0;
};
class RowTuple : public Tuple
......@@ -78,7 +55,7 @@ public:
RowTuple() = default;
virtual ~RowTuple()
{
for (TupleCellSpec *spec : speces_) {
for (FieldExpr *spec : speces_) {
delete spec;
}
speces_.clear();
......@@ -94,7 +71,7 @@ public:
table_ = table;
this->speces_.reserve(fields->size());
for (const FieldMeta &field : *fields) {
speces_.push_back(new TupleCellSpec(new FieldExpr(table, &field)));
speces_.push_back(new FieldExpr(table, &field));
}
}
......@@ -110,8 +87,7 @@ public:
return RC::INVALID_ARGUMENT;
}
const TupleCellSpec *spec = speces_[index];
FieldExpr *field_expr = (FieldExpr *)spec->expression();
FieldExpr *field_expr = speces_[index];
const FieldMeta *field_meta = field_expr->field().meta();
cell.set_type(field_meta->type());
cell.set_data(this->record_->data() + field_meta->offset());
......@@ -119,16 +95,16 @@ public:
return RC::SUCCESS;
}
RC find_cell(const Field &field, TupleCell &cell) const override
RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const override
{
const char *table_name = field.table_name();
const char *table_name = spec.table_name();
const char *field_name = spec.field_name();
if (0 != strcmp(table_name, table_->name())) {
return RC::NOTFOUND;
}
const char *field_name = field.field_name();
for (size_t i = 0; i < speces_.size(); ++i) {
const FieldExpr * field_expr = (const FieldExpr *)speces_[i]->expression();
const FieldExpr * field_expr = speces_[i];
const Field &field = field_expr->field();
if (0 == strcmp(field_name, field.field_name())) {
return cell_at(i, cell);
......@@ -137,6 +113,7 @@ public:
return RC::NOTFOUND;
}
#if 0
RC cell_spec_at(int index, const TupleCellSpec *&spec) const override
{
if (index < 0 || index >= static_cast<int>(speces_.size())) {
......@@ -146,6 +123,7 @@ public:
spec = speces_[index];
return RC::SUCCESS;
}
#endif
Record &record()
{
......@@ -159,20 +137,8 @@ public:
private:
Record *record_ = nullptr;
const Table *table_ = nullptr;
std::vector<TupleCellSpec *> speces_;
};
/*
class CompositeTuple : public Tuple
{
public:
int cell_num() const override;
RC cell_at(int index, TupleCell &cell) const = 0;
private:
int cell_num_ = 0;
std::vector<Tuple *> tuples_;
std::vector<FieldExpr *> speces_;
};
*/
class ProjectTuple : public Tuple
{
......@@ -210,13 +176,15 @@ public:
}
const TupleCellSpec *spec = speces_[index];
return spec->expression()->get_value(*tuple_, cell);
return tuple_->find_cell(*spec, cell);
}
RC find_cell(const Field &field, TupleCell &cell) const override
RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const override
{
return tuple_->find_cell(field, cell);
return tuple_->find_cell(spec, cell);
}
#if 0
RC cell_spec_at(int index, const TupleCellSpec *&spec) const override
{
if (index < 0 || index >= static_cast<int>(speces_.size())) {
......@@ -225,7 +193,43 @@ public:
spec = speces_[index];
return RC::SUCCESS;
}
#endif
private:
std::vector<TupleCellSpec *> speces_;
Tuple *tuple_ = nullptr;
};
class ValueListTuple : public Tuple
{
public:
ValueListTuple() = default;
virtual ~ValueListTuple() = default;
void set_cells(const std::vector<TupleCell> &cells)
{
cells_ = cells;
}
virtual int cell_num() const override
{
return static_cast<int>(cells_.size());
}
virtual RC cell_at(int index, TupleCell &cell) const override
{
if (index < 0 || index >= cell_num()) {
return RC::NOTFOUND;
}
cell = cells_[index];
return RC::SUCCESS;
}
virtual RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const override
{
return RC::INTERNAL;
}
private:
std::vector<TupleCell> cells_;
};
......@@ -18,6 +18,27 @@ See the Mulan PSL v2 for more details. */
#include "util/comparator.h"
#include "util/util.h"
TupleCellSpec::TupleCellSpec(const char *table_name, const char *field_name, const char *alias)
{
if (table_name) {
table_name_ = table_name;
}
if (field_name) {
field_name_ = field_name;
}
if (alias) {
alias_ = alias;
}
}
TupleCellSpec::TupleCellSpec(const char *alias)
{
if (alias) {
alias_ = alias;
}
}
////////////////////////////////////////////////////////////////////////////////
void TupleCell::to_string(std::ostream &os) const
{
switch (attr_type_) {
......
......@@ -18,6 +18,22 @@ See the Mulan PSL v2 for more details. */
#include "storage/common/table.h"
#include "storage/common/field_meta.h"
class TupleCellSpec
{
public:
TupleCellSpec(const char *table_name, const char *field_name, const char *alias = nullptr);
TupleCellSpec(const char *alias);
const char *table_name() const { return table_name_.c_str(); }
const char *field_name() const { return field_name_.c_str(); }
const char *alias() const { return alias_.c_str(); }
private:
std::string table_name_;
std::string field_name_;
std::string alias_;
};
class TupleCell
{
public:
......
......@@ -36,8 +36,6 @@ public:
Tuple * current_tuple() override {
return nullptr;
}
//int tuple_cell_num() const override
//RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override
private:
DeleteStmt *delete_stmt_ = nullptr;
Trx *trx_ = nullptr;
......
......@@ -9,24 +9,14 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Longda on 2021/4/13.
// Created by WangYunlai on 2022/11/18.
//
#ifndef __SRC_OBSERVER_NET_CONNECTION_CONTEXT_H__
#define __SRC_OBSERVER_NET_CONNECTION_CONTEXT_H__
#include "sql/operator/operator.h"
#include <event.h>
#include <ini_setting.h>
class Session;
typedef struct _ConnectionContext {
Session *session;
int fd;
struct event read_event;
pthread_mutex_t mutex;
char addr[24];
char buf[SOCKET_BUFFER_SIZE];
} ConnectionContext;
#endif //__SRC_OBSERVER_NET_CONNECTION_CONTEXT_H__
Operator::~Operator()
{
for (Operator *oper : children_) {
delete oper;
}
}
......@@ -9,7 +9,7 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by WangYunlai on 2021/6/7.
// Created by WangYunlai on 2022/6/7.
//
#pragma once
......@@ -27,15 +27,13 @@ public:
Operator()
{}
virtual ~Operator() = default;
virtual ~Operator();
virtual RC open() = 0;
virtual RC next() = 0;
virtual RC close() = 0;
virtual Tuple * current_tuple() = 0;
//virtual int tuple_cell_num() const = 0;
//virtual RC tuple_cell_spec_at(int index, TupleCellSpec *&spec) const = 0;
void add_child(Operator *oper) {
children_.push_back(oper);
......
......@@ -105,12 +105,3 @@ bool PredicateOperator::do_predicate(RowTuple &tuple)
}
return true;
}
// int PredicateOperator::tuple_cell_num() const
// {
// return children_[0]->tuple_cell_num();
// }
// RC PredicateOperator::tuple_cell_spec_at(int index, TupleCellSpec &spec) const
// {
// return children_[0]->tuple_cell_spec_at(index, spec);
// }
......@@ -36,8 +36,6 @@ public:
RC close() override;
Tuple * current_tuple() override;
//int tuple_cell_num() const override;
//RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override;
private:
bool do_predicate(RowTuple &tuple);
private:
......
......@@ -54,12 +54,6 @@ void ProjectOperator::add_projection(const Table *table, const FieldMeta *field_
{
// 对单表来说,展示的(alias) 字段总是字段名称,
// 对多表查询来说,展示的alias 需要带表名字
TupleCellSpec *spec = new TupleCellSpec(new FieldExpr(table, field_meta));
spec->set_alias(field_meta->name());
TupleCellSpec *spec = new TupleCellSpec(table->name(), field_meta->name(), field_meta->name());
tuple_.add_cell_spec(spec);
}
RC ProjectOperator::tuple_cell_spec_at(int index, const TupleCellSpec *&spec) const
{
return tuple_.cell_spec_at(index, spec);
}
......@@ -36,8 +36,6 @@ public:
return tuple_.cell_num();
}
RC tuple_cell_spec_at(int index, const TupleCellSpec *&spec) const;
Tuple * current_tuple() override;
private:
ProjectTuple tuple_;
......
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by WangYunlai on 2022/11/18.
//
#pragma once
#include <vector>
class StringListOperator : public Operator
{
public:
StringListOperator()
{}
virtual ~StringListOperator() = default;
template <typename InputIt>
void append(InputIt begin, InputIt end)
{
strings_.emplace_back(begin, end);
}
void append(std::initializer_list<std::string> init)
{
strings_.emplace_back(init);
}
template <typename T>
void append( const T &v)
{
strings_.emplace_back(1, v);
}
RC open() override
{
return RC::SUCCESS;
}
RC next() override
{
if (!started_) {
started_ = true;
iterator_ = strings_.begin();
} else if (iterator_ != strings_.end()) {
++iterator_;
}
return iterator_ == strings_.end() ? RC::RECORD_EOF : RC::SUCCESS;
}
virtual RC close() override
{
iterator_ = strings_.end();
return RC::SUCCESS;
}
virtual Tuple * current_tuple() override
{
if (iterator_ == strings_.end()) {
return nullptr;
}
const StringList &string_list = *iterator_;
std::vector<TupleCell> cells;
for (const std::string &s : string_list) {
TupleCell cell(CHARS, const_cast<char *>(s.data()));
cell.set_length(s.length());
cells.push_back(cell);
}
tuple_.set_cells(cells);
return &tuple_;
}
private:
using StringList = std::vector<std::string>;
using StringListList = std::vector<StringList>;
StringListList strings_;
StringListList::iterator iterator_;
bool started_ = false;
ValueListTuple tuple_;
};
......@@ -9,7 +9,7 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by WangYunlai on 2021/6/7.
// Created by WangYunlai on 2022/6/7.
//
#pragma once
......@@ -35,12 +35,6 @@ public:
Tuple * current_tuple() override;
// int tuple_cell_num() const override
// {
// return tuple_.cell_num();
// }
// RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override;
private:
Table *table_ = nullptr;
RecordFileScanner record_scanner_;
......
......@@ -19,14 +19,14 @@ See the Mulan PSL v2 for more details. */
RC parse(char *st, Query *sqln);
void relation_attr_init(RelAttr *relation_attr, const char *relation_name, const char *attribute_name)
void relation_attr_init(RelAttr *relation_attr, char *relation_name, char *attribute_name)
{
if (relation_name != nullptr) {
relation_attr->relation_name = strdup(relation_name);
relation_attr->relation_name = relation_name;
} else {
relation_attr->relation_name = nullptr;
}
relation_attr->attribute_name = strdup(attribute_name);
relation_attr->attribute_name = attribute_name;
}
void relation_attr_destroy(RelAttr *relation_attr)
......@@ -49,6 +49,11 @@ void value_init_float(Value *value, float v)
value->data = malloc(sizeof(v));
memcpy(value->data, &v, sizeof(v));
}
void value_init_string(Value *value, char *v)
{
value->type = CHARS;
value->data = v;
}
void value_init_string(Value *value, const char *v)
{
value->type = CHARS;
......@@ -93,9 +98,9 @@ void condition_destroy(Condition *condition)
}
}
void attr_info_init(AttrInfo *attr_info, const char *name, AttrType type, size_t length)
void attr_info_init(AttrInfo *attr_info, char *name, AttrType type, size_t length)
{
attr_info->name = strdup(name);
attr_info->name = name;
attr_info->type = type;
attr_info->length = length;
}
......@@ -110,9 +115,9 @@ void selects_append_attribute(Selects *selects, RelAttr *rel_attr)
{
selects->attributes[selects->attr_num++] = *rel_attr;
}
void selects_append_relation(Selects *selects, const char *relation_name)
void selects_append_relation(Selects *selects, char *relation_name)
{
selects->relations[selects->relation_num++] = strdup(relation_name);
selects->relations[selects->relation_num++] = relation_name;
}
void selects_append_conditions(Selects *selects, Condition conditions[], size_t condition_num)
......@@ -143,11 +148,11 @@ void selects_destroy(Selects *selects)
selects->condition_num = 0;
}
void inserts_init(Inserts *inserts, const char *relation_name, Value values[], size_t value_num)
void inserts_init(Inserts *inserts, char *relation_name, Value values[], size_t value_num)
{
assert(value_num <= sizeof(inserts->values) / sizeof(inserts->values[0]));
inserts->relation_name = strdup(relation_name);
inserts->relation_name = relation_name;
for (size_t i = 0; i < value_num; i++) {
inserts->values[i] = values[i];
}
......@@ -164,9 +169,9 @@ void inserts_destroy(Inserts *inserts)
inserts->value_num = 0;
}
void deletes_init_relation(Deletes *deletes, const char *relation_name)
void deletes_init_relation(Deletes *deletes, char *relation_name)
{
deletes->relation_name = strdup(relation_name);
deletes->relation_name = relation_name;
}
void deletes_set_conditions(Deletes *deletes, Condition conditions[], size_t condition_num)
......@@ -187,11 +192,11 @@ void deletes_destroy(Deletes *deletes)
deletes->relation_name = nullptr;
}
void updates_init(Updates *updates, const char *relation_name, const char *attribute_name, Value *value,
void updates_init(Updates *updates, char *relation_name, char *attribute_name, Value *value,
Condition conditions[], size_t condition_num)
{
updates->relation_name = strdup(relation_name);
updates->attribute_name = strdup(attribute_name);
updates->relation_name = relation_name;
updates->attribute_name = attribute_name;
updates->value = *value;
assert(condition_num <= sizeof(updates->conditions) / sizeof(updates->conditions[0]));
......@@ -221,9 +226,9 @@ void create_table_append_attribute(CreateTable *create_table, AttrInfo *attr_inf
create_table->attributes[create_table->attribute_count++] = *attr_info;
}
void create_table_init_name(CreateTable *create_table, const char *relation_name)
void create_table_init_name(CreateTable *create_table, char *relation_name)
{
create_table->relation_name = strdup(relation_name);
create_table->relation_name = relation_name;
}
void create_table_destroy(CreateTable *create_table)
......@@ -236,9 +241,9 @@ void create_table_destroy(CreateTable *create_table)
create_table->relation_name = nullptr;
}
void drop_table_init(DropTable *drop_table, const char *relation_name)
void drop_table_init(DropTable *drop_table, char *relation_name)
{
drop_table->relation_name = strdup(relation_name);
drop_table->relation_name = relation_name;
}
void drop_table_destroy(DropTable *drop_table)
......@@ -248,11 +253,11 @@ void drop_table_destroy(DropTable *drop_table)
}
void create_index_init(
CreateIndex *create_index, const char *index_name, const char *relation_name, const char *attr_name)
CreateIndex *create_index, char *index_name, char *relation_name, char *attr_name)
{
create_index->index_name = strdup(index_name);
create_index->relation_name = strdup(relation_name);
create_index->attribute_name = strdup(attr_name);
create_index->index_name = index_name;
create_index->relation_name = relation_name;
create_index->attribute_name = attr_name;
}
void create_index_destroy(CreateIndex *create_index)
......@@ -266,9 +271,9 @@ void create_index_destroy(CreateIndex *create_index)
create_index->attribute_name = nullptr;
}
void drop_index_init(DropIndex *drop_index, const char *index_name)
void drop_index_init(DropIndex *drop_index, char *index_name)
{
drop_index->index_name = strdup(index_name);
drop_index->index_name = index_name;
}
void drop_index_destroy(DropIndex *drop_index)
......@@ -277,9 +282,9 @@ void drop_index_destroy(DropIndex *drop_index)
drop_index->index_name = nullptr;
}
void desc_table_init(DescTable *desc_table, const char *relation_name)
void desc_table_init(DescTable *desc_table, char *relation_name)
{
desc_table->relation_name = strdup(relation_name);
desc_table->relation_name = relation_name;
}
void desc_table_destroy(DescTable *desc_table)
......@@ -288,14 +293,14 @@ void desc_table_destroy(DescTable *desc_table)
desc_table->relation_name = nullptr;
}
void load_data_init(LoadData *load_data, const char *relation_name, const char *file_name)
void load_data_init(LoadData *load_data, char *relation_name, char *file_name)
{
load_data->relation_name = strdup(relation_name);
load_data->relation_name = relation_name;
if (file_name[0] == '\'' || file_name[0] == '\"') {
file_name++;
}
char *dup_file_name = strdup(file_name);
char *dup_file_name = file_name;
int len = strlen(dup_file_name);
if (dup_file_name[len - 1] == '\'' || dup_file_name[len - 1] == '\"') {
dup_file_name[len - 1] = 0;
......@@ -386,6 +391,25 @@ void query_destroy(Query *query)
free(query);
}
const char *ATTR_TYPE_NAME[] = {"undefined", "chars", "ints", "floats"};
const char *attr_type_to_string(AttrType type)
{
if (type >= UNDEFINED && type <= FLOATS) {
return ATTR_TYPE_NAME[type];
}
return "unknown";
}
AttrType attr_type_from_string(const char *s)
{
for (unsigned int i = 0; i < sizeof(ATTR_TYPE_NAME) / sizeof(ATTR_TYPE_NAME[0]); i++) {
if (0 == strcmp(ATTR_TYPE_NAME[i], s)) {
return (AttrType)i;
}
}
return UNDEFINED;
}
////////////////////////////////////////////////////////////////////////////////
int sql_parse(const char *st, Query *sqls);
......
......@@ -12,8 +12,7 @@ See the Mulan PSL v2 for more details. */
// Created by Meiyi
//
#ifndef __OBSERVER_SQL_PARSER_PARSE_DEFS_H__
#define __OBSERVER_SQL_PARSER_PARSE_DEFS_H__
#pragma once
#include <stddef.h>
......@@ -180,11 +179,12 @@ typedef struct Query {
union Queries sstr;
} Query;
void relation_attr_init(RelAttr *relation_attr, const char *relation_name, const char *attribute_name);
void relation_attr_init(RelAttr *relation_attr, char *relation_name, char *attribute_name);
void relation_attr_destroy(RelAttr *relation_attr);
void value_init_integer(Value *value, int v);
void value_init_float(Value *value, float v);
void value_init_string(Value *value, char *v);
void value_init_string(Value *value, const char *v);
void value_destroy(Value *value);
......@@ -192,44 +192,44 @@ void condition_init(Condition *condition, CompOp comp, int left_is_attr, RelAttr
int right_is_attr, RelAttr *right_attr, Value *right_value);
void condition_destroy(Condition *condition);
void attr_info_init(AttrInfo *attr_info, const char *name, AttrType type, size_t length);
void attr_info_init(AttrInfo *attr_info, char *name, AttrType type, size_t length);
void attr_info_destroy(AttrInfo *attr_info);
void selects_init(Selects *selects, ...);
void selects_append_attribute(Selects *selects, RelAttr *rel_attr);
void selects_append_relation(Selects *selects, const char *relation_name);
void selects_append_relation(Selects *selects, char *relation_name);
void selects_append_conditions(Selects *selects, Condition conditions[], size_t condition_num);
void selects_destroy(Selects *selects);
void inserts_init(Inserts *inserts, const char *relation_name, Value values[], size_t value_num);
void inserts_init(Inserts *inserts, char *relation_name, Value values[], size_t value_num);
void inserts_destroy(Inserts *inserts);
void deletes_init_relation(Deletes *deletes, const char *relation_name);
void deletes_init_relation(Deletes *deletes, char *relation_name);
void deletes_set_conditions(Deletes *deletes, Condition conditions[], size_t condition_num);
void deletes_destroy(Deletes *deletes);
void updates_init(Updates *updates, const char *relation_name, const char *attribute_name, Value *value,
void updates_init(Updates *updates, char *relation_name, char *attribute_name, Value *value,
Condition conditions[], size_t condition_num);
void updates_destroy(Updates *updates);
void create_table_append_attribute(CreateTable *create_table, AttrInfo *attr_info);
void create_table_init_name(CreateTable *create_table, const char *relation_name);
void create_table_init_name(CreateTable *create_table, char *relation_name);
void create_table_destroy(CreateTable *create_table);
void drop_table_init(DropTable *drop_table, const char *relation_name);
void drop_table_init(DropTable *drop_table, char *relation_name);
void drop_table_destroy(DropTable *drop_table);
void create_index_init(
CreateIndex *create_index, const char *index_name, const char *relation_name, const char *attr_name);
CreateIndex *create_index, char *index_name, char *relation_name, char *attr_name);
void create_index_destroy(CreateIndex *create_index);
void drop_index_init(DropIndex *drop_index, const char *index_name);
void drop_index_init(DropIndex *drop_index, char *index_name);
void drop_index_destroy(DropIndex *drop_index);
void desc_table_init(DescTable *desc_table, const char *relation_name);
void desc_table_init(DescTable *desc_table, char *relation_name);
void desc_table_destroy(DescTable *desc_table);
void load_data_init(LoadData *load_data, const char *relation_name, const char *file_name);
void load_data_init(LoadData *load_data, char *relation_name, char *file_name);
void load_data_destroy(LoadData *load_data);
void query_init(Query *query);
......@@ -237,4 +237,5 @@ Query *query_create(); // create and init
void query_reset(Query *query);
void query_destroy(Query *query); // reset and delete
#endif // __OBSERVER_SQL_PARSER_PARSE_DEFS_H__
const char *attr_type_to_string(AttrType type);
AttrType attr_type_from_string(const char *s);
......@@ -129,10 +129,13 @@ RC ParseStage::handle_request(StageEvent *event)
return RC::INTERNAL;
}
SqlResult *sql_result = new SqlResult;
RC ret = parse(sql.c_str(), query_result);
if (ret != RC::SUCCESS) {
// set error information to event
sql_event->session_event()->set_response("Failed to parse sql\n");
sql_result->set_return_code(ret);
sql_result->set_state_string("Failed to parse sql");
sql_event->session_event()->set_sql_result(sql_result);
query_destroy(query_result);
return RC::INTERNAL;
}
......
......@@ -106,7 +106,9 @@ void ResolveStage::handle_event(StageEvent *event)
RC rc = Stmt::create_stmt(db, *query, stmt);
if (rc != RC::SUCCESS && rc != RC::UNIMPLENMENT) {
LOG_WARN("failed to create stmt. rc=%d:%s", rc, strrc(rc));
session_event->set_response("FAILURE\n");
SqlResult *sql_result = new SqlResult;
sql_result->set_return_code(rc);
session_event->set_sql_result(sql_result);
return;
}
......
......@@ -118,7 +118,7 @@ union YYSTYPE
char *string;
int number;
float floats;
char *position;
char *position;
#line 124 "yacc_sql.hpp"
......
......@@ -19,7 +19,7 @@ typedef struct ParserContext {
Value values[MAX_NUM];
Condition conditions[MAX_NUM];
CompOp comp;
char id[MAX_NUM];
char id[MAX_NUM];
} ParserContext;
//获取子串
......@@ -110,7 +110,7 @@ ParserContext *get_context(yyscan_t scanner)
char *string;
int number;
float floats;
char *position;
char *position;
}
%token <number> NUMBER
......@@ -129,32 +129,32 @@ ParserContext *get_context(yyscan_t scanner)
%%
commands: //commands or sqls. parser starts here.
commands: //commands or sqls. parser starts here.
/* empty */
| commands command
;
command:
select
| insert
| update
| delete
| create_table
| drop_table
| show_tables
| desc_table
| create_index
| drop_index
| sync
| begin
| commit
| rollback
| load_data
| help
| exit
select
| insert
| update
| delete
| create_table
| drop_table
| show_tables
| desc_table
| create_index
| drop_index
| sync
| begin
| commit
| rollback
| load_data
| help
| exit
;
exit:
exit:
EXIT SEMICOLON {
CONTEXT->ssql->flag=SCF_EXIT;//"exit";
};
......@@ -188,7 +188,7 @@ rollback:
}
;
drop_table: /*drop table 语句的语法解析树*/
drop_table: /*drop table 语句的语法解析树*/
DROP TABLE ID SEMICOLON {
CONTEXT->ssql->flag = SCF_DROP_TABLE;//"drop_table";
drop_table_init(&CONTEXT->ssql->sstr.drop_table, $3);
......@@ -207,29 +207,29 @@ desc_table:
}
;
create_index: /*create index 语句的语法解析树*/
create_index: /*create index 语句的语法解析树*/
CREATE INDEX ID ON ID LBRACE ID RBRACE SEMICOLON
{
CONTEXT->ssql->flag = SCF_CREATE_INDEX;//"create_index";
create_index_init(&CONTEXT->ssql->sstr.create_index, $3, $5, $7);
}
{
CONTEXT->ssql->flag = SCF_CREATE_INDEX;//"create_index";
create_index_init(&CONTEXT->ssql->sstr.create_index, $3, $5, $7);
}
;
drop_index: /*drop index 语句的语法解析树*/
drop_index: /*drop index 语句的语法解析树*/
DROP INDEX ID SEMICOLON
{
CONTEXT->ssql->flag=SCF_DROP_INDEX;//"drop_index";
drop_index_init(&CONTEXT->ssql->sstr.drop_index, $3);
}
{
CONTEXT->ssql->flag=SCF_DROP_INDEX;//"drop_index";
drop_index_init(&CONTEXT->ssql->sstr.drop_index, $3);
}
;
create_table: /*create table 语句的语法解析树*/
create_table: /*create table 语句的语法解析树*/
CREATE TABLE ID LBRACE attr_def attr_def_list RBRACE SEMICOLON
{
CONTEXT->ssql->flag=SCF_CREATE_TABLE;//"create_table";
create_table_init_name(&CONTEXT->ssql->sstr.create_table, $3);
//临时变量清零
CONTEXT->value_length = 0;
}
{
CONTEXT->ssql->flag=SCF_CREATE_TABLE;//"create_table";
create_table_init_name(&CONTEXT->ssql->sstr.create_table, $3);
//临时变量清零
CONTEXT->value_length = 0;
}
;
attr_def_list:
/* empty */
......@@ -237,43 +237,34 @@ attr_def_list:
;
attr_def:
ID_get type LBRACE number RBRACE
{
AttrInfo attribute;
attr_info_init(&attribute, CONTEXT->id, (AttrType)$2, $4);
create_table_append_attribute(&CONTEXT->ssql->sstr.create_table, &attribute);
CONTEXT->value_length++;
}
|ID_get type
{
AttrInfo attribute;
attr_info_init(&attribute, CONTEXT->id, (AttrType)$2, 4);
create_table_append_attribute(&CONTEXT->ssql->sstr.create_table, &attribute);
CONTEXT->value_length++;
}
ID type LBRACE number RBRACE
{
AttrInfo attribute;
attr_info_init(&attribute, $1, (AttrType)$2, $4);
create_table_append_attribute(&CONTEXT->ssql->sstr.create_table, &attribute);
CONTEXT->value_length++;
}
|ID type
{
AttrInfo attribute;
attr_info_init(&attribute, $1, (AttrType)$2, 4);
create_table_append_attribute(&CONTEXT->ssql->sstr.create_table, &attribute);
CONTEXT->value_length++;
}
;
number:
NUMBER {$$ = $1;}
;
NUMBER {$$ = $1;}
;
type:
INT_T { $$=INTS; }
INT_T { $$=INTS; }
| STRING_T { $$=CHARS; }
| FLOAT_T { $$=FLOATS; }
;
ID_get:
ID
{
char *temp=$1;
snprintf(CONTEXT->id, sizeof(CONTEXT->id), "%s", temp);
}
;
insert: /*insert 语句的语法解析树*/
insert: /*insert 语句的语法解析树*/
INSERT INTO ID VALUES LBRACE value value_list RBRACE SEMICOLON
{
CONTEXT->ssql->flag=SCF_INSERT;//"insert";
inserts_init(&CONTEXT->ssql->sstr.insertion, $3, CONTEXT->values, CONTEXT->value_length);
{
CONTEXT->ssql->flag=SCF_INSERT;//"insert";
inserts_init(&CONTEXT->ssql->sstr.insertion, $3, CONTEXT->values, CONTEXT->value_length);
//临时变量清零
CONTEXT->value_length=0;
......@@ -282,186 +273,187 @@ insert: /*insert 语句的语法解析树*/
value_list:
/* empty */
| COMMA value value_list {
// CONTEXT->values[CONTEXT->value_length++] = *$2;
}
// CONTEXT->values[CONTEXT->value_length++] = *$2;
}
;
value:
NUMBER{
value_init_integer(&CONTEXT->values[CONTEXT->value_length++], $1);
}
NUMBER{
value_init_integer(&CONTEXT->values[CONTEXT->value_length++], $1);
}
|FLOAT{
value_init_float(&CONTEXT->values[CONTEXT->value_length++], $1);
}
value_init_float(&CONTEXT->values[CONTEXT->value_length++], $1);
}
|SSS {
$1 = substr($1,1,strlen($1)-2);
value_init_string(&CONTEXT->values[CONTEXT->value_length++], $1);
}
char *tmp = substr($1,1,strlen($1)-2);
value_init_string(&CONTEXT->values[CONTEXT->value_length++], tmp);
free($1);
}
;
delete: /* delete 语句的语法解析树*/
delete: /* delete 语句的语法解析树*/
DELETE FROM ID where SEMICOLON
{
CONTEXT->ssql->flag = SCF_DELETE;//"delete";
deletes_init_relation(&CONTEXT->ssql->sstr.deletion, $3);
deletes_set_conditions(&CONTEXT->ssql->sstr.deletion,
CONTEXT->conditions, CONTEXT->condition_length);
CONTEXT->condition_length = 0;
{
CONTEXT->ssql->flag = SCF_DELETE;//"delete";
deletes_init_relation(&CONTEXT->ssql->sstr.deletion, $3);
deletes_set_conditions(&CONTEXT->ssql->sstr.deletion,
CONTEXT->conditions, CONTEXT->condition_length);
CONTEXT->condition_length = 0;
}
;
update: /* update 语句的语法解析树*/
update: /* update 语句的语法解析树*/
UPDATE ID SET ID EQ value where SEMICOLON
{
CONTEXT->ssql->flag = SCF_UPDATE;//"update";
Value *value = &CONTEXT->values[0];
updates_init(&CONTEXT->ssql->sstr.update, $2, $4, value,
CONTEXT->conditions, CONTEXT->condition_length);
CONTEXT->condition_length = 0;
}
{
CONTEXT->ssql->flag = SCF_UPDATE;//"update";
Value *value = &CONTEXT->values[0];
updates_init(&CONTEXT->ssql->sstr.update, $2, $4, value,
CONTEXT->conditions, CONTEXT->condition_length);
CONTEXT->condition_length = 0;
}
;
select: /* select 语句的语法解析树*/
select: /* select 语句的语法解析树*/
SELECT select_attr FROM ID rel_list where SEMICOLON
{
selects_append_relation(&CONTEXT->ssql->sstr.selection, $4);
{
selects_append_relation(&CONTEXT->ssql->sstr.selection, $4);
selects_append_conditions(&CONTEXT->ssql->sstr.selection, CONTEXT->conditions, CONTEXT->condition_length);
selects_append_conditions(&CONTEXT->ssql->sstr.selection, CONTEXT->conditions, CONTEXT->condition_length);
CONTEXT->ssql->flag=SCF_SELECT;//"select";
CONTEXT->ssql->flag=SCF_SELECT;//"select";
//临时变量清零
CONTEXT->condition_length=0;
CONTEXT->from_length=0;
CONTEXT->select_length=0;
CONTEXT->value_length = 0;
}
;
//临时变量清零
CONTEXT->condition_length=0;
CONTEXT->from_length=0;
CONTEXT->select_length=0;
CONTEXT->value_length = 0;
}
;
select_attr:
STAR {
RelAttr attr;
relation_attr_init(&attr, NULL, "*");
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
RelAttr attr;
relation_attr_init(&attr, NULL, strdup("*"));
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
| ID attr_list {
RelAttr attr;
relation_attr_init(&attr, NULL, $1);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
| ID DOT ID attr_list {
RelAttr attr;
relation_attr_init(&attr, $1, $3);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
RelAttr attr;
relation_attr_init(&attr, NULL, $1);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
| ID DOT ID attr_list {
RelAttr attr;
relation_attr_init(&attr, $1, $3);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
;
attr_list:
/* empty */
| COMMA ID attr_list {
RelAttr attr;
relation_attr_init(&attr, NULL, $2);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
RelAttr attr;
relation_attr_init(&attr, NULL, $2);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
| COMMA ID DOT ID attr_list {
RelAttr attr;
relation_attr_init(&attr, $2, $4);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
;
RelAttr attr;
relation_attr_init(&attr, $2, $4);
selects_append_attribute(&CONTEXT->ssql->sstr.selection, &attr);
}
;
rel_list:
/* empty */
| COMMA ID rel_list {
selects_append_relation(&CONTEXT->ssql->sstr.selection, $2);
}
| COMMA ID rel_list {
selects_append_relation(&CONTEXT->ssql->sstr.selection, $2);
}
;
where:
/* empty */
| WHERE condition condition_list {
// CONTEXT->conditions[CONTEXT->condition_length++]=*$2;
}
| WHERE condition condition_list {
// CONTEXT->conditions[CONTEXT->condition_length++]=*$2;
}
;
condition_list:
/* empty */
| AND condition condition_list {
// CONTEXT->conditions[CONTEXT->condition_length++]=*$2;
}
// CONTEXT->conditions[CONTEXT->condition_length++]=*$2;
}
;
condition:
ID comOp value
{
RelAttr left_attr;
relation_attr_init(&left_attr, NULL, $1);
Value *right_value = &CONTEXT->values[CONTEXT->value_length - 1];
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 0, NULL, right_value);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|value comOp value
{
Value *left_value = &CONTEXT->values[CONTEXT->value_length - 2];
Value *right_value = &CONTEXT->values[CONTEXT->value_length - 1];
Condition condition;
condition_init(&condition, CONTEXT->comp, 0, NULL, left_value, 0, NULL, right_value);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|ID comOp ID
{
RelAttr left_attr;
relation_attr_init(&left_attr, NULL, $1);
RelAttr right_attr;
relation_attr_init(&right_attr, NULL, $3);
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
{
RelAttr left_attr;
relation_attr_init(&left_attr, NULL, $1);
Value *right_value = &CONTEXT->values[CONTEXT->value_length - 1];
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 0, NULL, right_value);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|value comOp value
{
Value *left_value = &CONTEXT->values[CONTEXT->value_length - 2];
Value *right_value = &CONTEXT->values[CONTEXT->value_length - 1];
Condition condition;
condition_init(&condition, CONTEXT->comp, 0, NULL, left_value, 0, NULL, right_value);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|ID comOp ID
{
RelAttr left_attr;
relation_attr_init(&left_attr, NULL, $1);
RelAttr right_attr;
relation_attr_init(&right_attr, NULL, $3);
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|value comOp ID
{
Value *left_value = &CONTEXT->values[CONTEXT->value_length - 1];
RelAttr right_attr;
relation_attr_init(&right_attr, NULL, $3);
Condition condition;
condition_init(&condition, CONTEXT->comp, 0, NULL, left_value, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
{
Value *left_value = &CONTEXT->values[CONTEXT->value_length - 1];
RelAttr right_attr;
relation_attr_init(&right_attr, NULL, $3);
Condition condition;
condition_init(&condition, CONTEXT->comp, 0, NULL, left_value, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|ID DOT ID comOp value
{
RelAttr left_attr;
relation_attr_init(&left_attr, $1, $3);
Value *right_value = &CONTEXT->values[CONTEXT->value_length - 1];
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 0, NULL, right_value);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
{
RelAttr left_attr;
relation_attr_init(&left_attr, $1, $3);
Value *right_value = &CONTEXT->values[CONTEXT->value_length - 1];
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 0, NULL, right_value);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|value comOp ID DOT ID
{
Value *left_value = &CONTEXT->values[CONTEXT->value_length - 1];
{
Value *left_value = &CONTEXT->values[CONTEXT->value_length - 1];
RelAttr right_attr;
relation_attr_init(&right_attr, $3, $5);
RelAttr right_attr;
relation_attr_init(&right_attr, $3, $5);
Condition condition;
condition_init(&condition, CONTEXT->comp, 0, NULL, left_value, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
Condition condition;
condition_init(&condition, CONTEXT->comp, 0, NULL, left_value, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
|ID DOT ID comOp ID DOT ID
{
RelAttr left_attr;
relation_attr_init(&left_attr, $1, $3);
RelAttr right_attr;
relation_attr_init(&right_attr, $5, $7);
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
{
RelAttr left_attr;
relation_attr_init(&left_attr, $1, $3);
RelAttr right_attr;
relation_attr_init(&right_attr, $5, $7);
Condition condition;
condition_init(&condition, CONTEXT->comp, 1, &left_attr, NULL, 1, &right_attr, NULL);
CONTEXT->conditions[CONTEXT->condition_length++] = condition;
}
;
comOp:
EQ { CONTEXT->comp = EQUAL_TO; }
EQ { CONTEXT->comp = EQUAL_TO; }
| LT { CONTEXT->comp = LESS_THAN; }
| GT { CONTEXT->comp = GREAT_THAN; }
| LE { CONTEXT->comp = LESS_EQUAL; }
......@@ -470,25 +462,25 @@ comOp:
;
load_data:
LOAD DATA INFILE SSS INTO TABLE ID SEMICOLON
{
CONTEXT->ssql->flag = SCF_LOAD_DATA;
load_data_init(&CONTEXT->ssql->sstr.load_data, $7, $4);
}
;
LOAD DATA INFILE SSS INTO TABLE ID SEMICOLON
{
CONTEXT->ssql->flag = SCF_LOAD_DATA;
load_data_init(&CONTEXT->ssql->sstr.load_data, $7, $4);
}
;
%%
//_____________________________________________________________________
extern void scan_string(const char *str, yyscan_t scanner);
int sql_parse(const char *s, Query *sqls){
ParserContext context;
memset(&context, 0, sizeof(context));
yyscan_t scanner;
yylex_init_extra(&context, &scanner);
context.ssql = sqls;
scan_string(s, scanner);
int result = yyparse(scanner);
yylex_destroy(scanner);
return result;
ParserContext context;
memset(&context, 0, sizeof(context));
yyscan_t scanner;
yylex_init_extra(&context, &scanner);
context.ssql = sqls;
scan_string(s, scanner);
int result = yyparse(scanner);
yylex_destroy(scanner);
return result;
}
......@@ -15,6 +15,7 @@ See the Mulan PSL v2 for more details. */
#include <common/lang/string.h>
#include "storage/common/field_meta.h"
#include "common/log/log.h"
#include "sql/parser/parse_defs.h"
#include "json/json.h"
......@@ -24,25 +25,6 @@ const static Json::StaticString FIELD_OFFSET("offset");
const static Json::StaticString FIELD_LEN("len");
const static Json::StaticString FIELD_VISIBLE("visible");
const char *ATTR_TYPE_NAME[] = {"undefined", "chars", "ints", "floats"};
const char *attr_type_to_string(AttrType type)
{
if (type >= UNDEFINED && type <= FLOATS) {
return ATTR_TYPE_NAME[type];
}
return "unknown";
}
AttrType attr_type_from_string(const char *s)
{
for (unsigned int i = 0; i < sizeof(ATTR_TYPE_NAME) / sizeof(ATTR_TYPE_NAME[0]); i++) {
if (0 == strcmp(ATTR_TYPE_NAME[i], s)) {
return (AttrType)i;
}
}
return UNDEFINED;
}
FieldMeta::FieldMeta() : attr_type_(AttrType::UNDEFINED), attr_offset_(-1), attr_len_(0), visible_(false)
{}
......
......@@ -12,8 +12,7 @@ See the Mulan PSL v2 for more details. */
// Created by Meiyi & Wangyunlai on 2021/5/12.
//
#ifndef __OBSERVER_STORAGE_COMMON_FIELD_META_H__
#define __OBSERVER_STORAGE_COMMON_FIELD_META_H__
#pragma once
#include <string>
......@@ -53,4 +52,3 @@ protected:
int attr_len_;
bool visible_;
};
#endif // __OBSERVER_STORAGE_COMMON_FIELD_META_H__
\ No newline at end of file
......@@ -12,8 +12,7 @@ See the Mulan PSL v2 for more details. */
// Created by Meiyi & Wangyunlai on 2021/5/12.
//
#ifndef __OBSERVER_STORAGE_COMMON_TABLE_H__
#define __OBSERVER_STORAGE_COMMON_TABLE_H__
#pragma once
#include "storage/common/table_meta.h"
......@@ -123,5 +122,3 @@ private:
RecordFileHandler *record_handler_ = nullptr; /// 记录操作
std::vector<Index *> indexes_;
};
#endif // __OBSERVER_STORAGE_COMMON_TABLE_H__
......@@ -12,8 +12,7 @@ See the Mulan PSL v2 for more details. */
// Created by Meiyi & Wangyunlai on 2021/5/12.
//
#ifndef __OBSERVER_STORAGE_COMMON_TABLE_META_H__
#define __OBSERVER_STORAGE_COMMON_TABLE_META_H__
#pragma once
#include <string>
#include <vector>
......@@ -73,5 +72,3 @@ protected:
//@@@ TODO why used static variable?
static std::vector<FieldMeta> sys_fields_;
};
#endif // __OBSERVER_STORAGE_COMMON_TABLE_META_H__
......@@ -148,7 +148,7 @@ void DefaultStorageStage::handle_event(StageEvent *event)
SessionEvent *session_event = sql_event->session_event();
Session *session = session_event->get_client()->session;
Session *session = session_event->session();
Db *db = session->get_current_db();
const char *dbname = db->name();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册