You need to sign in or sign up before continuing.
提交 68f71563 编写于 作者: H Hongqin-Li 提交者: wangzelin.wzl

[CP] Fix snapshot releasing bug when building index

上级 1190e78c
...@@ -672,6 +672,8 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) ...@@ -672,6 +672,8 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end)
} }
if (OB_SUCC(ret) && is_end) { if (OB_SUCC(ret) && is_end) {
// report wait trans status // report wait trans status
ObMySQLTransaction trans;
common::ObMySQLProxy &proxy = ddl_service_->get_sql_proxy();
const int64_t partition_id = -1; const int64_t partition_id = -1;
int64_t frozen_timestamp = 0; int64_t frozen_timestamp = 0;
ObIndexTransStatus report_status; ObIndexTransStatus report_status;
...@@ -681,12 +683,15 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) ...@@ -681,12 +683,15 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end)
report_status.schema_version_ = index_schema->get_schema_version(); report_status.schema_version_ = index_schema->get_schema_version();
if (OB_FAIL(ddl_service_->get_zone_mgr().get_frozen_info(report_status.frozen_version_, frozen_timestamp))) { if (OB_FAIL(ddl_service_->get_zone_mgr().get_frozen_info(report_status.frozen_version_, frozen_timestamp))) {
LOG_WARN("fail to get frozen info", K(ret)); LOG_WARN("fail to get frozen info", K(ret));
} else if (OB_FAIL(trans.start(&proxy))) {
LOG_WARN("fail to start trans", K(ret));
} else if (OB_FAIL(acquire_snapshot(report_status.snapshot_version_, } else if (OB_FAIL(acquire_snapshot(report_status.snapshot_version_,
index_schema->get_data_table_id(), index_schema->get_data_table_id(),
report_status.schema_version_))) { report_status.schema_version_,
trans))) {
if (OB_SNAPSHOT_DISCARDED == ret) { if (OB_SNAPSHOT_DISCARDED == ret) {
STORAGE_LOG(WARN, "snapshot discard", K(ret), K(index_id_)); STORAGE_LOG(WARN, "snapshot discard", K(ret), K(index_id_));
if (OB_FAIL(ObIndexTransStatusReporter::delete_wait_trans_status(index_id_, ddl_service_->get_sql_proxy()))) { if (OB_FAIL(ObIndexTransStatusReporter::delete_wait_trans_status(index_id_, trans))) {
LOG_WARN("fail to delete wait trans status", K(ret)); LOG_WARN("fail to delete wait trans status", K(ret));
} else { } else {
report_status.snapshot_version_ = 0; report_status.snapshot_version_ = 0;
...@@ -694,7 +699,7 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) ...@@ -694,7 +699,7 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end)
ObIndexTransStatusReporter::ROOT_SERVICE, ObIndexTransStatusReporter::ROOT_SERVICE,
partition_id, partition_id,
report_status, report_status,
ddl_service_->get_sql_proxy()))) { trans))) {
LOG_WARN("fail to report wait trans status", K(ret)); LOG_WARN("fail to report wait trans status", K(ret));
} }
} }
...@@ -705,10 +710,20 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end) ...@@ -705,10 +710,20 @@ int ObRSBuildIndexTask::wait_trans_end(bool& is_end)
ObIndexTransStatusReporter::ROOT_SERVICE, ObIndexTransStatusReporter::ROOT_SERVICE,
partition_id, partition_id,
report_status, report_status,
ddl_service_->get_sql_proxy()))) { trans))) {
is_end = false; is_end = false;
LOG_WARN("fail to report wait trans status", K(ret)); LOG_WARN("fail to report wait trans status", K(ret));
} }
if (trans.is_started()) {
bool is_commit = (ret == OB_SUCCESS);
int tmp_ret = trans.end(is_commit);
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("fail to end trans", K(ret), K(tmp_ret), K(is_commit));
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
}
}
} }
} }
return ret; return ret;
...@@ -899,8 +914,8 @@ int ObRSBuildIndexTask::calc_snapshot_version(const int64_t max_commit_version, ...@@ -899,8 +914,8 @@ int ObRSBuildIndexTask::calc_snapshot_version(const int64_t max_commit_version,
return ret; return ret;
} }
int ObRSBuildIndexTask::acquire_snapshot( int ObRSBuildIndexTask::acquire_snapshot(const int64_t snapshot_version, const int64_t data_table_id,
const int64_t snapshot_version, const int64_t data_table_id, const int64_t schema_version) const int64_t schema_version, ObMySQLTransaction &trans)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) { if (OB_UNLIKELY(!is_inited_)) {
...@@ -916,23 +931,12 @@ int ObRSBuildIndexTask::acquire_snapshot( ...@@ -916,23 +931,12 @@ int ObRSBuildIndexTask::acquire_snapshot(
info.schema_version_ = schema_version; info.schema_version_ = schema_version;
info.tenant_id_ = extract_tenant_id(index_id_); info.tenant_id_ = extract_tenant_id(index_id_);
info.table_id_ = data_table_id; info.table_id_ = data_table_id;
ObMySQLTransaction trans; if (!info.is_valid()) {
common::ObMySQLProxy& proxy = ddl_service_->get_sql_proxy(); ret = OB_INVALID_ARGUMENT;
if (OB_FAIL(trans.start(&proxy))) { LOG_WARN("invalid argument", K(ret), K(info));
LOG_WARN("fail to start trans", K(ret));
} else if (OB_FAIL(ddl_service_->get_snapshot_mgr().acquire_snapshot_for_building_index(trans, info, index_id_))) { } else if (OB_FAIL(ddl_service_->get_snapshot_mgr().acquire_snapshot_for_building_index(trans, info, index_id_))) {
LOG_WARN("fail to acquire snapshot", K(ret), K(index_id_), K(data_table_id), K(info)); LOG_WARN("fail to acquire snapshot", K(ret), K(index_id_), K(data_table_id), K(info));
} }
if (trans.is_started()) {
bool is_commit = (ret == OB_SUCCESS);
int tmp_ret = trans.end(is_commit);
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("fail to end trans", K(ret), K(is_commit));
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
}
}
} }
return ret; return ret;
} }
......
...@@ -170,8 +170,9 @@ private: ...@@ -170,8 +170,9 @@ private:
int wait_trans_end(bool& is_end); int wait_trans_end(bool& is_end);
int wait_build_index_end(bool& is_end); int wait_build_index_end(bool& is_end);
bool need_print_log(); bool need_print_log();
int calc_snapshot_version(const int64_t max_commit_version, int64_t& snapshot_version); int calc_snapshot_version(const int64_t max_commit_version, int64_t &snapshot_version);
int acquire_snapshot(const int64_t snapshot_version, const int64_t data_table_id, const int64_t schema_version); int acquire_snapshot(const int64_t snapshot_version, const int64_t data_table_id, const int64_t schema_version,
common::ObMySQLTransaction &trans);
int release_snapshot(); int release_snapshot();
int remove_index_build_stat_record(); int remove_index_build_stat_record();
......
...@@ -26,7 +26,7 @@ ObIndexTransStatusReporter::~ObIndexTransStatusReporter() ...@@ -26,7 +26,7 @@ ObIndexTransStatusReporter::~ObIndexTransStatusReporter()
{} {}
int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_table_id, const int svr_type, int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_table_id, const int svr_type,
const int64_t partition_id, const ObIndexTransStatus& status, ObMySQLProxy& sql_proxy) const int64_t partition_id, const ObIndexTransStatus &status, ObISQLClient &sql_client)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
char ip[common::OB_MAX_SERVER_ADDR_SIZE] = ""; char ip[common::OB_MAX_SERVER_ADDR_SIZE] = "";
...@@ -48,7 +48,7 @@ int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_ta ...@@ -48,7 +48,7 @@ int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_ta
OB_FAIL(dml.add_column("schema_version", status.schema_version_))) { OB_FAIL(dml.add_column("schema_version", status.schema_version_))) {
LOG_WARN("fail to add column", K(ret)); LOG_WARN("fail to add column", K(ret));
} else { } else {
ObDMLExecHelper exec(sql_proxy, OB_SYS_TENANT_ID); ObDMLExecHelper exec(sql_client, OB_SYS_TENANT_ID);
int64_t affected_rows = 0; int64_t affected_rows = 0;
if (OB_FAIL(exec.exec_insert_update(OB_ALL_INDEX_WAIT_TRANSACTION_STATUS_TNAME, dml, affected_rows))) { if (OB_FAIL(exec.exec_insert_update(OB_ALL_INDEX_WAIT_TRANSACTION_STATUS_TNAME, dml, affected_rows))) {
LOG_WARN("fail to exeucte dml", K(ret)); LOG_WARN("fail to exeucte dml", K(ret));
...@@ -58,7 +58,7 @@ int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_ta ...@@ -58,7 +58,7 @@ int ObIndexTransStatusReporter::report_wait_trans_status(const uint64_t index_ta
return ret; return ret;
} }
int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_table_id, ObMySQLProxy& sql_proxy) int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_table_id, ObISQLClient &sql_client)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_INVALID_ID == index_table_id) { if (OB_INVALID_ID == index_table_id) {
...@@ -70,7 +70,7 @@ int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_ta ...@@ -70,7 +70,7 @@ int ObIndexTransStatusReporter::delete_wait_trans_status(const uint64_t index_ta
OB_FAIL(dml.add_pk_column(K(index_table_id)))) { OB_FAIL(dml.add_pk_column(K(index_table_id)))) {
LOG_WARN("fail to add column", K(ret), K(index_table_id)); LOG_WARN("fail to add column", K(ret), K(index_table_id));
} else { } else {
ObDMLExecHelper exec(sql_proxy, OB_SYS_TENANT_ID); ObDMLExecHelper exec(sql_client, OB_SYS_TENANT_ID);
int64_t affected_rows = 0; int64_t affected_rows = 0;
if (OB_FAIL(exec.exec_delete(OB_ALL_INDEX_WAIT_TRANSACTION_STATUS_TNAME, dml, affected_rows))) { if (OB_FAIL(exec.exec_delete(OB_ALL_INDEX_WAIT_TRANSACTION_STATUS_TNAME, dml, affected_rows))) {
LOG_WARN("fail to exec delete", K(ret)); LOG_WARN("fail to exec delete", K(ret));
......
...@@ -52,10 +52,10 @@ public: ...@@ -52,10 +52,10 @@ public:
ObIndexTransStatusReporter(); ObIndexTransStatusReporter();
virtual ~ObIndexTransStatusReporter(); virtual ~ObIndexTransStatusReporter();
static int report_wait_trans_status(const uint64_t index_table_id, const int svr_type, const int64_t partition_id, static int report_wait_trans_status(const uint64_t index_table_id, const int svr_type, const int64_t partition_id,
const ObIndexTransStatus& status, common::ObMySQLProxy& mysql_proxy); const ObIndexTransStatus &status, common::ObISQLClient &mysql_client);
static int get_wait_trans_status(const uint64_t index_table_id, const int svr_type, const int64_t partition_id, static int get_wait_trans_status(const uint64_t index_table_id, const int svr_type, const int64_t partition_id,
common::ObMySQLProxy& sql_proxy, ObIndexTransStatus& status); common::ObMySQLProxy &sql_proxy, ObIndexTransStatus &status);
static int delete_wait_trans_status(const uint64_t index_table_id, common::ObMySQLProxy& mysql_proxy); static int delete_wait_trans_status(const uint64_t index_table_id, common::ObISQLClient &mysql_client);
}; };
} // end namespace share } // end namespace share
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册