提交 4f1f1b93 编写于 作者: R raywill 提交者: LINGuanRen

fix query __all_virtual_processlist bugs

上级 97b3bfab
......@@ -124,7 +124,8 @@ int ObMPQuery::process()
THIS_WORKER.set_session(sess);
ObSQLSessionInfo& session = *sess;
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
sess->set_use_static_typing_engine(GCONF.enable_static_engine_for_query());
session.set_use_static_typing_engine(GCONF.enable_static_engine_for_query());
session.set_current_trace_id(ObCurTraceId::get_trace_id());
int64_t val = 0;
const bool check_throttle = extract_pure_id(sess->get_user_id()) != OB_SYS_USER_ID;
if (check_throttle && !sess->is_inner() && sess->get_raw_audit_record().try_cnt_ == 0 &&
......
......@@ -1003,6 +1003,7 @@ int ObMPStmtExecute::process()
int64_t sys_version = 0;
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
session.set_use_static_typing_engine(false);
session.set_current_trace_id(ObCurTraceId::get_trace_id());
session.set_thread_id(GETTID());
const ObMySQLRawPacket& pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
int64_t packet_len = pkt.get_clen();
......
......@@ -116,6 +116,7 @@ int ObMPStmtPrepare::process()
ObSQLSessionInfo& session = *sess;
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
session.set_use_static_typing_engine(false);
session.set_current_trace_id(ObCurTraceId::get_trace_id());
session.set_proxy_version(get_proxy_version());
int64_t tenant_version = 0;
int64_t sys_version = 0;
......@@ -445,10 +446,6 @@ int ObMPStmtPrepare::do_process(
if (!OB_SUCC(ret) && !async_resp_used && need_response_error && conn_valid_ && !THIS_WORKER.need_retry()) {
LOG_WARN("query failed", K(ret), K(retry_ctrl_.need_retry()), K_(sql));
// 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
// 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
// 则需要在下面回复一个error_packet作为收尾。否则后面没人帮忙发错误包给客户端了,
// 可能会导致客户端挂起等回包。
bool is_partition_hit = session.get_err_final_partition_hit(ret);
int err = send_error_packet(ret, NULL, is_partition_hit);
if (OB_SUCCESS != err) {
......
......@@ -85,7 +85,7 @@ int ObMPStmtSendLongData::process()
int ret = OB_SUCCESS;
ObSQLSessionInfo *sess = NULL;
bool need_response_error = true;
bool async_resp_used = false; // 由事务提交线程异步回复客户端
bool async_resp_used = false;
int64_t query_timeout = 0;
ObSMConnection *conn = get_conn();
......@@ -112,6 +112,7 @@ int ObMPStmtSendLongData::process()
ObSQLSessionInfo &session = *sess;
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
session.set_use_static_typing_engine(false);
session.set_current_trace_id(ObCurTraceId::get_trace_id());
int64_t tenant_version = 0;
int64_t sys_version = 0;
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket &>(req_->get_packet());
......@@ -183,9 +184,7 @@ int ObMPStmtSendLongData::process_send_long_data_stmt(ObSQLSessionInfo &session)
ObThreadLogLevelUtils::clear();
}
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
int tmp_ret = OB_SUCCESS;
//清空WARNING BUFFER
tmp_ret = do_after_process(session, use_sess_trace, ctx_, false);
UNUSED(tmp_ret);
return ret;
......@@ -218,7 +217,6 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session)
} else if (OB_FAIL(store_piece(session))) {
exec_start_timestamp_ = ObTimeUtility::current_time();
} else {
//监控项统计开始
if (enable_perf_event) {
exec_start_timestamp_ = ObTimeUtility::current_time();
}
......@@ -244,7 +242,7 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session)
if (OB_SUCC(ret) && is_diagnostics_stmt) {
// if diagnostic stmt execute successfully, it dosen't clear the warning message
} else {
session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
session.set_show_warnings_buf(ret);
}
// set read_only
......@@ -253,7 +251,7 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session)
} else {
bool is_partition_hit = session.partition_hit().get_bool();
int err = send_error_packet(ret, NULL, is_partition_hit);
if (OB_SUCCESS != err) { // 发送error包
if (OB_SUCCESS != err) {
LOG_WARN("send error packet failed", K(ret), K(err));
}
}
......@@ -266,7 +264,6 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session)
audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
audit_record.ps_stmt_id_ = stmt_id_;
audit_record.update_stage_stat();
// TODO: 可以这么做么?
// ObSQLUtils::handle_audit_record(false, EXECUTE_PS_EXECUTE,
// session, ctx_);
}
......
......@@ -118,6 +118,7 @@ bool ObShowProcesslist::FillScanner::operator()(sql::ObSQLSessionMgr::Key key, O
uint64_t cell_idx = 0;
char ip_buf[common::OB_IP_STR_BUFF];
char peer_buf[common::OB_IP_STR_BUFF];
char sql_id[common::OB_MAX_SQL_ID_LENGTH + 1];
// If you are in system tenant, you can see all thread.
// Otherwise, you can show only the threads at the same Tenant with you.
// If you have the PROCESS privilege, you can show all threads at your Tenant.
......@@ -180,8 +181,13 @@ bool ObShowProcesslist::FillScanner::operator()(sql::ObSQLSessionMgr::Key key, O
break;
}
case SQL_ID: {
const char* sql_id =
OB_NOT_NULL(sess_info->get_cur_phy_plan()) ? sess_info->get_cur_phy_plan()->stat_.sql_id_ : "";
if (obmysql::OB_MYSQL_COM_QUERY == sess_info->get_mysql_cmd() ||
obmysql::OB_MYSQL_COM_STMT_EXECUTE == sess_info->get_mysql_cmd() ||
obmysql::OB_MYSQL_COM_STMT_PREPARE == sess_info->get_mysql_cmd()) {
sess_info->get_cur_sql_id(sql_id, OB_MAX_SQL_ID_LENGTH + 1);
} else {
sql_id[0] = '\0';
}
cur_row_->cells_[cell_idx].set_varchar(ObString::make_string(sql_id));
cur_row_->cells_[cell_idx].set_collation_type(default_collation);
break;
......@@ -281,7 +287,7 @@ bool ObShowProcesslist::FillScanner::operator()(sql::ObSQLSessionMgr::Key key, O
if (obmysql::OB_MYSQL_COM_QUERY == sess_info->get_mysql_cmd() ||
obmysql::OB_MYSQL_COM_STMT_EXECUTE == sess_info->get_mysql_cmd() ||
obmysql::OB_MYSQL_COM_STMT_PREPARE == sess_info->get_mysql_cmd()) {
int len = sess_info->get_last_trace_id().to_string(trace_id_, sizeof(trace_id_));
int len = sess_info->get_current_trace_id().to_string(trace_id_, sizeof(trace_id_));
cur_row_->cells_[cell_idx].set_varchar(trace_id_, len);
cur_row_->cells_[cell_idx].set_collation_type(default_collation);
} else {
......
......@@ -351,6 +351,7 @@ void ObBasicSessionInfo::reset(bool skip_sys_var)
// magic_num_ = 0x86427531;
current_execution_id_ = -1;
last_trace_id_.reset();
curr_trace_id_.reset();
app_trace_id_.reset();
database_id_ = OB_INVALID_ID;
retry_info_.reset();
......@@ -1640,6 +1641,7 @@ int ObBasicSessionInfo::set_cur_phy_plan(ObPhysicalPlan* cur_phy_plan)
LOG_WARN("current physical plan is NULL", K(lbt()), K(ret));
} else {
cur_phy_plan_ = cur_phy_plan;
MEMCPY(sql_id_, cur_phy_plan->stat_.sql_id_, common::OB_MAX_SQL_ID_LENGTH + 1);
}
return ret;
}
......@@ -1649,6 +1651,15 @@ void ObBasicSessionInfo::reset_cur_phy_plan_to_null()
cur_phy_plan_ = NULL;
}
void ObBasicSessionInfo::get_cur_sql_id(char *sql_id_buf, int64_t sql_id_buf_size) const
{
if (common::OB_MAX_SQL_ID_LENGTH + 1 <= sql_id_buf_size) {
MEMCPY(sql_id_buf, sql_id_, common::OB_MAX_SQL_ID_LENGTH + 1);
} else {
sql_id_buf[0] = '\0';
}
}
ObObjType ObBasicSessionInfo::get_sys_variable_type(const ObString& var_name) const
{
int ret = OB_SUCCESS;
......
......@@ -1032,7 +1032,8 @@ public:
// current executing physical plan
ObPhysicalPlan* get_cur_phy_plan() const;
int set_cur_phy_plan(ObPhysicalPlan* cur_phy_plan);
void get_cur_sql_id(char *sql_id_buf, int64_t sql_id_buf_size) const;
int set_cur_phy_plan(ObPhysicalPlan *cur_phy_plan);
void reset_cur_phy_plan_to_null();
common::ObObjType get_sys_variable_type(const common::ObString& var_name) const;
......@@ -1295,16 +1296,17 @@ public:
uint32_t get_version() const {return version_;}
uint32_t get_magic_num() {return magic_num_;}
int64_t get_current_execution_id() const { return current_execution_id_; }
const common::ObCurTraceId::TraceId &get_last_trace_id() const { return last_trace_id_; }
void set_current_execution_id(int64_t execution_id) { current_execution_id_ = execution_id; }
const common::ObCurTraceId::TraceId &get_last_trace_id() const { return last_trace_id_; }
void set_last_trace_id(common::ObCurTraceId::TraceId *trace_id)
{
if (OB_ISNULL(trace_id)) {
} else {
last_trace_id_ = *trace_id;
}
if (OB_NOT_NULL(trace_id)) { last_trace_id_ = *trace_id; }
}
const common::ObCurTraceId::TraceId &get_current_trace_id() const { return curr_trace_id_; }
void set_current_trace_id(common::ObCurTraceId::TraceId *trace_id)
{
if (OB_NOT_NULL(trace_id)) { curr_trace_id_ = *trace_id; }
}
const ObString& get_app_trace_id() const
{
return app_trace_id_;
......@@ -2145,6 +2147,8 @@ private:
// used for calculating which system variables need serialization,
// set NULL after query is done.
ObPhysicalPlan* cur_phy_plan_;
// sql_id of cur_phy_plan_ sql
char sql_id_[common::OB_MAX_SQL_ID_LENGTH + 1];
//=======================ObProxy && OCJ related============================
obmysql::ObMySQLCapabilityFlags capability_;
......@@ -2166,6 +2170,7 @@ private:
uint32_t magic_num_;
int64_t current_execution_id_;
common::ObCurTraceId::TraceId last_trace_id_;
common::ObCurTraceId::TraceId curr_trace_id_;
common::ObString app_trace_id_;
uint64_t database_id_;
ObQueryRetryInfo retry_info_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册