提交 d2970467 编写于 作者: O obdev 提交者: wangzelin.wzl

fix ObServerMetaTableChecker about tabelt replica checksum

上级 481c969b
......@@ -22,6 +22,8 @@
#include "share/schema/ob_multi_version_schema_service.h" // ObMultiVersionSchemaService
#include "observer/omt/ob_multi_tenant.h" // ObMultiTenant
#include "share/tablet/ob_tablet_info.h" // ObTabletInfo
#include "share/ob_tablet_replica_checksum_operator.h" // for ObTabletReplicaChecksumItem
#include "lib/mysqlclient/ob_mysql_transaction.h" // ObMySQLTransaction
namespace oceanbase
{
......@@ -197,7 +199,8 @@ int ObServerMetaTableChecker::check_meta_table(const ObMetaTableCheckType check_
} else {
ARRAY_FOREACH_NORET(nonlocal_tenant_ids, idx) { // ignore ret between each tenant
int64_t ls_residual_count = 0;
int64_t tablet_residual_count = 0;
int64_t meta_residual_count = 0;
int64_t checksum_residual_count = 0;
const uint64_t tenant_id = nonlocal_tenant_ids.at(idx);
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|| is_virtual_tenant_id(tenant_id))) {
......@@ -213,11 +216,12 @@ int ObServerMetaTableChecker::check_meta_table(const ObMetaTableCheckType check_
KR(ret), K(tenant_id), K(ls_residual_count));
}
} else if (CHECK_TABLET_META_TABLE == check_type) {
if (OB_FAIL(check_tablet_table_(tenant_id, tablet_residual_count))) {
if (OB_FAIL(check_tablet_table_(tenant_id, meta_residual_count, checksum_residual_count))) {
LOG_WARN("fail to check tablet meta table", KR(ret), K(tenant_id));
} else if (tablet_residual_count != 0) {
LOG_INFO("ObServerMetaTableChecker found residual tablet and corrected tablet meta table for a tenant",
KR(ret), K(tenant_id), K(tablet_residual_count));
} else if ((0 != meta_residual_count) || (0 != checksum_residual_count)) {
LOG_INFO("ObServerMetaTableChecker found residual tablet, and corrected tablet"
" meta table and tablet replica checksum table for a tenant", KR(ret), K(tenant_id),
K(meta_residual_count), K(checksum_residual_count));
}
} else { // can't be here
ret = OB_INVALID_ARGUMENT;
......@@ -259,11 +263,15 @@ int ObServerMetaTableChecker::check_ls_table_(
int ObServerMetaTableChecker::check_tablet_table_(
const uint64_t tenant_id,
int64_t &residual_count)
int64_t &meta_residual_count,
int64_t &checksum_residual_count)
{
int ret = OB_SUCCESS;
residual_count = 0;
int64_t affected_rows = 0;
int trans_ret = OB_SUCCESS;
meta_residual_count = 0;
checksum_residual_count = 0;
int64_t affected_rows_meta = 0;
int64_t affected_rows_checksum = 0;
const int64_t limit = 1024;
if (OB_UNLIKELY(!inited_) || OB_ISNULL(tt_operator_)) {
ret = OB_NOT_INIT;
......@@ -271,25 +279,40 @@ int ObServerMetaTableChecker::check_tablet_table_(
} else if (OB_UNLIKELY(stopped_)) {
ret = OB_CANCELED;
LOG_WARN("ObServerMetaTableChecker is stopped", KR(ret), K_(tablet_tg_id));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|| is_virtual_tenant_id(tenant_id))) {
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), K(tenant_id));
} else {
do {
if (OB_UNLIKELY(stopped_)) {
common::ObMySQLTransaction trans;
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
} else if (OB_UNLIKELY(stopped_)) {
ret = OB_CANCELED;
LOG_WARN("ObServerMetaTableChecker is stopped", KR(ret), K_(tablet_tg_id));
} else if (OB_FAIL(tt_operator_->remove_residual_tablet(
tenant_id,
GCONF.self_addr_,
limit,
affected_rows))) {
} else if (OB_FAIL(tt_operator_->remove_residual_tablet(trans, tenant_id, GCONF.self_addr_,
limit, affected_rows_meta))) {
LOG_WARN("fail to remove residual tablet by operator", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::remove_residual_checksum(trans,
tenant_id, GCONF.self_addr_, limit, affected_rows_checksum))) {
LOG_WARN("fail to remove residual checksum by operator", KR(ret), K(tenant_id));
} else {
residual_count += affected_rows;
meta_residual_count += affected_rows_meta;
checksum_residual_count += affected_rows_checksum;
}
if (OB_UNLIKELY(affected_rows_meta != affected_rows_checksum)) {
LOG_WARN("affected_rows_meta is not equal to affected_rows_checksum, may due to cluster"
"upgrade", K(tenant_id), K(affected_rows_meta), K(affected_rows_checksum));
}
if (trans.is_started()) {
trans_ret = trans.end(OB_SUCCESS == ret);
if (OB_UNLIKELY(OB_SUCCESS != trans_ret)) {
LOG_WARN("fail to end transaction", KR(trans_ret));
ret = ((OB_SUCCESS == ret) ? trans_ret : ret);
}
}
} while (OB_SUCC(ret) && (limit == affected_rows));
} while (OB_SUCC(ret) && ((limit == affected_rows_meta) || (limit == affected_rows_checksum)));
}
return ret;
}
......
......@@ -89,7 +89,8 @@ private:
int64_t &residual_count);
int check_tablet_table_(
const uint64_t tenant_id,
int64_t &residual_count);
int64_t &meta_residual_count,
int64_t &checksum_residual_count);
bool inited_;
bool stopped_;
......
......@@ -422,6 +422,38 @@ int ObTabletReplicaChecksumOperator::inner_batch_remove_by_sql_(
return ret;
}
int ObTabletReplicaChecksumOperator::remove_residual_checksum(
ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObAddr &server,
const int64_t limit,
int64_t &affected_rows)
{
int ret = OB_SUCCESS;
affected_rows = 0;
char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
ObSqlString sql;
const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|| is_virtual_tenant_id(tenant_id)
|| !server.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(server));
} else if (OB_UNLIKELY(!server.ip_to_string(ip, sizeof(ip)))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("convert server ip to string failed", KR(ret), K(server));
} else if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE tenant_id = %lu AND svr_ip = '%s' AND"
" svr_port = %d limit %ld", OB_ALL_TABLET_REPLICA_CHECKSUM_TNAME, tenant_id, ip,
server.get_port(), limit))) {
LOG_WARN("assign sql string failed", KR(ret), K(sql));
} else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("execute sql failed", KR(ret), K(sql), K(sql_tenant_id));
} else if (affected_rows > 0) {
LOG_INFO("finish to remove residual checksum", KR(ret), K(tenant_id), K(affected_rows));
}
return ret;
}
int ObTabletReplicaChecksumOperator::batch_get(
const uint64_t tenant_id,
const ObTabletLSPair &start_pair,
......
......@@ -127,6 +127,12 @@ public:
common::ObMySQLTransaction &trans,
const uint64_t tenant_id,
const common::ObIArray<share::ObTabletReplica> &tablet_replicas);
static int remove_residual_checksum(
common::ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObAddr &server,
const int64_t limit,
int64_t &affected_rows);
static int check_column_checksum(
const uint64_t tenant_id,
......
......@@ -678,6 +678,7 @@ int ObTabletTableOperator::fill_remove_dml_splicer_(
}
int ObTabletTableOperator::remove_residual_tablet(
ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObAddr &server,
const int64_t limit,
......@@ -688,7 +689,7 @@ int ObTabletTableOperator::remove_residual_tablet(
char ip[OB_MAX_SERVER_ADDR_SIZE] = "";
ObSqlString sql;
const uint64_t sql_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_UNLIKELY(!inited_) || OB_ISNULL(sql_proxy_)) {
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(
......@@ -708,7 +709,7 @@ int ObTabletTableOperator::remove_residual_tablet(
server.get_port(),
limit))) {
LOG_WARN("assign sql string failed", KR(ret), K(sql));
} else if (OB_FAIL(sql_proxy_->write(sql_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("execute sql failed", KR(ret), K(sql), K(sql_tenant_id));
} else if (affected_rows > 0) {
LOG_INFO("finish to remove residual tablet", KR(ret), K(tenant_id), K(affected_rows));
......
......@@ -124,11 +124,13 @@ public:
const ObIArray<ObTabletReplica> &replicas);
// remove residual tablet in __all_tablet_meta_table for ObServerMetaTableChecker
//
// @param [in] sql_client, client for executing query
// @param [in] tenant_id, tenant for query
// @param [in] server, target ObAddr
// @param [in] limit, limit number for delete sql
// @param [out] residual_count, count of residual tablets in table
int remove_residual_tablet(
ObISQLClient &sql_client,
const uint64_t tenant_id,
const ObAddr &server,
const int64_t limit,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册