提交 bc1f0c4d 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] [CP] [CP] support sql_id level cache evict

上级 92f008a9
......@@ -996,7 +996,16 @@ int ObFlushCacheP::process()
} else if (NULL == (pcm = gctx_.sql_engine_->get_plan_cache_manager())) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", K(pcm), K(ret));
} else if (arg_.is_all_tenant_) { // flush all tenant cache
} else if (arg_.is_fine_grained_) { // fine-grained plan cache evict
if (arg_.db_ids_.count() == 0) {
uint64_t db_id = OB_INVALID_ID;
ret = pcm->flush_plan_cache_by_sql_id(arg_.tenant_id_, db_id, arg_.sql_id_);
} else {
for (uint64_t i=0; OB_SUCC(ret) && i<arg_.db_ids_.count(); i++) {
ret = pcm->flush_plan_cache_by_sql_id(arg_.tenant_id_, arg_.db_ids_.at(i), arg_.sql_id_);
}
}
} else if (arg_.is_all_tenant_) { //flush all tenant cache
ret = pcm->flush_all_plan_cache();
} else { // flush appointed tenant cache
ret = pcm->flush_plan_cache(arg_.tenant_id_);
......
......@@ -3137,6 +3137,8 @@ int ObAdminFlushCache::execute(const obrpc::ObAdminFlushCacheArg& arg)
int64_t tenant_num = arg.tenant_ids_.count();
ObSEArray<ObAddr, 8> server_list;
ObFlushCacheArg fc_arg;
// fine-grained plan evict only will pass this way.
// This because fine-grained plan evict must specify tenant
// if tenant num is 0, flush all tenant, else, flush appointed tenant
if (tenant_num != 0) { // flush appointed tenant
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_num; ++i) {
......@@ -3147,6 +3149,16 @@ int ObAdminFlushCache::execute(const obrpc::ObAdminFlushCacheArg& arg)
// call tenant servers;
fc_arg.is_all_tenant_ = false;
fc_arg.cache_type_ = arg.cache_type_;
// fine-grained plan evict args
if (arg.is_fine_grained_) {
fc_arg.sql_id_ = arg.sql_id_;
fc_arg.is_fine_grained_ = arg.is_fine_grained_;
for(int64_t j=0; OB_SUCC(ret) && j<arg.db_ids_.count(); j++) {
if (OB_FAIL(fc_arg.push_database(arg.db_ids_.at(j)))) {
LOG_WARN("fail to add db ids", K(ret));
}
}
}
for (int64_t j = 0; OB_SUCC(ret) && j < server_list.count(); ++j) {
fc_arg.tenant_id_ = arg.tenant_ids_.at(i);
LOG_INFO("flush server cache", K(fc_arg), K(server_list.at(j)));
......
......@@ -3349,9 +3349,9 @@ int ObUpgradeJobArg::assign(const ObUpgradeJobArg& other)
}
OB_SERIALIZE_MEMBER(ObUpgradeJobArg, action_, version_);
OB_SERIALIZE_MEMBER(ObAdminFlushCacheArg, tenant_ids_, cache_type_);
OB_SERIALIZE_MEMBER(ObAdminFlushCacheArg, tenant_ids_, cache_type_, db_ids_, sql_id_, is_fine_grained_);
OB_SERIALIZE_MEMBER(ObFlushCacheArg, is_all_tenant_, tenant_id_, cache_type_);
OB_SERIALIZE_MEMBER(ObFlushCacheArg, is_all_tenant_, tenant_id_, cache_type_, db_ids_, sql_id_, is_fine_grained_);
OB_SERIALIZE_MEMBER(ObAdminLoadBaselineArg, tenant_ids_, sql_id_, plan_hash_value_, fixed_, enabled_);
......
......@@ -4720,22 +4720,25 @@ struct ObAdminFlushCacheArg {
OB_UNIS_VERSION(1);
public:
ObAdminFlushCacheArg() : cache_type_(CACHE_TYPE_INVALID)
{}
virtual ~ObAdminFlushCacheArg()
{}
bool is_valid() const
ObAdminFlushCacheArg() :
cache_type_(CACHE_TYPE_INVALID),
is_fine_grained_(false)
{
return cache_type_ > CACHE_TYPE_INVALID && cache_type_ < CACHE_TYPE_MAX;
}
int push_tenant(uint64_t tenant_id)
virtual ~ObAdminFlushCacheArg() {}
bool is_valid() const
{
return tenant_ids_.push_back(tenant_id);
return cache_type_ > CACHE_TYPE_INVALID && cache_type_ < CACHE_TYPE_MAX;
}
TO_STRING_KV(K_(tenant_ids), K_(cache_type));
int push_tenant(uint64_t tenant_id) { return tenant_ids_.push_back(tenant_id); }
int push_database(uint64_t db_id) { return db_ids_.push_back(db_id); }
TO_STRING_KV(K_(tenant_ids), K_(cache_type), K_(db_ids), K_(sql_id), K_(is_fine_grained));
common::ObSEArray<uint64_t, 8> tenant_ids_;
ObCacheType cache_type_;
common::ObSEArray<uint64_t, 8> db_ids_;
common::ObString sql_id_;
bool is_fine_grained_;
};
struct ObAdminMigrateUnitArg {
......@@ -5456,19 +5459,26 @@ struct ObFlushCacheArg {
OB_UNIS_VERSION(1);
public:
ObFlushCacheArg() : is_all_tenant_(false), tenant_id_(common::OB_INVALID_TENANT_ID), cache_type_(CACHE_TYPE_INVALID)
{}
ObFlushCacheArg() :
is_all_tenant_(false),
tenant_id_(common::OB_INVALID_TENANT_ID),
cache_type_(CACHE_TYPE_INVALID),
is_fine_grained_(false){};
virtual ~ObFlushCacheArg()
{}
bool is_valid() const
{
return cache_type_ > CACHE_TYPE_INVALID && cache_type_ < CACHE_TYPE_MAX;
}
TO_STRING_KV(K(is_all_tenant_), K_(tenant_id), K_(cache_type));
int push_database(uint64_t db_id) { return db_ids_.push_back(db_id); }
TO_STRING_KV(K(is_all_tenant_), K_(tenant_id), K_(cache_type), K_(db_ids), K_(sql_id), K_(is_fine_grained));
bool is_all_tenant_;
uint64_t tenant_id_;
ObCacheType cache_type_;
common::ObSEArray<uint64_t, 8> db_ids_;
common::ObString sql_id_;
bool is_fine_grained_;
};
struct ObGetAllSchemaArg {
......
......@@ -84,13 +84,34 @@ int ObFlushCacheExecutor::execute(ObExecContext& ctx, ObFlushCacheStmt& stmt)
int ret = OB_SUCCESS;
if (!stmt.is_global_) { // flush local
int64_t tenant_num = stmt.flush_cache_arg_.tenant_ids_.count();
int64_t db_num = stmt.flush_cache_arg_.db_ids_.count();
common::ObString sql_id = stmt.flush_cache_arg_.sql_id_;
switch (stmt.flush_cache_arg_.cache_type_) {
case CACHE_TYPE_PLAN: {
if (OB_ISNULL(ctx.get_plan_cache_manager())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("plan cache manager is null");
} else if (0 == tenant_num) {
ret = ctx.get_plan_cache_manager()->flush_all_plan_cache();
} else if (stmt.flush_cache_arg_.is_fine_grained_) {
// purge in sql_id level, aka. fine-grained plan evict
// we assume tenant_list must not be empty and this will be checked in resolve phase
if (0 == tenant_num) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected tenant_list in fine-grained plan evict", K(tenant_num));
} else {
for (int64_t i = 0; i < tenant_num; i++) { // ignore ret
int64_t t_id = stmt.flush_cache_arg_.tenant_ids_.at(i);
if (db_num == 0) { // not specified db_name, evcit all dbs
ret = GCTX.sql_engine_->get_plan_cache_manager()->flush_plan_cache_by_sql_id(t_id, OB_INVALID_ID, sql_id);
} else { // evict db by db
for(int64_t j = 0; j < db_num; j++) { // ignore ret
ret = GCTX.sql_engine_->get_plan_cache_manager()->flush_plan_cache_by_sql_id(
t_id, stmt.flush_cache_arg_.db_ids_.at(j), sql_id);
}
}
}
}
} else if (0 == tenant_num) { // purge in tenant level, aka. coarse-grained plan evict
ret = GCTX.sql_engine_->get_plan_cache_manager()->flush_all_plan_cache();
} else {
for (int64_t i = 0; i < tenant_num; ++i) { // ignore ret
ret = ctx.get_plan_cache_manager()->flush_plan_cache(stmt.flush_cache_arg_.tenant_ids_.at(i));
......
......@@ -82,6 +82,7 @@ class ObPxSqcHandler;
class ObOpSpec;
class ObOperator;
class ObOpInput;
class ObSql;
class ObEvalCtx;
typedef common::ObArray<const common::ObIArray<int64_t>*> ObRowIdListArray;
// Physical operator kit: operator specification, operator, operator input
......
......@@ -2676,6 +2676,9 @@ int ObSql::pc_get_plan(ObPlanCacheCtx &pc_ctx, ObPhysicalPlan *&plan, int &get_p
NG_TRACE(cache_get_plan_begin);
ObPlanCache *plan_cache = NULL;
ObSQLSessionInfo *session = pc_ctx.sql_ctx_.session_info_;
if (pc_ctx.sql_ctx_.is_remote_sql_) {
pc_ctx.fp_result_.pc_key_.key_id_ = 0;
}
if (OB_ISNULL(session) || OB_ISNULL(plan_cache = session->get_plan_cache())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Invalid plan cache", K(ret), K(session), K(plan_cache));
......@@ -2684,6 +2687,10 @@ int ObSql::pc_get_plan(ObPlanCacheCtx &pc_ctx, ObPhysicalPlan *&plan, int &get_p
OB_ERR_PROXY_REROUTE == ret || OB_BATCHED_MULTI_STMT_ROLLBACK == ret) {
/*do nothing*/
} else if (!pc_ctx.is_ps_mode_ && OB_PC_LOCK_CONFLICT == ret && !session->is_inner()) {
if (pc_ctx.sql_ctx_.is_remote_sql_) {
get_plan_err = ret;
ret = OB_SUCCESS;
}
} else {
get_plan_err = ret;
ret = OB_SUCCESS;
......@@ -2960,6 +2967,9 @@ int ObSql::pc_add_plan(
LOG_WARN("Failed copy field to pplan", K(ret));
} else {
phy_plan->set_outline_state(outline_state);
if (pc_ctx.sql_ctx_.is_remote_sql_) {
pc_ctx.fp_result_.pc_key_.key_id_ = 0;
}
if (pc_ctx.is_ps_mode_) {
// remote SQL enters the plan for the second time, and saves raw_sql as pc_key in the plan cache.
// then use the ps interface to directly use the parameterized sql as the key to check the plan cache,
......
......@@ -1105,6 +1105,7 @@ typedef enum ObItemType
T_ALTER_DATABASE,
T_USE_DATABASE,
T_DATABASE_OPTION_LIST,
T_DATABASE_LIST,
T_CREATE_TENANT,
T_DROP_TENANT,
T_MODIFY_TENANT,
......
......@@ -307,7 +307,7 @@ END_P SET_VAR DELIMITER
%type <node> create_table_stmt create_table_like_stmt opt_table_option_list table_option_list table_option table_option_list_space_seperated create_function_stmt drop_function_stmt parallel_option
%type <node> opt_force
%type <node> create_database_stmt drop_database_stmt alter_database_stmt use_database_stmt
%type <node> opt_database_name database_option database_option_list opt_database_option_list database_factor
%type <node> opt_database_name database_option database_option_list opt_database_option_list database_factor databases_expr opt_databases
%type <node> create_tenant_stmt opt_tenant_option_list alter_tenant_stmt drop_tenant_stmt
%type <node> create_restore_point_stmt drop_restore_point_stmt
%type <node> create_resource_stmt drop_resource_stmt alter_resource_stmt
......@@ -3634,6 +3634,24 @@ database_option
}
;
databases_expr:
DATABASES opt_equal_mark STRING_VALUE
{
(void)($2);
malloc_non_terminal_node($$, result->malloc_pool_, T_DATABASE_LIST, 1, $3);
};
opt_databases:
databases_expr
{
$$ = $1;
}
| /*EMPTY*/
{
$$ = NULL;
}
;
charset_key:
CHARSET
{
......@@ -11446,14 +11464,16 @@ ALTER SYSTEM BOOTSTRAP server_info_list
malloc_non_terminal_node($$, result->malloc_pool_, T_BOOTSTRAP, 3, server_list, NULL, NULL);
}
|
ALTER SYSTEM FLUSH cache_type CACHE opt_tenant_list flush_scope
ALTER SYSTEM FLUSH cache_type CACHE opt_sql_id opt_databases opt_tenant_list flush_scope
{
malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 3, $4, $6, $7);
// system tenant use only.
malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 5, $4, $6, $7, $8, $9);
}
|
// this just is a Syntactic sugar, only used to be compatible to plan cache's Grammar
ALTER SYSTEM FLUSH SQL cache_type opt_tenant_list flush_scope
{
malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 3, $5, $6, $7);
malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 5, $5, NULL, NULL, $6, $7);
}
|
ALTER SYSTEM FLUSH KVCACHE opt_tenant_name opt_cache_name
......
......@@ -68,15 +68,23 @@ void ObPCVSet::destroy()
pcv = NULL;
}
}
// free key.
if (NULL != pc_alloc_) {
//free key.
pc_key_.destory(*pc_alloc_);
pc_key_.reset();
if (NULL != sql_.ptr()) {
pc_alloc_->free(sql_.ptr());
}
// free sql_id list
for (int64_t i = 0; i < sql_ids_.count(); i++) {
if (NULL != sql_ids_[i].ptr()) {
pc_alloc_->free(sql_ids_[i].ptr());
}
}
}
sql_ids_.reset();
sql_.reset();
plan_cache_ = NULL;
pc_alloc_ = NULL;
......@@ -226,6 +234,7 @@ int ObPCVSet::add_cache_obj(ObCacheObject* cache_obj, ObPlanCacheCtx& pc_ctx)
LOG_WARN("fail to match pcv in pcv_set", K(ret));
} else if (is_same) {
is_new = false;
LOG_INFO("has identical pcv", K(is_same), K(pcv));
if (OB_FAIL(pcv->add_plan(*cache_obj, schema_array, pc_ctx))) {
if (OB_SQL_PC_PLAN_DUPLICATE == ret || is_not_supported_err(ret)) {
LOG_DEBUG("fail to add plan to pcv", K(ret));
......@@ -237,7 +246,6 @@ int ObPCVSet::add_cache_obj(ObCacheObject* cache_obj, ObPlanCacheCtx& pc_ctx)
}
} // for end
}
if (OB_SUCC(ret) && is_new) {
ObPlanCacheValue* pcv = NULL;
if (OB_FAIL(create_pcv_and_add_plan(cache_obj, pc_ctx, schema_array, pcv))) {
......@@ -277,8 +285,11 @@ int ObPCVSet::create_pcv_and_add_plan(ObCacheObject* cache_obj, ObPlanCacheCtx&
{
int ret = OB_SUCCESS;
new_pcv = nullptr;
// create pcv and init
if (OB_ISNULL(cache_obj) || OB_ISNULL(pc_ctx.sql_ctx_.schema_guard_)) {
common::ObString sql_id;
common::ObString sql_id_org(common::OB_MAX_SQL_ID_LENGTH, (const char*)&pc_ctx.sql_ctx_.sql_id_);
//create pcv and init
if (OB_ISNULL(cache_obj) ||
OB_ISNULL(pc_ctx.sql_ctx_.schema_guard_)) {
ret = OB_INVALID_ARGUMENT;
SQL_PC_LOG(WARN, "invalid argument", K(ret));
} else if (OB_FAIL(create_new_pcv(new_pcv))) {
......@@ -288,6 +299,10 @@ int ObPCVSet::create_pcv_and_add_plan(ObCacheObject* cache_obj, ObPlanCacheCtx&
LOG_WARN("unexpected null for new_pcv", K(new_pcv));
} else if (OB_FAIL(new_pcv->init(this, cache_obj, pc_ctx))) {
SQL_PC_LOG(WARN, "failed to init plan cache value");
} else if (OB_FAIL(ob_write_string(*pc_alloc_, sql_id_org, sql_id))) {
SQL_PC_LOG(WARN, "failed to deep copy sql_id_", K(sql_id_org), K(ret));
} else if (OB_FAIL(push_sql_id(sql_id))) {
SQL_PC_LOG(WARN, "failed to push sql_id_", K(ret));
} else {
// do nothing
}
......
......@@ -75,10 +75,12 @@ public:
int init(const ObPlanCacheCtx& pc_ctx, const ObCacheObject* cache_obj);
void destroy();
// get plan from pcv_set
int get_plan(ObPlanCacheCtx& pc_ctx, ObCacheObject*& plan);
// add plan to pcv_set
int add_cache_obj(ObCacheObject* cache_obj, ObPlanCacheCtx& pc_ctx);
//get plan from pcv_set
int get_plan(ObPlanCacheCtx &pc_ctx, ObCacheObject *&plan);
//add plan to pcv_set
int add_cache_obj(ObCacheObject *cache_obj, ObPlanCacheCtx &pc_ctx);
common::ObIArray<common::ObString> &get_sql_id() { return sql_ids_; }
int push_sql_id(common::ObString sql_id) { return sql_ids_.push_back(sql_id); }
int64_t inc_ref_count(const CacheRefHandleID ref_handle);
int64_t dec_ref_count(const CacheRefHandleID ref_handle);
......@@ -176,6 +178,7 @@ private:
int64_t min_merged_version_;
int64_t min_cluster_version_;
int64_t plan_num_;
common::ObSEArray<common::ObString, 4> sql_ids_;
bool need_check_gen_tbl_col_;
common::ObFixedArray<PCColStruct, common::ObIAllocator> col_field_arr_;
......
......@@ -101,10 +101,92 @@ struct ObGetAllSqlIdOp {
const CacheRefHandleID ref_handle_;
};
struct ObGetAllPLIdOp {
explicit ObGetAllPLIdOp(common::ObIArray<PCKeyValue> *key_array, const CacheRefHandleID ref_handle)
: key_array_(key_array), ref_handle_(ref_handle)
struct ObGetPcvSetBySQLIDOp
{
explicit ObGetPcvSetBySQLIDOp(uint64_t db_id, common::ObString sql_id,
common::ObIArray<PCKeyValue> *key_array, const CacheRefHandleID ref_handle)
: db_id_(db_id),
sql_id_(sql_id),
key_array_(key_array),
ref_handle_(ref_handle)
{
}
int operator()(common::hash::HashMapPair<ObPlanCacheKey, ObPCVSet *> &entry)
{
int ret = common::OB_SUCCESS;
bool contains = false;
if (OB_ISNULL(key_array_) || OB_ISNULL(entry.second)) {
ret = common::OB_INVALID_ARGUMENT;
SQL_PC_LOG(WARN, "invalid argument", K(key_array_), K(entry.second), K(ret));
} else if ( NS_INVALID == entry.first.namespace_) {
// do nothing
} else if (db_id_ != common::OB_INVALID_ID && db_id_ != entry.first.db_id_) {
// skip entry that has non-matched db_id
} else if (!contain_sql_id(entry.second->get_sql_id())) {
// skip entry which not contains same sql_id
} else if (OB_FAIL(key_array_->push_back(ObPCKeyValue(entry.first, entry.second)))) {
SQL_PC_LOG(WARN, "fail to push back key", K(ret));
} else {
entry.second->inc_ref_count(ref_handle_);
}
return ret;
}
bool contain_sql_id(common::ObIArray<common::ObString> &sql_ids){
bool contains = false;
for (int64_t i = 0; !contains && i < sql_ids.count(); i++) {
if (sql_ids.at(i) == sql_id_) {
contains = true;
}
}
return contains;
}
uint64_t db_id_;
common::ObString sql_id_;
common::ObIArray<PCKeyValue> *key_array_;
const CacheRefHandleID ref_handle_;
};
struct ObGetTableIdOp
{
explicit ObGetTableIdOp(uint64_t table_id)
: table_id_(table_id)
{}
int operator()(common::hash::HashMapPair<ObCacheObjID, ObCacheObject *> &entry)
{
int ret = common::OB_SUCCESS;
ObPhysicalPlan *plan = NULL;
int64_t version = -1;
if (OB_ISNULL(entry.second)) {
ret = common::OB_NOT_INIT;
SQL_PC_LOG(WARN, "invalid argument", K(ret));
} else if (!entry.second->is_sql_crsr()) {
// not sql plan
// do nothing
} else if (OB_ISNULL(plan = dynamic_cast<ObPhysicalPlan *>(entry.second))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null plan", K(ret), K(plan));
} else if (entry.second->get_base_table_version(table_id_, version)) {
LOG_WARN("failed to get base table version", K(ret));
} else if (version > 0) {
static_cast<ObPhysicalPlan *>(entry.second)->set_is_expired(true);
}
return ret;
}
const uint64_t table_id_;
};
struct ObGetAllPLIdOp
{
explicit ObGetAllPLIdOp(common::ObIArray<PCKeyValue> *key_array,
const CacheRefHandleID ref_handle)
: key_array_(key_array), ref_handle_(ref_handle) {}
int operator()(common::hash::HashMapPair<ObPlanCacheKey, ObPCVSet *> &entry)
{
int ret = OB_SUCCESS;
......@@ -515,9 +597,10 @@ int ObPlanCache::add_plan(ObPhysicalPlan *plan, ObPlanCacheCtx &pc_ctx)
SQL_PC_LOG(WARN, "invalid physical plan", K(ret));
} else if (is_reach_memory_limit()) {
ret = OB_REACH_MEMORY_LIMIT;
if (REACH_TIME_INTERVAL(1000000)) {
SQL_PC_LOG(
ERROR, "plan cache memory used reach limit", K_(tenant_id), K(get_mem_hold()), K(get_mem_limit()), K(ret));
LOG_INFO("reach memory limit.", K(ret), K(get_mem_hold()), K(get_mem_limit()));
if (REACH_TIME_INTERVAL(1000000)) { //1s, when memory reach out of limit, set 1s as interval to print log
SQL_PC_LOG(ERROR, "plan cache memory used reach limit",
K_(tenant_id), K(get_mem_hold()), K(get_mem_limit()), K(ret));
}
} else if (plan->get_mem_size() >= get_mem_high()) {
// plan mem is too big, do not add plan
......@@ -889,6 +972,33 @@ int ObPlanCache::cache_evict_all_pl()
return ret;
}
// delete plan by sql_id
int ObPlanCache::cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql_id)
{
int ret = OB_SUCCESS;
ObGlobalReqTimeService::check_req_timeinfo();
SQL_PC_LOG(DEBUG, "cache evict plan by sql id start");
PCKeyValueArray to_evict_keys;
ObGetPcvSetBySQLIDOp get_ids_op(db_id, sql_id, &to_evict_keys, PCV_GET_PLAN_KEY_HANDLE);
if (OB_FAIL(sql_pcvs_map_.foreach_refactored(get_ids_op))) {
SQL_PC_LOG(WARN, "fail to get all sql_ids in id2value_map", K(ret));
} else if (OB_FAIL(remove_pcv_sets(to_evict_keys))) {
SQL_PC_LOG(WARN, "fail to remove pcv set", K(ret));
}
//decrease each pcv_set's reference count
int64_t N = to_evict_keys.count();
for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) {
if (NULL != to_evict_keys.at(i).pcv_set_) {
to_evict_keys.at(i).pcv_set_->dec_ref_count(PCV_GET_PLAN_KEY_HANDLE);
}
}
SQL_PC_LOG(DEBUG, "cache evict plan by sql id end");
return ret;
}
// rule for evict plan
// 1. calc evict_num : (mem_used - mem_lwm) / (mem_used / cache_value_count)
// 2. get evict_sql_id from calc_cache_evict_keys
// 3. evict value
......@@ -1145,6 +1255,7 @@ int ObPlanCache::add_cache_obj_stat(ObPlanCacheCtx &pc_ctx, ObCacheObject *cache
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null plan", K(ret), K(plan));
} else {
plan->stat_.bl_info_.key_.db_id_=pc_ctx.bl_key_.db_id_;
plan->stat_.plan_id_ = plan->get_plan_id();
plan->stat_.bl_info_.plan_hash_value_ = plan->get_signature();
plan->stat_.gen_time_ = ObTimeUtility::current_time();
......
......@@ -197,7 +197,8 @@ public:
int cache_evict_all_pl();
// evict plan, adjust mem between hwm and lwm
int cache_evict();
// evict plan whose merged version is expired
int cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql_id);
//evict plan whose merged version is expired
int evict_expired_plan();
bool is_valid()
{
......
......@@ -445,6 +445,41 @@ void ObPlanCacheManager::ObPlanCacheEliminationTask::run_free_cache_obj_task()
}
}
int ObPlanCacheManager::flush_plan_cache_by_sql_id(uint64_t tenant_id,
uint64_t db_id,
common::ObString sql_id) {
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
ObPlanCache *plan_cache = get_plan_cache(tenant_id);
if (NULL != plan_cache) {
if (OB_FAIL(plan_cache->cache_evict_plan_by_sql_id(db_id, sql_id))) {
SQL_PC_LOG(ERROR, "Plan cache evict failed, please check", K(ret));
}
ObArray<DeletedCacheObjInfo> deleted_objs;
int64_t safe_timestamp = INT64_MAX;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance()
.get_global_safe_timestamp(safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(plan_cache->dump_deleted_objs<DUMP_SQL>(deleted_objs, safe_timestamp))) {
SQL_PC_LOG(WARN, "failed to get deleted sql objs", K(ret));
} else {
ObCacheObject *to_del_obj = NULL;
LOG_INFO("Deleted Cache Objs", K(deleted_objs));
for (int64_t i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue
if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true,
deleted_objs.at(i).obj_id_,
plan_cache))) {
LOG_WARN("failed to destroy cache obj", K(ret));
}
}
}
plan_cache->dec_ref_count();
}
return ret;
}
int ObPlanCacheManager::flush_all_plan_cache()
{
int ret = OB_SUCCESS;
......
......@@ -87,6 +87,7 @@ public:
int revert_ps_cache(const uint64_t& tenant_id);
int flush_all_plan_cache();
int flush_plan_cache(const uint64_t tenant_id);
int flush_plan_cache_by_sql_id(uint64_t tenant_id, uint64_t db_id, common::ObString sql_id);
int flush_all_pl_cache();
int flush_pl_cache(const uint64_t tenant_id);
int flush_all_ps_cache();
......
......@@ -1374,6 +1374,7 @@ int get_sys_tenant_alter_system_priv(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Basic stmt should be not be NULL", K(ret));
} else if (OB_SYS_TENANT_ID != session_priv.tenant_id_ &&
stmt::T_FLUSH_CACHE != basic_stmt->get_stmt_type() &&
stmt::T_ALTER_SYSTEM_SET_PARAMETER != basic_stmt->get_stmt_type()) {
ret = OB_ERR_NO_PRIVILEGE;
LOG_WARN("Only sys tenant can do this operation", K(ret), "stmt type", basic_stmt->get_stmt_type());
......
......@@ -551,14 +551,29 @@ int ObFreezeResolver::resolve(const ParseNode& parse_tree)
return ret;
}
int ObFlushCacheResolver::resolve(const ParseNode& parse_tree)
//
// This node has five children_ and they are following:
// cache_type_: parse_tree.children_[0]
// opt_sql_id: parse_tree.children_[1]
// opt_databases: parse_tree.children_[2]
// opt_tenant_list: parse_tree.children_[3]
// flush_scope: parse_tree.children_[4]
//
int ObFlushCacheResolver::resolve(const ParseNode &parse_tree)
{
int ret = OB_SUCCESS;
ObFlushCacheStmt* stmt = NULL;
if (OB_UNLIKELY(T_FLUSH_CACHE != parse_tree.type_ || parse_tree.num_child_ != 3)) {
ObFlushCacheStmt *stmt = NULL;
ObSQLSessionInfo* sess = params_.session_info_;
if (OB_ISNULL(sess)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid argument", "type", get_type_name(parse_tree.type_), "child_num", parse_tree.num_child_);
} else if (NULL == parse_tree.children_[0] || NULL == parse_tree.children_[2]) {
SERVER_LOG(WARN, "invalid session");
} else if (OB_UNLIKELY(T_FLUSH_CACHE != parse_tree.type_ || parse_tree.num_child_ != 5)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid argument",
"type", get_type_name(parse_tree.type_),
"child_num", parse_tree.num_child_);
} else if (NULL == parse_tree.children_[0]
|| NULL == parse_tree.children_[4]) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid parse tree", K(ret));
} else if (NULL == (stmt = create_stmt<ObFlushCacheStmt>())) {
......@@ -566,14 +581,53 @@ int ObFlushCacheResolver::resolve(const ParseNode& parse_tree)
LOG_ERROR("create ObFlushCacheStmt failed");
} else {
ObSchemaGetterGuard schema_guard;
// first child: resolve cache type
stmt->flush_cache_arg_.cache_type_ = (ObCacheType)parse_tree.children_[0]->value_;
stmt->is_global_ = parse_tree.children_[2]->value_;
ParseNode* t_node = parse_tree.children_[1];
if (NULL == t_node) { // tenant list is empty
// second child: resolve sql_id
ParseNode *sql_id_node = parse_tree.children_[1];
// for adds database id
// third child: resolve db_list
ParseNode *db_node = parse_tree.children_[2];
// for adds tenant ids
// fourth child: resolve tenant list
ParseNode *t_node = parse_tree.children_[3];
// fivth child: resolve application fields
stmt->is_global_ = parse_tree.children_[4]->value_;
// whether is coarse granularity plan cache evict.
// tenant level(true) / pcv_set level(false)
bool is_coarse_granularity = true;
ObSEArray<common::ObString, 8> db_name_list;
// sql_id
if (OB_FAIL(ret)) {
// do nothing
} else if (NULL == t_node->children_) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid argument", K(ret));
} else if (OB_ISNULL(sql_id_node)) {
// do nothing
// currently, only support plan cache's fine-grained cache evict
} else if (stmt->flush_cache_arg_.cache_type_ != CACHE_TYPE_PLAN) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("only support plan cache's fine-grained cache evict", K(stmt->flush_cache_arg_.cache_type_), K(ret));
} else if (lib::is_oracle_mode()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported plan cache's fine-grained cache evict in oracle mode", K(ret));
} else if (OB_ISNULL(sql_id_node->children_)
|| OB_ISNULL(sql_id_node->children_[0])
|| T_SQL_ID != sql_id_node->type_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else if (sql_id_node->children_[0]->str_len_ > (OB_MAX_SQL_ID_LENGTH+1)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
stmt->flush_cache_arg_.sql_id_.assign_ptr(
sql_id_node->children_[0]->str_value_,
static_cast<ObString::obstr_size_t>(sql_id_node->children_[0]->str_len_));
stmt->flush_cache_arg_.is_fine_grained_ = true;
}
// retrive schema guard
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(GCTX.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid argument", K(GCTX.schema_service_));
......@@ -583,9 +637,107 @@ int ObFlushCacheResolver::resolve(const ParseNode& parse_tree)
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(
session_info_->get_effective_tenant_id(), schema_guard))) {
SERVER_LOG(WARN, "get_schema_guard failed", K(ret));
} else {
// do nothing
}
// db names
if (OB_FAIL(ret)) {
// do nothing
} else if (!stmt->flush_cache_arg_.is_fine_grained_) {
if (OB_ISNULL(db_node)) {
// tenant level plan cache evict
// and not needs to specify db_name
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported flush cache in database level", K(ret));
}
} else if (NULL == db_node) { // db list is empty
// empty db list means clear all db's in fine-grained cache evict
// do nothing
} else if (OB_ISNULL(db_node->children_)
|| OB_ISNULL(db_node->children_[0])
|| T_DATABASE_LIST != db_node->type_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
uint64_t db_id = 0;
ObString db_names;
ObString db_name;
db_names.assign_ptr(db_node->children_[0]->str_value_,
static_cast<ObString::obstr_size_t>(db_node->children_[0]->str_len_));
while (OB_SUCC(ret) && !db_names.empty()) {
db_name = db_names.split_on(',').trim();
if(db_name.empty() && NULL == db_names.find(',')) {
db_name = db_names;
db_names.reset();
}
if(!db_name.empty()) {
if (OB_FAIL(db_name_list.push_back(db_name))) {
SERVER_LOG(WARN, "failed to add database name", K(ret));
}
}
} // for database name end
}
/*
* different database belongs to different tenant,
* and we will use following logics to retrive db_id:
* for (tenant list) {
* for (database_name_list) {
* // find db_id from schema
* args_.push_back(db_id);
* }
* }
* */
// tenant list
if (OB_FAIL(ret)) {
} else if (NULL == t_node) { //tenant list is empty
if (!stmt->flush_cache_arg_.is_fine_grained_) { // coarse grained cache evict
// Notes:
// tenant level evict, and no tenant list specified means all tenant
// for system tenant: empty means flush all tenant's
// for normal tenant: this node has been set as NULL in parse phase,
// and already adds its tenant id to tenant list in above
// Therefore, do nothing
} else { // fine-grained cache evcit
// for fine-grained plan evict, we must specify tenant list
uint64_t t_id = OB_INVALID_ID;
t_id = sess->get_effective_tenant_id();
if (t_id <= OB_MAX_RESERVED_TENANT_ID) {// system tenant will use this path.
// system tenant must specify tenant_list;
ret = OB_EMPTY_TENANT;
SERVER_LOG(WARN, "invalid argument, fine-grained plan evict must specify tenant_list", K(ret));
} else { // normal tenant
if (OB_FAIL(stmt->flush_cache_arg_.push_tenant(sess->get_effective_tenant_id()))) {
LOG_WARN("failed to adds tenant for normal tenant", K(sess->get_effective_tenant_id()), K(ret));
} else {
// normal tenant will use it's tenant_id when t_node is empty
for (uint64_t j=0; OB_SUCC(ret) && j<db_name_list.count(); j++) {
uint64_t db_id = 0;
if (OB_FAIL(schema_guard.get_database_id(t_id, db_name_list.at(j).trim(), db_id))) {
SERVER_LOG(WARN, "database not exist", K(db_name_list.at(j).trim()), K(ret));
} else if (OB_FAIL(stmt->flush_cache_arg_.push_database(db_id))) {
SERVER_LOG(WARN, "fail to push database id ",K(db_name_list.at(j).trim()), K(db_id), K(ret));
}
} // for get db_id ends
LOG_INFO("normal tenant flush plan cache ends", K(t_id), K(db_name_list));
}
} // normal tenant ends
} // fine-grained plan evcit ends
} else if (sess->get_effective_tenant_id() != OB_SYS_TENANT_ID) {
// tenant node is not null and current tenant is not sys tenant
// due to normal tenant cannot specify tenant, and only can purge
// their own plan cache
ret = OB_ERR_NO_PRIVILEGE;
LOG_WARN("Only sys tenant can do this operation", K(ret), K(sess->get_effective_tenant_id()));
} else if (NULL == t_node->children_) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid argument", K(ret));
} else {
uint64_t tenant_id = 0;
ObString tenant_name;
// adds tenant_ids and get db_ids
for (int64_t i = 0; OB_SUCC(ret) && i < t_node->num_child_; ++i) {
if (OB_ISNULL(t_node->children_[i])) {
ret = OB_ERR_UNEXPECTED;
......@@ -596,12 +748,50 @@ int ObFlushCacheResolver::resolve(const ParseNode& parse_tree)
if (OB_FAIL(schema_guard.get_tenant_id(tenant_name, tenant_id))) {
SERVER_LOG(WARN, "tenant not exist", K(tenant_name), K(ret));
} else if (OB_FAIL(stmt->flush_cache_arg_.push_tenant(tenant_id))) {
SERVER_LOG(WARN, "fail to push tenant id ", K(tenant_name), K(tenant_id), K(ret));
SERVER_LOG(WARN, "fail to push tenant id ",K(tenant_name), K(tenant_id), K(ret));
} else {
ObSchemaGetterGuard schema_guard_db;
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard_db))) {
SERVER_LOG(WARN, "get_schema_guard failed", K(ret), K(tenant_id));
} else {
for (uint64_t j = 0; OB_SUCC(ret) && j < db_name_list.count(); j++) {
uint64_t db_id = 0;
if (OB_FAIL(schema_guard_db.get_database_id(tenant_id, db_name_list.at(j), db_id))) {
SERVER_LOG(WARN, "database not exist", K(db_name_list.at(j)), K(ret));
} else if ((int64_t)db_id == OB_INVALID_ID) {
ret = OB_ERR_BAD_DATABASE;
SERVER_LOG(WARN, "database not exist", K(db_name_list.at(j)), K(ret));
} else if (OB_FAIL(stmt->flush_cache_arg_.push_database(db_id))) {
SERVER_LOG(WARN, "fail to push database id ",K(db_name_list.at(j)), K(db_id), K(ret));
}
} // for get db_id ends
}
}
}
} // for get tenant_id ends
tenant_name.reset();
} // for tenant end
}
LOG_INFO("resolve flush command finished!", K(ret), K(stmt->is_global_), K(stmt->flush_cache_arg_.cache_type_),
K(stmt->flush_cache_arg_.sql_id_), K(stmt->flush_cache_arg_.is_fine_grained_),
K(stmt->flush_cache_arg_.tenant_ids_), K(stmt->flush_cache_arg_.db_ids_));
}
if (OB_SUCC(ret) && ObSchemaChecker::is_ora_priv_check()) {
if (OB_ISNULL(schema_checker_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else if (OB_FAIL(schema_checker_->check_ora_ddl_priv(
session_info_->get_effective_tenant_id(),
session_info_->get_priv_user_id(),
ObString(""),
// why use T_ALTER_SYSTEM_SET_PARAMETER?
// because T_ALTER_SYSTEM_SET_PARAMETER has following traits:
// T_ALTER_SYSTEM_SET_PARAMETER can allow dba to do an operation
// and prohibit other user to do this operation
// so we reuse this.
stmt::T_ALTER_SYSTEM_SET_PARAMETER,
session_info_->get_enable_role_array()))) {
LOG_WARN("failed to check privilege", K(session_info_->get_effective_tenant_id()), K(session_info_->get_user_id()));
}
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册