Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
bfd53e3c
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
bfd53e3c
编写于
3月 16, 2019
作者:
Y
Yuriy
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
draft
上级
a63cf1ae
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
803 addition
and
0 deletion
+803
-0
dbms/programs/server/CMakeLists.txt
dbms/programs/server/CMakeLists.txt
+1
-0
dbms/programs/server/MySQLHandler.cpp
dbms/programs/server/MySQLHandler.cpp
+279
-0
dbms/programs/server/MySQLHandler.h
dbms/programs/server/MySQLHandler.h
+58
-0
dbms/programs/server/MySQLHandlerFactory.h
dbms/programs/server/MySQLHandlerFactory.h
+33
-0
dbms/programs/server/Server.cpp
dbms/programs/server/Server.cpp
+16
-0
dbms/src/Core/MySQLProtocol.cpp
dbms/src/Core/MySQLProtocol.cpp
+50
-0
dbms/src/Core/MySQLProtocol.h
dbms/src/Core/MySQLProtocol.h
+366
-0
未找到文件。
dbms/programs/server/CMakeLists.txt
浏览文件 @
bfd53e3c
...
@@ -8,6 +8,7 @@ set(CLICKHOUSE_SERVER_SOURCES
...
@@ -8,6 +8,7 @@ set(CLICKHOUSE_SERVER_SOURCES
${
CMAKE_CURRENT_SOURCE_DIR
}
/RootRequestHandler.cpp
${
CMAKE_CURRENT_SOURCE_DIR
}
/RootRequestHandler.cpp
${
CMAKE_CURRENT_SOURCE_DIR
}
/Server.cpp
${
CMAKE_CURRENT_SOURCE_DIR
}
/Server.cpp
${
CMAKE_CURRENT_SOURCE_DIR
}
/TCPHandler.cpp
${
CMAKE_CURRENT_SOURCE_DIR
}
/TCPHandler.cpp
${
CMAKE_CURRENT_SOURCE_DIR
}
/MySQLHandler.cpp
)
)
set
(
CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions
${
Poco_Net_LIBRARY
}
)
set
(
CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions
${
Poco_Net_LIBRARY
}
)
...
...
dbms/programs/server/MySQLHandler.cpp
0 → 100644
浏览文件 @
bfd53e3c
#include <DataStreams/copyData.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <Interpreters/executeQuery.h>
#include <Storages/IStorage.h>
#include <Core/MySQLProtocol.h>
#include <Core/NamesAndTypes.h>
#include <Columns/ColumnVector.h>
#include <Common/config_version.h>
#include <Common/NetException.h>
#include "MySQLHandler.h"
#include <limits>
namespace
DB
{
using
namespace
MySQLProtocol
;
uint32_t
MySQLHandler
::
last_connection_id
=
0
;
String
MySQLHandler
::
readPayload
()
{
WriteBufferFromOwnString
buf
;
size_t
payload_length
=
0
;
uint8_t
packet_sequence_id
;
// packets which are larger than or equal to 16MB are splitted
do
{
LOG_TRACE
(
log
,
"Reading from buffer"
);
in
->
readStrict
(
reinterpret_cast
<
char
*>
(
&
payload_length
),
3
);
if
(
payload_length
>
MAX_PACKET_LENGTH
)
{
throw
ProtocolError
(
Poco
::
format
(
"Received packet with payload length greater than 2^24 - 1: %z."
,
payload_length
),
0
);
}
in
->
readStrict
(
reinterpret_cast
<
char
*>
(
&
packet_sequence_id
),
1
);
if
(
packet_sequence_id
!=
sequence_id
)
{
throw
ProtocolError
(
Poco
::
format
(
"Received packet with wrong sequence-id: %d. Expected: %d."
,
packet_sequence_id
,
sequence_id
),
0
);
}
sequence_id
++
;
LOG_TRACE
(
log
,
"Received packet. Sequence-id: "
<<
static_cast
<
int
>
(
packet_sequence_id
)
<<
", payload length: "
<<
payload_length
);
copyData
(
*
in
,
static_cast
<
WriteBuffer
&>
(
buf
),
payload_length
);
}
while
(
payload_length
==
MAX_PACKET_LENGTH
);
return
buf
.
str
();
}
/// Converts packet to text. Useful for debugging, since packets often consist of non-printing characters.
static
String
packetToText
(
std
::
string_view
payload
)
{
String
result
;
for
(
auto
c
:
payload
)
{
result
+=
' '
;
result
+=
std
::
to_string
(
static_cast
<
unsigned
char
>
(
c
));
}
return
result
;
}
void
MySQLHandler
::
writePayload
(
std
::
string_view
payload
)
{
size_t
pos
=
0
;
do
{
size_t
payload_length
=
std
::
min
(
payload
.
length
()
-
pos
,
MAX_PACKET_LENGTH
);
LOG_TRACE
(
log
,
"Writing packet of size "
<<
payload_length
<<
" with sequence-id "
<<
static_cast
<
int
>
(
sequence_id
));
LOG_TRACE
(
log
,
packetToText
(
payload
));
out
->
write
(
reinterpret_cast
<
const
char
*>
(
&
payload_length
),
3
);
out
->
write
(
reinterpret_cast
<
const
char
*>
(
&
sequence_id
),
1
);
out
->
write
(
payload
.
data
()
+
pos
,
payload_length
);
pos
+=
payload_length
;
sequence_id
++
;
}
while
(
pos
<
payload
.
length
());
out
->
next
();
LOG_TRACE
(
log
,
"Packet was sent."
);
}
void
MySQLHandler
::
run
()
{
sequence_id
=
0
;
connection_context
=
server
.
context
();
in
=
std
::
make_shared
<
ReadBufferFromPocoSocket
>
(
socket
());
out
=
std
::
make_shared
<
WriteBufferFromPocoSocket
>
(
socket
());
try
{
Handshake
handshake
(
connection_id
,
VERSION_FULL
);
auto
payload
=
handshake
.
getPayload
();
writePayload
(
payload
);
LOG_TRACE
(
log
,
"sent handshake"
);
HandshakeResponse
handshake_response
;
payload
=
readPayload
();
handshake_response
.
readPayload
(
payload
);
LOG_DEBUG
(
log
,
"capability_flags: "
<<
handshake_response
.
capability_flags
<<
"max_packet_size: %s"
<<
handshake_response
.
max_packet_size
<<
"character_set: %s"
<<
handshake_response
.
character_set
<<
"user: %s"
<<
handshake_response
.
username
<<
"auth_response length: %s"
<<
handshake_response
.
auth_response
.
length
()
<<
"auth_response: %s"
<<
handshake_response
.
auth_response
<<
"database: %s"
<<
handshake_response
.
database
<<
"auth_plugin_name: %s"
<<
handshake_response
.
auth_plugin_name
);
capabilities
=
handshake_response
.
capability_flags
;
if
(
!
(
capabilities
&
CLIENT_PROTOCOL_41
))
{
LOG_ERROR
(
log
,
"Clients without CLIENT_PROTOCOL_41 capability are not supported."
);
return
;
}
try
{
connection_context
.
setUser
(
handshake_response
.
username
,
""
,
socket
().
address
(),
""
);
connection_context
.
setCurrentDatabase
(
handshake_response
.
database
);
connection_context
.
setCurrentQueryId
(
""
);
}
catch
(
const
Exception
&
exc
)
{
log
->
log
(
exc
);
writePayload
(
ERR_Packet
(
exc
.
code
(),
"00000"
,
exc
.
message
()).
getPayload
());
return
;
// TODO Authentication method change
}
OK_Packet
ok_packet
(
0
,
handshake_response
.
capability_flags
,
0
,
0
,
0
,
0
,
""
);
payload
=
ok_packet
.
getPayload
();
writePayload
(
payload
);
LOG_INFO
(
log
,
"sent OK_Packet"
);
while
(
true
)
{
sequence_id
=
0
;
payload
=
readPayload
();
int
command
=
payload
[
0
];
LOG_DEBUG
(
log
,
"Received command: "
<<
std
::
to_string
(
command
)
<<
". Connection id: "
<<
connection_id
<<
"."
);
try
{
switch
(
command
)
{
case
COM_QUIT
:
return
;
case
COM_INIT_DB
:
comInitDB
(
payload
);
break
;
case
COM_QUERY
:
comQuery
(
payload
);
break
;
case
COM_FIELD_LIST
:
comFieldList
(
payload
);
break
;
case
COM_PING
:
comPing
();
break
;
default:
throw
Exception
(
Poco
::
format
(
"Command %d is not implemented."
,
command
),
ErrorCodes
::
NOT_IMPLEMENTED
);
}
}
catch
(
const
NetException
&
exc
)
{
log
->
log
(
exc
);
throw
;
}
catch
(
const
Exception
&
exc
)
{
log
->
log
(
exc
);
writePayload
(
ERR_Packet
(
exc
.
code
(),
"00000"
,
exc
.
message
()).
getPayload
());
}
}
}
catch
(
Poco
::
Exception
&
exc
)
{
log
->
log
(
exc
);
}
}
void
MySQLHandler
::
comInitDB
(
const
String
&
payload
)
{
String
database
=
payload
.
substr
(
1
);
LOG_DEBUG
(
log
,
"Setting current database to "
<<
database
);
connection_context
.
setCurrentDatabase
(
database
);
writePayload
(
OK_Packet
(
0
,
capabilities
,
0
,
0
,
0
,
1
,
""
).
getPayload
());
}
void
MySQLHandler
::
comFieldList
(
const
String
&
payload
)
{
ComFieldList
packet
;
packet
.
readPayload
(
payload
);
StoragePtr
tablePtr
=
connection_context
.
getTable
(
connection_context
.
getCurrentDatabase
(),
packet
.
table
);
for
(
const
NameAndTypePair
&
column
:
tablePtr
->
getColumns
().
getAll
())
{
ColumnDefinition
column_definition
(
"schema"
,
packet
.
table
,
packet
.
table
,
column
.
name
,
column
.
name
,
CharacterSet
::
UTF8
,
100
,
ColumnType
::
MYSQL_TYPE_STRING
,
0
,
0
);
writePayload
(
column_definition
.
getPayload
());
}
writePayload
(
OK_Packet
(
0xfe
,
capabilities
,
0
,
0
,
0
,
0
,
""
).
getPayload
());
}
void
MySQLHandler
::
comPing
()
{
writePayload
(
OK_Packet
(
0x0
,
capabilities
,
0
,
0
,
0
,
0
,
""
).
getPayload
());
}
void
MySQLHandler
::
comQuery
(
const
String
&
payload
)
{
BlockIO
res
=
executeQuery
(
payload
.
substr
(
1
),
connection_context
);
FormatSettings
format_settings
;
if
(
res
.
in
)
{
LOG_TRACE
(
log
,
"Executing query with output."
);
Block
header
=
res
.
in
->
getHeader
();
writePayload
(
writeLenenc
(
header
.
columns
()));
for
(
const
ColumnWithTypeAndName
&
column
:
header
.
getColumnsWithTypeAndName
())
{
writePayload
(
ColumnDefinition
(
""
,
/// database. Apparently, addition of these fields to ColumnWithTypeAndName and changes in interpreters are needed.
""
,
/// table name.
""
,
/// physical table name
column
.
name
,
/// virtual column name
""
,
/// physical column name
CharacterSet
::
UTF8
,
/// maximum column length which can be used for text outputting. Since query execution hasn't started, it is unknown.
std
::
numeric_limits
<
uint32_t
>::
max
(),
ColumnType
::
MYSQL_TYPE_STRING
,
/// TODO
0
,
0
).
getPayload
());
LOG_TRACE
(
log
,
"sent "
<<
column
.
name
<<
" column definition"
);
}
LOG_TRACE
(
log
,
"Sent columns definitions."
);
while
(
Block
block
=
res
.
in
->
read
())
{
size_t
rows
=
block
.
rows
();
size_t
columns
=
block
.
columns
();
for
(
size_t
i
=
0
;
i
<
rows
;
i
++
)
{
String
row_payload
;
for
(
size_t
j
=
0
;
j
<
columns
;
j
++
)
{
ColumnWithTypeAndName
&
column
=
block
.
getByPosition
(
j
);
column
.
column
=
column
.
column
->
convertToFullColumnIfConst
();
String
column_value
;
WriteBufferFromString
ostr
(
column_value
);
LOG_TRACE
(
log
,
"sending value of type "
<<
column
.
type
->
getName
()
<<
" of column "
<<
column
.
column
->
getName
());
column
.
type
->
serializeAsText
(
*
column
.
column
.
get
(),
i
,
ostr
,
format_settings
);
ostr
.
finish
();
writeLenencStr
(
row_payload
,
column_value
);
}
writePayload
(
row_payload
);
}
}
LOG_TRACE
(
log
,
"Sent rows."
);
}
if
(
capabilities
&
CLIENT_DEPRECATE_EOF
)
{
writePayload
(
OK_Packet
(
0xfe
,
capabilities
,
0
,
0
,
0
,
0
,
""
).
getPayload
());
}
else
{
writePayload
(
EOF_Packet
(
0
,
0
).
getPayload
());
}
}
}
dbms/programs/server/MySQLHandler.h
0 → 100644
浏览文件 @
bfd53e3c
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include <Common/getFQDNOrHostName.h>
#include "IServer.h"
namespace
DB
{
/// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client.
class
MySQLHandler
:
public
Poco
::
Net
::
TCPServerConnection
{
public:
MySQLHandler
(
IServer
&
server_
,
const
Poco
::
Net
::
StreamSocket
&
socket_
)
:
Poco
::
Net
::
TCPServerConnection
(
socket_
)
,
server
(
server_
)
,
log
(
&
Poco
::
Logger
::
get
(
"MySQLHandler"
))
,
connection_context
(
server
.
context
())
,
connection_id
(
last_connection_id
++
)
{
}
void
run
()
final
;
/** Reads one packet, incrementing sequence-id, and returns its payload.
* Currently, whole payload is loaded into memory.
*/
String
readPayload
();
/// Writes packet payload, incrementing sequence-id.
void
writePayload
(
std
::
string_view
payload
);
void
comQuery
(
const
String
&
payload
);
void
comFieldList
(
const
String
&
payload
);
void
comPing
();
void
comInitDB
(
const
String
&
payload
);
private:
IServer
&
server
;
Poco
::
Logger
*
log
;
Context
connection_context
;
std
::
shared_ptr
<
ReadBuffer
>
in
;
std
::
shared_ptr
<
WriteBuffer
>
out
;
/// Packet sequence id
unsigned
char
sequence_id
=
0
;
uint32_t
connection_id
=
0
;
uint32_t
capabilities
;
static
uint32_t
last_connection_id
;
};
}
dbms/programs/server/MySQLHandlerFactory.h
0 → 100644
浏览文件 @
bfd53e3c
#pragma once
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <common/logger_useful.h>
#include "IServer.h"
#include "MySQLHandler.h"
namespace
Poco
{
class
Logger
;
}
namespace
DB
{
class
MySQLHandlerFactory
:
public
Poco
::
Net
::
TCPServerConnectionFactory
{
private:
IServer
&
server
;
Poco
::
Logger
*
log
;
public:
explicit
MySQLHandlerFactory
(
IServer
&
server_
)
:
server
(
server_
)
,
log
(
&
Logger
::
get
(
"MySQLHandlerFactory"
))
{
}
Poco
::
Net
::
TCPServerConnection
*
createConnection
(
const
Poco
::
Net
::
StreamSocket
&
socket
)
override
{
LOG_TRACE
(
log
,
"MySQL connection. Address: "
<<
socket
.
peerAddress
().
toString
());
return
new
MySQLHandler
(
server
,
socket
);
}
};
}
dbms/programs/server/Server.cpp
浏览文件 @
bfd53e3c
...
@@ -49,6 +49,7 @@
...
@@ -49,6 +49,7 @@
#include <Common/StatusFile.h>
#include <Common/StatusFile.h>
#include "TCPHandlerFactory.h"
#include "TCPHandlerFactory.h"
#include "Common/config_version.h"
#include "Common/config_version.h"
#include "MySQLHandlerFactory.h"
#if defined(__linux__)
#if defined(__linux__)
#include <Common/hasLinuxCapability.h>
#include <Common/hasLinuxCapability.h>
...
@@ -725,6 +726,21 @@ int Server::main(const std::vector<std::string> & /*args*/)
...
@@ -725,6 +726,21 @@ int Server::main(const std::vector<std::string> & /*args*/)
ErrorCodes
::
SUPPORT_IS_DISABLED
};
ErrorCodes
::
SUPPORT_IS_DISABLED
};
#endif
#endif
}
}
if
(
config
().
has
(
"mysql_port"
))
{
Poco
::
Net
::
ServerSocket
socket
;
auto
address
=
socket_bind_listen
(
socket
,
listen_host
,
config
().
getInt
(
"mysql_port"
),
/* secure = */
true
);
socket
.
setReceiveTimeout
(
Poco
::
Timespan
());
socket
.
setSendTimeout
(
settings
.
send_timeout
);
servers
.
emplace_back
(
std
::
make_unique
<
Poco
::
Net
::
TCPServer
>
(
new
MySQLHandlerFactory
(
*
this
),
server_pool
,
socket
,
new
Poco
::
Net
::
TCPServerParams
));
LOG_INFO
(
log
,
"Listening mysql: "
+
address
.
toString
());
}
}
}
catch
(
const
Poco
::
Net
::
NetException
&
e
)
catch
(
const
Poco
::
Net
::
NetException
&
e
)
{
{
...
...
dbms/src/Core/MySQLProtocol.cpp
0 → 100644
浏览文件 @
bfd53e3c
#include <IO/WriteBuffer.h>
#include <random>
#include <sstream>
/// Implementation of MySQL wire protocol
namespace
DB
{
namespace
MySQLProtocol
{
uint64_t
readLenenc
(
std
::
istringstream
&
ss
)
{
char
c
;
uint64_t
buf
=
0
;
ss
.
get
(
c
);
auto
cc
=
static_cast
<
uint8_t
>
(
c
);
if
(
cc
<
0xfc
)
{
return
cc
;
}
else
if
(
cc
<
0xfd
)
{
ss
.
read
(
reinterpret_cast
<
char
*>
(
&
buf
),
2
);
}
else
if
(
cc
<
0xfe
)
{
ss
.
read
(
reinterpret_cast
<
char
*>
(
&
buf
),
3
);
}
else
{
ss
.
read
(
reinterpret_cast
<
char
*>
(
&
buf
),
8
);
}
return
buf
;
}
std
::
string
writeLenenc
(
uint64_t
x
)
{
std
::
string
result
;
if
(
x
<
251
)
{
result
.
append
(
1
,
static_cast
<
char
>
(
x
));
}
else
if
(
x
<
(
1
<<
16
))
{
result
.
append
(
1
,
0xfc
);
result
.
append
(
reinterpret_cast
<
char
*>
(
&
x
),
2
);
}
else
if
(
x
<
(
1
<<
24
))
{
result
.
append
(
1
,
0xfd
);
result
.
append
(
reinterpret_cast
<
char
*>
(
&
x
),
3
);
}
else
{
result
.
append
(
1
,
0xfe
);
result
.
append
(
reinterpret_cast
<
char
*>
(
&
x
),
8
);
}
return
result
;
}
void
writeLenencStr
(
std
::
string
&
payload
,
const
std
::
string
&
s
)
{
payload
.
append
(
writeLenenc
(
s
.
length
()));
payload
.
append
(
s
);
}
}
}
dbms/src/Core/MySQLProtocol.h
0 → 100644
浏览文件 @
bfd53e3c
#pragma once
#include <IO/WriteBuffer.h>
#include <random>
#include <sstream>
/// Implementation of MySQL wire protocol
namespace
DB
{
namespace
MySQLProtocol
{
const
size_t
MAX_PACKET_LENGTH
=
(
1
<<
24
)
-
1
;
// 16 mb
const
size_t
SCRAMBLE_LENGTH
=
20
;
const
size_t
AUTH_PLUGIN_DATA_PART_1_LENGTH
=
8
;
const
size_t
MYSQL_ERRMSG_SIZE
=
512
;
namespace
Authentication
{
const
std
::
string
Native41
=
"mysql_native_password"
;
}
enum
CharacterSet
{
UTF8
=
33
};
enum
StatusFlags
{
SERVER_SESSION_STATE_CHANGED
=
0x4000
};
enum
Capability
{
CLIENT_CONNECT_WITH_DB
=
0x00000008
,
CLIENT_PROTOCOL_41
=
0x00000200
,
CLIENT_TRANSACTIONS
=
0x00002000
,
// TODO
CLIENT_SESSION_TRACK
=
0x00800000
,
// TODO
CLIENT_SECURE_CONNECTION
=
0x00008000
,
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
=
0x00200000
,
CLIENT_PLUGIN_AUTH
=
0x00080000
,
CLIENT_DEPRECATE_EOF
=
0x01000000
,
};
enum
Command
{
COM_SLEEP
=
0x0
,
COM_QUIT
=
0x1
,
COM_INIT_DB
=
0x2
,
COM_QUERY
=
0x3
,
COM_FIELD_LIST
=
0x4
,
COM_CREATE_DB
=
0x5
,
COM_DROP_DB
=
0x6
,
COM_REFRESH
=
0x7
,
COM_SHUTDOWN
=
0x8
,
COM_STATISTICS
=
0x9
,
COM_PROCESS_INFO
=
0xa
,
COM_CONNECT
=
0xb
,
COM_PROCESS_KILL
=
0xc
,
COM_DEBUG
=
0xd
,
COM_PING
=
0xe
,
COM_TIME
=
0xf
,
COM_DELAYED_INSERT
=
0x10
,
COM_CHANGE_USER
=
0x11
,
COM_RESET_CONNECTION
=
0x1f
,
COM_DAEMON
=
0x1d
};
enum
ColumnType
{
MYSQL_TYPE_DECIMAL
=
0x00
,
MYSQL_TYPE_TINY
=
0x01
,
MYSQL_TYPE_SHORT
=
0x02
,
MYSQL_TYPE_LONG
=
0x03
,
MYSQL_TYPE_FLOAT
=
0x04
,
MYSQL_TYPE_DOUBLE
=
0x05
,
MYSQL_TYPE_NULL
=
0x06
,
MYSQL_TYPE_TIMESTAMP
=
0x07
,
MYSQL_TYPE_LONGLONG
=
0x08
,
MYSQL_TYPE_INT24
=
0x09
,
MYSQL_TYPE_DATE
=
0x0a
,
MYSQL_TYPE_TIME
=
0x0b
,
MYSQL_TYPE_DATETIME
=
0x0c
,
MYSQL_TYPE_YEAR
=
0x0d
,
MYSQL_TYPE_VARCHAR
=
0x0f
,
MYSQL_TYPE_BIT
=
0x10
,
MYSQL_TYPE_NEWDECIMAL
=
0xf6
,
MYSQL_TYPE_ENUM
=
0xf7
,
MYSQL_TYPE_SET
=
0xf8
,
MYSQL_TYPE_TINY_BLOB
=
0xf9
,
MYSQL_TYPE_MEDIUM_BLOB
=
0xfa
,
MYSQL_TYPE_LONG_BLOB
=
0xfb
,
MYSQL_TYPE_BLOB
=
0xfc
,
MYSQL_TYPE_VAR_STRING
=
0xfd
,
MYSQL_TYPE_STRING
=
0xfe
,
MYSQL_TYPE_GEOMETRY
=
0xff
};
uint64_t
readLenenc
(
std
::
istringstream
&
ss
);
std
::
string
writeLenenc
(
uint64_t
x
);
void
writeLenencStr
(
std
::
string
&
payload
,
const
std
::
string
&
s
);
class
ProtocolError
:
public
DB
::
Exception
{
public:
using
Exception
::
Exception
;
};
/// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html
class
Handshake
{
int
protocol_version
=
0xa
;
std
::
string
server_version
;
uint32_t
connection_id
;
uint32_t
capability_flags
;
uint8_t
character_set
;
uint32_t
status_flags
;
std
::
string
auth_plugin_data
;
public:
explicit
Handshake
(
uint32_t
connection_id
,
std
::
string
server_version
)
:
protocol_version
(
0xa
)
,
server_version
(
std
::
move
(
server_version
))
,
connection_id
(
connection_id
)
,
capability_flags
(
CLIENT_PROTOCOL_41
|
CLIENT_SECURE_CONNECTION
|
CLIENT_PLUGIN_AUTH
|
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
|
CLIENT_CONNECT_WITH_DB
|
CLIENT_DEPRECATE_EOF
)
,
character_set
(
63
)
,
status_flags
(
0
)
{
auth_plugin_data
.
resize
(
SCRAMBLE_LENGTH
);
auto
seed
=
std
::
chrono
::
system_clock
::
now
().
time_since_epoch
().
count
();
std
::
default_random_engine
generator
(
static_cast
<
unsigned
int
>
(
seed
));
std
::
uniform_int_distribution
<
char
>
distribution
(
0
);
for
(
size_t
i
=
0
;
i
<
SCRAMBLE_LENGTH
;
i
++
)
{
auth_plugin_data
[
i
]
=
distribution
(
generator
);
}
}
std
::
string
getPayload
()
{
std
::
string
result
;
result
.
append
(
1
,
protocol_version
);
result
.
append
(
server_version
);
result
.
append
(
1
,
0x0
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
connection_id
),
4
);
result
.
append
(
auth_plugin_data
.
substr
(
0
,
AUTH_PLUGIN_DATA_PART_1_LENGTH
));
result
.
append
(
1
,
0x0
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
capability_flags
),
2
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
character_set
),
1
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
status_flags
),
2
);
result
.
append
((
reinterpret_cast
<
const
char
*>
(
&
capability_flags
))
+
2
,
2
);
result
.
append
(
1
,
SCRAMBLE_LENGTH
+
1
);
result
.
append
(
1
,
0x0
);
result
.
append
(
10
,
0x0
);
result
.
append
(
auth_plugin_data
.
substr
(
AUTH_PLUGIN_DATA_PART_1_LENGTH
,
SCRAMBLE_LENGTH
-
AUTH_PLUGIN_DATA_PART_1_LENGTH
));
result
.
append
(
Authentication
::
Native41
);
result
.
append
(
1
,
0x0
);
return
result
;
}
};
class
HandshakeResponse
{
public:
uint32_t
capability_flags
;
uint32_t
max_packet_size
;
uint8_t
character_set
;
std
::
string
username
;
std
::
string
auth_response
;
std
::
string
database
;
std
::
string
auth_plugin_name
;
void
readPayload
(
const
std
::
string
&
s
)
{
std
::
istringstream
ss
(
s
);
ss
.
readsome
(
reinterpret_cast
<
char
*>
(
&
capability_flags
),
4
);
ss
.
readsome
(
reinterpret_cast
<
char
*>
(
&
max_packet_size
),
4
);
ss
.
readsome
(
reinterpret_cast
<
char
*>
(
&
character_set
),
1
);
ss
.
ignore
(
23
);
std
::
getline
(
ss
,
username
,
static_cast
<
char
>
(
0x0
));
if
(
capability_flags
&
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
)
{
auto
len
=
readLenenc
(
ss
);
auth_response
.
resize
(
len
);
ss
.
read
(
auth_response
.
data
(),
static_cast
<
std
::
streamsize
>
(
len
));
}
else
if
(
capability_flags
&
CLIENT_SECURE_CONNECTION
)
{
uint8_t
len
;
ss
.
read
(
reinterpret_cast
<
char
*>
(
&
len
),
1
);
auth_response
.
resize
(
len
);
ss
.
read
(
auth_response
.
data
(),
len
);
}
else
{
std
::
getline
(
ss
,
auth_response
,
static_cast
<
char
>
(
0x0
));
}
if
(
capability_flags
&
CLIENT_CONNECT_WITH_DB
)
{
std
::
getline
(
ss
,
database
,
static_cast
<
char
>
(
0x0
));
}
if
(
capability_flags
&
CLIENT_PLUGIN_AUTH
)
{
std
::
getline
(
ss
,
auth_plugin_name
,
static_cast
<
char
>
(
0x0
));
}
}
};
class
OK_Packet
{
uint8_t
header
;
uint32_t
capabilities
;
uint64_t
affected_rows
;
uint64_t
last_insert_id
;
int16_t
warnings
=
0
;
uint32_t
status_flags
;
std
::
string
info
;
std
::
string
session_state_changes
;
public:
OK_Packet
(
uint8_t
header
,
uint32_t
capabilities
,
uint64_t
affected_rows
,
uint64_t
last_insert_id
,
uint32_t
status_flags
,
int16_t
warnings
,
std
::
string
session_state_changes
)
:
header
(
header
)
,
capabilities
(
capabilities
)
,
affected_rows
(
affected_rows
)
,
last_insert_id
(
last_insert_id
)
,
warnings
(
warnings
)
,
status_flags
(
status_flags
)
,
session_state_changes
(
std
::
move
(
session_state_changes
))
{
}
std
::
string
getPayload
()
{
std
::
string
result
;
result
.
append
(
1
,
header
);
result
.
append
(
writeLenenc
(
affected_rows
));
result
.
append
(
writeLenenc
(
last_insert_id
));
if
(
capabilities
&
CLIENT_PROTOCOL_41
)
{
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
status_flags
),
2
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
warnings
),
2
);
}
else
if
(
capabilities
&
CLIENT_TRANSACTIONS
)
{
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
status_flags
),
2
);
}
if
(
capabilities
&
CLIENT_SESSION_TRACK
)
{
result
.
append
(
writeLenenc
(
info
.
length
()));
result
.
append
(
info
);
if
(
status_flags
&
SERVER_SESSION_STATE_CHANGED
)
{
result
.
append
(
writeLenenc
(
session_state_changes
.
length
()));
result
.
append
(
session_state_changes
);
}
}
else
{
result
.
append
(
info
);
}
return
result
;
}
};
class
EOF_Packet
{
int
warnings
;
int
status_flags
;
public:
EOF_Packet
(
int
warnings
,
int
status_flags
)
:
warnings
(
warnings
),
status_flags
(
status_flags
)
{}
std
::
string
getPayload
()
{
std
::
string
result
;
result
.
append
(
1
,
0xfe
);
// EOF header
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
warnings
),
2
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
status_flags
),
2
);
return
result
;
}
};
class
ERR_Packet
{
int
error_code
;
std
::
string
sql_state
;
std
::
string
error_message
;
public:
ERR_Packet
(
int
error_code
,
std
::
string
sql_state
,
std
::
string
error_message
)
:
error_code
(
error_code
)
,
sql_state
(
std
::
move
(
sql_state
))
,
error_message
(
std
::
move
(
error_message
))
{
}
std
::
string
getPayload
()
{
std
::
string
result
;
result
.
append
(
1
,
0xff
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
error_code
),
2
);
result
.
append
(
"#"
,
1
);
result
.
append
(
sql_state
.
data
(),
sql_state
.
length
());
result
.
append
(
error_message
.
data
(),
std
::
min
(
error_message
.
length
(),
MYSQL_ERRMSG_SIZE
));
return
result
;
}
};
class
ColumnDefinition
{
std
::
string
schema
;
std
::
string
table
;
std
::
string
org_table
;
std
::
string
name
;
std
::
string
org_name
;
size_t
next_length
=
0x0c
;
uint16_t
character_set
;
uint32_t
column_length
;
ColumnType
column_type
;
uint16_t
flags
;
uint8_t
decimals
=
0x00
;
public:
explicit
ColumnDefinition
(
std
::
string
schema
,
std
::
string
table
,
std
::
string
org_table
,
std
::
string
name
,
std
::
string
org_name
,
uint16_t
character_set
,
uint32_t
column_length
,
ColumnType
column_type
,
uint16_t
flags
,
uint8_t
decimals
)
:
schema
(
std
::
move
(
schema
))
,
table
(
std
::
move
(
table
))
,
org_table
(
std
::
move
(
org_table
))
,
name
(
std
::
move
(
name
))
,
org_name
(
std
::
move
(
org_name
))
,
character_set
(
character_set
)
,
column_length
(
column_length
)
,
column_type
(
column_type
)
,
flags
(
flags
)
,
decimals
(
decimals
)
{
}
std
::
string
getPayload
()
{
std
::
string
result
;
writeLenencStr
(
result
,
"def"
);
// always "def"
writeLenencStr
(
result
,
schema
);
writeLenencStr
(
result
,
table
);
writeLenencStr
(
result
,
org_table
);
writeLenencStr
(
result
,
name
);
writeLenencStr
(
result
,
org_name
);
result
.
append
(
writeLenenc
(
next_length
));
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
character_set
),
2
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
column_length
),
4
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
column_type
),
1
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
flags
),
2
);
result
.
append
(
reinterpret_cast
<
const
char
*>
(
&
decimals
),
2
);
result
.
append
(
2
,
0x0
);
return
result
;
}
};
class
ComFieldList
{
public:
std
::
string
table
,
field_wildcard
;
void
readPayload
(
const
std
::
string
&
payload
)
{
std
::
istringstream
ss
(
payload
);
ss
.
ignore
(
1
);
// command byte
std
::
getline
(
ss
,
table
,
static_cast
<
char
>
(
0x0
));
field_wildcard
=
payload
.
substr
(
table
.
length
()
+
2
);
// rest of the packet
}
};
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录