提交 9bbbb315 编写于 作者: W wanhong.wwh 提交者: wangzelin.wzl

WRS: add weak read version verification

上级 60c37863
......@@ -193,6 +193,12 @@ OB_INLINE bool is_valid_membership_version(const int64_t membership_version)
return membership_version >= 0;
}
OB_INLINE bool is_valid_read_snapshot_version(const int64_t read_snapshot_version)
{
// read snapshot version should be greater than 0 and should not be INT64_MAX
return read_snapshot_version > 0 && INT64_MAX != read_snapshot_version;
}
inline bool is_schema_error(int err)
{
bool ret = false;
......
......@@ -663,13 +663,12 @@ int ObTenantWeakReadClusterService::update_version(int64_t& affected_rows)
} else if (OB_UNLIKELY(cur_leader_epoch != leader_epoch_)) {
ret = OB_NOT_MASTER;
CLUSTER_ISTAT("WRS leader changed when update CLUSTER version, need stop CLUSTER weak read service",
"tenant_id",
wrs_pkey_.get_tenant_id(),
K(cur_leader_epoch),
K(leader_epoch_));
} else if (!check_can_update_version_()) {
"tenant_id", wrs_pkey_.get_tenant_id(), K(cur_leader_epoch), K(leader_epoch_));
} else if (! check_can_update_version_()) {
// check if can update version or not
} else if (OB_UNLIKELY((new_version = compute_version_(skipped_server_count, need_print)) <= 0)) {
} else if (FALSE_IT(new_version = compute_version_(skipped_server_count, need_print))) {
// nothing to do
} else if (OB_UNLIKELY(! is_valid_read_snapshot_version(new_version))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid CLUSTER weak read version", K(new_version), KR(ret));
} else {
......
......@@ -15,6 +15,8 @@
#include "share/ob_errno.h"
#include "ob_tenant_weak_read_server_version_mgr.h"
#include "share/ob_define.h" // is_valid_read_snapshot_version
using namespace oceanbase::common;
namespace oceanbase {
namespace transaction {
......@@ -64,7 +66,8 @@ int ObTenantWeakReadServerVersionMgr::update_with_part_info(const uint64_t tenan
const bool need_skip, const bool is_user_part, const int64_t version)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(epoch_tstamp <= 0) || OB_UNLIKELY(!need_skip && version <= 0)) {
if (OB_UNLIKELY(epoch_tstamp <= 0)
|| OB_UNLIKELY(! need_skip && ! is_valid_read_snapshot_version(version))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument, epoch or version",
K(ret),
......@@ -84,8 +87,9 @@ int ObTenantWeakReadServerVersionMgr::generate_new_version(const uint64_t tenant
const int64_t base_version_when_no_valid_partition, const bool need_print_status)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(epoch_tstamp <= 0) || OB_UNLIKELY(OB_INVALID_ID == tenant_id) ||
OB_UNLIKELY(base_version_when_no_valid_partition <= 0)) {
if (OB_UNLIKELY(epoch_tstamp <= 0)
|| OB_UNLIKELY(OB_INVALID_ID == tenant_id)
|| OB_UNLIKELY(! is_valid_read_snapshot_version(base_version_when_no_valid_partition))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(epoch_tstamp), K(tenant_id), K(base_version_when_no_valid_partition));
}
......@@ -214,8 +218,9 @@ int ObTenantWeakReadServerVersionMgr::ServerVersionInner::amend(
int ObTenantWeakReadServerVersionMgr::ServerVersionInner::update(const ServerVersionInner& new_version)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(0 == new_version.epoch_tstamp_) || OB_UNLIKELY(epoch_tstamp_ >= new_version.epoch_tstamp_) ||
OB_UNLIKELY(new_version.version_ <= 0)) {
if (OB_UNLIKELY(0 == new_version.epoch_tstamp_)
|| OB_UNLIKELY(epoch_tstamp_ >= new_version.epoch_tstamp_)
|| OB_UNLIKELY(! is_valid_read_snapshot_version(new_version.version_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid new server version", KR(ret), K(new_version), KPC(this));
} else {
......
......@@ -221,8 +221,9 @@ int ObTenantWeakReadService::get_cluster_version_by_rpc_(int64_t& version)
} else if (OB_SUCCESS != resp.err_code_) {
LOG_WARN("get weak read cluster version RPC return error", K(resp), K(tenant_id_), K(cluster_service_master));
ret = resp.err_code_;
} else if (OB_UNLIKELY(resp.version_ <= 0)) {
LOG_WARN("invalid weak read cluster version from RPC", K(resp), K(tenant_id_), K(cluster_service_master));
} else if (OB_UNLIKELY(! is_valid_read_snapshot_version(resp.version_))) {
LOG_WARN("invalid weak read cluster version from RPC", K(resp), K(tenant_id_),
K(cluster_service_master));
ret = OB_INVALID_ERROR;
} else {
version = resp.version_;
......
......@@ -99,11 +99,11 @@ int ObWeakReadService::get_server_version(const uint64_t tenant_id, int64_t& ver
LOG_WARN("change tenant context fail when get weak read service server version", KR(ret), K(tenant_id));
}
if (OB_SUCC(ret) && version <= 0) {
if (OB_SUCC(ret) && ! is_valid_read_snapshot_version(version)) {
int old_ret = ret;
ret = OB_ERR_UNEXPECTED;
LOG_ERROR(
"get server version succ, but version not bigger than zero", K(ret), K(old_ret), K(tenant_id), K(version));
LOG_ERROR("get server version succ, but version is not valid snapshot version", K(ret), K(old_ret),
K(tenant_id), K(version));
}
LOG_DEBUG("[WRS] get_server_version", K(ret), K(tenant_id), K(version));
......@@ -131,11 +131,11 @@ int ObWeakReadService::get_cluster_version(const uint64_t tenant_id, int64_t& ve
LOG_WARN("change tenant context fail when get weak read service cluster version", KR(ret), K(tenant_id));
}
if (OB_SUCC(ret) && version <= 0) {
if (OB_SUCC(ret) && ! is_valid_read_snapshot_version(version)) {
int old_ret = ret;
ret = OB_ERR_UNEXPECTED;
LOG_ERROR(
"get cluster version succ, but version not bigger than zero", K(ret), K(old_ret), K(tenant_id), K(version));
LOG_ERROR("get cluster version succ, but version is not valid snapshot version", K(ret), K(old_ret),
K(tenant_id), K(version));
}
LOG_DEBUG("[WRS] get_cluster_version", K(ret), K(tenant_id), K(version));
......@@ -328,15 +328,19 @@ int ObWeakReadService::handle_partition_(ObIPartitionGroup& part, bool& need_ski
ObTenantWeakReadService* twrs = NULL;
const ObPartitionKey& pkey = part.get_partition_key();
uint64_t tenant_id = pkey.get_tenant_id();
ObPartitionService& ps = ObPartitionService::get_instance();
ObPartitionService &ps = ObPartitionService::get_instance();
// tenant maybe not exist, pass second parameter to ignore WARN log
int64_t max_stale_time =
ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id, ObWeakReadUtil::IGNORE_TENANT_EXIST_WARN);
int64_t max_stale_time = ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id,
ObWeakReadUtil::IGNORE_TENANT_EXIST_WARN);
// generate partition level weak read snapshot version
if (OB_FAIL(
ps.generate_partition_weak_read_snapshot_version(part, need_skip, is_user_part, version, max_stale_time))) {
LOG_WARN("generate partition weak read snapshot version fail", K(ret), K(pkey));
if (OB_FAIL(ps.generate_partition_weak_read_snapshot_version(part, need_skip,
is_user_part, version, max_stale_time))) {
LOG_WARN("generate partition weak read snapshot version fail", KR(ret), K(pkey));
} else if (OB_UNLIKELY(!need_skip && ! is_valid_read_snapshot_version(version))) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected generated partitoin weak read snapshot version",
KR(ret), "pkey", part.get_partition_key(), K(is_user_part), K(version));
} else {
// switch tenant, get tenant weak read service
FETCH_ENTITY(TENANT_SPACE, tenant_id)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册