提交 0dfcc0da 编写于 作者: O obdev 提交者: ob-robot

[CP] sql nio support ssl and open by default on master

上级 09028517
......@@ -1655,6 +1655,7 @@ static int easy_ssl_certificate_for_mysql_memory(easy_ssl_ctx_t *ssl, const char
easy_error_log("PEM_read_bio_RSAPrivateKey(\"%s\") failed", key);
return EASY_ERROR;
} else if (SSL_CTX_use_RSAPrivateKey(ssl->ctx, rsa) <= 0) {
RSA_free(rsa);
easy_error_log("SSL_CTX_use_RSAPrivateKey(\"%s\") failed", key);
return EASY_ERROR;
}
......
......@@ -187,6 +187,10 @@ ob_set_subtarget(oblib_lib ash
ash/ob_active_session_guard.cpp
)
ob_set_subtarget(oblib_lib ssl
ssl/ob_ssl_config.cpp
)
ob_lib_add_target(oblib_lib)
ob_set_subtarget(ob_malloc_object_list common_alloc
......
此差异已折叠。
#ifndef OB_SSL_CONFIG_H_
#define OB_SSL_CONFIG_H_
#include <stdint.h>
#include <openssl/ssl.h>
namespace oceanbase {
namespace common {
typedef struct ObSSLConfig {
ObSSLConfig(int is_from_file, int is_sm, const char* ca_cert, const char* sign_cert,
const char* sign_private_key, const char* enc_cert, const char* enc_private_key):
is_from_file_(is_from_file), is_sm_(is_sm), ca_cert_(ca_cert), sign_cert_(sign_cert),
sign_private_key_(sign_private_key), enc_cert_(enc_cert), enc_private_key_(enc_cert) {}
int is_from_file_;
int is_sm_;
const char* ca_cert_;
const char* sign_cert_;
const char* sign_private_key_;
const char* enc_cert_;
const char* enc_private_key_;
} ObSSLConfig;
enum OB_SSL_CTX_ID {
OB_SSL_CTX_ID_SQL_NIO,
OB_SSL_CTX_ID_MAX
};
enum OB_SSL_ROLE {
OB_SSL_ROLE_CLIENT,
OB_SSL_ROLE_SERVER,
OB_SSL_ROLE_MAX
};
int ob_ssl_load_config(int ctx_id, const ObSSLConfig& ssl_config);
int ob_fd_enable_ssl_for_server(int fd, int ctx_id);
int ob_fd_enable_ssl_for_client(int fd, int ctx_id);
void ob_fd_disable_ssl(int fd);
SSL* ob_fd_get_ssl_st(int fd);
ssize_t ob_read_regard_ssl(int fd, void *buf, size_t nbytes);
ssize_t ob_write_regard_ssl(int fd, const void *buf, size_t nbytes);
}
}
#endif
\ No newline at end of file
......@@ -18,6 +18,8 @@
#include "rpc/frame/ob_req_transport.h"
#include "rpc/obrpc/ob_listener.h"
#include "rpc/obrpc/ob_net_keepalive.h"
#include "lib/ssl/ob_ssl_config.h"
using namespace oceanbase::obrpc;
namespace oceanbase
{
......
......@@ -63,6 +63,19 @@ int ObMysqlProtocolProcessor::do_decode(ObSMConnection& conn, ObICSMemPool& pool
} else {
ObMySQLCapabilityFlags capability;
capability.capability_ = uint2korr(start);
/*
connection_phase_ state transition
(1)use ssl:
1.when tcp connection establised, the state is initialized as CPE_CONNECTED
2.if client decide to open ssl, after decode the first incomplete
login request packet, the state is changed to CPE_SSL_CONNECT, and the packet will be droped (not processed by processor)
3.after ssl handshake finished, after decode the complete login request
packet, the state changed to CPE_CONNECTED and deliver the packet to processor(ObMPConnect)
4.when complete the authentication operations, the state is changed to CPE_AUTHED
CPE_CONNECTED -> CPE_SSL_CONNECT -> CPE_CONNECTED -> CPE_AUTHED
(2)do not use ssl
CPE_CONNECTED -> CPE_AUTHED
*/
if (conn.is_in_connected_phase()) {
if (1 == capability.cap_flags_.OB_CLIENT_SSL) {
if (OB_FAIL(decode_sslr_body(pool, start, pktlen, pktseq, pkt))) {
......
......@@ -29,8 +29,8 @@ void* ObPocSqlRequestOperator::get_sql_session(ObRequest* req)
SSL* ObPocSqlRequestOperator::get_sql_ssl_st(ObRequest* req)
{
UNUSED(req);
SSL *ssl_st = NULL;
ObSqlSockSession* sess = (ObSqlSockSession*)req->get_server_handle_context();
SSL *ssl_st = sess->get_ssl_st();
return ssl_st;
}
......
......@@ -220,9 +220,22 @@ private:
}
int do_read_fd(int64_t sz) {
int ret = OB_SUCCESS;
const int MAX_SSL_REQ_PKT_SIZE = 36;
while(remain() < sz && OB_SUCCESS == ret) {
int64_t rbytes = 0;
if ((rbytes = read(fd_, data_end_, buf_end_ - data_end_)) > 0) {
size_t read_size = 0;
if (OB_UNLIKELY(0 == consume_sz_)) {
/*
set read size for ssl, when client want to open ssl, it will send a 36 bytes
incomplete Login Request packet and then do SSL_connect, the data flow will be
like this |Login Request (36 bytes)|SSL handshake message|.To avoid read the SSL
handshake message by us, we read 36 bytes for the first packet.
*/
read_size = MAX_SSL_REQ_PKT_SIZE;
} else {
read_size = buf_end_ - data_end_;
}
if ((rbytes = ob_read_regard_ssl(fd_, data_end_, read_size)) > 0) {
data_end_ += rbytes;
} else if (0 == rbytes) {
LOG_INFO("read fd return EOF", K_(fd));
......@@ -292,7 +305,7 @@ private:
int64_t pos = 0;
while(pos < sz && OB_SUCCESS == ret) {
int64_t wbytes = 0;
if ((wbytes = write(fd, buf + pos, sz - pos)) >= 0) {
if ((wbytes = ob_write_regard_ssl(fd, buf + pos, sz - pos)) >= 0) {
pos += wbytes;
} else if (EAGAIN == errno || EWOULDBLOCK == errno) {
LOG_INFO("write return EAGAIN");
......@@ -331,6 +344,7 @@ public:
void do_close() {
if (fd_ >= 0) {
ob_fd_disable_ssl(fd_);
close(fd_);
read_buffer_.set_fd(-1);
fd_ = -1;
......@@ -368,7 +382,7 @@ public:
int64_t pos = 0;
while(pos < sz && OB_SUCCESS == ret) {
int64_t wbytes = 0;
if ((wbytes = write(fd_, buf + pos, sz - pos)) >= 0) {
if ((wbytes = ob_write_regard_ssl(fd_, buf + pos, sz - pos)) >= 0) {
pos += wbytes;
LOG_DEBUG("write fd", K(wbytes));
} else if (EAGAIN == errno || EWOULDBLOCK == errno) {
......@@ -384,6 +398,7 @@ public:
last_write_time_ = ObTimeUtility::current_time();
return ret;
}
const rpc::TraceId* get_trace_id() const {
ObSqlSockSession* sess = (ObSqlSockSession *)sess_;
return &(sess->sql_req_.get_trace_id());
......@@ -416,6 +431,8 @@ public:
void set_shutdown() { ATOMIC_STORE(&need_shutdown_, true); }
bool need_shutdown() const { return ATOMIC_LOAD(&need_shutdown_); }
void shutdown() { ::shutdown(fd_, SHUT_RD); }
int set_ssl_enabled();
SSL* get_ssl_st();
public:
ObDLink dlink_;
ObDLink all_list_link_;
......@@ -439,6 +456,20 @@ public:
char sess_[3000] __attribute__((aligned(16)));
};
int ObSqlSock::set_ssl_enabled()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ob_fd_enable_ssl_for_server(fd_, OB_SSL_CTX_ID_SQL_NIO))) {
LOG_WARN("sqlnio enable ssl for server failed", K(ret), K(fd_));
}
return ret;
}
SSL* ObSqlSock::get_ssl_st()
{
return ob_fd_get_ssl_st(fd_);
}
static struct epoll_event *__make_epoll_event(struct epoll_event *event, uint32_t event_flag, void* val) {
event->events = event_flag;
event->data.ptr = val;
......@@ -942,5 +973,16 @@ void ObSqlNio::async_write_data(void* sess, const char* buf, int64_t sz)
sock->get_nio_impl().push_write_req(sock);
}
int ObSqlNio::set_ssl_enabled(void* sess)
{
ObSqlSock* sock = sess2sock(sess);
return sock->set_ssl_enabled();
}
SSL* ObSqlNio::get_ssl_st(void* sess)
{
ObSqlSock* sock = sess2sock(sess);
return sock->get_ssl_st();
}
}; // end namespace obmysql
}; // end namespace oceanbase
......@@ -15,6 +15,7 @@
#include <pthread.h>
#include <stdint.h>
#include "lib/thread/threads.h"
#include "lib/ssl/ob_ssl_config.h"
namespace oceanbase
{
......@@ -43,6 +44,8 @@ public:
void set_sql_session_info(void* sess, void* sql_session);
void set_shutdown(void* sess);
void shutdown(void* sess);
int set_ssl_enabled(void* sess);
SSL* get_ssl_st(void* sess);
private:
void run(int64_t idx);
private:
......
......@@ -11,10 +11,9 @@
*/
#define USING_LOG_PREFIX RPC_OBMYSQL
#include "rpc/frame/ob_req_deliver.h"
#include "rpc/obmysql/ob_sql_sock_handler.h"
#include "rpc/frame/ob_req_deliver.h"
#include "rpc/obmysql/ob_sql_sock_processor.h"
#include "rpc/obmysql/ob_sql_sock_session.h"
namespace oceanbase
{
......@@ -149,6 +148,5 @@ int ObSqlSockHandler::on_readable(void* udata)
return ret;
}
}; // end namespace obmysql
}; // end namespace oceanbase
......@@ -14,7 +14,7 @@
#define OCEANBASE_OBMYSQL_OB_SQL_SOCK_HANDLER_H_
#include "rpc/obmysql/ob_i_sm_conn_callback.h"
#include "rpc/obmysql/ob_i_sql_sock_handler.h"
#include "rpc/obmysql/ob_sql_sock_processor.h"
#include "rpc/obmysql/ob_sql_sock_session.h"
namespace oceanbase
{
......@@ -23,6 +23,7 @@ namespace frame { class ObReqDeliver;};
};
namespace obmysql
{
class ObSqlSockProcessor;
class ObSqlNio;
class ObSqlSockHandler: public ObISqlSockHandler
{
......
......@@ -60,6 +60,13 @@ int ObSqlSockProcessor::decode_sql_packet(ObSqlSockSession& sess, rpc::ObPacket*
LOG_WARN("do_decode fail", K(ret));
} else if (NULL == pkt) {
// try read more
} else if (conn.is_in_ssl_connect_phase()) {
ret_pkt = NULL;
sess.set_last_pkt_sz(consume_sz);
if (OB_FAIL(sess.set_ssl_enabled())) {
LOG_WARN("sql nio enable ssl for server failed", K(ret));
}
break;
} else if (!conn.is_in_authed_phase()) {
ret_pkt = pkt;
sess.set_last_pkt_sz(consume_sz);
......
......@@ -161,5 +161,15 @@ void ObSqlSockSession::set_sql_session_info(void* sess)
nio_.set_sql_session_info((void *)this, sess);
}
int ObSqlSockSession::set_ssl_enabled()
{
return nio_.set_ssl_enabled((void *)this);
}
SSL* ObSqlSockSession::get_ssl_st()
{
return nio_.get_ssl_st((void *)this);
}
}; // end namespace obmysql
}; // end namespace oceanbase
......@@ -59,6 +59,8 @@ public:
int on_disconnect();
void clear_sql_session_info();
void set_sql_session_info(void* sess);
int set_ssl_enabled();
SSL* get_ssl_st();
ObSqlNio& nio_;
ObISMConnectionCallback& sm_conn_cb_;
rpc::ObRequest sql_req_;
......
......@@ -92,14 +92,15 @@ static int sm_conn_build_handshake(ObSMConnection& conn, obmysql::OMPKHandshake&
int ret = OB_SUCCESS;
RLOCAL(common::ObMysqlRandom, thread_scramble_rand);
hsp.set_thread_id(conn.sessid_);
hsp.set_ssl_cap(false);
const bool support_ssl = GCONF.ssl_client_authentication;
hsp.set_ssl_cap(support_ssl);
const int64_t BUF_LEN = sizeof(conn.scramble_buf_);
if (OB_FAIL(create_scramble_string(conn.scramble_buf_, BUF_LEN, thread_scramble_rand))) {
LOG_WARN("create scramble string failed", K(ret));
} else if (OB_FAIL(hsp.set_scramble(conn.scramble_buf_, BUF_LEN))) {
LOG_WARN("set scramble failed", K(ret));
} else {
LOG_INFO("new mysql sessid created", K(conn.sessid_));
LOG_INFO("new mysql sessid created", K(conn.sessid_), K(support_ssl));
}
return ret;
}
......
......@@ -21,6 +21,7 @@
#include "observer/ob_server_struct.h"
#include "observer/ob_rpc_intrusion_detect.h"
#include "storage/ob_locality_manager.h"
#include "lib/ssl/ob_ssl_config.h"
#include <sys/types.h>
#include <sys/stat.h>
#include "storage/ob_locality_manager.h"
......@@ -366,6 +367,16 @@ int ObSrvNetworkFrame::reload_ssl_config()
last_ssl_info_hash_ = new_hash_value;
LOG_INFO("finish reload_ssl_config", K(use_bkmi), K(use_bkmi), K(use_sm),
"ssl_key_expired_time", GCTX.ssl_key_expired_time_, K(new_hash_value));
if (OB_SUCC(ret)) {
if (enable_new_sql_nio()) {
common::ObSSLConfig ssl_config(!use_bkmi, use_sm, ca_cert, public_cert, private_key, NULL, NULL);
if (OB_FAIL(ob_ssl_load_config(OB_SSL_CTX_ID_SQL_NIO, ssl_config))) {
LOG_WARN("create ssl ctx failed!", K(ret));
} else {
LOG_INFO("create ssl ctx success!", K(use_bkmi), K(use_sm));
}
}
}
}
}
}
......
......@@ -1253,7 +1253,7 @@ DEF_TIME(ob_query_switch_leader_retry_timeout, OB_TENANT_PARAMETER, "0ms", "[0ms
DEF_BOOL(default_enable_extended_rowid, OB_TENANT_PARAMETER, "false",
"specifies whether to create table as extended rowid mode or not",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_new_sql_nio, OB_CLUSTER_PARAMETER, "false",
DEF_BOOL(_enable_new_sql_nio, OB_CLUSTER_PARAMETER, "true",
"specifies whether SQL serial network is turned on. Turned on to support mysql_send_long_data"
"The default value is FALSE. Value: TRUE: turned on FALSE: turned off",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
......
......@@ -37,6 +37,7 @@
#include "observer/omt/ob_tenant.h" //ObTenant
#include "rootserver/freeze/ob_major_freeze_helper.h" //ObMajorFreezeHelper
#include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService
#include "rpc/obmysql/ob_sql_sock_session.h"
namespace oceanbase
{
using namespace common;
......@@ -1573,13 +1574,8 @@ int ObChangeTenantExecutor::execute(ObExecContext &ctx, ObChangeTenantStmt &stmt
} else {
// switch connection
if (OB_SUCC(ret)) {
rpc::ObSqlSockDesc& sock_desc = session_info->get_sock_desc();
easy_connection_t* easy_conn = nullptr;
observer::ObSMConnection* conn = nullptr;
if (OB_ISNULL((easy_conn = static_cast<easy_connection_t*>(sock_desc.sock_desc_)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sock_desc is null", KR(ret), KPC(session_info));
} else if (OB_ISNULL(conn = static_cast<observer::ObSMConnection*>(easy_conn->user_data))) {
if (OB_ISNULL(conn = session_info->get_sm_connection())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("connection is null", KR(ret), KPC(session_info));
} else {
......
......@@ -36,6 +36,7 @@
#include "share/rc/ob_tenant_base.h"
#include "pl/sys_package/ob_dbms_sql.h"
#include "pl/ob_pl_package_state.h"
#include "rpc/obmysql/ob_sql_sock_session.h"
using namespace oceanbase::common;
using namespace oceanbase::share;
......@@ -5816,5 +5817,29 @@ void ObBasicSessionInfo::on_revert_session()
K(sessid_), "backtrace", lbt());
}
observer::ObSMConnection *ObBasicSessionInfo::get_sm_connection()
{
observer::ObSMConnection *conn = nullptr;
rpc::ObSqlSockDesc &sock_desc = thread_data_.sock_desc_;
if (rpc::ObRequest::TRANSPORT_PROTO_EASY == sock_desc.type_) {
easy_connection_t* easy_conn = nullptr;
if (OB_ISNULL((easy_conn = static_cast<easy_connection_t *>(sock_desc.sock_desc_)))) {
LOG_ERROR("easy sock_desc is null");
} else {
conn = static_cast<observer::ObSMConnection*>(easy_conn->user_data);
}
} else if (rpc::ObRequest::TRANSPORT_PROTO_POC == sock_desc.type_) {
obmysql::ObSqlSockSession *sess = nullptr;
if (OB_ISNULL(sess = static_cast<obmysql::ObSqlSockSession *>(sock_desc.sock_desc_))) {
LOG_ERROR("sql nio sock_desc is null");
} else {
conn = &sess->conn_;
}
}
else {
LOG_ERROR("invalid sock_desc type", K(sock_desc.type_));
}
return conn;
}
}//end of namespace sql
}//end of namespace oceanbase
......@@ -47,6 +47,9 @@
namespace oceanbase
{
namespace observer {
class ObSMConnection;
}
using sql::FLTControlInfo;
namespace sql
{
......@@ -683,7 +686,7 @@ public:
const common::ObString &get_user_at_host() const { return thread_data_.user_at_host_name_;}
const common::ObString &get_user_at_client_ip() const { return thread_data_.user_at_client_ip_;}
rpc::ObSqlSockDesc& get_sock_desc() { return thread_data_.sock_desc_;}
observer::ObSMConnection *get_sm_connection();
void set_peer_addr(common::ObAddr peer_addr)
{
LockGuard lock_guard(thread_data_mutex_);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册