diff --git a/src/rootserver/ob_index_builder.cpp b/src/rootserver/ob_index_builder.cpp index 4cb421f1fc0487771e481d3e4449b692470e1150..59166345bb98113bb0aa908bc91d2ba9fb124044 100644 --- a/src/rootserver/ob_index_builder.cpp +++ b/src/rootserver/ob_index_builder.cpp @@ -672,6 +672,8 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) } if (OB_SUCC(ret) && is_end) { // report wait trans status + ObMySQLTransaction trans; + common::ObMySQLProxy &proxy = ddl_service_->get_sql_proxy(); const int64_t partition_id = -1; int64_t frozen_timestamp = 0; ObIndexTransStatus report_status; @@ -681,12 +683,15 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) report_status.schema_version_ = index_schema->get_schema_version(); if (OB_FAIL(ddl_service_->get_zone_mgr().get_frozen_info(report_status.frozen_version_, frozen_timestamp))) { LOG_WARN("fail to get frozen info", K(ret)); + } else if (OB_FAIL(trans.start(&proxy))) { + LOG_WARN("fail to start trans", K(ret)); } else if (OB_FAIL(acquire_snapshot(report_status.snapshot_version_, index_schema->get_data_table_id(), - report_status.schema_version_))) { + report_status.schema_version_, + trans))) { if (OB_SNAPSHOT_DISCARDED == ret) { STORAGE_LOG(WARN, "snapshot discard", K(ret), K(index_id_)); - if (OB_FAIL(ObIndexTransStatusReporter::delete_wait_trans_status(index_id_, ddl_service_->get_sql_proxy()))) { + if (OB_FAIL(ObIndexTransStatusReporter::delete_wait_trans_status(index_id_, trans))) { LOG_WARN("fail to delete wait trans status", K(ret)); } else { report_status.snapshot_version_ = 0; @@ -694,7 +699,7 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) ObIndexTransStatusReporter::ROOT_SERVICE, partition_id, report_status, - ddl_service_->get_sql_proxy()))) { + trans))) { LOG_WARN("fail to report wait trans status", K(ret)); } } @@ -705,10 +710,20 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) ObIndexTransStatusReporter::ROOT_SERVICE, partition_id, report_status, - ddl_service_->get_sql_proxy()))) { + trans))) { is_end = false; LOG_WARN("fail to report wait trans status", K(ret)); } + if (trans.is_started()) { + bool is_commit = (ret == OB_SUCCESS); + int tmp_ret = trans.end(is_commit); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(is_commit)); + if (OB_SUCC(ret)) { + ret = tmp_ret; + } + } + } } } return ret; @@ -899,8 +914,8 @@ int ObRSBuildIndexTask::calc_snapshot_version(const int64_t max_commit_version, return ret; } -int ObRSBuildIndexTask::acquire_snapshot( - const int64_t snapshot_version, const int64_t data_table_id, const int64_t schema_version) +int ObRSBuildIndexTask::acquire_snapshot(const int64_t snapshot_version, const int64_t data_table_id, + const int64_t schema_version, ObMySQLTransaction &trans) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -916,23 +931,12 @@ int ObRSBuildIndexTask::acquire_snapshot( info.schema_version_ = schema_version; info.tenant_id_ = extract_tenant_id(index_id_); info.table_id_ = data_table_id; - ObMySQLTransaction trans; - common::ObMySQLProxy& proxy = ddl_service_->get_sql_proxy(); - if (OB_FAIL(trans.start(&proxy))) { - LOG_WARN("fail to start trans", K(ret)); + if (!info.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(info)); } else if (OB_FAIL(ddl_service_->get_snapshot_mgr().acquire_snapshot_for_building_index(trans, info, index_id_))) { LOG_WARN("fail to acquire snapshot", K(ret), K(index_id_), K(data_table_id), K(info)); } - if (trans.is_started()) { - bool is_commit = (ret == OB_SUCCESS); - int tmp_ret = trans.end(is_commit); - if (OB_SUCCESS != tmp_ret) { - LOG_WARN("fail to end trans", K(ret), K(is_commit)); - if (OB_SUCC(ret)) { - ret = tmp_ret; - } - } - } } return ret; } diff --git a/src/rootserver/ob_index_builder.h b/src/rootserver/ob_index_builder.h index 6b439579fdc8035e0e09c468c5703e322ba9e0d3..75a0019a85c6702d722427e2b699087d600e28e8 100644 --- a/src/rootserver/ob_index_builder.h +++ b/src/rootserver/ob_index_builder.h @@ -170,8 +170,9 @@ private: int wait_trans_end(bool& is_end); int wait_build_index_end(bool& is_end); bool need_print_log(); - int calc_snapshot_version(const int64_t max_commit_version, int64_t& snapshot_version); - int acquire_snapshot(const int64_t snapshot_version, const int64_t data_table_id, const int64_t schema_version); + int calc_snapshot_version(const int64_t max_commit_version, int64_t &snapshot_version); + int acquire_snapshot(const int64_t snapshot_version, const int64_t data_table_id, const int64_t schema_version, + common::ObMySQLTransaction &trans); int release_snapshot(); int remove_index_build_stat_record(); diff --git a/src/share/ob_index_trans_status_reporter.cpp b/src/share/ob_index_trans_status_reporter.cpp index 2f81e2e4e807dcd288851632112cfcaf827ba9df..e417b5d09d21894ec86d380ddbf5a97951083c95 100644 --- a/src/share/ob_index_trans_status_reporter.cpp +++ b/src/share/ob_index_trans_status_reporter.cpp @@ -26,7 +26,7 @@ ObIndexTransStatusReporter::~ObIndexTransStatusReporter() {} int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_table_id, const int svr_type, - const int64_t partition_id, const ObIndexTransStatus& status, ObMySQLProxy& sql_proxy) + const int64_t partition_id, const ObIndexTransStatus &status, ObISQLClient &sql_client) { int ret = OB_SUCCESS; char ip[common::OB_MAX_SERVER_ADDR_SIZE] = ""; @@ -48,7 +48,7 @@ int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_ta OB_FAIL(dml.add_column("schema_version", status.schema_version_))) { LOG_WARN("fail to add column", K(ret)); } else { - ObDMLExecHelper exec(sql_proxy, OB_SYS_TENANT_ID); + ObDMLExecHelper exec(sql_client, OB_SYS_TENANT_ID); int64_t affected_rows = 0; if (OB_FAIL(exec.exec_insert_update(OB_ALL_INDEX_WAIT_TRANSACTION_STATUS_TNAME, dml, affected_rows))) { LOG_WARN("fail to exeucte dml", K(ret)); @@ -58,7 +58,7 @@ int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_ta return ret; } -int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_table_id, ObMySQLProxy& sql_proxy) +int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_table_id, ObISQLClient &sql_client) { int ret = OB_SUCCESS; if (OB_INVALID_ID == index_table_id) { @@ -70,7 +70,7 @@ int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_ta OB_FAIL(dml.add_pk_column(K(index_table_id)))) { LOG_WARN("fail to add column", K(ret), K(index_table_id)); } else { - ObDMLExecHelper exec(sql_proxy, OB_SYS_TENANT_ID); + ObDMLExecHelper exec(sql_client, OB_SYS_TENANT_ID); int64_t affected_rows = 0; if (OB_FAIL(exec.exec_delete(OB_ALL_INDEX_WAIT_TRANSACTION_STATUS_TNAME, dml, affected_rows))) { LOG_WARN("fail to exec delete", K(ret)); diff --git a/src/share/ob_index_trans_status_reporter.h b/src/share/ob_index_trans_status_reporter.h index 8b3b9d34b0461bd266238700f683ce72a078d423..1d48ee8d64132bcc7c6e26608d132cd95a3ba519 100644 --- a/src/share/ob_index_trans_status_reporter.h +++ b/src/share/ob_index_trans_status_reporter.h @@ -52,10 +52,10 @@ public: ObIndexTransStatusReporter(); virtual ~ObIndexTransStatusReporter(); static int report_wait_trans_status(const uint64_t index_table_id, const int svr_type, const int64_t partition_id, - const ObIndexTransStatus& status, common::ObMySQLProxy& mysql_proxy); + const ObIndexTransStatus &status, common::ObISQLClient &mysql_client); static int get_wait_trans_status(const uint64_t index_table_id, const int svr_type, const int64_t partition_id, - common::ObMySQLProxy& sql_proxy, ObIndexTransStatus& status); - static int delete_wait_trans_status(const uint64_t index_table_id, common::ObMySQLProxy& mysql_proxy); + common::ObMySQLProxy &sql_proxy, ObIndexTransStatus &status); + static int delete_wait_trans_status(const uint64_t index_table_id, common::ObISQLClient &mysql_client); }; } // end namespace share