From bc1f0c4d8d85a4c3164e0a3bf88d3d32f6a96434 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 16 May 2022 20:37:47 +0800 Subject: [PATCH] [CP] [CP] [CP] support sql_id level cache evict --- src/observer/ob_rpc_processor_simple.cpp | 11 +- src/rootserver/ob_system_admin_util.cpp | 12 + src/share/ob_rpc_struct.cpp | 4 +- src/share/ob_rpc_struct.h | 34 ++- .../engine/cmd/ob_alter_system_executor.cpp | 25 +- src/sql/engine/ob_exec_context.h | 1 + src/sql/ob_sql.cpp | 10 + src/sql/parser/ob_item_type.h | 1 + src/sql/parser/sql_parser_mysql_mode.y | 28 ++- src/sql/plan_cache/ob_pcv_set.cpp | 23 +- src/sql/plan_cache/ob_pcv_set.h | 11 +- src/sql/plan_cache/ob_plan_cache.cpp | 123 +++++++++- src/sql/plan_cache/ob_plan_cache.h | 3 +- src/sql/plan_cache/ob_plan_cache_manager.cpp | 35 +++ src/sql/plan_cache/ob_plan_cache_manager.h | 1 + .../privilege_check/ob_privilege_check.cpp | 1 + .../resolver/cmd/ob_alter_system_resolver.cpp | 216 ++++++++++++++++-- 17 files changed, 490 insertions(+), 49 deletions(-) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index af84a8046e..601633929f 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -996,7 +996,16 @@ int ObFlushCacheP::process() } else if (NULL == (pcm = gctx_.sql_engine_->get_plan_cache_manager())) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid argument", K(pcm), K(ret)); - } else if (arg_.is_all_tenant_) { // flush all tenant cache + } else if (arg_.is_fine_grained_) { // fine-grained plan cache evict + if (arg_.db_ids_.count() == 0) { + uint64_t db_id = OB_INVALID_ID; + ret = pcm->flush_plan_cache_by_sql_id(arg_.tenant_id_, db_id, arg_.sql_id_); + } else { + for (uint64_t i=0; OB_SUCC(ret) && iflush_plan_cache_by_sql_id(arg_.tenant_id_, arg_.db_ids_.at(i), arg_.sql_id_); + } + } + } else if (arg_.is_all_tenant_) { //flush all tenant cache ret = pcm->flush_all_plan_cache(); } else { // flush appointed tenant cache ret = pcm->flush_plan_cache(arg_.tenant_id_); diff --git a/src/rootserver/ob_system_admin_util.cpp b/src/rootserver/ob_system_admin_util.cpp index 8dd99f4417..efc9756be2 100755 --- a/src/rootserver/ob_system_admin_util.cpp +++ b/src/rootserver/ob_system_admin_util.cpp @@ -3137,6 +3137,8 @@ int ObAdminFlushCache::execute(const obrpc::ObAdminFlushCacheArg& arg) int64_t tenant_num = arg.tenant_ids_.count(); ObSEArray server_list; ObFlushCacheArg fc_arg; + // fine-grained plan evict only will pass this way. + // This because fine-grained plan evict must specify tenant // if tenant num is 0, flush all tenant, else, flush appointed tenant if (tenant_num != 0) { // flush appointed tenant for (int64_t i = 0; OB_SUCC(ret) && i < tenant_num; ++i) { @@ -3147,6 +3149,16 @@ int ObAdminFlushCache::execute(const obrpc::ObAdminFlushCacheArg& arg) // call tenant servers; fc_arg.is_all_tenant_ = false; fc_arg.cache_type_ = arg.cache_type_; + // fine-grained plan evict args + if (arg.is_fine_grained_) { + fc_arg.sql_id_ = arg.sql_id_; + fc_arg.is_fine_grained_ = arg.is_fine_grained_; + for(int64_t j=0; OB_SUCC(ret) && j CACHE_TYPE_INVALID && cache_type_ < CACHE_TYPE_MAX; } - int push_tenant(uint64_t tenant_id) + virtual ~ObAdminFlushCacheArg() {} + bool is_valid() const { - return tenant_ids_.push_back(tenant_id); + return cache_type_ > CACHE_TYPE_INVALID && cache_type_ < CACHE_TYPE_MAX; } - TO_STRING_KV(K_(tenant_ids), K_(cache_type)); + int push_tenant(uint64_t tenant_id) { return tenant_ids_.push_back(tenant_id); } + int push_database(uint64_t db_id) { return db_ids_.push_back(db_id); } + TO_STRING_KV(K_(tenant_ids), K_(cache_type), K_(db_ids), K_(sql_id), K_(is_fine_grained)); common::ObSEArray tenant_ids_; ObCacheType cache_type_; + common::ObSEArray db_ids_; + common::ObString sql_id_; + bool is_fine_grained_; }; struct ObAdminMigrateUnitArg { @@ -5456,19 +5459,26 @@ struct ObFlushCacheArg { OB_UNIS_VERSION(1); public: - ObFlushCacheArg() : is_all_tenant_(false), tenant_id_(common::OB_INVALID_TENANT_ID), cache_type_(CACHE_TYPE_INVALID) - {} + ObFlushCacheArg() : + is_all_tenant_(false), + tenant_id_(common::OB_INVALID_TENANT_ID), + cache_type_(CACHE_TYPE_INVALID), + is_fine_grained_(false){}; virtual ~ObFlushCacheArg() {} bool is_valid() const { return cache_type_ > CACHE_TYPE_INVALID && cache_type_ < CACHE_TYPE_MAX; } - TO_STRING_KV(K(is_all_tenant_), K_(tenant_id), K_(cache_type)); + int push_database(uint64_t db_id) { return db_ids_.push_back(db_id); } + TO_STRING_KV(K(is_all_tenant_), K_(tenant_id), K_(cache_type), K_(db_ids), K_(sql_id), K_(is_fine_grained)); bool is_all_tenant_; uint64_t tenant_id_; ObCacheType cache_type_; + common::ObSEArray db_ids_; + common::ObString sql_id_; + bool is_fine_grained_; }; struct ObGetAllSchemaArg { diff --git a/src/sql/engine/cmd/ob_alter_system_executor.cpp b/src/sql/engine/cmd/ob_alter_system_executor.cpp index 7584365dfe..2347adef6c 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.cpp +++ b/src/sql/engine/cmd/ob_alter_system_executor.cpp @@ -84,13 +84,34 @@ int ObFlushCacheExecutor::execute(ObExecContext& ctx, ObFlushCacheStmt& stmt) int ret = OB_SUCCESS; if (!stmt.is_global_) { // flush local int64_t tenant_num = stmt.flush_cache_arg_.tenant_ids_.count(); + int64_t db_num = stmt.flush_cache_arg_.db_ids_.count(); + common::ObString sql_id = stmt.flush_cache_arg_.sql_id_; switch (stmt.flush_cache_arg_.cache_type_) { case CACHE_TYPE_PLAN: { if (OB_ISNULL(ctx.get_plan_cache_manager())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("plan cache manager is null"); - } else if (0 == tenant_num) { - ret = ctx.get_plan_cache_manager()->flush_all_plan_cache(); + } else if (stmt.flush_cache_arg_.is_fine_grained_) { + // purge in sql_id level, aka. fine-grained plan evict + // we assume tenant_list must not be empty and this will be checked in resolve phase + if (0 == tenant_num) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected tenant_list in fine-grained plan evict", K(tenant_num)); + } else { + for (int64_t i = 0; i < tenant_num; i++) { // ignore ret + int64_t t_id = stmt.flush_cache_arg_.tenant_ids_.at(i); + if (db_num == 0) { // not specified db_name, evcit all dbs + ret = GCTX.sql_engine_->get_plan_cache_manager()->flush_plan_cache_by_sql_id(t_id, OB_INVALID_ID, sql_id); + } else { // evict db by db + for(int64_t j = 0; j < db_num; j++) { // ignore ret + ret = GCTX.sql_engine_->get_plan_cache_manager()->flush_plan_cache_by_sql_id( + t_id, stmt.flush_cache_arg_.db_ids_.at(j), sql_id); + } + } + } + } + } else if (0 == tenant_num) { // purge in tenant level, aka. coarse-grained plan evict + ret = GCTX.sql_engine_->get_plan_cache_manager()->flush_all_plan_cache(); } else { for (int64_t i = 0; i < tenant_num; ++i) { // ignore ret ret = ctx.get_plan_cache_manager()->flush_plan_cache(stmt.flush_cache_arg_.tenant_ids_.at(i)); diff --git a/src/sql/engine/ob_exec_context.h b/src/sql/engine/ob_exec_context.h index a40b7350d3..444e180477 100644 --- a/src/sql/engine/ob_exec_context.h +++ b/src/sql/engine/ob_exec_context.h @@ -82,6 +82,7 @@ class ObPxSqcHandler; class ObOpSpec; class ObOperator; class ObOpInput; +class ObSql; class ObEvalCtx; typedef common::ObArray*> ObRowIdListArray; // Physical operator kit: operator specification, operator, operator input diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index b82f5ce2ea..6aa6a3a158 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -2676,6 +2676,9 @@ int ObSql::pc_get_plan(ObPlanCacheCtx &pc_ctx, ObPhysicalPlan *&plan, int &get_p NG_TRACE(cache_get_plan_begin); ObPlanCache *plan_cache = NULL; ObSQLSessionInfo *session = pc_ctx.sql_ctx_.session_info_; + if (pc_ctx.sql_ctx_.is_remote_sql_) { + pc_ctx.fp_result_.pc_key_.key_id_ = 0; + } if (OB_ISNULL(session) || OB_ISNULL(plan_cache = session->get_plan_cache())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Invalid plan cache", K(ret), K(session), K(plan_cache)); @@ -2684,6 +2687,10 @@ int ObSql::pc_get_plan(ObPlanCacheCtx &pc_ctx, ObPhysicalPlan *&plan, int &get_p OB_ERR_PROXY_REROUTE == ret || OB_BATCHED_MULTI_STMT_ROLLBACK == ret) { /*do nothing*/ } else if (!pc_ctx.is_ps_mode_ && OB_PC_LOCK_CONFLICT == ret && !session->is_inner()) { + if (pc_ctx.sql_ctx_.is_remote_sql_) { + get_plan_err = ret; + ret = OB_SUCCESS; + } } else { get_plan_err = ret; ret = OB_SUCCESS; @@ -2960,6 +2967,9 @@ int ObSql::pc_add_plan( LOG_WARN("Failed copy field to pplan", K(ret)); } else { phy_plan->set_outline_state(outline_state); + if (pc_ctx.sql_ctx_.is_remote_sql_) { + pc_ctx.fp_result_.pc_key_.key_id_ = 0; + } if (pc_ctx.is_ps_mode_) { // remote SQL enters the plan for the second time, and saves raw_sql as pc_key in the plan cache. // then use the ps interface to directly use the parameterized sql as the key to check the plan cache, diff --git a/src/sql/parser/ob_item_type.h b/src/sql/parser/ob_item_type.h index bd536d1c66..01c4bff17e 100644 --- a/src/sql/parser/ob_item_type.h +++ b/src/sql/parser/ob_item_type.h @@ -1105,6 +1105,7 @@ typedef enum ObItemType T_ALTER_DATABASE, T_USE_DATABASE, T_DATABASE_OPTION_LIST, + T_DATABASE_LIST, T_CREATE_TENANT, T_DROP_TENANT, T_MODIFY_TENANT, diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index da5ca0a0be..571a8c3f71 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -307,7 +307,7 @@ END_P SET_VAR DELIMITER %type create_table_stmt create_table_like_stmt opt_table_option_list table_option_list table_option table_option_list_space_seperated create_function_stmt drop_function_stmt parallel_option %type opt_force %type create_database_stmt drop_database_stmt alter_database_stmt use_database_stmt -%type opt_database_name database_option database_option_list opt_database_option_list database_factor +%type opt_database_name database_option database_option_list opt_database_option_list database_factor databases_expr opt_databases %type create_tenant_stmt opt_tenant_option_list alter_tenant_stmt drop_tenant_stmt %type create_restore_point_stmt drop_restore_point_stmt %type create_resource_stmt drop_resource_stmt alter_resource_stmt @@ -3634,6 +3634,24 @@ database_option } ; +databases_expr: +DATABASES opt_equal_mark STRING_VALUE +{ + (void)($2); + malloc_non_terminal_node($$, result->malloc_pool_, T_DATABASE_LIST, 1, $3); +}; + +opt_databases: +databases_expr +{ + $$ = $1; +} +| /*EMPTY*/ +{ + $$ = NULL; +} +; + charset_key: CHARSET { @@ -11446,14 +11464,16 @@ ALTER SYSTEM BOOTSTRAP server_info_list malloc_non_terminal_node($$, result->malloc_pool_, T_BOOTSTRAP, 3, server_list, NULL, NULL); } | -ALTER SYSTEM FLUSH cache_type CACHE opt_tenant_list flush_scope +ALTER SYSTEM FLUSH cache_type CACHE opt_sql_id opt_databases opt_tenant_list flush_scope { - malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 3, $4, $6, $7); + // system tenant use only. + malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 5, $4, $6, $7, $8, $9); } | +// this just is a Syntactic sugar, only used to be compatible to plan cache's Grammar ALTER SYSTEM FLUSH SQL cache_type opt_tenant_list flush_scope { - malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 3, $5, $6, $7); + malloc_non_terminal_node($$, result->malloc_pool_, T_FLUSH_CACHE, 5, $5, NULL, NULL, $6, $7); } | ALTER SYSTEM FLUSH KVCACHE opt_tenant_name opt_cache_name diff --git a/src/sql/plan_cache/ob_pcv_set.cpp b/src/sql/plan_cache/ob_pcv_set.cpp index 2e10f54486..ed849b6749 100644 --- a/src/sql/plan_cache/ob_pcv_set.cpp +++ b/src/sql/plan_cache/ob_pcv_set.cpp @@ -68,15 +68,23 @@ void ObPCVSet::destroy() pcv = NULL; } } - // free key. if (NULL != pc_alloc_) { + //free key. pc_key_.destory(*pc_alloc_); pc_key_.reset(); if (NULL != sql_.ptr()) { pc_alloc_->free(sql_.ptr()); } + + // free sql_id list + for (int64_t i = 0; i < sql_ids_.count(); i++) { + if (NULL != sql_ids_[i].ptr()) { + pc_alloc_->free(sql_ids_[i].ptr()); + } + } } + sql_ids_.reset(); sql_.reset(); plan_cache_ = NULL; pc_alloc_ = NULL; @@ -226,6 +234,7 @@ int ObPCVSet::add_cache_obj(ObCacheObject* cache_obj, ObPlanCacheCtx& pc_ctx) LOG_WARN("fail to match pcv in pcv_set", K(ret)); } else if (is_same) { is_new = false; + LOG_INFO("has identical pcv", K(is_same), K(pcv)); if (OB_FAIL(pcv->add_plan(*cache_obj, schema_array, pc_ctx))) { if (OB_SQL_PC_PLAN_DUPLICATE == ret || is_not_supported_err(ret)) { LOG_DEBUG("fail to add plan to pcv", K(ret)); @@ -237,7 +246,6 @@ int ObPCVSet::add_cache_obj(ObCacheObject* cache_obj, ObPlanCacheCtx& pc_ctx) } } // for end } - if (OB_SUCC(ret) && is_new) { ObPlanCacheValue* pcv = NULL; if (OB_FAIL(create_pcv_and_add_plan(cache_obj, pc_ctx, schema_array, pcv))) { @@ -277,8 +285,11 @@ int ObPCVSet::create_pcv_and_add_plan(ObCacheObject* cache_obj, ObPlanCacheCtx& { int ret = OB_SUCCESS; new_pcv = nullptr; - // create pcv and init - if (OB_ISNULL(cache_obj) || OB_ISNULL(pc_ctx.sql_ctx_.schema_guard_)) { + common::ObString sql_id; + common::ObString sql_id_org(common::OB_MAX_SQL_ID_LENGTH, (const char*)&pc_ctx.sql_ctx_.sql_id_); + //create pcv and init + if (OB_ISNULL(cache_obj) || + OB_ISNULL(pc_ctx.sql_ctx_.schema_guard_)) { ret = OB_INVALID_ARGUMENT; SQL_PC_LOG(WARN, "invalid argument", K(ret)); } else if (OB_FAIL(create_new_pcv(new_pcv))) { @@ -288,6 +299,10 @@ int ObPCVSet::create_pcv_and_add_plan(ObCacheObject* cache_obj, ObPlanCacheCtx& LOG_WARN("unexpected null for new_pcv", K(new_pcv)); } else if (OB_FAIL(new_pcv->init(this, cache_obj, pc_ctx))) { SQL_PC_LOG(WARN, "failed to init plan cache value"); + } else if (OB_FAIL(ob_write_string(*pc_alloc_, sql_id_org, sql_id))) { + SQL_PC_LOG(WARN, "failed to deep copy sql_id_", K(sql_id_org), K(ret)); + } else if (OB_FAIL(push_sql_id(sql_id))) { + SQL_PC_LOG(WARN, "failed to push sql_id_", K(ret)); } else { // do nothing } diff --git a/src/sql/plan_cache/ob_pcv_set.h b/src/sql/plan_cache/ob_pcv_set.h index 2c102c820c..c723b37822 100644 --- a/src/sql/plan_cache/ob_pcv_set.h +++ b/src/sql/plan_cache/ob_pcv_set.h @@ -75,10 +75,12 @@ public: int init(const ObPlanCacheCtx& pc_ctx, const ObCacheObject* cache_obj); void destroy(); - // get plan from pcv_set - int get_plan(ObPlanCacheCtx& pc_ctx, ObCacheObject*& plan); - // add plan to pcv_set - int add_cache_obj(ObCacheObject* cache_obj, ObPlanCacheCtx& pc_ctx); + //get plan from pcv_set + int get_plan(ObPlanCacheCtx &pc_ctx, ObCacheObject *&plan); + //add plan to pcv_set + int add_cache_obj(ObCacheObject *cache_obj, ObPlanCacheCtx &pc_ctx); + common::ObIArray &get_sql_id() { return sql_ids_; } + int push_sql_id(common::ObString sql_id) { return sql_ids_.push_back(sql_id); } int64_t inc_ref_count(const CacheRefHandleID ref_handle); int64_t dec_ref_count(const CacheRefHandleID ref_handle); @@ -176,6 +178,7 @@ private: int64_t min_merged_version_; int64_t min_cluster_version_; int64_t plan_num_; + common::ObSEArray sql_ids_; bool need_check_gen_tbl_col_; common::ObFixedArray col_field_arr_; diff --git a/src/sql/plan_cache/ob_plan_cache.cpp b/src/sql/plan_cache/ob_plan_cache.cpp index f2bff32083..9369e50753 100644 --- a/src/sql/plan_cache/ob_plan_cache.cpp +++ b/src/sql/plan_cache/ob_plan_cache.cpp @@ -101,10 +101,92 @@ struct ObGetAllSqlIdOp { const CacheRefHandleID ref_handle_; }; -struct ObGetAllPLIdOp { - explicit ObGetAllPLIdOp(common::ObIArray *key_array, const CacheRefHandleID ref_handle) - : key_array_(key_array), ref_handle_(ref_handle) +struct ObGetPcvSetBySQLIDOp +{ + explicit ObGetPcvSetBySQLIDOp(uint64_t db_id, common::ObString sql_id, + common::ObIArray *key_array, const CacheRefHandleID ref_handle) + : db_id_(db_id), + sql_id_(sql_id), + key_array_(key_array), + ref_handle_(ref_handle) + { + } + + int operator()(common::hash::HashMapPair &entry) + { + int ret = common::OB_SUCCESS; + bool contains = false; + + if (OB_ISNULL(key_array_) || OB_ISNULL(entry.second)) { + ret = common::OB_INVALID_ARGUMENT; + SQL_PC_LOG(WARN, "invalid argument", K(key_array_), K(entry.second), K(ret)); + } else if ( NS_INVALID == entry.first.namespace_) { + // do nothing + } else if (db_id_ != common::OB_INVALID_ID && db_id_ != entry.first.db_id_) { + // skip entry that has non-matched db_id + } else if (!contain_sql_id(entry.second->get_sql_id())) { + // skip entry which not contains same sql_id + } else if (OB_FAIL(key_array_->push_back(ObPCKeyValue(entry.first, entry.second)))) { + SQL_PC_LOG(WARN, "fail to push back key", K(ret)); + } else { + entry.second->inc_ref_count(ref_handle_); + } + + return ret; + } + + bool contain_sql_id(common::ObIArray &sql_ids){ + bool contains = false; + for (int64_t i = 0; !contains && i < sql_ids.count(); i++) { + if (sql_ids.at(i) == sql_id_) { + contains = true; + } + } + return contains; + } + + uint64_t db_id_; + common::ObString sql_id_; + common::ObIArray *key_array_; + const CacheRefHandleID ref_handle_; +}; + +struct ObGetTableIdOp +{ + explicit ObGetTableIdOp(uint64_t table_id) + : table_id_(table_id) {} + + int operator()(common::hash::HashMapPair &entry) + { + int ret = common::OB_SUCCESS; + ObPhysicalPlan *plan = NULL; + int64_t version = -1; + if (OB_ISNULL(entry.second)) { + ret = common::OB_NOT_INIT; + SQL_PC_LOG(WARN, "invalid argument", K(ret)); + } else if (!entry.second->is_sql_crsr()) { + // not sql plan + // do nothing + } else if (OB_ISNULL(plan = dynamic_cast(entry.second))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null plan", K(ret), K(plan)); + } else if (entry.second->get_base_table_version(table_id_, version)) { + LOG_WARN("failed to get base table version", K(ret)); + } else if (version > 0) { + static_cast(entry.second)->set_is_expired(true); + } + return ret; + } + const uint64_t table_id_; +}; + + +struct ObGetAllPLIdOp +{ + explicit ObGetAllPLIdOp(common::ObIArray *key_array, + const CacheRefHandleID ref_handle) + : key_array_(key_array), ref_handle_(ref_handle) {} int operator()(common::hash::HashMapPair &entry) { int ret = OB_SUCCESS; @@ -515,9 +597,10 @@ int ObPlanCache::add_plan(ObPhysicalPlan *plan, ObPlanCacheCtx &pc_ctx) SQL_PC_LOG(WARN, "invalid physical plan", K(ret)); } else if (is_reach_memory_limit()) { ret = OB_REACH_MEMORY_LIMIT; - if (REACH_TIME_INTERVAL(1000000)) { - SQL_PC_LOG( - ERROR, "plan cache memory used reach limit", K_(tenant_id), K(get_mem_hold()), K(get_mem_limit()), K(ret)); + LOG_INFO("reach memory limit.", K(ret), K(get_mem_hold()), K(get_mem_limit())); + if (REACH_TIME_INTERVAL(1000000)) { //1s, when memory reach out of limit, set 1s as interval to print log + SQL_PC_LOG(ERROR, "plan cache memory used reach limit", + K_(tenant_id), K(get_mem_hold()), K(get_mem_limit()), K(ret)); } } else if (plan->get_mem_size() >= get_mem_high()) { // plan mem is too big, do not add plan @@ -889,6 +972,33 @@ int ObPlanCache::cache_evict_all_pl() return ret; } +// delete plan by sql_id +int ObPlanCache::cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql_id) +{ + int ret = OB_SUCCESS; + ObGlobalReqTimeService::check_req_timeinfo(); + SQL_PC_LOG(DEBUG, "cache evict plan by sql id start"); + PCKeyValueArray to_evict_keys; + ObGetPcvSetBySQLIDOp get_ids_op(db_id, sql_id, &to_evict_keys, PCV_GET_PLAN_KEY_HANDLE); + + if (OB_FAIL(sql_pcvs_map_.foreach_refactored(get_ids_op))) { + SQL_PC_LOG(WARN, "fail to get all sql_ids in id2value_map", K(ret)); + } else if (OB_FAIL(remove_pcv_sets(to_evict_keys))) { + SQL_PC_LOG(WARN, "fail to remove pcv set", K(ret)); + } + + //decrease each pcv_set's reference count + int64_t N = to_evict_keys.count(); + for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) { + if (NULL != to_evict_keys.at(i).pcv_set_) { + to_evict_keys.at(i).pcv_set_->dec_ref_count(PCV_GET_PLAN_KEY_HANDLE); + } + } + + SQL_PC_LOG(DEBUG, "cache evict plan by sql id end"); + return ret; +} +// rule for evict plan // 1. calc evict_num : (mem_used - mem_lwm) / (mem_used / cache_value_count) // 2. get evict_sql_id from calc_cache_evict_keys // 3. evict value @@ -1145,6 +1255,7 @@ int ObPlanCache::add_cache_obj_stat(ObPlanCacheCtx &pc_ctx, ObCacheObject *cache ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null plan", K(ret), K(plan)); } else { + plan->stat_.bl_info_.key_.db_id_=pc_ctx.bl_key_.db_id_; plan->stat_.plan_id_ = plan->get_plan_id(); plan->stat_.bl_info_.plan_hash_value_ = plan->get_signature(); plan->stat_.gen_time_ = ObTimeUtility::current_time(); diff --git a/src/sql/plan_cache/ob_plan_cache.h b/src/sql/plan_cache/ob_plan_cache.h index 4e88c42746..e83a3edfe6 100644 --- a/src/sql/plan_cache/ob_plan_cache.h +++ b/src/sql/plan_cache/ob_plan_cache.h @@ -197,7 +197,8 @@ public: int cache_evict_all_pl(); // evict plan, adjust mem between hwm and lwm int cache_evict(); - // evict plan whose merged version is expired + int cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql_id); + //evict plan whose merged version is expired int evict_expired_plan(); bool is_valid() { diff --git a/src/sql/plan_cache/ob_plan_cache_manager.cpp b/src/sql/plan_cache/ob_plan_cache_manager.cpp index eaccf1e91d..5fec0595e0 100644 --- a/src/sql/plan_cache/ob_plan_cache_manager.cpp +++ b/src/sql/plan_cache/ob_plan_cache_manager.cpp @@ -445,6 +445,41 @@ void ObPlanCacheManager::ObPlanCacheEliminationTask::run_free_cache_obj_task() } } +int ObPlanCacheManager::flush_plan_cache_by_sql_id(uint64_t tenant_id, + uint64_t db_id, + common::ObString sql_id) { + int ret = OB_SUCCESS; + observer::ObReqTimeGuard req_timeinfo_guard; + ObPlanCache *plan_cache = get_plan_cache(tenant_id); + if (NULL != plan_cache) { + if (OB_FAIL(plan_cache->cache_evict_plan_by_sql_id(db_id, sql_id))) { + SQL_PC_LOG(ERROR, "Plan cache evict failed, please check", K(ret)); + } + ObArray deleted_objs; + int64_t safe_timestamp = INT64_MAX; + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance() + .get_global_safe_timestamp(safe_timestamp))) { + SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret)); + } else if (OB_FAIL(plan_cache->dump_deleted_objs(deleted_objs, safe_timestamp))) { + SQL_PC_LOG(WARN, "failed to get deleted sql objs", K(ret)); + } else { + ObCacheObject *to_del_obj = NULL; + LOG_INFO("Deleted Cache Objs", K(deleted_objs)); + for (int64_t i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue + if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true, + deleted_objs.at(i).obj_id_, + plan_cache))) { + LOG_WARN("failed to destroy cache obj", K(ret)); + } + } + } + plan_cache->dec_ref_count(); + } + return ret; +} + int ObPlanCacheManager::flush_all_plan_cache() { int ret = OB_SUCCESS; diff --git a/src/sql/plan_cache/ob_plan_cache_manager.h b/src/sql/plan_cache/ob_plan_cache_manager.h index c3db9078b0..2ffa3f36b9 100644 --- a/src/sql/plan_cache/ob_plan_cache_manager.h +++ b/src/sql/plan_cache/ob_plan_cache_manager.h @@ -87,6 +87,7 @@ public: int revert_ps_cache(const uint64_t& tenant_id); int flush_all_plan_cache(); int flush_plan_cache(const uint64_t tenant_id); + int flush_plan_cache_by_sql_id(uint64_t tenant_id, uint64_t db_id, common::ObString sql_id); int flush_all_pl_cache(); int flush_pl_cache(const uint64_t tenant_id); int flush_all_ps_cache(); diff --git a/src/sql/privilege_check/ob_privilege_check.cpp b/src/sql/privilege_check/ob_privilege_check.cpp index 5e2d948a5c..14a12d0aef 100644 --- a/src/sql/privilege_check/ob_privilege_check.cpp +++ b/src/sql/privilege_check/ob_privilege_check.cpp @@ -1374,6 +1374,7 @@ int get_sys_tenant_alter_system_priv( ret = OB_INVALID_ARGUMENT; LOG_WARN("Basic stmt should be not be NULL", K(ret)); } else if (OB_SYS_TENANT_ID != session_priv.tenant_id_ && + stmt::T_FLUSH_CACHE != basic_stmt->get_stmt_type() && stmt::T_ALTER_SYSTEM_SET_PARAMETER != basic_stmt->get_stmt_type()) { ret = OB_ERR_NO_PRIVILEGE; LOG_WARN("Only sys tenant can do this operation", K(ret), "stmt type", basic_stmt->get_stmt_type()); diff --git a/src/sql/resolver/cmd/ob_alter_system_resolver.cpp b/src/sql/resolver/cmd/ob_alter_system_resolver.cpp index 29994cb4d7..09c49fbe09 100644 --- a/src/sql/resolver/cmd/ob_alter_system_resolver.cpp +++ b/src/sql/resolver/cmd/ob_alter_system_resolver.cpp @@ -551,14 +551,29 @@ int ObFreezeResolver::resolve(const ParseNode& parse_tree) return ret; } -int ObFlushCacheResolver::resolve(const ParseNode& parse_tree) + // + // This node has five children_ and they are following: + // cache_type_: parse_tree.children_[0] + // opt_sql_id: parse_tree.children_[1] + // opt_databases: parse_tree.children_[2] + // opt_tenant_list: parse_tree.children_[3] + // flush_scope: parse_tree.children_[4] + // +int ObFlushCacheResolver::resolve(const ParseNode &parse_tree) { int ret = OB_SUCCESS; - ObFlushCacheStmt* stmt = NULL; - if (OB_UNLIKELY(T_FLUSH_CACHE != parse_tree.type_ || parse_tree.num_child_ != 3)) { + ObFlushCacheStmt *stmt = NULL; + ObSQLSessionInfo* sess = params_.session_info_; + if (OB_ISNULL(sess)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid argument", "type", get_type_name(parse_tree.type_), "child_num", parse_tree.num_child_); - } else if (NULL == parse_tree.children_[0] || NULL == parse_tree.children_[2]) { + SERVER_LOG(WARN, "invalid session"); + } else if (OB_UNLIKELY(T_FLUSH_CACHE != parse_tree.type_ || parse_tree.num_child_ != 5)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid argument", + "type", get_type_name(parse_tree.type_), + "child_num", parse_tree.num_child_); + } else if (NULL == parse_tree.children_[0] + || NULL == parse_tree.children_[4]) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid parse tree", K(ret)); } else if (NULL == (stmt = create_stmt())) { @@ -566,14 +581,53 @@ int ObFlushCacheResolver::resolve(const ParseNode& parse_tree) LOG_ERROR("create ObFlushCacheStmt failed"); } else { ObSchemaGetterGuard schema_guard; + + // first child: resolve cache type stmt->flush_cache_arg_.cache_type_ = (ObCacheType)parse_tree.children_[0]->value_; - stmt->is_global_ = parse_tree.children_[2]->value_; - ParseNode* t_node = parse_tree.children_[1]; - if (NULL == t_node) { // tenant list is empty + // second child: resolve sql_id + ParseNode *sql_id_node = parse_tree.children_[1]; + // for adds database id + // third child: resolve db_list + ParseNode *db_node = parse_tree.children_[2]; + // for adds tenant ids + // fourth child: resolve tenant list + ParseNode *t_node = parse_tree.children_[3]; + // fivth child: resolve application fields + stmt->is_global_ = parse_tree.children_[4]->value_; + // whether is coarse granularity plan cache evict. + // tenant level(true) / pcv_set level(false) + bool is_coarse_granularity = true; + ObSEArray db_name_list; + + // sql_id + if (OB_FAIL(ret)) { // do nothing - } else if (NULL == t_node->children_) { - ret = OB_ERR_UNEXPECTED; - SERVER_LOG(WARN, "invalid argument", K(ret)); + } else if (OB_ISNULL(sql_id_node)) { + // do nothing + // currently, only support plan cache's fine-grained cache evict + } else if (stmt->flush_cache_arg_.cache_type_ != CACHE_TYPE_PLAN) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("only support plan cache's fine-grained cache evict", K(stmt->flush_cache_arg_.cache_type_), K(ret)); + } else if (lib::is_oracle_mode()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("not supported plan cache's fine-grained cache evict in oracle mode", K(ret)); + } else if (OB_ISNULL(sql_id_node->children_) + || OB_ISNULL(sql_id_node->children_[0]) + || T_SQL_ID != sql_id_node->type_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret)); + } else if (sql_id_node->children_[0]->str_len_ > (OB_MAX_SQL_ID_LENGTH+1)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret)); + } else { + stmt->flush_cache_arg_.sql_id_.assign_ptr( + sql_id_node->children_[0]->str_value_, + static_cast(sql_id_node->children_[0]->str_len_)); + stmt->flush_cache_arg_.is_fine_grained_ = true; + } + + // retrive schema guard + if (OB_FAIL(ret)) { } else if (OB_ISNULL(GCTX.schema_service_)) { ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "invalid argument", K(GCTX.schema_service_)); @@ -583,9 +637,107 @@ int ObFlushCacheResolver::resolve(const ParseNode& parse_tree) } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard( session_info_->get_effective_tenant_id(), schema_guard))) { SERVER_LOG(WARN, "get_schema_guard failed", K(ret)); + } else { + // do nothing + } + + // db names + if (OB_FAIL(ret)) { + // do nothing + } else if (!stmt->flush_cache_arg_.is_fine_grained_) { + if (OB_ISNULL(db_node)) { + // tenant level plan cache evict + // and not needs to specify db_name + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("not supported flush cache in database level", K(ret)); + } + } else if (NULL == db_node) { // db list is empty + // empty db list means clear all db's in fine-grained cache evict + // do nothing + } else if (OB_ISNULL(db_node->children_) + || OB_ISNULL(db_node->children_[0]) + || T_DATABASE_LIST != db_node->type_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret)); + } else { + uint64_t db_id = 0; + ObString db_names; + ObString db_name; + db_names.assign_ptr(db_node->children_[0]->str_value_, + static_cast(db_node->children_[0]->str_len_)); + while (OB_SUCC(ret) && !db_names.empty()) { + db_name = db_names.split_on(',').trim(); + if(db_name.empty() && NULL == db_names.find(',')) { + db_name = db_names; + db_names.reset(); + } + if(!db_name.empty()) { + if (OB_FAIL(db_name_list.push_back(db_name))) { + SERVER_LOG(WARN, "failed to add database name", K(ret)); + } + } + } // for database name end + } + + /* + * different database belongs to different tenant, + * and we will use following logics to retrive db_id: + * for (tenant list) { + * for (database_name_list) { + * // find db_id from schema + * args_.push_back(db_id); + * } + * } + * */ + // tenant list + if (OB_FAIL(ret)) { + } else if (NULL == t_node) { //tenant list is empty + if (!stmt->flush_cache_arg_.is_fine_grained_) { // coarse grained cache evict + // Notes: + // tenant level evict, and no tenant list specified means all tenant + // for system tenant: empty means flush all tenant's + // for normal tenant: this node has been set as NULL in parse phase, + // and already adds its tenant id to tenant list in above + // Therefore, do nothing + } else { // fine-grained cache evcit + // for fine-grained plan evict, we must specify tenant list + uint64_t t_id = OB_INVALID_ID; + t_id = sess->get_effective_tenant_id(); + if (t_id <= OB_MAX_RESERVED_TENANT_ID) {// system tenant will use this path. + // system tenant must specify tenant_list; + ret = OB_EMPTY_TENANT; + SERVER_LOG(WARN, "invalid argument, fine-grained plan evict must specify tenant_list", K(ret)); + } else { // normal tenant + if (OB_FAIL(stmt->flush_cache_arg_.push_tenant(sess->get_effective_tenant_id()))) { + LOG_WARN("failed to adds tenant for normal tenant", K(sess->get_effective_tenant_id()), K(ret)); + } else { + // normal tenant will use it's tenant_id when t_node is empty + for (uint64_t j=0; OB_SUCC(ret) && jflush_cache_arg_.push_database(db_id))) { + SERVER_LOG(WARN, "fail to push database id ",K(db_name_list.at(j).trim()), K(db_id), K(ret)); + } + } // for get db_id ends + LOG_INFO("normal tenant flush plan cache ends", K(t_id), K(db_name_list)); + } + } // normal tenant ends + } // fine-grained plan evcit ends + } else if (sess->get_effective_tenant_id() != OB_SYS_TENANT_ID) { + // tenant node is not null and current tenant is not sys tenant + // due to normal tenant cannot specify tenant, and only can purge + // their own plan cache + ret = OB_ERR_NO_PRIVILEGE; + LOG_WARN("Only sys tenant can do this operation", K(ret), K(sess->get_effective_tenant_id())); + } else if (NULL == t_node->children_) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "invalid argument", K(ret)); } else { uint64_t tenant_id = 0; ObString tenant_name; + // adds tenant_ids and get db_ids for (int64_t i = 0; OB_SUCC(ret) && i < t_node->num_child_; ++i) { if (OB_ISNULL(t_node->children_[i])) { ret = OB_ERR_UNEXPECTED; @@ -596,12 +748,50 @@ int ObFlushCacheResolver::resolve(const ParseNode& parse_tree) if (OB_FAIL(schema_guard.get_tenant_id(tenant_name, tenant_id))) { SERVER_LOG(WARN, "tenant not exist", K(tenant_name), K(ret)); } else if (OB_FAIL(stmt->flush_cache_arg_.push_tenant(tenant_id))) { - SERVER_LOG(WARN, "fail to push tenant id ", K(tenant_name), K(tenant_id), K(ret)); + SERVER_LOG(WARN, "fail to push tenant id ",K(tenant_name), K(tenant_id), K(ret)); + } else { + ObSchemaGetterGuard schema_guard_db; + if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard_db))) { + SERVER_LOG(WARN, "get_schema_guard failed", K(ret), K(tenant_id)); + } else { + for (uint64_t j = 0; OB_SUCC(ret) && j < db_name_list.count(); j++) { + uint64_t db_id = 0; + if (OB_FAIL(schema_guard_db.get_database_id(tenant_id, db_name_list.at(j), db_id))) { + SERVER_LOG(WARN, "database not exist", K(db_name_list.at(j)), K(ret)); + } else if ((int64_t)db_id == OB_INVALID_ID) { + ret = OB_ERR_BAD_DATABASE; + SERVER_LOG(WARN, "database not exist", K(db_name_list.at(j)), K(ret)); + } else if (OB_FAIL(stmt->flush_cache_arg_.push_database(db_id))) { + SERVER_LOG(WARN, "fail to push database id ",K(db_name_list.at(j)), K(db_id), K(ret)); + } + } // for get db_id ends + } } - } + } // for get tenant_id ends tenant_name.reset(); } // for tenant end } + LOG_INFO("resolve flush command finished!", K(ret), K(stmt->is_global_), K(stmt->flush_cache_arg_.cache_type_), + K(stmt->flush_cache_arg_.sql_id_), K(stmt->flush_cache_arg_.is_fine_grained_), + K(stmt->flush_cache_arg_.tenant_ids_), K(stmt->flush_cache_arg_.db_ids_)); + } + if (OB_SUCC(ret) && ObSchemaChecker::is_ora_priv_check()) { + if (OB_ISNULL(schema_checker_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret)); + } else if (OB_FAIL(schema_checker_->check_ora_ddl_priv( + session_info_->get_effective_tenant_id(), + session_info_->get_priv_user_id(), + ObString(""), + // why use T_ALTER_SYSTEM_SET_PARAMETER? + // because T_ALTER_SYSTEM_SET_PARAMETER has following traits: + // T_ALTER_SYSTEM_SET_PARAMETER can allow dba to do an operation + // and prohibit other user to do this operation + // so we reuse this. + stmt::T_ALTER_SYSTEM_SET_PARAMETER, + session_info_->get_enable_role_array()))) { + LOG_WARN("failed to check privilege", K(session_info_->get_effective_tenant_id()), K(session_info_->get_user_id())); + } } return ret; } -- GitLab