提交 b66dcf06 编写于 作者: O obdev 提交者: ob-robot

fix dblink connection pool memory leak

上级 f419c12e
......@@ -348,6 +348,7 @@ template<typename T>
int ObLruConnectionAllocator<T>::free_session_conn_array(uint32_t sessid, int64_t &fail_recycled_conn_count, int64_t &succ_recycled_conn_count)
{
int ret = OB_SUCCESS;
_OB_LOG(DEBUG, "free session conn array sessid=%u, ret=%d", sessid, ret);
ObSpinLockGuard guard(ObIConnectionAllocator<T>::lock_);
ObArray<T *> *conn_array = NULL;
fail_recycled_conn_count = 0;
......@@ -358,7 +359,7 @@ int ObLruConnectionAllocator<T>::free_session_conn_array(uint32_t sessid, int64_
T *conn = NULL;
int64_t array_size = conn_array->size();
int64_t succ_count = 0;
_OB_LOG(DEBUG, "try to free conn_array, sessid=%u, count=%ld", sessid, array_size);
_OB_LOG(DEBUG, "try to free conn_array, conn_array=%p, count=%ld", conn_array, array_size);
while (OB_SUCC(ret) && OB_SUCCESS == conn_array->pop_back(conn)) {
conn->close(); //close immedately
if (OB_FAIL(free_conn_array_.push_back(conn))) {
......@@ -369,10 +370,12 @@ int ObLruConnectionAllocator<T>::free_session_conn_array(uint32_t sessid, int64_
++succ_count;
}
}
_OB_LOG(DEBUG, "free session conn array succ_count=%ld, ret=%d", succ_count, ret);
succ_recycled_conn_count = succ_count;
if (OB_FAIL(ret)) {
fail_recycled_conn_count = array_size - succ_count;
while (OB_SUCCESS == conn_array->pop_back(conn)) {
conn->close(); //close immedately
ObLruConnectionAllocator<T>::free(conn);
}
if (OB_SUCCESS != sessionid_to_conns_map_.erase_refactored(sessid)) {
......
......@@ -77,6 +77,7 @@ public:
is_init_remote_env_(false),
dblink_id_(OB_INVALID_ID),
dblink_driver_proto_(-1),
sessid_(-1),
has_reverse_link_credentials_(false),
usable_(true)
{}
......@@ -117,6 +118,8 @@ public:
virtual ObCommonServerConnectionPool *get_common_server_pool() = 0;
void set_dblink_id(uint64_t dblink_id) { dblink_id_ = dblink_id; }
uint64_t get_dblink_id() { return dblink_id_; }
void set_sessid(uint32_t sessid) { sessid_ = sessid; }
uint32_t get_sessid() { return sessid_; }
void set_dblink_driver_proto(int64_t dblink_driver_proto) { dblink_driver_proto_ = dblink_driver_proto; }
int64_t get_dblink_driver_proto() { return dblink_driver_proto_; }
......@@ -136,11 +139,13 @@ public:
bool get_reverse_link_creadentials() { return has_reverse_link_credentials_; }
void set_usable(bool flag) { usable_ = flag; }
bool usable() { return usable_; }
virtual int ping() { return OB_SUCCESS; }
protected:
bool oracle_mode_;
bool is_init_remote_env_; // for dblink, we have to init remote env with some sql
uint64_t dblink_id_; // for dblink, record dblink_id of a connection used by dblink
int64_t dblink_driver_proto_; //for dblink, record DblinkDriverProto of a connection used by dblink
uint32_t sessid_;
bool has_reverse_link_credentials_; // for dblink, mark if this link has credentials set
bool usable_; // usable_ = false: connection is unusable, should not execute query again.
};
......
......@@ -159,7 +159,7 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d
MYSQL *mysql = mysql_real_connect(&mysql_, host, user, pass, db, port, NULL, 0);
if (OB_ISNULL(mysql)) {
ret = -mysql_errno(&mysql_);
LOG_WARN("fail to connect to mysql server", KCSTRING(host), KCSTRING(user), K(port),
LOG_WARN("fail to connect to mysql server", K(get_sessid()), KCSTRING(host), KCSTRING(user), K(port),
"info", mysql_error(&mysql_), K(ret));
} else {
/*Note: mysql_real_connect() incorrectly reset the MYSQL_OPT_RECONNECT option
......@@ -229,7 +229,7 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d
MYSQL *mysql = mysql_real_connect(&mysql_, host, user, pass, db, port, NULL, 0);
if (OB_ISNULL(mysql)) {
ret = -mysql_errno(&mysql_);
LOG_WARN("fail to connect to mysql server", KCSTRING(host), KCSTRING(user), K(port),
LOG_WARN("fail to connect to mysql server", K(get_sessid()), KCSTRING(host), KCSTRING(user), K(port),
"info", mysql_error(&mysql_), K(ret));
} else {
/*Note: mysql_real_connect() incorrectly reset the MYSQL_OPT_RECONNECT option
......@@ -256,6 +256,7 @@ void ObMySQLConnection::close()
if (!closed_) {
mysql_close(&mysql_);
closed_ = true;
sessid_ = 0;
memset(&mysql_, 0, sizeof(MYSQL));
set_init_remote_env(false);
}
......
......@@ -92,7 +92,7 @@ public:
virtual int set_session_variable(const ObString &name, int64_t val) override;
int set_session_variable(const ObString &name, const ObString &val);
int ping();
virtual int ping() override;
int set_trace_id();
void set_timeout(const int64_t timeout);
virtual int set_timeout_variable(const int64_t query_timeout, const int64_t trx_timeout);
......
......@@ -880,6 +880,10 @@ int ObMySQLConnectionPool::try_connect_dblink(ObISQLConnection *dblink_conn, int
DEFAULT_TRANSACTION_TIMEOUT_US))) {
LOG_WARN("fail to set mysql timeout variablse", K(ret));
}
} else if (OB_SUCCESS != dblink_conn1->ping()) {
ret = OB_ERR_UNEXPECTED;
dblink_conn1->close();
LOG_WARN("connection status is invalid", K(sql_request_level), KP(dblink_conn1), K(ret));
}
return ret;
}
......
......@@ -103,7 +103,7 @@ int ObMySQLStatement::execute_update(int64_t &affected_rows)
if (is_need_disconnect_error(ret)) {
conn_->set_usable(false);
}
LOG_WARN("fail to query server","server", stmt_->host, "port", stmt_->port,
LOG_WARN("fail to query server", "sessid", conn_->get_sessid(), "server", stmt_->host, "port", stmt_->port,
"err_msg", mysql_error(stmt_), K(tmp_ret), K(ret), K(sql_str_));
if (OB_NOT_MASTER == tmp_ret) {
// conn -> server pool -> connection pool
......@@ -140,10 +140,10 @@ ObMySQLResult *ObMySQLStatement::execute_query()
}
const int ER_LOCK_WAIT_TIMEOUT = -1205;
if (ER_LOCK_WAIT_TIMEOUT == ret) {
LOG_INFO("fail to query server", "host", stmt_->host, "port", stmt_->port,
LOG_INFO("fail to query server", "sessid", conn_->get_sessid(), "host", stmt_->host, "port", stmt_->port,
"err_msg", mysql_error(stmt_), K(ret), K(sql_str_));
} else {
LOG_WARN("fail to query server", "host", stmt_->host, "port", stmt_->port,
LOG_WARN("fail to query server", "host", stmt_->host, "port", stmt_->port, K(conn_->get_sessid()),
"err_msg", mysql_error(stmt_), K(ret), K(STRLEN(sql_str_)), K(sql_str_));
}
if (OB_SUCCESS == ret) {
......
......@@ -85,6 +85,7 @@ int ObServerConnectionPool::acquire(ObMySQLConnection *&conn, uint32_t sessid)
}
if (OB_SUCC(ret)) {
conn = connection;
conn->set_sessid(sessid);
if (conn->connection_version() != connection_version_) {
conn->set_connection_version(connection_version_);
conn->close();
......
此差异已折叠。
......@@ -753,6 +753,7 @@ DEFINE_ERROR(OB_SQL_OPT_JOIN_ORDER_FAILED, -5121, -1, "HY000", "fail to generat
DEFINE_ERROR(OB_SQL_OPT_ERROR, -5122, -1, "HY000", "optimizer general error");
DEFINE_ORACLE_ERROR(OB_ERR_OCI_INIT_TIMEZONE, -5123, -1, "HY000", "failure to initialize timezone information", 1804, "failure to initialize timezone information");
DEFINE_ERROR(OB_ERR_ZLIB_DATA, -5124, ER_ZLIB_Z_DATA_ERROR, "HY000", "ZLIB: Input data corrupted");
DEFINE_ORACLE_ERROR(OB_ERR_DBLINK_SESSION_KILLED, -5125, -1, "HY000", "your session has been killed", 28, "your session has been killed");
DEFINE_ERROR(OB_SQL_RESOLVER_NO_MEMORY, -5130, -1, "HY000", "sql resolver no memory");
DEFINE_ERROR(OB_SQL_DML_ONLY, -5131, -1, "HY000", "plan cache support dml only");
DEFINE_ERROR(OB_ERR_NO_GRANT, -5133, -1, "42000", "No such grant defined");
......
......@@ -536,6 +536,7 @@ constexpr int OB_SQL_OPT_JOIN_ORDER_FAILED = -5121;
constexpr int OB_SQL_OPT_ERROR = -5122;
constexpr int OB_ERR_OCI_INIT_TIMEZONE = -5123;
constexpr int OB_ERR_ZLIB_DATA = -5124;
constexpr int OB_ERR_DBLINK_SESSION_KILLED = -5125;
constexpr int OB_SQL_RESOLVER_NO_MEMORY = -5130;
constexpr int OB_SQL_DML_ONLY = -5131;
constexpr int OB_ERR_NO_GRANT = -5133;
......@@ -2349,6 +2350,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_SQL_OPT_ERROR__USER_ERROR_MSG "optimizer general error"
#define OB_ERR_OCI_INIT_TIMEZONE__USER_ERROR_MSG "failure to initialize timezone information"
#define OB_ERR_ZLIB_DATA__USER_ERROR_MSG "ZLIB: Input data corrupted"
#define OB_ERR_DBLINK_SESSION_KILLED__USER_ERROR_MSG "your session has been killed"
#define OB_SQL_RESOLVER_NO_MEMORY__USER_ERROR_MSG "sql resolver no memory"
#define OB_SQL_DML_ONLY__USER_ERROR_MSG "plan cache support dml only"
#define OB_ERR_NO_GRANT__USER_ERROR_MSG "No such grant defined"
......@@ -4380,6 +4382,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_SQL_OPT_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5122, optimizer general error"
#define OB_ERR_OCI_INIT_TIMEZONE__ORA_USER_ERROR_MSG "ORA-01804: failure to initialize timezone information"
#define OB_ERR_ZLIB_DATA__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5124, ZLIB: Input data corrupted"
#define OB_ERR_DBLINK_SESSION_KILLED__ORA_USER_ERROR_MSG "ORA-00028: your session has been killed"
#define OB_SQL_RESOLVER_NO_MEMORY__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5130, sql resolver no memory"
#define OB_SQL_DML_ONLY__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5131, plan cache support dml only"
#define OB_ERR_NO_GRANT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5133, No such grant defined"
......@@ -5760,7 +5763,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
extern int g_all_ob_errnos[2027];
extern int g_all_ob_errnos[2028];
const char *ob_error_name(const int oberr);
const char* ob_error_cause(const int oberr);
......
......@@ -8502,73 +8502,111 @@ int ObSchemaServiceSQLImpl::fetch_link_table_info(uint64_t tenant_id,
// dblink will convert the result to AL32UTF8 when pulling schema meta
LOG_WARN("failed to set expected charset id", K(ret));
} else {
T tmp_table_schema;
table_schema = NULL;
ObObjMeta type;
int64_t column_count = result->get_column_count();
tmp_table_schema.set_tenant_id(tenant_id);
tmp_table_schema.set_table_id(1); //no use
tmp_table_schema.set_dblink_id(dblink_id);
tmp_table_schema.set_collation_type(CS_TYPE_UTF8MB4_BIN);
tmp_table_schema.set_charset_type(ObCharset::charset_type_by_coll(tmp_table_schema.get_collation_type()));
if (OB_FAIL(tmp_table_schema.set_table_name(table_name))) {
LOG_WARN("set table name failed", K(ret), K(table_name));
} else if (OB_FAIL(tmp_table_schema.set_link_database_name(database_name))) {
LOG_WARN("set database name failed", K(ret), K(database_name));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_count; ++i) {
ObColumnSchemaV2 column_schema;
int16_t precision = 0;
int16_t scale = 0;
int32_t length = 0;
ObString column_name;
bool old_max_length = false;
if (OB_FAIL(result->get_col_meta(i, old_max_length, column_name, type, precision, scale, length))) {
LOG_WARN("failed to get column meta", K(i), K(old_max_length), K(ret));
} else if (OB_FAIL(column_schema.set_column_name(column_name))) {
LOG_WARN("failed to set column name", K(i), K(column_name), K(ret));
} else {
column_schema.set_table_id(tmp_table_schema.get_table_id());
column_schema.set_tenant_id(tenant_id);
column_schema.set_column_id(i + OB_END_RESERVED_COLUMN_ID_NUM);
column_schema.set_meta_type(type);
column_schema.set_charset_type(ObCharset::charset_type_by_coll(column_schema.get_collation_type()));
column_schema.set_data_precision(precision);
column_schema.set_data_scale(scale);
column_schema.set_data_length(length);
if (OB_SUCC(ret) &&
next_sql_req_level == 1 &&
(ObNCharType == column_schema.get_data_type() || ObNVarchar2Type == column_schema.get_data_type())) {
if (DBLINK_DRV_OB == link_type &&
sql::DblinkGetConnType::TEMP_CONN != conn_type &&
OB_FAIL(fetch_desc_table(dblink_id,
link_type,
database_name,
table_name,
param_ctx,
session_info,
alloctor,
i,
length))) {
LOG_WARN("failed to fetch desc table", K(ret));
} else {
column_schema.set_data_length(length);
const char * desc_sql_str_fmt = "/*$BEFPARSEdblink_req_level=1*/ desc \"%.*s\".\"%.*s\"";
ObSqlString desc_sql;
ObMySQLResult *desc_result = NULL;
bool need_desc = (next_sql_req_level == 1) && DBLINK_DRV_OB == link_type && sql::DblinkGetConnType::TEMP_CONN != conn_type;
int64_t desc_res_row_idx = -1;
SMART_VAR(ObMySQLProxy::MySQLResult, desc_res) {
T tmp_table_schema;
table_schema = NULL;
ObObjMeta type;
int64_t column_count = result->get_column_count();
tmp_table_schema.set_tenant_id(tenant_id);
tmp_table_schema.set_table_id(1); //no use
tmp_table_schema.set_dblink_id(dblink_id);
tmp_table_schema.set_collation_type(CS_TYPE_UTF8MB4_BIN);
tmp_table_schema.set_charset_type(ObCharset::charset_type_by_coll(tmp_table_schema.get_collation_type()));
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(tmp_table_schema.set_table_name(table_name))) {
LOG_WARN("set table name failed", K(ret), K(table_name));
} else if (OB_FAIL(tmp_table_schema.set_link_database_name(database_name))) {
LOG_WARN("set database name failed", K(ret), K(database_name));
}
for (int64_t i = 0; OB_SUCC(ret) && i < column_count; ++i) {
ObColumnSchemaV2 column_schema;
int16_t precision = 0;
int16_t scale = 0;
int32_t length = 0;
ObString column_name;
bool old_max_length = false;
if (OB_FAIL(result->get_col_meta(i, old_max_length, column_name, type, precision, scale, length))) {
LOG_WARN("failed to get column meta", K(i), K(old_max_length), K(ret));
} else if (OB_FAIL(column_schema.set_column_name(column_name))) {
LOG_WARN("failed to set column name", K(i), K(column_name), K(ret));
} else {
column_schema.set_table_id(tmp_table_schema.get_table_id());
column_schema.set_tenant_id(tenant_id);
column_schema.set_column_id(i + OB_END_RESERVED_COLUMN_ID_NUM);
column_schema.set_meta_type(type);
column_schema.set_charset_type(ObCharset::charset_type_by_coll(column_schema.get_collation_type()));
column_schema.set_data_precision(precision);
column_schema.set_data_scale(scale);
if (need_desc && OB_ISNULL(desc_result) &&
(ObNCharType == column_schema.get_data_type() || ObNVarchar2Type == column_schema.get_data_type())) {
if (OB_FAIL(desc_sql.append_fmt(desc_sql_str_fmt, database_name.length(), database_name.ptr(),
table_name.length(), table_name.ptr()))) {
LOG_WARN("append desc sql failed", K(ret));
} else if (OB_FAIL(dblink_proxy_->dblink_read(dblink_conn, desc_res, desc_sql.ptr()))) {
ObDblinkUtils::process_dblink_errno(link_type, dblink_conn, ret);
LOG_WARN("read link failed", K(ret), K(dblink_id), K(desc_sql.ptr()));
} else if (OB_ISNULL(desc_result = desc_res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get result", K(ret));
} else if (OB_FAIL(desc_result->set_expected_charset_id(static_cast<uint16_t>(common::ObNlsCharsetId::CHARSET_AL32UTF8_ID),
static_cast<uint16_t>(common::ObNlsCharsetId::CHARSET_AL32UTF8_ID)))) {
// dblink will convert the result to AL32UTF8 when pulling schema meta
LOG_WARN("failed to set expected charset id", K(ret));
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(desc_result) && (ObNCharType == column_schema.get_data_type() || ObNVarchar2Type == column_schema.get_data_type())) {
while (OB_SUCC(ret) && desc_res_row_idx < i) {
if (OB_FAIL(desc_result->next())) {
LOG_WARN("failed to get next row", K(ret));
}
++desc_res_row_idx;
}
if (desc_res_row_idx == i && OB_SUCC(ret)) {
const ObTimeZoneInfo *tz_info = TZ_INFO(session_info);
ObObj value;
ObString string_value;
if (OB_ISNULL(tz_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tz info is NULL", K(ret));
} else if (OB_FAIL(desc_result->get_obj(1, value, tz_info, &alloctor))) {
LOG_WARN("failed to get obj", K(ret));
} else if (ObVarcharType != value.get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("type is invalid", K(value.get_type()), K(ret));
} else if (OB_FAIL(value.get_varchar(string_value))) {
LOG_WARN("failed to get varchar value", K(ret));
} else if (OB_FAIL(ObDblinkService::get_length_from_type_text(string_value, length))) {
LOG_WARN("failed to get length", K(ret));
} else {
LOG_DEBUG("desc table type string", K(string_value), K(length));
}
}
}
column_schema.set_data_length(length);
}
LOG_DEBUG("dblink column schema", K(i), K(column_schema.get_data_precision()),
K(column_schema.get_data_scale()),
K(column_schema.get_data_length()),
K(column_schema.get_data_type()));
if (OB_SUCC(ret) && OB_FAIL(tmp_table_schema.add_column(column_schema))) {
LOG_WARN("fail to add link column schema. ", K(i), K(column_schema), K(ret));
}
}
LOG_DEBUG("dblink column schema", K(i), K(column_schema.get_data_precision()),
K(column_schema.get_data_scale()),
K(column_schema.get_data_length()),
K(column_schema.get_data_type()));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tmp_table_schema.add_column(column_schema))) {
LOG_WARN("fail to add link column schema. ", K(i), K(column_schema), K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObSchemaUtils::alloc_schema(alloctor, tmp_table_schema, table_schema))) {
if (OB_SUCC(ret) && OB_FAIL(ObSchemaUtils::alloc_schema(alloctor, tmp_table_schema, table_schema))) {
LOG_WARN("failed to alloc table_schema", K(ret));
}
int tmp_ret = OB_SUCCESS;
if (OB_NOT_NULL(desc_result) && OB_SUCCESS != (tmp_ret = desc_result->close())) {
LOG_WARN("failed to close desc result", K(tmp_ret));
} else {
desc_result = NULL;
}
}
}
if (OB_FAIL(ret)) {
......
......@@ -84,9 +84,8 @@ int ObDblinkService::get_length_from_type_text(ObString &type_text, int32_t &len
return ret;
}
ObReverseLink::ObReverseLink(common::ObIAllocator &alloc)
: allocator_(alloc),
user_(),
ObReverseLink::ObReverseLink()
: user_(),
tenant_(),
cluster_(),
passwd_(),
......@@ -100,6 +99,7 @@ ObReverseLink::ObReverseLink(common::ObIAllocator &alloc)
ObReverseLink::~ObReverseLink()
{
allocator_.reset();
}
OB_DEF_SERIALIZE(ObReverseLink)
......@@ -381,7 +381,7 @@ int ObDblinkCtxInSession::register_dblink_conn_pool(common::sqlclient::ObCommonS
return ret;
}
// When the session is about to be destroyed, the session will release all connections
// When the session is about to be reset, the session will release all connections
// from ObServerConnectionPool through this interface
int ObDblinkCtxInSession::free_dblink_conn_pool()
{
......@@ -400,9 +400,7 @@ int ObDblinkCtxInSession::free_dblink_conn_pool()
LOG_TRACE("free and close dblink connection in session", KP(this), K(session_info_->get_sessid()), K(i), K(dblink_conn_pool_array_.count()), K(dblink_conn_pool_array_), KP(dblink_conn_pool), K(lbt()));
}
}
if (OB_SUCC(ret)) {
dblink_conn_pool_array_.reset();
}
dblink_conn_pool_array_.reset();
return ret;
}
......@@ -425,6 +423,11 @@ int ObDblinkCtxInSession::get_dblink_conn(uint64_t dblink_id, common::sqlclient:
break;
}
}
if (OB_SUCC(ret) && OB_NOT_NULL(dblink_conn) &&
(OB_SUCCESS != dblink_conn->ping())) {
ret = OB_ERR_DBLINK_SESSION_KILLED;
LOG_WARN("connection is invalid", K(ret), K(dblink_conn->usable()), KP(dblink_conn));
}
return ret;
}
......@@ -473,9 +476,12 @@ int ObDblinkCtxInSession::clean_dblink_conn(const bool force_disconnect)
}
}
}
if (OB_SUCC(ret)) {
dblink_conn_holder_array_.reset();
}
dblink_conn_holder_array_.reset();
arena_alloc_.reset();
reverse_dblink_ = NULL;
reverse_dblink_buf_ = NULL;
sys_var_reverse_info_buf_ = NULL;
sys_var_reverse_info_buf_size_ = 0;
return ret;
}
......@@ -491,19 +497,23 @@ int ObDblinkCtxInSession::get_reverse_link(ObReverseLink *&reverse_dblink)
LOG_WARN("failed to get SYS_VAR_SET_REVERSE_DBLINK_INFOS", K(value), K(ret));
} else if (NULL == reverse_dblink_ || 0 != last_reverse_info_values_.compare(value)) {
if (!value.empty()){ // get a new valid REVERSE_DBLINK_INFOS, need create or update ObReverseLink
void *ptr = NULL;
void *last_new_value_ptr = NULL;
int64_t last_new_value_length = value.length();
if (OB_ISNULL(ptr = arena_alloc_.alloc(sizeof(ObReverseLink)))) {
int64_t sys_var_length = value.length();
if (OB_ISNULL(reverse_dblink_buf_) &&
OB_ISNULL(reverse_dblink_buf_ = arena_alloc_.alloc(sizeof(ObReverseLink)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret), K(sizeof(ObReverseLink)));
} else if (OB_ISNULL(last_new_value_ptr = arena_alloc_.alloc(last_new_value_length))) {
} else if (sys_var_length > sys_var_reverse_info_buf_size_ &&
OB_ISNULL(sys_var_reverse_info_buf_ = arena_alloc_.alloc(2 * sys_var_length))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory", K(ret), K(last_new_value_length));
LOG_WARN("failed to alloc memory", K(ret), K(2 * sys_var_length));
} else {
MEMCPY(last_new_value_ptr, value.ptr(), last_new_value_length);
last_reverse_info_values_.assign((char *)last_new_value_ptr, last_new_value_length);
reverse_dblink_ = new(ptr) ObReverseLink(arena_alloc_);
sys_var_reverse_info_buf_size_ = 2 * sys_var_length;
MEMCPY(sys_var_reverse_info_buf_, value.ptr(), sys_var_length);
last_reverse_info_values_.assign((char *)sys_var_reverse_info_buf_, sys_var_length);
if (OB_NOT_NULL(reverse_dblink_)) {
reverse_dblink_->~ObReverseLink();
}
reverse_dblink_ = new(reverse_dblink_buf_) ObReverseLink();
char *new_buff = NULL;
int64_t new_size = 0;
int64_t pos = 0;
......
......@@ -44,8 +44,8 @@ class ObReverseLink
{
OB_UNIS_VERSION_V(1);
public:
explicit ObReverseLink(common::ObIAllocator &alloc);
~ObReverseLink();
explicit ObReverseLink();
virtual ~ObReverseLink();
inline void set_user(ObString name) { user_ = name; }
inline void set_tenant(ObString name) { tenant_ = name; }
inline void set_cluster(ObString name) { cluster_ = name; }
......@@ -81,7 +81,7 @@ public:
static const ObString SESSION_VARIABLE_STRING;
static const int64_t LONG_QUERY_TIMEOUT;
private:
common::ObIAllocator &allocator_;
common::ObArenaAllocator allocator_;
ObString user_;
ObString tenant_;
ObString cluster_;
......@@ -112,7 +112,10 @@ public:
explicit ObDblinkCtxInSession(ObSQLSessionInfo *session_info)
:
session_info_(session_info),
reverse_dblink_(NULL)
reverse_dblink_(NULL),
reverse_dblink_buf_(NULL),
sys_var_reverse_info_buf_(NULL),
sys_var_reverse_info_buf_size_(0)
{}
~ObDblinkCtxInSession()
{
......@@ -124,6 +127,7 @@ public:
const bool force_disconnect = true;
clean_dblink_conn(force_disconnect);
free_dblink_conn_pool();
// session_info_ = NULL; // do not need reset session_info_
reverse_dblink_ = NULL;
}
int register_dblink_conn_pool(common::sqlclient::ObCommonServerConnectionPool *dblink_conn_pool);
......@@ -136,6 +140,9 @@ public:
private:
ObSQLSessionInfo *session_info_;
ObReverseLink *reverse_dblink_;
void * reverse_dblink_buf_;
void * sys_var_reverse_info_buf_;
int64_t sys_var_reverse_info_buf_size_;
common::ObArenaAllocator arena_alloc_;
ObArray<common::sqlclient::ObCommonServerConnectionPool *> dblink_conn_pool_array_; //for dblink read to free connection when session drop.
ObArray<int64_t> dblink_conn_holder_array_; //for dblink write to hold connection during trasaction.
......
......@@ -68,7 +68,7 @@ int ObLinkDmlOp::send_reverse_link_info(transaction::ObTransID &tx_id)
LOG_WARN("reverse link has invalid credentials", K(ret), K(user_name), K(tenant_name), K(passwd.empty()), K(addr));
LOG_USER_ERROR(OB_ERR_UNEXPECTED, "check if the database link was created with local credentials");
} else {
ObReverseLink reverse_link_info(MY_SPEC.allocator_);
ObReverseLink reverse_link_info;
reverse_link_info.set_user(user_name);
reverse_link_info.set_tenant(tenant_name);
reverse_link_info.set_cluster(cluster_name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册