diff --git a/dbms/programs/server/CMakeLists.txt b/dbms/programs/server/CMakeLists.txt index 5cb0801806538a3477d3ed52dd6a622f9d33d54a..5b00f5574c8f0f3007e8cbe727c4f301f133b46a 100644 --- a/dbms/programs/server/CMakeLists.txt +++ b/dbms/programs/server/CMakeLists.txt @@ -8,6 +8,7 @@ set(CLICKHOUSE_SERVER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/RootRequestHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/Server.cpp ${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandler.cpp ) set(CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Poco_Net_LIBRARY}) diff --git a/dbms/programs/server/MySQLHandler.cpp b/dbms/programs/server/MySQLHandler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ad250c8ce7e7f7634eba3e4d9cf8d4049daa300b --- /dev/null +++ b/dbms/programs/server/MySQLHandler.cpp @@ -0,0 +1,279 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "MySQLHandler.h" +#include + + +namespace DB +{ +using namespace MySQLProtocol; + +uint32_t MySQLHandler::last_connection_id = 0; + +String MySQLHandler::readPayload() +{ + WriteBufferFromOwnString buf; + + size_t payload_length = 0; + uint8_t packet_sequence_id; + + // packets which are larger than or equal to 16MB are splitted + do { + LOG_TRACE(log, "Reading from buffer"); + + in->readStrict(reinterpret_cast(&payload_length), 3); + + if (payload_length > MAX_PACKET_LENGTH) { + throw ProtocolError(Poco::format("Received packet with payload length greater than 2^24 - 1: %z.", payload_length), 0); + } + + in->readStrict(reinterpret_cast(&packet_sequence_id), 1); + + if (packet_sequence_id != sequence_id) { + throw ProtocolError(Poco::format("Received packet with wrong sequence-id: %d. Expected: %d.", packet_sequence_id, sequence_id), 0); + } + sequence_id++; + + LOG_TRACE(log, "Received packet. Sequence-id: " << static_cast(packet_sequence_id) << ", payload length: " << payload_length); + + copyData(*in, static_cast(buf), payload_length); + } + while (payload_length == MAX_PACKET_LENGTH); + + return buf.str(); +} + +/// Converts packet to text. Useful for debugging, since packets often consist of non-printing characters. +static String packetToText(std::string_view payload) { + String result; + for (auto c : payload) { + result += ' '; + result += std::to_string(static_cast(c)); + } + return result; +} + +void MySQLHandler::writePayload(std::string_view payload) +{ + size_t pos = 0; + do + { + size_t payload_length = std::min(payload.length() - pos, MAX_PACKET_LENGTH); + + LOG_TRACE(log, "Writing packet of size " << payload_length << " with sequence-id " << static_cast(sequence_id)); + LOG_TRACE(log, packetToText(payload)); + + out->write(reinterpret_cast(&payload_length), 3); + out->write(reinterpret_cast(&sequence_id), 1); + out->write(payload.data() + pos, payload_length); + + pos += payload_length; + sequence_id++; + } + while (pos < payload.length()); + out->next(); + + LOG_TRACE(log, "Packet was sent."); +} + +void MySQLHandler::run() { + sequence_id = 0; + connection_context = server.context(); + + in = std::make_shared(socket()); + out = std::make_shared(socket()); + + try + { + Handshake handshake(connection_id, VERSION_FULL); + auto payload = handshake.getPayload(); + writePayload(payload); + + LOG_TRACE(log, "sent handshake"); + + HandshakeResponse handshake_response; + payload = readPayload(); + handshake_response.readPayload(payload); + + LOG_DEBUG(log, "capability_flags: " << handshake_response.capability_flags + << "max_packet_size: %s" + << handshake_response.max_packet_size + << "character_set: %s" + << handshake_response.character_set + << "user: %s" + << handshake_response.username + << "auth_response length: %s" + << handshake_response.auth_response.length() + << "auth_response: %s" + << handshake_response.auth_response + << "database: %s" + << handshake_response.database + << "auth_plugin_name: %s" + << handshake_response.auth_plugin_name); + + capabilities = handshake_response.capability_flags; + if (!(capabilities & CLIENT_PROTOCOL_41)) { + LOG_ERROR(log, "Clients without CLIENT_PROTOCOL_41 capability are not supported."); + return; + } + + try { + connection_context.setUser(handshake_response.username, "", socket().address(), ""); + connection_context.setCurrentDatabase(handshake_response.database); + connection_context.setCurrentQueryId(""); + } + catch (const Exception & exc) { + log->log(exc); + writePayload(ERR_Packet(exc.code(), "00000", exc.message()).getPayload()); + return; // TODO Authentication method change + } + OK_Packet ok_packet(0, handshake_response.capability_flags, 0, 0, 0, 0, ""); + payload = ok_packet.getPayload(); + writePayload(payload); + LOG_INFO(log, "sent OK_Packet"); + + while (true) { + sequence_id = 0; + payload = readPayload(); + int command = payload[0]; + LOG_DEBUG(log, "Received command: " << std::to_string(command) << ". Connection id: " << connection_id << "."); + try { + switch (command) { + case COM_QUIT: + return; + case COM_INIT_DB: + comInitDB(payload); + break; + case COM_QUERY: + comQuery(payload); + break; + case COM_FIELD_LIST: + comFieldList(payload); + break; + case COM_PING: + comPing(); + break; + default: + throw Exception(Poco::format("Command %d is not implemented.", command), ErrorCodes::NOT_IMPLEMENTED); + } + } + catch (const NetException & exc) { + log->log(exc); + throw; + } + catch (const Exception & exc) { + log->log(exc); + writePayload(ERR_Packet(exc.code(), "00000", exc.message()).getPayload()); + } + } + } + catch (Poco::Exception& exc) + { + log->log(exc); + } + +} + +void MySQLHandler::comInitDB(const String & payload) +{ + String database = payload.substr(1); + LOG_DEBUG(log, "Setting current database to " << database); + connection_context.setCurrentDatabase(database); + writePayload(OK_Packet(0, capabilities, 0, 0, 0, 1, "").getPayload()); +} + +void MySQLHandler::comFieldList(const String & payload) { + ComFieldList packet; + packet.readPayload(payload); + StoragePtr tablePtr = connection_context.getTable(connection_context.getCurrentDatabase(), packet.table); + for (const NameAndTypePair & column: tablePtr->getColumns().getAll()) + { + ColumnDefinition column_definition( + "schema", packet.table, packet.table, column.name, column.name, CharacterSet::UTF8, 100, ColumnType::MYSQL_TYPE_STRING, 0, 0 + ); + writePayload(column_definition.getPayload()); + } + writePayload(OK_Packet(0xfe, capabilities, 0, 0, 0, 0, "").getPayload()); +} + +void MySQLHandler::comPing() { + writePayload(OK_Packet(0x0, capabilities, 0, 0, 0, 0, "").getPayload()); +} + +void MySQLHandler::comQuery(const String & payload) { + BlockIO res = executeQuery(payload.substr(1), connection_context); + FormatSettings format_settings; + if (res.in) { + LOG_TRACE(log, "Executing query with output."); + + Block header = res.in->getHeader(); + writePayload(writeLenenc(header.columns())); + + for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName()) + { + writePayload(ColumnDefinition( + "", /// database. Apparently, addition of these fields to ColumnWithTypeAndName and changes in interpreters are needed. + "", /// table name. + "", /// physical table name + column.name, /// virtual column name + "", /// physical column name + CharacterSet::UTF8, + /// maximum column length which can be used for text outputting. Since query execution hasn't started, it is unknown. + std::numeric_limits::max(), + ColumnType::MYSQL_TYPE_STRING, /// TODO + 0, + 0 + ).getPayload()); + + LOG_TRACE(log, "sent " << column.name << " column definition"); + } + + LOG_TRACE(log, "Sent columns definitions."); + + while (Block block = res.in->read()) + { + size_t rows = block.rows(); + size_t columns = block.columns(); + + for (size_t i = 0; i < rows; i++) { + String row_payload; + for (size_t j = 0; j < columns; j++) { + ColumnWithTypeAndName & column = block.getByPosition(j); + column.column = column.column->convertToFullColumnIfConst(); + + String column_value; + WriteBufferFromString ostr(column_value); + + LOG_TRACE(log, "sending value of type " << column.type->getName() << " of column " << column.column->getName()); + + column.type->serializeAsText(*column.column.get(), i, ostr, format_settings); + ostr.finish(); + + writeLenencStr(row_payload, column_value); + } + writePayload(row_payload); + } + } + + LOG_TRACE(log, "Sent rows."); + } + + if (capabilities & CLIENT_DEPRECATE_EOF) { + writePayload(OK_Packet(0xfe, capabilities, 0, 0, 0, 0, "").getPayload()); + } else { + writePayload(EOF_Packet(0, 0).getPayload()); + } +} + +} diff --git a/dbms/programs/server/MySQLHandler.h b/dbms/programs/server/MySQLHandler.h new file mode 100644 index 0000000000000000000000000000000000000000..85b6ed78e819998db123e450e8f07ef31fae3f47 --- /dev/null +++ b/dbms/programs/server/MySQLHandler.h @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include "IServer.h" + + +namespace DB +{ + +/// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client. +class MySQLHandler : public Poco::Net::TCPServerConnection { +public: + MySQLHandler(IServer &server_, const Poco::Net::StreamSocket &socket_) + : Poco::Net::TCPServerConnection(socket_) + , server(server_) + , log(&Poco::Logger::get("MySQLHandler")) + , connection_context(server.context()) + , connection_id(last_connection_id++) + { + } + + void run() final; + + /** Reads one packet, incrementing sequence-id, and returns its payload. + * Currently, whole payload is loaded into memory. + */ + String readPayload(); + + /// Writes packet payload, incrementing sequence-id. + void writePayload(std::string_view payload); + + void comQuery(const String & payload); + + void comFieldList(const String & payload); + + void comPing(); + + void comInitDB(const String & payload); +private: + IServer & server; + Poco::Logger * log; + Context connection_context; + + std::shared_ptr in; + std::shared_ptr out; + + /// Packet sequence id + unsigned char sequence_id = 0; + + uint32_t connection_id = 0; + + uint32_t capabilities; + + static uint32_t last_connection_id; +}; + +} diff --git a/dbms/programs/server/MySQLHandlerFactory.h b/dbms/programs/server/MySQLHandlerFactory.h new file mode 100644 index 0000000000000000000000000000000000000000..5f08191577838aa28cbc0f3104c0597e31f2573f --- /dev/null +++ b/dbms/programs/server/MySQLHandlerFactory.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include "IServer.h" +#include "MySQLHandler.h" + +namespace Poco { class Logger; } + +namespace DB +{ + + class MySQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory + { + private: + IServer & server; + Poco::Logger * log; + + public: + explicit MySQLHandlerFactory(IServer & server_) + : server(server_) + , log(&Logger::get("MySQLHandlerFactory")) + { + } + + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + { + LOG_TRACE(log, "MySQL connection. Address: " << socket.peerAddress().toString()); + return new MySQLHandler(server, socket); + } + }; + +} diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp index 2b10c9e3c98332e32945691a57c876c10718107c..5c7959f30924088eba4a6d3f84341531db0b39f2 100644 --- a/dbms/programs/server/Server.cpp +++ b/dbms/programs/server/Server.cpp @@ -49,6 +49,7 @@ #include #include "TCPHandlerFactory.h" #include "Common/config_version.h" +#include "MySQLHandlerFactory.h" #if defined(__linux__) #include @@ -725,6 +726,21 @@ int Server::main(const std::vector & /*args*/) ErrorCodes::SUPPORT_IS_DISABLED}; #endif } + + if (config().has("mysql_port")) + { + Poco::Net::ServerSocket socket; + auto address = socket_bind_listen(socket, listen_host, config().getInt("mysql_port"), /* secure = */ true); + socket.setReceiveTimeout(Poco::Timespan()); + socket.setSendTimeout(settings.send_timeout); + servers.emplace_back(std::make_unique( + new MySQLHandlerFactory(*this), + server_pool, + socket, + new Poco::Net::TCPServerParams)); + + LOG_INFO(log, "Listening mysql: " + address.toString()); + } } catch (const Poco::Net::NetException & e) { diff --git a/dbms/src/Core/MySQLProtocol.cpp b/dbms/src/Core/MySQLProtocol.cpp new file mode 100644 index 0000000000000000000000000000000000000000..baa6784fd642062db05dbba30e38993f94f43ca2 --- /dev/null +++ b/dbms/src/Core/MySQLProtocol.cpp @@ -0,0 +1,50 @@ +#include +#include +#include + +/// Implementation of MySQL wire protocol + +namespace DB { +namespace MySQLProtocol { + +uint64_t readLenenc(std::istringstream &ss) { + char c; + uint64_t buf = 0; + ss.get(c); + auto cc = static_cast(c); + if (cc < 0xfc) { + return cc; + } else if (cc < 0xfd) { + ss.read(reinterpret_cast(&buf), 2); + } else if (cc < 0xfe) { + ss.read(reinterpret_cast(&buf), 3); + } else { + ss.read(reinterpret_cast(&buf), 8); + } + return buf; +} + +std::string writeLenenc(uint64_t x) { + std::string result; + if (x < 251) { + result.append(1, static_cast(x)); + } else if (x < (1 << 16)) { + result.append(1, 0xfc); + result.append(reinterpret_cast(&x), 2); + } else if (x < (1 << 24)) { + result.append(1, 0xfd); + result.append(reinterpret_cast(&x), 3); + } else { + result.append(1, 0xfe); + result.append(reinterpret_cast(&x), 8); + } + return result; +} + +void writeLenencStr(std::string &payload, const std::string &s) { + payload.append(writeLenenc(s.length())); + payload.append(s); +} + +} +} diff --git a/dbms/src/Core/MySQLProtocol.h b/dbms/src/Core/MySQLProtocol.h new file mode 100644 index 0000000000000000000000000000000000000000..9157a32e96d2205d340718ba4b7393214a4b2b99 --- /dev/null +++ b/dbms/src/Core/MySQLProtocol.h @@ -0,0 +1,366 @@ +#pragma once + +#include +#include +#include + +/// Implementation of MySQL wire protocol + +namespace DB { +namespace MySQLProtocol { + +const size_t MAX_PACKET_LENGTH = (1 << 24) - 1; // 16 mb +const size_t SCRAMBLE_LENGTH = 20; +const size_t AUTH_PLUGIN_DATA_PART_1_LENGTH = 8; +const size_t MYSQL_ERRMSG_SIZE = 512; + +namespace Authentication { + const std::string Native41 = "mysql_native_password"; +} + +enum CharacterSet { + UTF8 = 33 +}; + +enum StatusFlags { + SERVER_SESSION_STATE_CHANGED = 0x4000 +}; + +enum Capability { + CLIENT_CONNECT_WITH_DB = 0x00000008, + CLIENT_PROTOCOL_41 = 0x00000200, + CLIENT_TRANSACTIONS = 0x00002000, // TODO + CLIENT_SESSION_TRACK = 0x00800000, // TODO + CLIENT_SECURE_CONNECTION = 0x00008000, + CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000, + CLIENT_PLUGIN_AUTH = 0x00080000, + CLIENT_DEPRECATE_EOF = 0x01000000, +}; + +enum Command { + COM_SLEEP = 0x0, + COM_QUIT = 0x1, + COM_INIT_DB = 0x2, + COM_QUERY = 0x3, + COM_FIELD_LIST = 0x4, + COM_CREATE_DB = 0x5, + COM_DROP_DB = 0x6, + COM_REFRESH = 0x7, + COM_SHUTDOWN = 0x8, + COM_STATISTICS = 0x9, + COM_PROCESS_INFO = 0xa, + COM_CONNECT = 0xb, + COM_PROCESS_KILL = 0xc, + COM_DEBUG = 0xd, + COM_PING = 0xe, + COM_TIME = 0xf, + COM_DELAYED_INSERT = 0x10, + COM_CHANGE_USER = 0x11, + COM_RESET_CONNECTION = 0x1f, + COM_DAEMON = 0x1d +}; + +enum ColumnType { + MYSQL_TYPE_DECIMAL = 0x00, + MYSQL_TYPE_TINY = 0x01, + MYSQL_TYPE_SHORT = 0x02, + MYSQL_TYPE_LONG = 0x03, + MYSQL_TYPE_FLOAT = 0x04, + MYSQL_TYPE_DOUBLE = 0x05, + MYSQL_TYPE_NULL = 0x06, + MYSQL_TYPE_TIMESTAMP = 0x07, + MYSQL_TYPE_LONGLONG = 0x08, + MYSQL_TYPE_INT24 = 0x09, + MYSQL_TYPE_DATE = 0x0a, + MYSQL_TYPE_TIME = 0x0b, + MYSQL_TYPE_DATETIME = 0x0c, + MYSQL_TYPE_YEAR = 0x0d, + MYSQL_TYPE_VARCHAR = 0x0f, + MYSQL_TYPE_BIT = 0x10, + MYSQL_TYPE_NEWDECIMAL = 0xf6, + MYSQL_TYPE_ENUM = 0xf7, + MYSQL_TYPE_SET = 0xf8, + MYSQL_TYPE_TINY_BLOB = 0xf9, + MYSQL_TYPE_MEDIUM_BLOB = 0xfa, + MYSQL_TYPE_LONG_BLOB = 0xfb, + MYSQL_TYPE_BLOB = 0xfc, + MYSQL_TYPE_VAR_STRING = 0xfd, + MYSQL_TYPE_STRING = 0xfe, + MYSQL_TYPE_GEOMETRY = 0xff +}; + +uint64_t readLenenc(std::istringstream &ss); + +std::string writeLenenc(uint64_t x); + +void writeLenencStr(std::string &payload, const std::string &s); + + +class ProtocolError : public DB::Exception { +public: + using Exception::Exception; +}; + + +/// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html + + +class Handshake { + int protocol_version = 0xa; + std::string server_version; + uint32_t connection_id; + uint32_t capability_flags; + uint8_t character_set; + uint32_t status_flags; + std::string auth_plugin_data; +public: + explicit Handshake(uint32_t connection_id, std::string server_version) + : protocol_version(0xa) + , server_version(std::move(server_version)) + , connection_id(connection_id) + , capability_flags( + CLIENT_PROTOCOL_41 + | CLIENT_SECURE_CONNECTION + | CLIENT_PLUGIN_AUTH + | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA + | CLIENT_CONNECT_WITH_DB + | CLIENT_DEPRECATE_EOF) + , character_set(63) + , status_flags(0) { + auth_plugin_data.resize(SCRAMBLE_LENGTH); + + auto seed = std::chrono::system_clock::now().time_since_epoch().count(); + std::default_random_engine generator (static_cast(seed)); + + std::uniform_int_distribution distribution(0); + for (size_t i = 0; i < SCRAMBLE_LENGTH; i++) { + auth_plugin_data[i] = distribution(generator); + } + } + + std::string getPayload() { + std::string result; + result.append(1, protocol_version); + result.append(server_version); + result.append(1, 0x0); + result.append(reinterpret_cast(&connection_id), 4); + result.append(auth_plugin_data.substr(0, AUTH_PLUGIN_DATA_PART_1_LENGTH)); + result.append(1, 0x0); + result.append(reinterpret_cast(&capability_flags), 2); + result.append(reinterpret_cast(&character_set), 1); + result.append(reinterpret_cast(&status_flags), 2); + result.append((reinterpret_cast(&capability_flags)) + 2, 2); + result.append(1, SCRAMBLE_LENGTH + 1); + result.append(1, 0x0); + result.append(10, 0x0); + result.append(auth_plugin_data.substr(AUTH_PLUGIN_DATA_PART_1_LENGTH, SCRAMBLE_LENGTH - AUTH_PLUGIN_DATA_PART_1_LENGTH)); + result.append(Authentication::Native41); + result.append(1, 0x0); + return result; + } +}; + +class HandshakeResponse { +public: + uint32_t capability_flags; + uint32_t max_packet_size; + uint8_t character_set; + std::string username; + std::string auth_response; + std::string database; + std::string auth_plugin_name; + + void readPayload(const std::string & s) { + std::istringstream ss(s); + + ss.readsome(reinterpret_cast(&capability_flags), 4); + ss.readsome(reinterpret_cast(&max_packet_size), 4); + ss.readsome(reinterpret_cast(&character_set), 1); + ss.ignore(23); + + std::getline(ss, username, static_cast(0x0)); + + if (capability_flags & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA) { + auto len = readLenenc(ss); + auth_response.resize(len); + ss.read(auth_response.data(), static_cast(len)); + } else if (capability_flags & CLIENT_SECURE_CONNECTION) { + uint8_t len; + ss.read(reinterpret_cast(&len), 1); + auth_response.resize(len); + ss.read(auth_response.data(), len); + } else { + std::getline(ss, auth_response, static_cast(0x0)); + } + + if (capability_flags & CLIENT_CONNECT_WITH_DB) { + std::getline(ss, database, static_cast(0x0)); + } + + if (capability_flags & CLIENT_PLUGIN_AUTH) { + std::getline(ss, auth_plugin_name, static_cast(0x0)); + } + } +}; + +class OK_Packet { + uint8_t header; + uint32_t capabilities; + uint64_t affected_rows; + uint64_t last_insert_id; + int16_t warnings = 0; + uint32_t status_flags; + std::string info; + std::string session_state_changes; +public: + OK_Packet(uint8_t header, uint32_t capabilities, uint64_t affected_rows, uint64_t last_insert_id, uint32_t status_flags, + int16_t warnings, std::string session_state_changes) + : header(header) + , capabilities(capabilities) + , affected_rows(affected_rows) + , last_insert_id(last_insert_id) + , warnings(warnings) + , status_flags(status_flags) + , session_state_changes(std::move(session_state_changes)) + { + } + + std::string getPayload() { + std::string result; + result.append(1, header); + result.append(writeLenenc(affected_rows)); + result.append(writeLenenc(last_insert_id)); + + if (capabilities & CLIENT_PROTOCOL_41) { + result.append(reinterpret_cast(&status_flags), 2); + result.append(reinterpret_cast(&warnings), 2); + } else if (capabilities & CLIENT_TRANSACTIONS) { + result.append(reinterpret_cast(&status_flags), 2); + } + + if (capabilities & CLIENT_SESSION_TRACK) { + result.append(writeLenenc(info.length())); + result.append(info); + if (status_flags & SERVER_SESSION_STATE_CHANGED) { + result.append(writeLenenc(session_state_changes.length())); + result.append(session_state_changes); + } + } else { + result.append(info); + } + return result; + } +}; + +class EOF_Packet { + int warnings; + int status_flags; +public: + EOF_Packet(int warnings, int status_flags): warnings(warnings), status_flags(status_flags) {} + + std::string getPayload() { + std::string result; + result.append(1, 0xfe); // EOF header + result.append(reinterpret_cast(&warnings), 2); + result.append(reinterpret_cast(&status_flags), 2); + return result; + } +}; + +class ERR_Packet { + int error_code; + std::string sql_state; + std::string error_message; +public: + ERR_Packet(int error_code, std::string sql_state, std::string error_message) + : error_code(error_code) + , sql_state(std::move(sql_state)) + , error_message(std::move(error_message)) + { + } + + std::string getPayload() + { + std::string result; + result.append(1, 0xff); + result.append(reinterpret_cast(&error_code), 2); + result.append("#", 1); + result.append(sql_state.data(), sql_state.length()); + result.append(error_message.data(), std::min(error_message.length(), MYSQL_ERRMSG_SIZE)); + return result; + } +}; + +class ColumnDefinition { + std::string schema; + std::string table; + std::string org_table; + std::string name; + std::string org_name; + size_t next_length = 0x0c; + uint16_t character_set; + uint32_t column_length; + ColumnType column_type; + uint16_t flags; + uint8_t decimals = 0x00; +public: + explicit ColumnDefinition( + std::string schema, + std::string table, + std::string org_table, + std::string name, + std::string org_name, + uint16_t character_set, + uint32_t column_length, + ColumnType column_type, + uint16_t flags, + uint8_t decimals) + + : schema(std::move(schema)) + , table(std::move(table)) + , org_table(std::move(org_table)) + , name(std::move(name)) + , org_name(std::move(org_name)) + , character_set(character_set) + , column_length(column_length) + , column_type(column_type) + , flags(flags) + , decimals(decimals) + { + } + + std::string getPayload() { + std::string result; + writeLenencStr(result, "def"); // always "def" + writeLenencStr(result, schema); + writeLenencStr(result, table); + writeLenencStr(result, org_table); + writeLenencStr(result, name); + writeLenencStr(result, org_name); + result.append(writeLenenc(next_length)); + result.append(reinterpret_cast(&character_set), 2); + result.append(reinterpret_cast(&column_length), 4); + result.append(reinterpret_cast(&column_type), 1); + result.append(reinterpret_cast(&flags), 2); + result.append(reinterpret_cast(&decimals), 2); + result.append(2, 0x0); + return result; + } +}; + +class ComFieldList { +public: + std::string table, field_wildcard; + + void readPayload(const std::string & payload) + { + std::istringstream ss(payload); + ss.ignore(1); // command byte + std::getline(ss, table, static_cast(0x0)); + field_wildcard = payload.substr(table.length() + 2); // rest of the packet + } +}; + + +} +}