提交 93fd1a9e 编写于 作者: X xy0 提交者: LINGuanRen

[asan] 3.1 open observer无法启动

上级 ac5edad6
...@@ -28,15 +28,15 @@ using namespace obmysql; ...@@ -28,15 +28,15 @@ using namespace obmysql;
using namespace share; using namespace share;
namespace observer { namespace observer {
ObSyncCmdDriver::ObSyncCmdDriver(const ObGlobalContext& gctx, const ObSqlCtx& ctx, sql::ObSQLSessionInfo& session, ObSyncCmdDriver::ObSyncCmdDriver(const ObGlobalContext &gctx, const ObSqlCtx &ctx, sql::ObSQLSessionInfo &session,
ObQueryRetryCtrl& retry_ctrl, ObIMPPacketSender& sender) ObQueryRetryCtrl &retry_ctrl, ObIMPPacketSender &sender)
: ObQueryDriver(gctx, ctx, session, retry_ctrl, sender) : ObQueryDriver(gctx, ctx, session, retry_ctrl, sender)
{} {}
ObSyncCmdDriver::~ObSyncCmdDriver() ObSyncCmdDriver::~ObSyncCmdDriver()
{} {}
int ObSyncCmdDriver::response_result(ObMySQLResultSet& result) int ObSyncCmdDriver::response_result(ObMySQLResultSet &result)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool process_ok = false; bool process_ok = false;
...@@ -77,7 +77,7 @@ int ObSyncCmdDriver::response_result(ObMySQLResultSet& result) ...@@ -77,7 +77,7 @@ int ObSyncCmdDriver::response_result(ObMySQLResultSet& result)
} }
} else { } else {
OMPKEOF eofp; OMPKEOF eofp;
const ObWarningBuffer* warnings_buf = common::ob_get_tsi_warning_buffer(); const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
uint16_t warning_count = 0; uint16_t warning_count = 0;
if (OB_ISNULL(warnings_buf)) { if (OB_ISNULL(warnings_buf)) {
LOG_WARN("can not get thread warnings buffer"); LOG_WARN("can not get thread warnings buffer");
...@@ -119,10 +119,10 @@ int ObSyncCmdDriver::response_result(ObMySQLResultSet& result) ...@@ -119,10 +119,10 @@ int ObSyncCmdDriver::response_result(ObMySQLResultSet& result)
} else if (!result.is_with_rows() || (sender_.need_send_extra_ok_packet() && !result.has_more_result())) { } else if (!result.is_with_rows() || (sender_.need_send_extra_ok_packet() && !result.has_more_result())) {
process_ok = true; process_ok = true;
ObOKPParam ok_param; ObOKPParam ok_param;
ok_param.message_ = const_cast<char*>(result.get_message()); ok_param.message_ = const_cast<char *>(result.get_message());
ok_param.affected_rows_ = result.get_affected_rows(); ok_param.affected_rows_ = result.get_affected_rows();
ok_param.lii_ = result.get_last_insert_id_to_client(); ok_param.lii_ = result.get_last_insert_id_to_client();
const ObWarningBuffer* warnings_buf = common::ob_get_tsi_warning_buffer(); const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
if (OB_ISNULL(warnings_buf)) { if (OB_ISNULL(warnings_buf)) {
LOG_WARN("can not get thread warnings buffer"); LOG_WARN("can not get thread warnings buffer");
} else { } else {
...@@ -152,7 +152,7 @@ int ObSyncCmdDriver::response_result(ObMySQLResultSet& result) ...@@ -152,7 +152,7 @@ int ObSyncCmdDriver::response_result(ObMySQLResultSet& result)
// two aspects: // two aspects:
// - set session last_schema_version to proxy for part DDL // - set session last_schema_version to proxy for part DDL
// - promote local schema up to target version if last_schema_version is set // - promote local schema up to target version if last_schema_version is set
int ObSyncCmdDriver::process_schema_version_changes(const ObMySQLResultSet& result) int ObSyncCmdDriver::process_schema_version_changes(const ObMySQLResultSet &result)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -171,11 +171,11 @@ int ObSyncCmdDriver::process_schema_version_changes(const ObMySQLResultSet& resu ...@@ -171,11 +171,11 @@ int ObSyncCmdDriver::process_schema_version_changes(const ObMySQLResultSet& resu
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
// - promote local schema up to target version if last_schema_version is set // - promote local schema up to target version if last_schema_version is set
if (result.get_stmt_type() == stmt::T_VARIABLE_SET) { if (result.get_stmt_type() == stmt::T_VARIABLE_SET) {
const ObVariableSetStmt* set_stmt = static_cast<const ObVariableSetStmt*>(result.get_cmd()); const ObVariableSetStmt *set_stmt = static_cast<const ObVariableSetStmt *>(result.get_cmd());
if (NULL != set_stmt) { if (NULL != set_stmt) {
ObVariableSetStmt::VariableNamesSetNode tmp_node; // just for init node ObVariableSetStmt::VariableNamesSetNode tmp_node; // just for init node
for (int64_t i = 0; OB_SUCC(ret) && i < set_stmt->get_variables_size(); ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < set_stmt->get_variables_size(); ++i) {
ObVariableSetStmt::VariableNamesSetNode& var_node = tmp_node; ObVariableSetStmt::VariableNamesSetNode &var_node = tmp_node;
ObString set_var_name(OB_SV_LAST_SCHEMA_VERSION); ObString set_var_name(OB_SV_LAST_SCHEMA_VERSION);
if (OB_FAIL(set_stmt->get_variable_node(i, var_node))) { if (OB_FAIL(set_stmt->get_variable_node(i, var_node))) {
LOG_WARN("fail to get_variable_node", K(i), K(ret)); LOG_WARN("fail to get_variable_node", K(i), K(ret));
...@@ -218,12 +218,12 @@ int ObSyncCmdDriver::check_and_refresh_schema(uint64_t tenant_id) ...@@ -218,12 +218,12 @@ int ObSyncCmdDriver::check_and_refresh_schema(uint64_t tenant_id)
return ret; return ret;
} }
int ObSyncCmdDriver::response_query_result(ObMySQLResultSet& result) int ObSyncCmdDriver::response_query_result(ObMySQLResultSet &result)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
session_.get_trans_desc().consistency_wait(); session_.get_trans_desc().consistency_wait();
const common::ObNewRow* row = NULL; const common::ObNewRow *row = NULL;
if (OB_FAIL(result.next_row(row))) { if (OB_FAIL(result.next_row(row))) {
LOG_WARN("fail to get next row", K(ret)); LOG_WARN("fail to get next row", K(ret));
} else if (OB_FAIL(response_query_header(result, result.has_more_result(), true))) { } else if (OB_FAIL(response_query_header(result, result.has_more_result(), true))) {
...@@ -232,9 +232,9 @@ int ObSyncCmdDriver::response_query_result(ObMySQLResultSet& result) ...@@ -232,9 +232,9 @@ int ObSyncCmdDriver::response_query_result(ObMySQLResultSet& result)
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("session info is null", K(ret)); LOG_WARN("session info is null", K(ret));
} else { } else {
ObNewRow* tmp_row = const_cast<ObNewRow*>(row); ObNewRow *tmp_row = const_cast<ObNewRow *>(row);
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_row->get_count(); i++) { for (int64_t i = 0; OB_SUCC(ret) && i < tmp_row->get_count(); i++) {
ObObj& value = tmp_row->get_cell(i); ObObj &value = tmp_row->get_cell(i);
if (ob_is_string_type(value.get_type()) && CS_TYPE_INVALID != value.get_collation_type()) { if (ob_is_string_type(value.get_type()) && CS_TYPE_INVALID != value.get_collation_type()) {
OZ(convert_string_value_charset(value, result)); OZ(convert_string_value_charset(value, result));
} else if (value.is_clob_locator() && OB_FAIL(convert_lob_value_charset(value, result))) { } else if (value.is_clob_locator() && OB_FAIL(convert_lob_value_charset(value, result))) {
...@@ -247,14 +247,15 @@ int ObSyncCmdDriver::response_query_result(ObMySQLResultSet& result) ...@@ -247,14 +247,15 @@ int ObSyncCmdDriver::response_query_result(ObMySQLResultSet& result)
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
MYSQL_PROTOCOL_TYPE protocol_type = result.is_ps_protocol() ? BINARY : TEXT; MYSQL_PROTOCOL_TYPE protocol_type = result.is_ps_protocol() ? BINARY : TEXT;
const ObSQLSessionInfo* tmp_session = result.get_exec_context().get_my_session(); const ObSQLSessionInfo *tmp_session = result.get_exec_context().get_my_session();
const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(tmp_session); const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(tmp_session);
OMPKRow rp(ObSMRow(protocol_type, ObSMRow sm_row(protocol_type,
*row, *row,
dtc_params, dtc_params,
result.get_field_columns(), result.get_field_columns(),
ctx_.schema_guard_, ctx_.schema_guard_,
tmp_session->get_effective_tenant_id())); tmp_session->get_effective_tenant_id());
OMPKRow rp(sm_row);
if (OB_FAIL(sender_.response_packet(rp))) { if (OB_FAIL(sender_.response_packet(rp))) {
LOG_WARN("response packet fail", K(ret), KP(row)); LOG_WARN("response packet fail", K(ret), KP(row));
} }
......
...@@ -30,22 +30,22 @@ using namespace sql; ...@@ -30,22 +30,22 @@ using namespace sql;
using namespace obmysql; using namespace obmysql;
namespace observer { namespace observer {
ObSyncPlanDriver::ObSyncPlanDriver(const ObGlobalContext& gctx, const ObSqlCtx& ctx, sql::ObSQLSessionInfo& session, ObSyncPlanDriver::ObSyncPlanDriver(const ObGlobalContext &gctx, const ObSqlCtx &ctx, sql::ObSQLSessionInfo &session,
ObQueryRetryCtrl& retry_ctrl, ObIMPPacketSender& sender) ObQueryRetryCtrl &retry_ctrl, ObIMPPacketSender &sender)
: ObQueryDriver(gctx, ctx, session, retry_ctrl, sender) : ObQueryDriver(gctx, ctx, session, retry_ctrl, sender)
{} {}
ObSyncPlanDriver::~ObSyncPlanDriver() ObSyncPlanDriver::~ObSyncPlanDriver()
{} {}
int ObSyncPlanDriver::response_result(ObMySQLResultSet& result) int ObSyncPlanDriver::response_result(ObMySQLResultSet &result)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool process_ok = false; bool process_ok = false;
// for select SQL // for select SQL
bool ac = true; bool ac = true;
bool admission_fail_and_need_retry = false; bool admission_fail_and_need_retry = false;
const ObNewRow* not_used_row = NULL; const ObNewRow *not_used_row = NULL;
if (OB_ISNULL(result.get_physical_plan())) { if (OB_ISNULL(result.get_physical_plan())) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("should have set plan to result set", K(ret)); LOG_WARN("should have set plan to result set", K(ret));
...@@ -98,7 +98,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result) ...@@ -98,7 +98,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result)
process_ok = true; process_ok = true;
OMPKEOF eofp; OMPKEOF eofp;
const ObWarningBuffer* warnings_buf = common::ob_get_tsi_warning_buffer(); const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
uint16_t warning_count = 0; uint16_t warning_count = 0;
if (OB_ISNULL(warnings_buf)) { if (OB_ISNULL(warnings_buf)) {
LOG_WARN("can not get thread warnings buffer"); LOG_WARN("can not get thread warnings buffer");
...@@ -149,10 +149,10 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result) ...@@ -149,10 +149,10 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result)
if (!result.has_implicit_cursor()) { if (!result.has_implicit_cursor()) {
// no implicit cursor, send one ok packet to client // no implicit cursor, send one ok packet to client
ObOKPParam ok_param; ObOKPParam ok_param;
ok_param.message_ = const_cast<char*>(result.get_message()); ok_param.message_ = const_cast<char *>(result.get_message());
ok_param.affected_rows_ = result.get_affected_rows(); ok_param.affected_rows_ = result.get_affected_rows();
ok_param.lii_ = result.get_last_insert_id_to_client(); ok_param.lii_ = result.get_last_insert_id_to_client();
const ObWarningBuffer* warnings_buf = common::ob_get_tsi_warning_buffer(); const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
if (OB_ISNULL(warnings_buf)) { if (OB_ISNULL(warnings_buf)) {
LOG_WARN("can not get thread warnings buffer"); LOG_WARN("can not get thread warnings buffer");
} else { } else {
...@@ -169,7 +169,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result) ...@@ -169,7 +169,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result)
result.reset_implicit_cursor_idx(); result.reset_implicit_cursor_idx();
while (OB_SUCC(ret) && OB_SUCC(result.switch_implicit_cursor())) { while (OB_SUCC(ret) && OB_SUCC(result.switch_implicit_cursor())) {
ObOKPParam ok_param; ObOKPParam ok_param;
ok_param.message_ = const_cast<char*>(result.get_message()); ok_param.message_ = const_cast<char *>(result.get_message());
ok_param.affected_rows_ = result.get_affected_rows(); ok_param.affected_rows_ = result.get_affected_rows();
ok_param.is_partition_hit_ = session_.partition_hit().get_bool(); ok_param.is_partition_hit_ = session_.partition_hit().get_bool();
ok_param.has_more_result_ = !result.is_cursor_end(); ok_param.has_more_result_ = !result.is_cursor_end();
...@@ -201,12 +201,12 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result) ...@@ -201,12 +201,12 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet& result)
} }
int ObSyncPlanDriver::response_query_result( int ObSyncPlanDriver::response_query_result(
ObResultSet& result, bool has_more_result, bool& can_retry, int64_t fetch_limit) ObResultSet &result, bool has_more_result, bool &can_retry, int64_t fetch_limit)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
can_retry = true; can_retry = true;
bool is_first_row = true; bool is_first_row = true;
const ObNewRow* result_row = NULL; const ObNewRow *result_row = NULL;
bool has_top_limit = result.get_has_top_limit(); bool has_top_limit = result.get_has_top_limit();
bool is_cac_found_rows = result.is_calc_found_rows(); bool is_cac_found_rows = result.is_calc_found_rows();
int64_t limit_count = OB_INVALID_COUNT == fetch_limit ? INT64_MAX : fetch_limit; int64_t limit_count = OB_INVALID_COUNT == fetch_limit ? INT64_MAX : fetch_limit;
...@@ -219,7 +219,7 @@ int ObSyncPlanDriver::response_query_result( ...@@ -219,7 +219,7 @@ int ObSyncPlanDriver::response_query_result(
} }
session_.get_trans_desc().consistency_wait(); session_.get_trans_desc().consistency_wait();
MYSQL_PROTOCOL_TYPE protocol_type = result.is_ps_protocol() ? BINARY : TEXT; MYSQL_PROTOCOL_TYPE protocol_type = result.is_ps_protocol() ? BINARY : TEXT;
const common::ColumnsFieldIArray* fields = NULL; const common::ColumnsFieldIArray *fields = NULL;
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
fields = result.get_field_columns(); fields = result.get_field_columns();
if (OB_ISNULL(fields)) { if (OB_ISNULL(fields)) {
...@@ -228,7 +228,7 @@ int ObSyncPlanDriver::response_query_result( ...@@ -228,7 +228,7 @@ int ObSyncPlanDriver::response_query_result(
} }
} }
while (OB_SUCC(ret) && row_num < limit_count && !OB_FAIL(result.get_next_row(result_row))) { while (OB_SUCC(ret) && row_num < limit_count && !OB_FAIL(result.get_next_row(result_row))) {
ObNewRow* row = const_cast<ObNewRow*>(result_row); ObNewRow *row = const_cast<ObNewRow *>(result_row);
// If it is the first line, first reply to the client with field and other information // If it is the first line, first reply to the client with field and other information
if (is_first_row) { if (is_first_row) {
is_first_row = false; is_first_row = false;
...@@ -238,7 +238,7 @@ int ObSyncPlanDriver::response_query_result( ...@@ -238,7 +238,7 @@ int ObSyncPlanDriver::response_query_result(
} }
} }
for (int64_t i = 0; OB_SUCC(ret) && i < row->get_count(); i++) { for (int64_t i = 0; OB_SUCC(ret) && i < row->get_count(); i++) {
ObObj& value = row->get_cell(i); ObObj &value = row->get_cell(i);
if (result.is_ps_protocol()) { if (result.is_ps_protocol()) {
if (value.get_type() != fields->at(i).type_.get_type()) { if (value.get_type() != fields->at(i).type_.get_type()) {
ObCastCtx cast_ctx(&result.get_mem_pool(), NULL, CM_WARN_ON_FAIL, CS_TYPE_INVALID); ObCastCtx cast_ctx(&result.get_mem_pool(), NULL, CM_WARN_ON_FAIL, CS_TYPE_INVALID);
...@@ -259,12 +259,13 @@ int ObSyncPlanDriver::response_query_result( ...@@ -259,12 +259,13 @@ int ObSyncPlanDriver::response_query_result(
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session_); const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(&session_);
OMPKRow rp(ObSMRow(protocol_type, ObSMRow sm_row(protocol_type,
*row, *row,
dtc_params, dtc_params,
result.get_field_columns(), result.get_field_columns(),
ctx_.schema_guard_, ctx_.schema_guard_,
session_.get_effective_tenant_id())); session_.get_effective_tenant_id());
OMPKRow rp(sm_row);
if (OB_FAIL(sender_.response_packet(rp))) { if (OB_FAIL(sender_.response_packet(rp))) {
LOG_WARN("response packet fail", K(ret), KP(row), K(row_num), K(can_retry)); LOG_WARN("response packet fail", K(ret), KP(row), K(row_num), K(can_retry));
// break; // break;
...@@ -297,12 +298,12 @@ int ObSyncPlanDriver::response_query_result( ...@@ -297,12 +298,12 @@ int ObSyncPlanDriver::response_query_result(
return ret; return ret;
} }
ObRemotePlanDriver::ObRemotePlanDriver(const ObGlobalContext& gctx, const ObSqlCtx& ctx, sql::ObSQLSessionInfo& session, ObRemotePlanDriver::ObRemotePlanDriver(const ObGlobalContext &gctx, const ObSqlCtx &ctx, sql::ObSQLSessionInfo &session,
ObQueryRetryCtrl& retry_ctrl, ObIMPPacketSender& sender) ObQueryRetryCtrl &retry_ctrl, ObIMPPacketSender &sender)
: ObSyncPlanDriver(gctx, ctx, session, retry_ctrl, sender) : ObSyncPlanDriver(gctx, ctx, session, retry_ctrl, sender)
{} {}
int ObRemotePlanDriver::response_result(ObMySQLResultSet& result) int ObRemotePlanDriver::response_result(ObMySQLResultSet &result)
{ {
int ret = result.get_errcode(); int ret = result.get_errcode();
bool process_ok = false; bool process_ok = false;
...@@ -347,7 +348,7 @@ int ObRemotePlanDriver::response_result(ObMySQLResultSet& result) ...@@ -347,7 +348,7 @@ int ObRemotePlanDriver::response_result(ObMySQLResultSet& result)
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
process_ok = true; process_ok = true;
OMPKEOF eofp; OMPKEOF eofp;
const ObWarningBuffer* warnings_buf = common::ob_get_tsi_warning_buffer(); const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
uint16_t warning_count = 0; uint16_t warning_count = 0;
if (OB_ISNULL(warnings_buf)) { if (OB_ISNULL(warnings_buf)) {
LOG_WARN("can not get thread warnings buffer"); LOG_WARN("can not get thread warnings buffer");
...@@ -392,10 +393,10 @@ int ObRemotePlanDriver::response_result(ObMySQLResultSet& result) ...@@ -392,10 +393,10 @@ int ObRemotePlanDriver::response_result(ObMySQLResultSet& result)
} else if (!result.has_implicit_cursor()) { } else if (!result.has_implicit_cursor()) {
// no implicit cursor, send one ok packet to client // no implicit cursor, send one ok packet to client
ObOKPParam ok_param; ObOKPParam ok_param;
ok_param.message_ = const_cast<char*>(result.get_message()); ok_param.message_ = const_cast<char *>(result.get_message());
ok_param.affected_rows_ = result.get_affected_rows(); ok_param.affected_rows_ = result.get_affected_rows();
ok_param.lii_ = result.get_last_insert_id_to_client(); ok_param.lii_ = result.get_last_insert_id_to_client();
const ObWarningBuffer* warnings_buf = common::ob_get_tsi_warning_buffer(); const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
if (OB_ISNULL(warnings_buf)) { if (OB_ISNULL(warnings_buf)) {
LOG_WARN("can not get thread warnings buffer"); LOG_WARN("can not get thread warnings buffer");
} else { } else {
...@@ -412,7 +413,7 @@ int ObRemotePlanDriver::response_result(ObMySQLResultSet& result) ...@@ -412,7 +413,7 @@ int ObRemotePlanDriver::response_result(ObMySQLResultSet& result)
result.reset_implicit_cursor_idx(); result.reset_implicit_cursor_idx();
while (OB_SUCC(ret) && OB_SUCC(result.switch_implicit_cursor())) { while (OB_SUCC(ret) && OB_SUCC(result.switch_implicit_cursor())) {
ObOKPParam ok_param; ObOKPParam ok_param;
ok_param.message_ = const_cast<char*>(result.get_message()); ok_param.message_ = const_cast<char *>(result.get_message());
ok_param.affected_rows_ = result.get_affected_rows(); ok_param.affected_rows_ = result.get_affected_rows();
ok_param.is_partition_hit_ = session_.partition_hit().get_bool(); ok_param.is_partition_hit_ = session_.partition_hit().get_bool();
ok_param.has_more_result_ = !result.is_cursor_end(); ok_param.has_more_result_ = !result.is_cursor_end();
......
...@@ -23,7 +23,7 @@ namespace oceanbase { ...@@ -23,7 +23,7 @@ namespace oceanbase {
using namespace common; using namespace common;
namespace sql { namespace sql {
int ObExecuteExecutor::execute(ObExecContext& ctx, ObExecuteStmt& stmt) int ObExecuteExecutor::execute(ObExecContext &ctx, ObExecuteStmt &stmt)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(ctx.get_sql_ctx()) || OB_ISNULL(ctx.get_my_session())) { if (OB_ISNULL(ctx.get_sql_ctx()) || OB_ISNULL(ctx.get_my_session())) {
...@@ -51,81 +51,83 @@ int ObExecuteExecutor::execute(ObExecContext& ctx, ObExecuteStmt& stmt) ...@@ -51,81 +51,83 @@ int ObExecuteExecutor::execute(ObExecContext& ctx, ObExecuteStmt& stmt)
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
SMART_VAR(ObResultSet, result_set, *ctx.get_my_session()) observer::ObVirtualTableIteratorFactory vt_iter_factory(*GCTX.vt_iter_creator_);
SMART_VAR(ObSqlCtx, sql_ctx)
{ {
result_set.init_partition_location_cache( SMART_VAR(ObResultSet, result_set, *ctx.get_my_session())
GCTX.location_cache_, GCTX.self_addr_, ctx.get_sql_ctx()->schema_guard_); {
result_set.set_ps_protocol(); result_set.init_partition_location_cache(
ObTaskExecutorCtx* task_ctx = result_set.get_exec_context().get_task_executor_ctx(); GCTX.location_cache_, GCTX.self_addr_, ctx.get_sql_ctx()->schema_guard_);
int64_t tenant_version = 0; result_set.set_ps_protocol();
int64_t sys_version = 0; ObTaskExecutorCtx *task_ctx = result_set.get_exec_context().get_task_executor_ctx();
if (OB_ISNULL(task_ctx)) { int64_t tenant_version = 0;
ret = OB_ERR_UNEXPECTED; int64_t sys_version = 0;
LOG_ERROR("task executor ctx can not be NULL", K(task_ctx), K(ret)); if (OB_ISNULL(task_ctx)) {
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_received_broadcast_version( ret = OB_ERR_UNEXPECTED;
ctx.get_my_session()->get_effective_tenant_id(), tenant_version))) { LOG_ERROR("task executor ctx can not be NULL", K(task_ctx), K(ret));
LOG_WARN("fail get tenant schema version", K(ret)); } else if (OB_FAIL(GCTX.schema_service_->get_tenant_received_broadcast_version(
} else if (OB_FAIL( ctx.get_my_session()->get_effective_tenant_id(), tenant_version))) {
GCTX.schema_service_->get_tenant_received_broadcast_version(OB_SYS_TENANT_ID, sys_version))) { LOG_WARN("fail get tenant schema version", K(ret));
LOG_WARN("fail get sys schema version", K(ret)); } else if (OB_FAIL(
} else { GCTX.schema_service_->get_tenant_received_broadcast_version(OB_SYS_TENANT_ID, sys_version))) {
ObSqlCtx sql_ctx; LOG_WARN("fail get sys schema version", K(ret));
observer::ObVirtualTableIteratorFactory vt_iter_factory(*GCTX.vt_iter_creator_);
sql_ctx.retry_times_ = 0;
sql_ctx.merged_version_ = ctx.get_sql_ctx()->merged_version_;
sql_ctx.vt_iter_factory_ = &vt_iter_factory;
sql_ctx.session_info_ = ctx.get_my_session();
sql_ctx.sql_proxy_ = ctx.get_sql_ctx()->sql_proxy_;
sql_ctx.use_plan_cache_ = true;
sql_ctx.disable_privilege_check_ = true;
sql_ctx.partition_table_operator_ = ctx.get_sql_ctx()->partition_table_operator_;
sql_ctx.partition_location_cache_ = &(result_set.get_partition_location_cache());
sql_ctx.part_mgr_ = ctx.get_sql_ctx()->part_mgr_;
sql_ctx.is_prepare_protocol_ = ctx.get_sql_ctx()->is_prepare_protocol_;
sql_ctx.is_prepare_stage_ = ctx.get_sql_ctx()->is_prepare_stage_;
sql_ctx.schema_guard_ = ctx.get_sql_ctx()->schema_guard_;
task_ctx->schema_service_ = GCTX.schema_service_;
task_ctx->set_query_tenant_begin_schema_version(tenant_version);
task_ctx->set_query_sys_begin_schema_version(sys_version);
task_ctx->set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
if (OB_FAIL(result_set.init())) {
LOG_WARN("result set init failed", K(ret));
} else if (OB_FAIL(GCTX.sql_engine_->stmt_execute(stmt.get_prepare_id(),
stmt.get_prepare_type(),
params_array,
sql_ctx,
result_set,
false /* is_inner_sql */))) {
LOG_WARN("failed to prepare stmt", K(stmt.get_prepare_id()), K(stmt.get_prepare_type()), K(ret));
} else { } else {
if (OB_ISNULL(ctx.get_sql_ctx()->schema_guard_)) { sql_ctx.retry_times_ = 0;
ret = OB_ERR_UNEXPECTED; sql_ctx.merged_version_ = ctx.get_sql_ctx()->merged_version_;
LOG_WARN("schema guard is null"); sql_ctx.vt_iter_factory_ = &vt_iter_factory;
} else if (OB_FAIL(ctx.get_my_session()->update_query_sensitive_system_variable( sql_ctx.session_info_ = ctx.get_my_session();
*(ctx.get_sql_ctx()->schema_guard_)))) { sql_ctx.sql_proxy_ = ctx.get_sql_ctx()->sql_proxy_;
LOG_WARN("update query affacted system variable failed", K(ret)); sql_ctx.use_plan_cache_ = true;
} else if (OB_FAIL(result_set.sync_open())) { sql_ctx.disable_privilege_check_ = true;
LOG_WARN("result set open failed", K(result_set.get_statement_id()), K(ret)); sql_ctx.partition_table_operator_ = ctx.get_sql_ctx()->partition_table_operator_;
} sql_ctx.partition_location_cache_ = &(result_set.get_partition_location_cache());
if (OB_SUCC(ret)) { sql_ctx.part_mgr_ = ctx.get_sql_ctx()->part_mgr_;
if (result_set.is_with_rows()) { sql_ctx.is_prepare_protocol_ = ctx.get_sql_ctx()->is_prepare_protocol_;
while (OB_SUCC(ret)) { sql_ctx.is_prepare_stage_ = ctx.get_sql_ctx()->is_prepare_stage_;
const common::ObNewRow* row = NULL; sql_ctx.schema_guard_ = ctx.get_sql_ctx()->schema_guard_;
if (OB_FAIL(result_set.get_next_row(row))) { task_ctx->schema_service_ = GCTX.schema_service_;
if (OB_ITER_END == ret) { task_ctx->set_query_tenant_begin_schema_version(tenant_version);
ret = OB_SUCCESS; task_ctx->set_query_sys_begin_schema_version(sys_version);
} else { task_ctx->set_min_cluster_version(GET_MIN_CLUSTER_VERSION());
LOG_WARN("get next row error", K(ret)); if (OB_FAIL(result_set.init())) {
LOG_WARN("result set init failed", K(ret));
} else if (OB_FAIL(GCTX.sql_engine_->stmt_execute(stmt.get_prepare_id(),
stmt.get_prepare_type(),
params_array,
sql_ctx,
result_set,
false /* is_inner_sql */))) {
LOG_WARN("failed to prepare stmt", K(stmt.get_prepare_id()), K(stmt.get_prepare_type()), K(ret));
} else {
if (OB_ISNULL(ctx.get_sql_ctx()->schema_guard_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema guard is null");
} else if (OB_FAIL(ctx.get_my_session()->update_query_sensitive_system_variable(
*(ctx.get_sql_ctx()->schema_guard_)))) {
LOG_WARN("update query affacted system variable failed", K(ret));
} else if (OB_FAIL(result_set.sync_open())) {
LOG_WARN("result set open failed", K(result_set.get_statement_id()), K(ret));
}
if (OB_SUCC(ret)) {
if (result_set.is_with_rows()) {
while (OB_SUCC(ret)) {
const common::ObNewRow *row = NULL;
if (OB_FAIL(result_set.get_next_row(row))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("get next row error", K(ret));
}
break;
} }
break;
} }
} }
} }
} int tmp_ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; if ((tmp_ret = result_set.close()) != OB_SUCCESS) {
if ((tmp_ret = result_set.close()) != OB_SUCCESS) { LOG_WARN("result set open failed", K(result_set.get_statement_id()), K(ret));
LOG_WARN("result set open failed", K(result_set.get_statement_id()), K(ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret;
ret = OB_SUCCESS == ret ? tmp_ret : ret; }
} }
} }
} }
......
...@@ -9004,14 +9004,19 @@ table_reference inner_join_type opt_full_table_factor %prec LOWER_ON ...@@ -9004,14 +9004,19 @@ table_reference inner_join_type opt_full_table_factor %prec LOWER_ON
if ($1->type_ == T_ORG) { if ($1->type_ == T_ORG) {
ParseNode *name_node = NULL; ParseNode *name_node = NULL;
make_name_node(name_node, result->malloc_pool_, "full"); make_name_node(name_node, result->malloc_pool_, "full");
malloc_non_terminal_node($$, result->malloc_pool_, T_ALIAS, $1->num_child_ + 1); $$ = new_node(result->malloc_pool_, T_ALIAS, $1->num_child_ + 1);
for (int i = 0; i <= $1->num_child_; ++i) { if (OB_UNLIKELY($$ == NULL)) {
if (i == 0) { yyerror(NULL, result, "No more space for malloc\n");
$$->children_[i] = $1->children_[i]; YYABORT_NO_MEMORY;
} else if (i == 1) { } else {
$$->children_[i] = name_node; for (int i = 0; i <= $1->num_child_; ++i) {
} else { if (i == 0) {
$$->children_[i] = $1->children_[i - 1]; $$->children_[i] = $1->children_[i];
} else if (i == 1) {
$$->children_[i] = name_node;
} else {
$$->children_[i] = $1->children_[i - 1];
}
} }
} }
} else if ($1->type_ == T_ALIAS && $1->children_[1] != NULL && } else if ($1->type_ == T_ALIAS && $1->children_[1] != NULL &&
...@@ -9046,14 +9051,19 @@ table_factor %prec LOWER_COMMA ...@@ -9046,14 +9051,19 @@ table_factor %prec LOWER_COMMA
if ($1->type_ == T_ORG) { if ($1->type_ == T_ORG) {
ParseNode *name_node = NULL; ParseNode *name_node = NULL;
make_name_node(name_node, result->malloc_pool_, "full"); make_name_node(name_node, result->malloc_pool_, "full");
malloc_non_terminal_node($$, result->malloc_pool_, T_ALIAS, $1->num_child_ + 1); $$ = new_node(result->malloc_pool_, T_ALIAS, $1->num_child_ + 1);
for (int i = 0; i <= $1->num_child_; ++i) { if (OB_UNLIKELY($$ == NULL)) {
if (i == 0) { yyerror(NULL, result, "No more space for malloc\n");
$$->children_[i] = $1->children_[i]; YYABORT_NO_MEMORY;
} else if (i == 1) { } else {
$$->children_[i] = name_node; for (int i = 0; i <= $1->num_child_; ++i) {
} else { if (i == 0) {
$$->children_[i] = $1->children_[i - 1]; $$->children_[i] = $1->children_[i];
} else if (i == 1) {
$$->children_[i] = name_node;
} else {
$$->children_[i] = $1->children_[i - 1];
}
} }
} }
} else if ($1->type_ == T_ALIAS && $1->children_[1] != NULL && } else if ($1->type_ == T_ALIAS && $1->children_[1] != NULL &&
......
...@@ -306,7 +306,7 @@ int ObSSTableMultiVersionRowMultiGetter::inner_open( ...@@ -306,7 +306,7 @@ int ObSSTableMultiVersionRowMultiGetter::inner_open(
} }
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.allocator_))) { } else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.stmt_allocator_))) {
LOG_WARN("failed to new iterator", K(ret)); LOG_WARN("failed to new iterator", K(ret));
} else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_ranges_))) { } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_ranges_))) {
LOG_WARN("failed to open multi scanner", K(ret)); LOG_WARN("failed to open multi scanner", K(ret));
...@@ -431,7 +431,7 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open( ...@@ -431,7 +431,7 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open(
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.allocator_))) { } else if (OB_FAIL(new_iterator<ObSSTableRowMultiScanner>(*access_ctx.stmt_allocator_))) {
LOG_WARN("failed to new iterator", K(ret)); LOG_WARN("failed to new iterator", K(ret));
} else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_ranges_))) { } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_ranges_))) {
LOG_WARN("failed to open scanner", K(ret)); LOG_WARN("failed to open scanner", K(ret));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册