提交 70016c6a 编写于 作者: Y ym0 提交者: LINGuanRen

Erase location cache

上级 d8961383
......@@ -43,6 +43,7 @@
#include "share/ob_server_blacklist.h"
#include "observer/ob_server_struct.h"
#include "rootserver/ob_rs_async_rpc_proxy.h"
#include "share/cache/ob_cache_name_define.h"
namespace oceanbase {
using namespace common;
......@@ -319,6 +320,26 @@ int ObLocationLeaderCache::set_strong_leader_info(
return ret;
}
// OB_INVALID_TENANT_ID means flush all tenant's leader cache
int ObLocationLeaderCache::flush_cache(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < CACHE_NUM; i++) {
ObLocationLeaderInfo &info = buffer_[i];
SpinWLockGuard guard(info.get_lock());
ObLocationLeader *&value_ptr = info.get_value();
if (OB_NOT_NULL(value_ptr)) {
const ObLocationCacheKey &key = value_ptr->get_key();
if (OB_INVALID_TENANT_ID == tenant_id || tenant_id == extract_tenant_id(key.table_id_)) {
// reset cache
value_ptr->set_strong_leader_info(LocationInfo());
LOG_TRACE("flush user leader cache", KR(ret), K(key));
}
}
}
return ret;
}
///////////////////////////////////////////
bool ObILocationFetcher::treat_sql_as_timeout(const int error_code)
{
......@@ -512,10 +533,7 @@ int ObILocationFetcher::fill_location(ObPartitionInfo& partition_info, ObPartiti
location.set_renew_time(ObTimeUtility::current_time());
location.set_sql_renew_time(ObTimeUtility::current_time());
ObReplicaLocation leader;
if (location.size() <= 0) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("location is empty", K(ret), K(location));
} else if (OB_FAIL(location.get_leader_by_election(leader))) {
if (OB_FAIL(location.get_leader_by_election(leader))) {
if (OB_LOCATION_LEADER_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_DEBUG("location leader not exist", K(ret), K(location));
......@@ -3263,6 +3281,7 @@ int ObPartitionLocationCache::renew_location(const uint64_t table_id, const int6
bool refresh_by_rpc = false;
bool refresh_by_sql = false;
bool can_erase = false;
bool exist_in_cache = true;
// try to get from cache, maybe other thread has already fetched location
if (OB_SUCC(ret)) {
......@@ -3378,6 +3397,7 @@ int ObPartitionLocationCache::renew_location(const uint64_t table_id, const int6
}
}
} else {
can_erase = true;
EVENT_INC(LOCATION_CACHE_SQL_RENEW);
}
if (location.get_replica_locations().count() > 0) {
......@@ -3395,13 +3415,17 @@ int ObPartitionLocationCache::renew_location(const uint64_t table_id, const int6
}
if (OB_SUCC(ret)) {
if (OB_FAIL(update_location(table_id, partition_id, cluster_id, new_location))) {
LOG_WARN("update location in cache failed", K(ret), KT(table_id), K(partition_id), K(new_location));
if (OB_FAIL(update_location(table_id, partition_id, cluster_id, can_erase, new_location))) {
LOG_WARN(
"update location in cache failed", K(ret), KT(table_id), K(partition_id), K(can_erase), K(new_location));
} else if (result_filter_not_readable_replica &&
OB_FAIL(location.assign_with_only_readable_replica(new_location))) {
LOG_WARN("assign with only readable replica fail", K(ret), K(location), K(new_location));
} else if (!result_filter_not_readable_replica && OB_FAIL(location.assign(new_location))) {
LOG_WARN("assign location fail", K(ret), K(location), K(new_location));
} else if (location.size() <= 0) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("location is empty", KR(ret), K(location));
}
}
} else {
......@@ -3492,7 +3516,8 @@ int ObPartitionLocationCache::check_skip_rpc_renew_v2(const ObPartitionLocation&
}
int ObPartitionLocationCache::update_location(
const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id, const ObPartitionLocation& location)
const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id,
const bool can_erase, const ObPartitionLocation& location)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
......@@ -3501,6 +3526,13 @@ int ObPartitionLocationCache::update_location(
} else if (!ObIPartitionTable::is_valid_key(table_id, partition_id) || !location.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KT(table_id), K(partition_id), K(location));
} else if (location.size() <= 0) {
if (!can_erase) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("location is empty", KR(ret), K(table_id), K(partition_id), K(cluster_id), K(location));
} else if (OB_FAIL(erase_location(table_id, partition_id, cluster_id))) {
LOG_WARN("fail to erase location", KR(ret), K(table_id), K(partition_id), K(cluster_id));
}
} else {
ObLocationCacheKey cache_key(table_id, partition_id, cluster_id);
if (use_sys_cache(table_id)) {
......@@ -3565,6 +3597,45 @@ int ObPartitionLocationCache::update_location(
return ret;
}
int ObPartitionLocationCache::erase_location(
const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (!ObIPartitionTable::is_valid_key(table_id, partition_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KT(table_id), K(partition_id), K(cluster_id));
} else if (use_sys_cache(table_id)) {
// use_sys_cache() includes user_sys_leader_cache(), and sys cache shouldn't be erased.
} else {
ObLocationCacheKey cache_key(table_id, partition_id, cluster_id);
// try erase user leader cache
if (cluster_id_ == cluster_id) {
LocationInfo leader_info;
int tmp_ret = leader_cache_.get_strong_leader_info(cache_key, leader_info);
if (OB_SUCCESS == tmp_ret) {
leader_info.reset();
tmp_ret = leader_cache_.set_strong_leader_info(cache_key, leader_info, false /*force update*/);
LOG_TRACE("erase user leader cache", KR(tmp_ret), K(cache_key));
}
}
// try erase user location cache
if (OB_FAIL(user_cache_.erase(cache_key))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_TRACE("user location cache not exist", K(cache_key));
} else {
LOG_WARN("fail to erase user location cache", KR(ret), K(cache_key));
}
} else {
LOG_TRACE("erase user location cache", K(cache_key));
}
}
return ret;
}
int ObPartitionLocationCache::clear_location(
const uint64_t table_id, const int64_t partition_id, const int64_t expire_renew_time, const int64_t cluster_id)
{
......@@ -4407,6 +4478,7 @@ int ObPartitionLocationCache::batch_renew_location(const common::ObIArray<ObLoca
// Step 3 : batch renew
if (OB_SUCC(ret) && locations.count() > 0) {
bool can_erase = false;
ObSEArray<ObPartitionLocation*, UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM> new_locations;
common::ObTimeoutCtx ctx;
if (OB_FAIL(set_batch_timeout_ctx(locations.count(), renew_type, ctx))) {
......@@ -4459,6 +4531,7 @@ int ObPartitionLocationCache::batch_renew_location(const common::ObIArray<ObLoca
LOG_WARN("fail to batch renew sys table location by rpc", K(tmp_ret), "key_cnt", renew_keys.count());
}
} else {
can_erase = true;
EVENT_INC(LOCATION_CACHE_SQL_RENEW);
}
} else {
......@@ -4482,8 +4555,11 @@ int ObPartitionLocationCache::batch_renew_location(const common::ObIArray<ObLoca
if (OB_ISNULL(new_location)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("location is null", K(ret), K(i));
} else if (OB_FAIL(update_location(
new_location->get_table_id(), new_location->get_partition_id(), cluster_id, *new_location))) {
} else if (OB_FAIL(update_location(new_location->get_table_id(),
new_location->get_partition_id(),
cluster_id,
can_erase,
*new_location))) {
LOG_WARN("fail to update location", K(ret), KPC(new_location));
}
if (OB_SUCC(ret)) {
......@@ -4731,5 +4807,123 @@ int64_t ObPartitionLocationCache::get_primary_cluster_id() const
return cluster_id;
}
int ObPartitionLocationCache::LeaderCacheKeyGetter::operator()(
common::hash::HashMapPair<ObLocationCacheKey, LocationInfo> &entry)
{
int ret = OB_SUCCESS;
const ObLocationCacheKey &key = entry.first;
const uint64_t tenant_id = extract_tenant_id(key.table_id_);
if (OB_INVALID_TENANT_ID == tenant_id_ || tenant_id_ == tenant_id) {
if (OB_FAIL(keys_.push_back(key))) {
LOG_WARN("fail to push back key", KR(ret), K(key));
}
}
return ret;
}
int ObPartitionLocationCache::LocationCacheKeyGetter::operator()(
common::hash::HashMapPair<ObLocationCacheKey, ObPartitionLocation> &entry)
{
int ret = OB_SUCCESS;
const ObLocationCacheKey &key = entry.first;
const uint64_t tenant_id = extract_tenant_id(key.table_id_);
if (OB_INVALID_TENANT_ID == tenant_id_ || tenant_id_ == tenant_id) {
if (OB_FAIL(keys_.push_back(key))) {
LOG_WARN("fail to push back key", KR(ret), K(key));
}
}
return ret;
}
// OB_INVALID_TENANT_ID means flush all tenant's location cache
int ObPartitionLocationCache::flush_cache(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(tenant_id));
} else {
// 1. flush sys cache, overwrite ret
int64_t start_time = ObTimeUtility::fast_current_time();
LOG_INFO("begin flush sys location cache", K(tenant_id));
LocationCacheKeyGetter location_key_getter(tenant_id);
if (OB_FAIL(sys_cache_.foreach_refactored(location_key_getter))) {
LOG_WARN("fail to get location cache key", KR(ret), K(tenant_id));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < location_key_getter.get_keys().count(); i++) {
const ObLocationCacheKey &key = location_key_getter.get_keys().at(i);
if (OB_FAIL(sys_cache_.erase_refactored(key))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_TRACE("sys cache not exist, just skip", KR(ret), K(key));
} else {
LOG_WARN("fail to erase sys cache", KR(ret), K(key));
}
} else {
LOG_TRACE("erase sys cache", KR(ret), K(key));
}
} // end for
}
LOG_INFO("finish flush sys location cache",
KR(ret),
K(tenant_id),
"cost_ts",
ObTimeUtility::fast_current_time() - start_time);
// 2. flush sys leader cache, overwrite ret
start_time = ObTimeUtility::fast_current_time();
LOG_INFO("begin flush sys leader cache", K(tenant_id));
LeaderCacheKeyGetter leader_key_getter(tenant_id);
if (OB_FAIL(sys_leader_cache_.foreach_refactored(leader_key_getter))) {
LOG_WARN("fail to get leader cache key", KR(ret), K(tenant_id));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < leader_key_getter.get_keys().count(); i++) {
const ObLocationCacheKey &key = leader_key_getter.get_keys().at(i);
if (OB_FAIL(sys_leader_cache_.erase_refactored(key))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_TRACE("sys leader cache not exist, just skip", KR(ret), K(key));
} else {
LOG_WARN("fail to erase sys leader cache", KR(ret), K(key));
}
} else {
LOG_TRACE("erase sys leader cache", KR(ret), K(key));
}
} // end for
}
LOG_INFO("finish flush sys leader cache",
KR(ret),
K(tenant_id),
"cost_ts",
ObTimeUtility::fast_current_time() - start_time);
// 3. flush user cache, overwrite ret
// user cache store in sys tenant, so we always flush all tenant's user location cache.
start_time = ObTimeUtility::fast_current_time();
LOG_INFO("begin flush user location cache", K(tenant_id));
if (OB_FAIL(common::ObKVGlobalCache::get_instance().erase_cache(OB_LOCATION_CACHE_NAME))) {
LOG_WARN("fail to flush user location cache", KR(ret), K(tenant_id));
}
LOG_INFO("finish flush user location cache",
KR(ret),
K(tenant_id),
"cost_ts",
ObTimeUtility::fast_current_time() - start_time);
// 4. flush user leader cache, overwrite ret
start_time = ObTimeUtility::fast_current_time();
LOG_INFO("begin flush user leader cache", K(tenant_id));
if (OB_FAIL(leader_cache_.flush_cache(tenant_id))) {
LOG_WARN("fail to flush user leader cache", KR(ret), K(tenant_id));
}
LOG_INFO("finish flush user leader cache",
KR(ret),
K(tenant_id),
"cost_ts",
ObTimeUtility::fast_current_time() - start_time);
}
return ret;
}
} // end namespace share
} // end namespace oceanbase
......@@ -81,6 +81,11 @@ public:
{}
public:
void reset()
{
server_.reset();
renew_ts_ = 0;
}
bool is_valid() const
{
return server_.is_valid() && renew_ts_ > 0;
......@@ -614,6 +619,7 @@ public:
{}
int get_strong_leader_info(const ObLocationCacheKey& key, LocationInfo& location_info);
int set_strong_leader_info(const ObLocationCacheKey& key, const LocationInfo& location_info, bool force_update);
int flush_cache(const uint64_t tenant_id);
private:
static const int64_t CACHE_NUM = 10000;
......@@ -694,6 +700,48 @@ public:
};
explicit ObPartitionLocationCache(ObILocationFetcher& location_fetcher);
class LeaderCacheKeyGetter {
public:
LeaderCacheKeyGetter() : tenant_id_(common::OB_INVALID_TENANT_ID), keys_()
{}
LeaderCacheKeyGetter(const uint64_t tenant_id) : tenant_id_(tenant_id), keys_()
{}
~LeaderCacheKeyGetter()
{}
int operator()(common::hash::HashMapPair<ObLocationCacheKey, LocationInfo> &entry);
const common::ObIArray<ObLocationCacheKey> &get_keys() const
{
return keys_;
}
private:
// OB_INVALID_TENANT_ID means get all tenant's location key
uint64_t tenant_id_;
common::ObArray<ObLocationCacheKey> keys_;
DISALLOW_COPY_AND_ASSIGN(LeaderCacheKeyGetter);
};
class LocationCacheKeyGetter {
public:
LocationCacheKeyGetter() : tenant_id_(common::OB_INVALID_TENANT_ID), keys_()
{}
LocationCacheKeyGetter(const uint64_t tenant_id) : tenant_id_(tenant_id), keys_()
{}
~LocationCacheKeyGetter()
{}
int operator()(common::hash::HashMapPair<ObLocationCacheKey, ObPartitionLocation> &entry);
const common::ObIArray<ObLocationCacheKey> &get_keys() const
{
return keys_;
}
private:
// OB_INVALID_TENANT_ID means get all tenant's location key
uint64_t tenant_id_;
common::ObArray<ObLocationCacheKey> keys_;
DISALLOW_COPY_AND_ASSIGN(LocationCacheKeyGetter);
};
virtual ~ObPartitionLocationCache();
int init(share::schema::ObMultiVersionSchemaService& schema_service, common::ObServerConfig& config,
......@@ -784,6 +832,8 @@ public:
const ObPartitionLocation& location, char* buf, const int64_t buf_size, ObLocationCacheValue& cache_value);
static const int64_t OB_MAX_LOCATION_SERIALIZATION_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE;
int flush_cache(const uint64_t tenant_id);
private:
int remote_get(const common::ObPartitionKey& pkey, ObPartitionLocation& location);
......@@ -799,6 +849,7 @@ private:
/*-----batch async renew location end -----*/
private:
static const int64_t OB_SYS_LOCATION_CACHE_BUCKET_NUM = 512;
// default mode is LatchReadWriteDefendMode
typedef common::hash::ObHashMap<ObLocationCacheKey, ObPartitionLocation> NoSwapCache;
typedef common::hash::ObHashMap<ObLocationCacheKey, LocationInfo> NoSwapLeaderCache;
typedef common::ObKVCache<ObLocationCacheKey, ObLocationCacheValue> KVCache;
......@@ -829,7 +880,8 @@ private:
// update location in cache
int update_location(const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id,
const ObPartitionLocation& location);
const bool can_erase, const ObPartitionLocation& location);
int erase_location(const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id);
// clear location in cache
int clear_location(
const uint64_t table_id, const int64_t partiton_id, const int64_t expire_renew_time, const int64_t cluster_id);
......
......@@ -177,13 +177,33 @@ int ObFlushCacheExecutor::execute(ObExecContext& ctx, ObFlushCacheStmt& stmt)
case CACHE_TYPE_BLOCK:
case CACHE_TYPE_ROW:
case CACHE_TYPE_BLOOM_FILTER:
case CACHE_TYPE_LOCATION:
case CACHE_TYPE_CLOG:
case CACHE_TYPE_ILOG:
case CACHE_TYPE_SCHEMA: {
ret = OB_NOT_SUPPORTED;
LOG_WARN("cache type not supported flush", "type", stmt.flush_cache_arg_.cache_type_, K(ret));
} break;
case CACHE_TYPE_LOCATION: {
share::ObPartitionLocationCache *location_cache = GCTX.location_cache_;
if (OB_ISNULL(location_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("location cache ptr is null", KR(ret));
} else if (0 == tenant_num) {
if (OB_FAIL(location_cache->flush_cache(OB_INVALID_TENANT_ID))) {
LOG_WARN("fail to flush all location cache", KR(ret));
}
} else {
int64_t tenant_num = stmt.flush_cache_arg_.tenant_ids_.count();
for (int64_t i = 0; i < tenant_num; i++) { // ingore error
const uint64_t tenant_id = stmt.flush_cache_arg_.tenant_ids_.at(i);
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = location_cache->flush_cache(tenant_id))) {
LOG_WARN("fail to flush tenant's cache", KR(ret), K(tenant_id));
}
ret = OB_SUCC(ret) ? tmp_ret : ret;
} // end for
}
} break;
default: {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid cache type", "type", stmt.flush_cache_arg_.cache_type_);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册