From e2d26db1c81baac89de67ffaf3f954da94808782 Mon Sep 17 00:00:00 2001 From: Wesley Wang Date: Sat, 20 Aug 2022 13:01:13 +0800 Subject: [PATCH] Feature: 1. Added the parameter location_expire_period_time to set the active expiration time of the location cache. The value range is [0s, 30d], the default is 0 2. show proxyroute supports printing location cache expiration time 3. Distinguish between business requests and internal, full-time requests, and business request parameter control 4. When the business request is enabled_cached_server = true, the Partition is not random Bugfix: 1. Fix core issue when obproxy::omt::ObResourceUnitTableProcessor::inc_conn 2. Fix the problem that the route cannot be routed when establishing a connection with a cluster id 3. Fix the timestamp accuracy modification caused the old version of the observer to be inaccurate routing 4. Fix the Sharding scenario, the select table name of the sub-database and sub-table is case-sensitive, resulting in an error report 5. Fix the Sharding scenario, the select of sub-database and sub-table will be intercepted and unsupported syntax problem 6. Fixed an issue where a single SQL was sent multiple times, causing the SQL to take more than 40ms 7. Fix the rslist method, specify 127.0.0.1 to start obproxy, in the server failure scenario, more than 3 times, the connection fails to be established 8. Fixed the issue that if the cluster scheduled task fails more than 10 times, the blacklist will be invalid 9. Fixed the problem that OBProxy still thinks the connection is successful when the server hangs up 10. Fixed the problem that the cluster scheduled task failed more than 10 times, causing it to remain in the blacklist 11. Fix the problem that COM_CHANGE_USER will be disconnected --- configure.ac | 3 +- rpm/obproxy-ce-VER.txt | 2 +- script/deploy/obproxyd.sh | 2 +- src/common/ob_accuracy.h | 24 ++++++- src/common/ob_obj_cast.cpp | 63 ++++++++-------- src/obproxy/cmd/ob_show_route_handler.cpp | 64 +++++++++++++++-- .../engine/ob_proxy_operator_table_scan.cpp | 2 +- .../iocore/net/ob_unix_net_vconnection.cpp | 7 +- src/obproxy/obutils/ob_congestion_manager.h | 1 - src/obproxy/obutils/ob_proxy_config.h | 11 ++- src/obproxy/obutils/ob_proxy_stmt.cpp | 72 ++++++++++--------- src/obproxy/obutils/ob_proxy_stmt.h | 2 + .../obutils/ob_resource_pool_processor.cpp | 2 - .../obutils/ob_server_state_processor.cpp | 3 - .../omt/ob_resource_unit_table_processor.cpp | 12 ++-- .../omt/ob_resource_unit_table_processor.h | 2 +- src/obproxy/omt/ob_vip_tenant_conn.h | 6 +- .../opsql/expr_parser/ob_expr_parse_result.h | 2 +- src/obproxy/proxy/mysql/ob_mysql_sm.cpp | 16 +++-- src/obproxy/proxy/mysql/ob_mysql_transact.cpp | 5 +- .../proxy/mysqllib/ob_mysql_resp_analyzer.cpp | 3 +- src/obproxy/proxy/route/ob_ldc_location.cpp | 2 +- src/obproxy/proxy/route/ob_mysql_route.cpp | 28 +++----- src/obproxy/proxy/route/ob_mysql_route.h | 5 +- src/obproxy/proxy/route/ob_partition_entry.h | 2 +- src/obproxy/proxy/route/ob_route_struct.cpp | 29 ++++++++ src/obproxy/proxy/route/ob_route_struct.h | 4 +- src/obproxy/proxy/route/ob_route_utils.cpp | 3 + src/obproxy/proxy/route/ob_server_route.h | 4 +- .../proxy/route/obproxy_expr_calculator.cpp | 19 +++-- .../proxy/route/obproxy_expr_calculator.h | 4 ++ .../proxy/shard/obproxy_shard_utils.cpp | 19 +++-- src/share/part/ob_part_desc_hash.cpp | 3 +- src/share/part/ob_part_desc_key.cpp | 2 +- src/share/part/ob_part_desc_list.cpp | 4 +- src/share/part/ob_part_desc_range.cpp | 2 +- 36 files changed, 283 insertions(+), 151 deletions(-) diff --git a/configure.ac b/configure.ac index 3687897..d236f69 100644 --- a/configure.ac +++ b/configure.ac @@ -106,7 +106,8 @@ [with perf (default is NO)]), [ if test "$withval" = "yes"; then - # 下面参数为使用gperftools的选项,目前未用tcmalloc + # The following parameters are options for using gperftools, + # currently not using tcmalloc # test_perf=yes # AM_CXXFLAGS="${AM_CXXFLAGS} -D__NEED_PERF__" # AM_LDFLAGS="${AM_LDFLAGS} -lprofiler" diff --git a/rpm/obproxy-ce-VER.txt b/rpm/obproxy-ce-VER.txt index b347b11..281b514 100644 --- a/rpm/obproxy-ce-VER.txt +++ b/rpm/obproxy-ce-VER.txt @@ -1 +1 @@ -3.2.3 +3.2.3.5 diff --git a/script/deploy/obproxyd.sh b/script/deploy/obproxyd.sh index f3b9156..9c7c47f 100755 --- a/script/deploy/obproxyd.sh +++ b/script/deploy/obproxyd.sh @@ -147,7 +147,7 @@ function check_opt() OBPROXY_OPT_LOCAL="${OBPROXY_OPT_LOCAL},$OBPROXY_EXTRA_OPT" fi - OBPROXY_OPT_LOCAL=",enable_cached_server=false,enable_get_rslist_remote=true,monitor_stat_dump_interval=1s,enable_qos=true,enable_standby=false,query_digest_time_threshold=2ms,monitor_cost_ms_unit=true,enable_strict_kernel_release=false,enable_proxy_scramble=true,work_thread_num=$WORK_THREAD_NUM,proxy_mem_limited='2G',log_dir_size_threshold=10G${OBPROXY_OPT_LOCAL}" + OBPROXY_OPT_LOCAL=",enable_cached_server=true,enable_get_rslist_remote=true,monitor_stat_dump_interval=1s,enable_qos=true,enable_standby=false,query_digest_time_threshold=2ms,monitor_cost_ms_unit=true,enable_strict_kernel_release=false,enable_proxy_scramble=true,work_thread_num=$WORK_THREAD_NUM,proxy_mem_limited='2G',log_dir_size_threshold=10G${OBPROXY_OPT_LOCAL}" } # change to the path where this script locates. diff --git a/src/common/ob_accuracy.h b/src/common/ob_accuracy.h index 1c724a1..762ce02 100644 --- a/src/common/ob_accuracy.h +++ b/src/common/ob_accuracy.h @@ -34,7 +34,10 @@ public: ObAccuracy(ObPrecision precision, ObScale scale) { set_precision(precision); set_scale(scale); } ObAccuracy(ObLength length, ObPrecision precision, ObScale scale) { set_length(length); set_precision(precision); set_scale(scale); } - ObAccuracy(const ObAccuracy &other) { accuracy_ = other.accuracy_; } + ObAccuracy(bool valid, ObLength length, ObPrecision precision, ObScale scale) : + valid_(valid), length_(length), precision_(precision), scale_(scale) {} + ObAccuracy(const ObAccuracy &other) { accuracy_ = other.accuracy_; valid_ = other.valid_; } + OB_INLINE void set_accuracy(const ObAccuracy &accuracy) { accuracy_ = accuracy.accuracy_; } OB_INLINE void set_accuracy(const int64_t &accuracy) { accuracy_ = accuracy; } OB_INLINE void set_length(ObLength length) { length_ = length; } @@ -47,12 +50,19 @@ public: OB_INLINE ObLength get_length() const { return length_; } OB_INLINE ObPrecision get_precision() const { return precision_; } OB_INLINE ObScale get_scale() const { return scale_; } - OB_INLINE void reset() { accuracy_ = -1; } + + /* + * the default length, precision, scale is different in each type + */ + OB_INLINE void reset() { valid_ = false; length_ = -1; precision_ = -1; scale_ = -1; } + OB_INLINE bool is_valid() const { return valid_; } + public: OB_INLINE ObAccuracy &operator =(const ObAccuracy &other) { if (this != &other) { accuracy_ = other.accuracy_; + valid_ = other.valid_; } return *this; } @@ -71,9 +81,17 @@ public: public: TO_STRING_KV(N_LENGTH, length_, N_PRECISION, precision_, - N_SCALE, scale_); + N_SCALE, scale_, + K_(valid)); NEED_SERIALIZE_AND_DESERIALIZE; +public: + /* + * whether we get the accuracy from server or not + * it is not recommend to judge the init status by value, the init value of different type is different + * the valid value of is as the same as the mysql/oracle document defined + */ + bool valid_; union { int64_t accuracy_; diff --git a/src/common/ob_obj_cast.cpp b/src/common/ob_obj_cast.cpp index 083b019..1c29b36 100644 --- a/src/common/ob_obj_cast.cpp +++ b/src/common/ob_obj_cast.cpp @@ -4126,39 +4126,36 @@ int obj_accuracy_check(ObCastCtx &cast_ctx, { int ret = OB_SUCCESS; - LOG_DEBUG("obj_accuracy_check before", K(obj), K(accuracy), K(cs_type)); - - switch (obj.get_type_class()) { - case ObFloatTC: { - ret = float_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - case ObDoubleTC: { - ret = double_check_precision(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - case ObNumberTC: { - ret = number_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - case ObDateTimeTC: { - ret = datetime_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - case ObOTimestampTC: { - ret = otimestamp_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - case ObTimeTC: { - ret = time_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - case ObStringTC: { - ret = string_length_check(cast_ctx, accuracy, cs_type, obj, buf_obj, res_obj, cast_ctx.cast_mode_); - break; - } - default: { - break; + if (accuracy.is_valid()) { + LOG_DEBUG("obj_accuracy_check before", K(obj), K(accuracy), K(cs_type)); + switch (obj.get_type_class()) { + case ObFloatTC: { + ret = float_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); + break; + } + case ObDoubleTC: { + ret = double_check_precision(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); + break; + } + case ObNumberTC: { + ret = number_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); + break; + } + case ObDateTimeTC: { + ret = datetime_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); + break; + } + case ObOTimestampTC: { + ret = otimestamp_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); + break; + } + case ObTimeTC: { + ret = time_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_); + break; + } + default: { + break; + } } } diff --git a/src/obproxy/cmd/ob_show_route_handler.cpp b/src/obproxy/cmd/ob_show_route_handler.cpp index 1524d4f..7d177fb 100644 --- a/src/obproxy/cmd/ob_show_route_handler.cpp +++ b/src/obproxy/cmd/ob_show_route_handler.cpp @@ -48,6 +48,8 @@ enum OB_RC_LAST_VALID, OB_RC_LAST_ACCESS, OB_RC_LAST_UPDATE, + OB_RC_EXPIRE_TIME, + OB_RC_RELATIVE_EXPIRE_TIME, OB_RC_SERVER_ADDR, OB_RC_MAX_ROUTE_COLUMN_ID, }; @@ -69,6 +71,8 @@ const ObProxyColumnSchema ROUTE_COLUMN_ARRAY[OB_RC_MAX_ROUTE_COLUMN_ID] = { ObProxyColumnSchema::make_schema(OB_RC_LAST_VALID, "last_valid_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RC_LAST_ACCESS, "last_access_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RC_LAST_UPDATE, "last_update_time", OB_MYSQL_TYPE_VARCHAR), + ObProxyColumnSchema::make_schema(OB_RC_EXPIRE_TIME, "expire_time", OB_MYSQL_TYPE_VARCHAR), + ObProxyColumnSchema::make_schema(OB_RC_RELATIVE_EXPIRE_TIME, "relative_expire_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RC_SERVER_ADDR, "server addr", OB_MYSQL_TYPE_VARCHAR), }; @@ -84,6 +88,8 @@ enum OB_RPC_LAST_VALID, OB_RPC_LAST_ACCESS, OB_RPC_LAST_UPDATE, + OB_RPC_EXPIRE_TIME, + OB_RPC_RELATIVE_EXPIRE_TIME, OB_RPC_SERVER_ADDR, OB_RPC_MAX_ROUTE_COLUMN_ID, }; @@ -98,6 +104,8 @@ const ObProxyColumnSchema ROUTE_PARTITION_COLUMN_ARRAY[OB_RPC_MAX_ROUTE_COLUMN_I ObProxyColumnSchema::make_schema(OB_RPC_LAST_VALID, "last_valid_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RPC_LAST_ACCESS, "last_access_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RPC_LAST_UPDATE, "last_update_time", OB_MYSQL_TYPE_VARCHAR), + ObProxyColumnSchema::make_schema(OB_RPC_EXPIRE_TIME, "expire_time", OB_MYSQL_TYPE_VARCHAR), + ObProxyColumnSchema::make_schema(OB_RPC_RELATIVE_EXPIRE_TIME, "relative_expire_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RPC_SERVER_ADDR, "server addr", OB_MYSQL_TYPE_VARCHAR), }; @@ -119,6 +127,8 @@ enum OB_RRC_LAST_VALID, OB_RRC_LAST_ACCESS, OB_RRC_LAST_UPDATE, + OB_RRC_EXPIRE_TIME, + OB_RRC_RELATIVE_EXPIRE_TIME, OB_RRC_ROUTE_SQL, OB_RRC_MAX_ROUTE_COLUMN_ID, }; @@ -138,11 +148,14 @@ const ObProxyColumnSchema ROUTE_ROUTINE_COLUMN_ARRAY[OB_RRC_MAX_ROUTE_COLUMN_ID] ObProxyColumnSchema::make_schema(OB_RRC_LAST_VALID, "last_valid_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RRC_LAST_ACCESS, "last_access_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RRC_LAST_UPDATE, "last_update_time", OB_MYSQL_TYPE_VARCHAR), + ObProxyColumnSchema::make_schema(OB_RRC_EXPIRE_TIME, "expire_time", OB_MYSQL_TYPE_VARCHAR), + ObProxyColumnSchema::make_schema(OB_RRC_RELATIVE_EXPIRE_TIME, "relative_expire_time", OB_MYSQL_TYPE_VARCHAR), ObProxyColumnSchema::make_schema(OB_RRC_ROUTE_SQL, "route_sql", OB_MYSQL_TYPE_VARCHAR), }; int extract_entry_time(const ObRouteEntry &entry, char *create_timebuf, char *valid_timebuf, - char *access_timebuf, char *update_timebuf, const uint32_t buf_len); + char *access_timebuf, char *update_timebuf, char *expire_timebuf, + char *relative_expire_timebuf, const uint32_t buf_len); ObShowRouteHandler::ObShowRouteHandler(ObContinuation *cont, ObMIOBuffer *buf, const ObInternalCmdInfo &info) @@ -534,15 +547,19 @@ int ObShowRouteHandler::dump_table_item(const ObTableEntry &entry) char valid_timebuf[buf_len]; char access_timebuf[buf_len]; char update_timebuf[buf_len]; + char expire_timebuf[buf_len]; + char relative_expire_timebuf[buf_len]; if (OB_FAIL(extract_entry_time(entry, create_timebuf, valid_timebuf, access_timebuf, - update_timebuf, buf_len))) { + update_timebuf, expire_timebuf, relative_expire_timebuf, buf_len))) { WARN_ICMD("fail to extract_entry_time", K(entry), K(ret)); } else { cells[OB_RC_CREATE].set_varchar(create_timebuf); cells[OB_RC_LAST_VALID].set_varchar(valid_timebuf); cells[OB_RC_LAST_ACCESS].set_varchar(access_timebuf); cells[OB_RC_LAST_UPDATE].set_varchar(update_timebuf); + cells[OB_RC_EXPIRE_TIME].set_varchar(expire_timebuf); + cells[OB_RC_RELATIVE_EXPIRE_TIME].set_varchar(relative_expire_timebuf); row.cells_ = cells; row.count_ = OB_RC_MAX_ROUTE_COLUMN_ID; @@ -558,7 +575,8 @@ int ObShowRouteHandler::dump_table_item(const ObTableEntry &entry) } int extract_entry_time(const ObRouteEntry &entry, char *create_timebuf, char *valid_timebuf, - char *access_timebuf, char *update_timebuf, const uint32_t buf_len) + char *access_timebuf, char *update_timebuf, char *expire_timebuf, + char *relative_expire_time, const uint32_t buf_len) { int ret = OB_SUCCESS; struct tm struct_tm; @@ -621,6 +639,34 @@ int extract_entry_time(const ObRouteEntry &entry, char *create_timebuf, char *va } } } + if (OB_SUCC(ret)) { + time_us = usec_to_sec(entry.get_time_for_expired()); + if (OB_ISNULL(localtime_r(&time_us, &struct_tm))) { + ret = OB_ERR_UNEXPECTED; + WARN_ICMD("fail to converts the calendar time timep to broken-time representation", K(time_us), K(ret)); + } else { + strftime_len = strftime(expire_timebuf, buf_len, "%Y-%m-%d %H:%M:%S", &struct_tm); + if (OB_UNLIKELY(strftime_len <= 0) || OB_UNLIKELY(strftime_len >= buf_len)) { + ret = OB_BUF_NOT_ENOUGH; + WARN_ICMD("timebuf is not enough", K(strftime_len), "timebuf length", buf_len, + K(expire_timebuf), K(ret)); + } + } + } + if (OB_SUCC(ret)) { + time_us = usec_to_sec(get_global_table_cache().get_cache_expire_time_us()); + if (OB_ISNULL(localtime_r(&time_us, &struct_tm))) { + ret = OB_ERR_UNEXPECTED; + WARN_ICMD("fail to converts the calendar time timep to broken-time representation", K(time_us), K(ret)); + } else { + strftime_len = strftime(relative_expire_time, buf_len, "%Y-%m-%d %H:%M:%S", &struct_tm); + if (OB_UNLIKELY(strftime_len <= 0) || OB_UNLIKELY(strftime_len >= buf_len)) { + ret = OB_BUF_NOT_ENOUGH; + WARN_ICMD("timebuf is not enough", K(strftime_len), "timebuf length", buf_len, + K(expire_timebuf), K(ret)); + } + } + } return ret; } @@ -668,15 +714,19 @@ int ObShowRouteHandler::dump_partition_item(const ObPartitionEntry &entry) char valid_timebuf[buf_len]; char access_timebuf[buf_len]; char update_timebuf[buf_len]; + char expire_timebuf[buf_len]; + char relative_expire_timebuf[buf_len]; if (OB_FAIL(extract_entry_time(entry, create_timebuf, valid_timebuf, access_timebuf, - update_timebuf, buf_len))) { + update_timebuf, expire_timebuf, relative_expire_timebuf, buf_len))) { WARN_ICMD("fail to extract_entry_time", K(entry), K(ret)); } else { cells[OB_RPC_CREATE].set_varchar(create_timebuf); cells[OB_RPC_LAST_VALID].set_varchar(valid_timebuf); cells[OB_RPC_LAST_ACCESS].set_varchar(access_timebuf); cells[OB_RPC_LAST_UPDATE].set_varchar(update_timebuf); + cells[OB_RPC_EXPIRE_TIME].set_varchar(expire_timebuf); + cells[OB_RPC_RELATIVE_EXPIRE_TIME].set_varchar(relative_expire_timebuf); row.cells_ = cells; row.count_ = OB_RPC_MAX_ROUTE_COLUMN_ID; @@ -715,15 +765,19 @@ int ObShowRouteHandler::dump_routine_item(const ObRoutineEntry &entry) char valid_timebuf[buf_len]; char access_timebuf[buf_len]; char update_timebuf[buf_len]; + char expire_timebuf[buf_len]; + char relative_expire_timebuf[buf_len]; if (OB_FAIL(extract_entry_time(entry, create_timebuf, valid_timebuf, access_timebuf, - update_timebuf, buf_len))) { + update_timebuf, expire_timebuf, relative_expire_timebuf, buf_len))) { WARN_ICMD("fail to extract_entry_time", K(entry), K(ret)); } else { cells[OB_RRC_CREATE].set_varchar(create_timebuf); cells[OB_RRC_LAST_VALID].set_varchar(valid_timebuf); cells[OB_RRC_LAST_ACCESS].set_varchar(access_timebuf); cells[OB_RRC_LAST_UPDATE].set_varchar(update_timebuf); + cells[OB_RRC_EXPIRE_TIME].set_varchar(expire_timebuf); + cells[OB_RRC_RELATIVE_EXPIRE_TIME].set_varchar(relative_expire_timebuf); row.cells_ = cells; row.count_ = OB_RRC_MAX_ROUTE_COLUMN_ID; diff --git a/src/obproxy/engine/ob_proxy_operator_table_scan.cpp b/src/obproxy/engine/ob_proxy_operator_table_scan.cpp index f9c16c5..eb6fd78 100644 --- a/src/obproxy/engine/ob_proxy_operator_table_scan.cpp +++ b/src/obproxy/engine/ob_proxy_operator_table_scan.cpp @@ -361,7 +361,7 @@ int ObProxyTableScanOp::set_index_for_expr(ObProxyExpr *expr) } else { ObString &table_name = expr_column->get_table_name(); ObString &column_name = expr_column->get_column_name(); - if ((table_name.empty() || field.tname_.prefix_match(table_name)) + if ((table_name.empty() || field.tname_.prefix_case_match(table_name)) && 0 == column_name.case_compare(field.cname_)) { expr->set_index(i); expr->set_accuracy(field.accuracy_); diff --git a/src/obproxy/iocore/net/ob_unix_net_vconnection.cpp b/src/obproxy/iocore/net/ob_unix_net_vconnection.cpp index e2e7a83..fb93028 100644 --- a/src/obproxy/iocore/net/ob_unix_net_vconnection.cpp +++ b/src/obproxy/iocore/net/ob_unix_net_vconnection.cpp @@ -48,8 +48,7 @@ namespace obproxy namespace net { -// A block size is 8k, and 2 blocks can transmit 16k data, which meets the requirements -static const int64_t NET_MAX_IOV = 2; +static const int64_t NET_MAX_IOV = 16; static inline ObNetState &get_net_state_by_vio(ObVIO &vio) { @@ -1013,7 +1012,9 @@ ObVIO *ObUnixNetVConnection::do_io_write( // SSL_read maybe trigger write and SSL_write maybe trigger read // so reenable write if (nbytes > 0 && (!write_.enabled_ || using_ssl_)) { - write_.triggered_ = true; + if (using_ssl_) { + write_.triggered_ = true; + } write_.vio_.reenable(); } } else { diff --git a/src/obproxy/obutils/ob_congestion_manager.h b/src/obproxy/obutils/ob_congestion_manager.h index a85d868..dd7dce4 100644 --- a/src/obproxy/obutils/ob_congestion_manager.h +++ b/src/obproxy/obutils/ob_congestion_manager.h @@ -156,7 +156,6 @@ public: ObCongestionZoneState *get_zone_state(const common::ObString &zone_name); bool is_base_servers_added() const { return is_base_servers_added_; } void set_base_servers_added() { is_base_servers_added_ = true; } - void clear_base_servers_added() { is_base_servers_added_ = false; } bool is_congestion_avail(); int update_tc_congestion_map(ObCongestionEntry &entry); DECLARE_TO_STRING; diff --git a/src/obproxy/obutils/ob_proxy_config.h b/src/obproxy/obutils/ob_proxy_config.h index a27f28d..a873069 100644 --- a/src/obproxy/obutils/ob_proxy_config.h +++ b/src/obproxy/obutils/ob_proxy_config.h @@ -159,7 +159,6 @@ public: DEF_TIME(idc_list_refresh_interval, "2h", "[10s, 1d]", "the interval to refresh idc list for getting newest region-idc, [10s, 1d]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); DEF_TIME(stat_table_sync_interval, "60s", "[0s,1d]", "update sync statistic to ob_all_proxy_stat table interval, [0s, 1d], 0 means disable, if set a negative value, proxy treat it as 0", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); DEF_TIME(stat_dump_interval, "6000s", "[0s,1d]", "dump statistic in log interval, [0s, 1d], 0 means disable, if set a negative value, proxy treat it as 0", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); - DEF_INT(partition_location_expire_relative_time, "0", "[-36000000,36000000]", "the unit is ms, 0 means do not expire, others will expire partition location base on relative time", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); DEF_INT(cluster_count_high_water_mark, "256", "[2, 102400]", "if cluster count is greater than this water mark, cluser will be kicked out by LRU", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); DEF_TIME(cluster_expire_time, "1d", "[0,]", "cluster resource expire time, 0 means never expire,cluster will be deleted if it has not been accessed for more than the time,[0, ]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); @@ -341,8 +340,14 @@ public: DEF_BOOL(enable_extra_prometheus_metric, "false", "enable net and route prometheus merics or not", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); DEF_BOOL(enable_causal_order_read, "true", "if enabled, proxy will choose server by priority and sync safe snapshot version if need", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); - DEF_BOOL(enable_qa_mode, "false", "just for test, if enabled, proxy can forcibly expire all location cache", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); - DEF_INT(location_expire_period, "0", "[0,36000000]", "the unit is ms, only work if qa_mode is set, it means location cache which has been created for more than this value will be expired", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); + // The following four parameters are related to the location cache at the table level. + // Here, it is recommended to use partition_location_expire_relative_time in an emergency, + // and set the active expiration recommendation location_expire_period_time + // enable_qa_mode and location_expire_period will be discarded later + DEF_INT(partition_location_expire_relative_time, "0", "[-36000000,36000000]", "the unit is ms, 0 means do not expire, others will expire partition location base on relative time", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); + DEF_BOOL(enable_qa_mode, "false", "just for test, not recommended, if enabled, proxy can forcibly expire all location cache", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); + DEF_INT(location_expire_period, "0", "[0,36000000]", "just for test, not recommended, the unit is ms, only work if qa_mode is set, it means location cache which has been created for more than this value will be expired", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); + DEF_TIME(location_expire_period_time, "0d", "[0s, 30d]", "time for location expire period, values in [0s, 30d], 0 means no expire", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS); // in public cloud, will assign a vip addr to proxy. qa_mode_mock_slb_vip is a vip addr for testing DEF_STR(qa_mode_mock_public_cloud_slb_addr, "127.0.0.1:33045", "mock public cloud slb addr", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER); diff --git a/src/obproxy/obutils/ob_proxy_stmt.cpp b/src/obproxy/obutils/ob_proxy_stmt.cpp index efd3f78..70277b7 100644 --- a/src/obproxy/obutils/ob_proxy_stmt.cpp +++ b/src/obproxy/obutils/ob_proxy_stmt.cpp @@ -40,7 +40,8 @@ int ObProxyDMLStmt::limit_to_sql_string(common::ObSqlString& sql_string) return ret; } ObProxySelectStmt::ObProxySelectStmt() : is_inited_(false), has_rollup_(false), - has_for_update_(false), from_token_off_(-1) + has_for_update_(false), has_unsupport_expr_type_(false), + from_token_off_(-1) { } @@ -100,7 +101,7 @@ int ObProxySelectStmt::handle_parse_result(const ParseResult &parse_result) if (NULL == tmp_node) { // do nothing } else if (i == PARSE_SELECT_HAVING) { - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("having not support", K(ret), K(sql_string_)); } else { switch(tmp_node->type_) { @@ -146,7 +147,7 @@ int ObProxySelectStmt::handle_parse_result(const ParseResult &parse_result) break; case T_QEURY_EXPRESSION_LIST: //distinct not support now default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport type", "node_type", get_type_name(tmp_node->type_), K(sql_string_), K(ret)); } } @@ -339,7 +340,7 @@ int ObProxySelectStmt::handle_limit_clause(ParseNode* node) } break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport type", "node_type", get_type_name(tmp_node->type_), K(sql_string_)); } } @@ -367,7 +368,7 @@ int ObProxySelectStmt::handle_project_list(ParseNode* node) } break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport type", "node_type", get_type_name(tmp_node->type_), K(sql_string_)); } @@ -532,7 +533,7 @@ int ObProxySelectStmt::get_expr_by_type(ObProxyExpr* &expr, ObProxyExprType type ALLOC_PROXY_EXPR_BY_TYPE(ObProxyGroupItem); break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unexpected type", K(type)); break; } @@ -562,25 +563,25 @@ int ObProxySelectStmt::handle_table_and_db_node(ParseNode* node, ObProxyExprTabl ret = OB_ERR_UNEXPECTED; LOG_WARN("dynamic_cast failed", K(ret)); } - } else if (OB_SUCCESS == table_exprs_map_.get_refactored(table_name, expr)) { - if (OB_ISNULL(expr_table = dynamic_cast(expr))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("dynamic_cast failed", K(ret)); - } else { - // Only the real table name needs to be rewritten, not the alias - ObProxyExprTablePos expr_table_pos; - if (NULL != db_node) { - expr_table_pos.set_database_pos(db_node->token_off_); - } - expr_table_pos.set_table_pos(table_node->token_off_); - expr_table_pos.set_table_expr(expr_table); - if (OB_FAIL(table_pos_array_.push_back(expr_table_pos))) { - LOG_WARN("fail to push expr table pos", K(ret)); + } else { + string_to_upper_case(table_name.ptr(), table_name.length()); // change all to upper to store + if (OB_SUCCESS == table_exprs_map_.get_refactored(table_name, expr)) { + if (OB_ISNULL(expr_table = dynamic_cast(expr))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic_cast failed", K(ret)); + } else { + // Only the real table name needs to be rewritten, not the alias + ObProxyExprTablePos expr_table_pos; + if (NULL != db_node) { + expr_table_pos.set_database_pos(db_node->token_off_); + } + expr_table_pos.set_table_pos(table_node->token_off_); + expr_table_pos.set_table_expr(expr_table); + if (OB_FAIL(table_pos_array_.push_back(expr_table_pos))) { + LOG_WARN("fail to push expr table pos", K(ret)); + } } } - } else { - ret = OB_ERR_BAD_FIELD_ERROR; - LOG_WARN("table name of column is not alias name or real table name", K(table_name), K(ret)); } } @@ -622,7 +623,7 @@ int ObProxySelectStmt::column_ref_to_expr(ParseNode* node, ObProxyExpr* &expr, O LOG_WARN("T_COLUMN_REF unexpected column entry", K(ret)); } else if (T_IDENT != column_node->type_ && T_STAR != column_node->type_) { // now column_ref child should be T_IDENT - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("T_COLUMN_REF unexpected column entry", "node_type", get_type_name(column_node->type_), K(ret)); } else if (T_IDENT == column_node->type_) { ObProxyExprColumn* expr_column = NULL; @@ -759,7 +760,7 @@ int ObProxySelectStmt::check_node_has_agg(ParseNode* node) case T_FUN_MAX: case T_FUN_MIN: case T_FUN_AVG: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("node has agg", "node_type", get_type_name(tmp_node->type_), K(ret), K(sql_string_)); break; default: @@ -859,7 +860,7 @@ int ObProxySelectStmt::string_node_to_expr(ParseNode* node, ObProxyExpr* &expr, if (OB_FAIL(check_node_has_agg(node))) { LOG_WARN("unsupport type", "node_type", get_type_name(node->type_), K(node->str_value_)); } else if (string_node == NULL) { - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport type", "node_type", get_type_name(node->type_), K(node->str_value_), K(sql_string_)); } else if (OB_FAIL(get_sharding_const_expr(string_node, expr))) { LOG_WARN("get_sharding_const_expr failed", K(ret)); @@ -914,6 +915,7 @@ int ObProxySelectStmt::get_table_and_db_expr(ParseNode* node, ObProxyExprTable* "token len", table_node->token_len_, K(ret)); } else { ObString table_name(table_node->token_len_, table_node->str_value_); + string_to_upper_case(table_name.ptr(), table_name.length()); // change all to upper to store if (OB_FAIL(table_exprs_map_.get_refactored(table_name, expr))) { /* same table keep last one. */ if (OB_HASH_NOT_EXIST == ret) { if (OB_FAIL(get_expr_by_type(expr, OB_PROXY_EXPR_TYPE_TABLE))) { @@ -923,6 +925,8 @@ int ObProxySelectStmt::get_table_and_db_expr(ParseNode* node, ObProxyExprTable* LOG_WARN("dynamic_cast failed", K(ret)); } else { if (NULL != db_node) { + ObString database_name(db_node->token_len_, db_node->str_value_); + string_to_upper_case(database_name.ptr(), database_name.length()); // change all to upper to store expr_table->set_database_name(db_node->str_value_, db_node->token_len_); } expr_table->set_table_name(table_node->str_value_, table_node->token_len_); @@ -939,6 +943,8 @@ int ObProxySelectStmt::get_table_and_db_expr(ParseNode* node, ObProxyExprTable* ret = OB_ERR_UNEXPECTED; LOG_WARN("dynamic_cast failed", K(ret)); } else if (NULL != db_node && expr_table->get_database_name().empty()) { + ObString database_name(db_node->token_len_, db_node->str_value_); + string_to_upper_case(database_name.ptr(), database_name.length()); // change all to upper to store expr_table->set_database_name(db_node->str_value_, db_node->token_len_); } } @@ -991,7 +997,7 @@ int ObProxySelectStmt::handle_table_node_to_expr(ParseNode* node) } break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport type", "node_type", get_type_name(node->type_), K(node->str_value_)); } } @@ -1169,7 +1175,7 @@ int ObProxySelectStmt::handle_where_node(ParseNode* node) } break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unexpected expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_)); } } @@ -1334,13 +1340,13 @@ int ObProxySelectStmt::handle_sort_list_node(ParseNode* node, const SortListType } break; default: - ret = OB_ERR_UNEXPECTED; + has_unsupport_expr_type_ = true; LOG_WARN("invalid sort_list_type", K(sort_list_type), K(sql_string_), K(ret)); } } break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_), K(ret)); } } @@ -1370,7 +1376,7 @@ int ObProxySelectStmt::handle_with_rollup_in_groupby(ParseNode* node) } break; default: - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_)); } } @@ -1387,7 +1393,7 @@ int ObProxySelectStmt::handle_groupby_clause(ParseNode* node) if (NULL == tmp_node) { //do nothing } else if (T_WITH_ROLLUP_CLAUSE != tmp_node->type_) { - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_)); } else if (OB_FAIL(handle_with_rollup_in_groupby(tmp_node))) { LOG_WARN("handle_with_rollup_in_groupby failed", K(ret), K(i), K(sql_string_)); @@ -1406,7 +1412,7 @@ int ObProxySelectStmt::handle_orderby_clause(ParseNode* node) if (NULL == tmp_node) { //do nothing } else if (T_SORT_LIST != tmp_node->type_) { - ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + has_unsupport_expr_type_ = true; LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_)); } else if (OB_FAIL(handle_sort_list_node(tmp_node, SORT_LSIT_IN_ORDER_BY))) { LOG_WARN("handle_sort_list_node", K(ret), K(i), K(sql_string_)); diff --git a/src/obproxy/obutils/ob_proxy_stmt.h b/src/obproxy/obutils/ob_proxy_stmt.h index 5e8a181..68156c7 100644 --- a/src/obproxy/obutils/ob_proxy_stmt.h +++ b/src/obproxy/obutils/ob_proxy_stmt.h @@ -113,6 +113,7 @@ public: int handle_comment_list(const ParseResult &parse_result); virtual int to_sql_string(common::ObSqlString& sql_string); bool has_for_update() const { return has_for_update_; } + bool has_unsupport_expr_type() const { return has_unsupport_expr_type_; } void set_table_name(const common::ObString& table_name) { table_name_ = table_name; } int64_t get_from_token_off() { return from_token_off_; } @@ -175,6 +176,7 @@ private: bool is_inited_; bool has_rollup_; bool has_for_update_; + bool has_unsupport_expr_type_; int64_t from_token_off_; ExprMap table_exprs_map_; ExprMap alias_table_map_; diff --git a/src/obproxy/obutils/ob_resource_pool_processor.cpp b/src/obproxy/obutils/ob_resource_pool_processor.cpp index 3e75bbc..13e75ee 100644 --- a/src/obproxy/obutils/ob_resource_pool_processor.cpp +++ b/src/obproxy/obutils/ob_resource_pool_processor.cpp @@ -173,8 +173,6 @@ int ObRslistFetchCont::init_task() } else { fetch_result_ = true; } - } else { - fetch_result_ = true; } if (!fetch_result_) { diff --git a/src/obproxy/obutils/ob_server_state_processor.cpp b/src/obproxy/obutils/ob_server_state_processor.cpp index ba71c19..03d5a29 100644 --- a/src/obproxy/obutils/ob_server_state_processor.cpp +++ b/src/obproxy/obutils/ob_server_state_processor.cpp @@ -1193,9 +1193,6 @@ int ObServerStateRefreshCont::add_refresh_rslist_task(const bool need_update_dum cont->destroy(); cont = NULL; } - } else { - congestion_manager_->clear_base_servers_added(); - LOG_INFO("congestion manager's base servers has cleared", K_(cluster_name), K_(cluster_id)); } } else { LOG_DEBUG("refresh rslist task has been scheduled", K_(cluster_name), K_(cluster_id), diff --git a/src/obproxy/omt/ob_resource_unit_table_processor.cpp b/src/obproxy/omt/ob_resource_unit_table_processor.cpp index 4603fdb..35c5f4f 100644 --- a/src/obproxy/omt/ob_resource_unit_table_processor.cpp +++ b/src/obproxy/omt/ob_resource_unit_table_processor.cpp @@ -229,9 +229,8 @@ void ObResourceUnitTableProcessor::dec_conn( if (ATOMIC_FAA(&used_conn->max_used_connections_, -1) > 1) { } else { DRWLock::WRLockGuard guard(used_conn_rwlock_); - if (0 == used_conn->max_used_connections_) { - erase_used_conn(key_name); - used_conn->dec_ref(); + if (0 == used_conn->max_used_connections_ && used_conn->is_in_map_) { + erase_used_conn(key_name, used_conn); LOG_DEBUG("erase used conn", K(key_name)); } } @@ -544,6 +543,8 @@ int ObResourceUnitTableProcessor::create_used_conn(ObString& key_name, used_conn->inc_ref(); if (OB_FAIL(used_conn_cache_.set(used_conn))) { LOG_WARN("fail to set used conn map", K(key_name), KPC(used_conn), K(ret)); + } else { + used_conn->is_in_map_ = true; } } @@ -601,11 +602,14 @@ int ObResourceUnitTableProcessor::get_or_create_used_conn(ObString& key_name, return ret; } -int ObResourceUnitTableProcessor::erase_used_conn(ObString& key_name) +int ObResourceUnitTableProcessor::erase_used_conn(ObString& key_name, ObUsedConn* used_conn) { int ret = OB_SUCCESS; if (OB_FAIL(used_conn_cache_.erase(key_name))) { LOG_WARN("erase used conn failed", K(key_name)); + } else { + used_conn->is_in_map_ = false; + used_conn->dec_ref(); } return ret; diff --git a/src/obproxy/omt/ob_resource_unit_table_processor.h b/src/obproxy/omt/ob_resource_unit_table_processor.h index f5d49c6..fe188c3 100644 --- a/src/obproxy/omt/ob_resource_unit_table_processor.h +++ b/src/obproxy/omt/ob_resource_unit_table_processor.h @@ -97,7 +97,7 @@ public: int create_used_conn(common::ObString& key_name, ObUsedConn*& used_conn, int64_t& cur_used_connections); int get_used_conn(common::ObString& key_name, bool is_need_inc_used_connections, ObUsedConn*& used_conn, int64_t& cur_used_connections); - int erase_used_conn(common::ObString& key_name); + int erase_used_conn(common::ObString& key_name, ObUsedConn* used_conn); int get_or_create_used_conn(common::ObString& key_name, ObUsedConn*& used_conn, int64_t& cur_used_connections); TO_STRING_KV(K_(is_inited), K_(backup_status)); diff --git a/src/obproxy/omt/ob_vip_tenant_conn.h b/src/obproxy/omt/ob_vip_tenant_conn.h index fb5cd86..c5d9015 100644 --- a/src/obproxy/omt/ob_vip_tenant_conn.h +++ b/src/obproxy/omt/ob_vip_tenant_conn.h @@ -131,7 +131,7 @@ private: class ObUsedConn : public common::ObSharedRefCount { public: - ObUsedConn(common::ObString& full_name) : max_used_connections_(0) { + ObUsedConn(common::ObString& full_name) : max_used_connections_(0), is_in_map_(false) { if (full_name.length() < OB_PROXY_MAX_TENANT_CLUSTER_NAME_LENGTH + common::OB_IP_STR_BUFF) { MEMCPY(full_name_str_, full_name.ptr(), full_name.length()); full_name_.assign_ptr(full_name_str_, (int32_t)full_name.length()); @@ -146,13 +146,15 @@ public: void reset() { full_name_.reset(); max_used_connections_ = 0; + is_in_map_ = false; } - TO_STRING_KV(K_(full_name), K_(max_used_connections)); + TO_STRING_KV(K_(full_name), K_(max_used_connections), K_(is_in_map)); LINK(ObUsedConn, used_conn_link_); public: common::ObString full_name_; volatile int64_t max_used_connections_; + bool is_in_map_; private: char full_name_str_[OB_PROXY_MAX_TENANT_CLUSTER_NAME_LENGTH + common::OB_IP_STR_BUFF]; diff --git a/src/obproxy/opsql/expr_parser/ob_expr_parse_result.h b/src/obproxy/opsql/expr_parser/ob_expr_parse_result.h index a9a5cb8..9265606 100644 --- a/src/obproxy/opsql/expr_parser/ob_expr_parse_result.h +++ b/src/obproxy/opsql/expr_parser/ob_expr_parse_result.h @@ -137,9 +137,9 @@ typedef struct _ObProxyRelationExpr * table: oceanbase.__all_virtual_proxy_partition_info * -> column spare5: VARCHAR, format: "int32_t,int16_t,int16_t" * which means "length,precision/length_semantics,scale" - * init -1 to each elements for invalid status. */ typedef struct _ObProxyPartKeyAccuracy { + int8_t valid_; // default is 0, means not valid int32_t length_; int16_t precision_; // the same as length_semantics int16_t scale_; diff --git a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp index d17df9f..11da280 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_sm.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_sm.cpp @@ -5101,6 +5101,7 @@ void ObMysqlSM::do_partition_location_lookup() int ret = OB_SUCCESS; ObAction *pl_lookup_action_handle = NULL; bool find_entry = false; + int64_t tenant_version = 0; bool need_pl_route = (is_pl_route_supported() && (trans_state_.trans_info_.client_request_.get_parse_result().is_call_stmt() || trans_state_.trans_info_.client_request_.get_parse_result().is_text_ps_call_stmt())); // Get it from table_map first @@ -5117,10 +5118,16 @@ void ObMysqlSM::do_partition_location_lookup() cr_id = sm_cluster_resource_->get_cluster_id(); } ObTableEntryKey key(name, sm_cluster_resource_->version_, cr_id); + if (OB_UNLIKELY(get_global_proxy_config().check_tenant_locality_change)) { + tenant_version = sm_cluster_resource_->get_location_tenant_version( + client_session_->get_session_info().get_priv_info().tenant_name_); + } tmp_entry = table_map.get(key); if (NULL != tmp_entry && !tmp_entry->is_partition_table() && (tmp_entry->is_avail_state() || tmp_entry->is_updating_state()) && !(get_global_table_cache().is_table_entry_expired(*tmp_entry))) { + tmp_entry->check_and_set_expire_time(tenant_version, tmp_entry->is_dummy_entry()); + tmp_entry->renew_last_access_time(); ObMysqlRouteResult result; result.table_entry_ = tmp_entry; @@ -5128,7 +5135,8 @@ void ObMysqlSM::do_partition_location_lookup() result.has_dup_replica_ = tmp_entry->has_dup_replica(); tmp_entry->set_need_force_flush(false); find_entry = true; - LOG_DEBUG("get table entry from thread map", KPC(tmp_entry)); + bool is_table_entry_from_remote = false; + LOG_DEBUG("ObMysqlRoute get table entry succ", KPC(tmp_entry), K(is_table_entry_from_remote)); state_partition_location_lookup(TABLE_ENTRY_EVENT_LOOKUP_DONE, &result); } else if (NULL != tmp_entry) { tmp_entry->dec_ref(); @@ -5149,15 +5157,13 @@ void ObMysqlSM::do_partition_location_lookup() } else { param.cr_id_ = sm_cluster_resource_->get_cluster_id(); } - if (OB_UNLIKELY(get_global_proxy_config().check_tenant_locality_change)) { - param.tenant_version_ = sm_cluster_resource_->get_location_tenant_version( - client_session_->get_session_info().get_priv_info().tenant_name_); - } + param.tenant_version_ = tenant_version; param.timeout_us_ = hrtime_to_usec(trans_state_.mysql_config_params_->short_async_task_timeout_); param.is_partition_table_route_supported_ = is_partition_table_route_supported(); param.is_oracle_mode_ = client_session_->get_session_info().is_oracle_mode(); param.client_request_ = &trans_state_.trans_info_.client_request_; // priv parse result param.client_info_ = &client_session_->get_session_info(); + param.route_ = &trans_state_.pll_info_.route_; param.need_pl_route_ = need_pl_route; param.current_idc_name_ = client_session_->get_current_idc_name();//shallow copy if (trans_state_.pll_info_.is_cached_dummy_force_renew()) { diff --git a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp index 3260763..4a196d8 100644 --- a/src/obproxy/proxy/mysql/ob_mysql_transact.cpp +++ b/src/obproxy/proxy/mysql/ob_mysql_transact.cpp @@ -292,6 +292,7 @@ void ObMysqlTransact::acquire_cached_server_session(ObTransState &s) LOG_DEBUG("[ObMysqlTransact::acquire_cached_server_session] last_insert_id_session is alive, pick it"); } else if (get_global_proxy_config().enable_cached_server + && !s.sm_->client_session_->is_proxy_mysql_client_ && NULL != last_session && OB_LIKELY(!s.mysql_config_params_->is_random_routing_mode())) { const int32_t ip = ops_ip4_addr_host_order(last_session->get_netvc()->get_remote_addr()); const int32_t port = static_cast(ops_ip_port_host_order(last_session->get_netvc()->get_remote_addr())); @@ -1271,7 +1272,9 @@ void ObMysqlTransact::handle_pl_update(ObTransState &s) inline bool ObMysqlTransact::ObPartitionLookupInfo::need_update_entry_by_partition_hit() { - return (need_update_entry() && !route_.is_dummy_table()); + return (need_update_entry() + && !route_.is_dummy_table() + && !route_.no_need_pl_update_); } int64_t ObMysqlTransact::ObPartitionLookupInfo::to_string(char *buf, const int64_t buf_len) const diff --git a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp index b864fb1..7eaae25 100644 --- a/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp +++ b/src/obproxy/proxy/mysqllib/ob_mysql_resp_analyzer.cpp @@ -234,7 +234,8 @@ int ObRespResult::is_resp_finished(bool &finished, ObMysqlRespEndingType &ending case OB_MYSQL_COM_REFRESH : case OB_MYSQL_COM_PROCESS_KILL : case OB_MYSQL_COM_LOGIN: - case OB_MYSQL_COM_INIT_DB : { + case OB_MYSQL_COM_INIT_DB : + case OB_MYSQL_COM_CHANGE_USER: { if (OB_UNLIKELY(is_mysql_mode())) { if (1 == pkt_cnt_[OK_PACKET_ENDING_TYPE]) { finished = true; diff --git a/src/obproxy/proxy/route/ob_ldc_location.cpp b/src/obproxy/proxy/route/ob_ldc_location.cpp index 4c2540b..b75e06e 100644 --- a/src/obproxy/proxy/route/ob_ldc_location.cpp +++ b/src/obproxy/proxy/route/ob_ldc_location.cpp @@ -227,7 +227,7 @@ int ObLDCLocation::assign(const ObTenantServer *ts, const ObIArrayget_tenant_version() - && entry->is_avail_state() - && !entry->is_dummy_entry() - && entry->get_time_for_expired() == 0) { - LOG_INFO("tenant locality change, set table entry time expired", - K_(param_.tenant_version), K(entry->get_tenant_version()), KPC(entry)); - entry->set_time_for_expired(ObRandomNumUtils::get_random_half_to_full(60*1000*1000) - + common::ObTimeUtility::current_time()); - } - } - if (NULL == table_entry_action_handle) { // cache hit handle_event(TABLE_ENTRY_EVENT_LOOKUP_DONE, &table_param.result_); table_param.result_.reset(); @@ -447,6 +434,10 @@ int ObMysqlRoute::state_table_entry_lookup(int event, void *data) if (false == param_.is_need_force_flush_) { param_.is_need_force_flush_ = result->is_need_force_flush_; } + if (NULL != table_entry_) { + table_entry_->check_and_set_expire_time(param_.tenant_version_, table_entry_->is_dummy_entry()); + } + result->target_entry_ = NULL; is_table_entry_from_remote_ = result->is_from_remote_; // use different log level only for debug @@ -603,6 +594,7 @@ inline void ObMysqlRoute::setup_partition_id_calc() *result, *param_.client_request_, *param_.client_info_, + *param_.route_, *part_info, part_id_))) { LOG_INFO("fail to calculate partition id, just use tenant server", K(tmp_ret)); @@ -713,13 +705,8 @@ int ObMysqlRoute::state_partition_entry_lookup(int event, void *data) result->target_entry_ = NULL; is_part_entry_from_remote_ = result->is_from_remote_; - if (NULL != part_entry_ && part_entry_->is_avail_state() - && param_.tenant_version_ != part_entry_->get_tenant_version() - && part_entry_->get_time_for_expired() == 0) { - LOG_DEBUG("tenant locality change, set partition entry time expired", - K(param_.tenant_version_), K(part_entry_->get_tenant_version()), KPC_(part_entry)); - part_entry_->set_time_for_expired(ObRandomNumUtils::get_random_half_to_full(60*1000*1000) - + common::ObTimeUtility::current_time()); + if (NULL != part_entry_) { + part_entry_->check_and_set_expire_time(param_.tenant_version_, false); } if (NULL != part_entry_ && (part_entry_->is_avail_state())) { @@ -951,6 +938,7 @@ inline int ObMysqlRoute::deep_copy_route_param(ObRouteParam ¶m) // !!Attention client_request should not be used in scheduled cont param_.client_request_ = param.client_request_; param_.client_info_ = param.client_info_; + param_.route_ = param.route_; // no need assign result_ and cr if (NULL != name_buf_ && name_buf_len_ > 0) { op_fixed_mem_free(name_buf_, name_buf_len_); diff --git a/src/obproxy/proxy/route/ob_mysql_route.h b/src/obproxy/proxy/route/ob_mysql_route.h index 59cfc0d..36e1f9b 100644 --- a/src/obproxy/proxy/route/ob_mysql_route.h +++ b/src/obproxy/proxy/route/ob_mysql_route.h @@ -34,6 +34,7 @@ class ObMysqlRoute; class ObPartitionEntry; class ObProxyMysqlRequest; class ObClientSessionInfo; +class ObServerRoute; typedef int (ObMysqlRoute::*MysqlRouteHandler)(int event, void *data); class ObMysqlRouteResult @@ -89,7 +90,7 @@ public: : cont_(NULL), name_(), force_renew_(false), use_lower_case_name_(false), is_partition_table_route_supported_(false), need_pl_route_(false), is_oracle_mode_(false), is_need_force_flush_(false), result_(), mysql_proxy_(NULL), client_request_(NULL), client_info_(), - cr_version_(-1), cr_id_(-1), tenant_version_(0), timeout_us_(-1), current_idc_name_(), + route_(NULL), cr_version_(-1), cr_id_(-1), tenant_version_(0), timeout_us_(-1), current_idc_name_(), cr_(NULL) {} ~ObRouteParam() { reset(); } @@ -113,6 +114,7 @@ public: ObMysqlProxy *mysql_proxy_; ObProxyMysqlRequest *client_request_; ObClientSessionInfo *client_info_; + ObServerRoute *route_; int64_t cr_version_; int64_t cr_id_; uint64_t tenant_version_; @@ -148,6 +150,7 @@ inline void ObRouteParam::reset() is_oracle_mode_ = false; is_need_force_flush_ = false; mysql_proxy_ = NULL; + route_ = NULL; cr_version_ = -1; cr_id_ = -1; tenant_version_ = 0; diff --git a/src/obproxy/proxy/route/ob_partition_entry.h b/src/obproxy/proxy/route/ob_partition_entry.h index 2e90b43..37d7a70 100644 --- a/src/obproxy/proxy/route/ob_partition_entry.h +++ b/src/obproxy/proxy/route/ob_partition_entry.h @@ -70,7 +70,7 @@ inline bool ObPartitionEntryKey::is_valid() const return (common::OB_INVALID_ID != table_id_ && common::OB_INVALID_ID != partition_id_ && (cr_version_ >= 0) - && (cr_id_ <= 0)); + && (cr_id_ >= 0)); } inline uint64_t ObPartitionEntryKey::hash(uint64_t seed) const diff --git a/src/obproxy/proxy/route/ob_route_struct.cpp b/src/obproxy/proxy/route/ob_route_struct.cpp index dcb2068..ef69a2d 100644 --- a/src/obproxy/proxy/route/ob_route_struct.cpp +++ b/src/obproxy/proxy/route/ob_route_struct.cpp @@ -14,9 +14,11 @@ #include "proxy/route/ob_route_struct.h" #include "iocore/eventsystem/ob_buf_allocator.h" +#include "utils/ob_proxy_utils.h" using namespace oceanbase::common; using namespace oceanbase::share; +using namespace oceanbase::obproxy; namespace oceanbase { @@ -313,6 +315,33 @@ int64_t ObTableEntryKey::to_string(char *buf, const int64_t buf_len) const return pos; } +void ObRouteEntry::check_and_set_expire_time(const uint64_t tenant_version, const bool is_dummy_entry) +{ + int64_t period_us = obutils::get_global_proxy_config().location_expire_period_time; + const int64_t TENANT_LOCALITY_CHANGE_TIME = 60 * 1000 * 1000; + const int64_t TENANT_LOCALITY_CHANGE_TIME_CONFIG = -1; + if (AVAIL == state_ && !is_dummy_entry) { + if (tenant_version != tenant_version_) { + tenant_version_ = tenant_version; + // -1 means the change comes from a locality change + current_expire_time_config_ = TENANT_LOCALITY_CHANGE_TIME_CONFIG; + period_us = TENANT_LOCALITY_CHANGE_TIME; + time_for_expired_ = ObRandomNumUtils::get_random_half_to_full(period_us) + common::ObTimeUtility::current_time(); + } else if (period_us != current_expire_time_config_ && TENANT_LOCALITY_CHANGE_TIME_CONFIG != current_expire_time_config_) { + current_expire_time_config_ = period_us; + if (period_us > 0) { + time_for_expired_ = ObRandomNumUtils::get_random_half_to_full(period_us) + common::ObTimeUtility::current_time(); + } else if (period_us == 0) { + time_for_expired_ = 0; + } else { + // period_us < 0, unexpected error + } + } else { + // do nothing + } + } +} + } // end of namespace proxy } // end of namespace obproxy diff --git a/src/obproxy/proxy/route/ob_route_struct.h b/src/obproxy/proxy/route/ob_route_struct.h index 4c016fb..0a0e5bb 100644 --- a/src/obproxy/proxy/route/ob_route_struct.h +++ b/src/obproxy/proxy/route/ob_route_struct.h @@ -383,7 +383,7 @@ public: ObRouteEntry() : common::ObSharedRefCount(), cr_version_(-1), cr_id_(common::OB_INVALID_CLUSTER_ID), schema_version_(0), create_time_us_(0), last_valid_time_us_(0), last_access_time_us_(0), last_update_time_us_(0), - state_(BORN), tenant_version_(0), time_for_expired_(0) {} + state_(BORN), tenant_version_(0), time_for_expired_(0), current_expire_time_config_(0) {} virtual ~ObRouteEntry() {} virtual void free() = 0; @@ -430,6 +430,7 @@ public: uint64_t get_tenant_version() const { return tenant_version_; } int64_t get_time_for_expired() const { return time_for_expired_; } void set_time_for_expired(int64_t expire_time) { time_for_expired_ = expire_time; } + void check_and_set_expire_time(const uint64_t tenant_version, const bool is_dummy_entry); protected: int64_t cr_version_; // one entry must belong to one cluster with the specfied version @@ -444,6 +445,7 @@ protected: ObRouteEntryState state_; uint64_t tenant_version_; int64_t time_for_expired_; + int64_t current_expire_time_config_; }; inline bool ObRouteEntry::is_need_update() const diff --git a/src/obproxy/proxy/route/ob_route_utils.cpp b/src/obproxy/proxy/route/ob_route_utils.cpp index 322fbdf..f627d48 100644 --- a/src/obproxy/proxy/route/ob_route_utils.cpp +++ b/src/obproxy/proxy/route/ob_route_utils.cpp @@ -635,9 +635,11 @@ void ObRouteUtils::parse_part_key_accuracy(ObProxyPartKey *part_key, || ob_is_time_tc(part_key_type)) { part_key->accuracy_.precision_ = precision; part_key->accuracy_.scale_ = scale; + part_key->accuracy_.valid_ = 1; } else if (ob_is_string_tc(part_key_type)) { part_key->accuracy_.length_ = length; part_key->accuracy_.precision_ = precision; + part_key->accuracy_.valid_ = 1; } // more obj type could be supported here. } @@ -716,6 +718,7 @@ inline int ObRouteUtils::fetch_part_key(ObResultSetFetcher &rs_fetcher, part_key->name_.str_ = buf; part_key->obj_type_ = part_key_type; part_key->idx_in_rowid_ = idx_in_rowid; + part_key->accuracy_.valid_ = 0; // not valid accuracy parse_part_key_accuracy(part_key, part_key_type, &allocator, part_key_accuracy); diff --git a/src/obproxy/proxy/route/ob_server_route.h b/src/obproxy/proxy/route/ob_server_route.h index 7b73815..413b504 100644 --- a/src/obproxy/proxy/route/ob_server_route.h +++ b/src/obproxy/proxy/route/ob_server_route.h @@ -33,7 +33,7 @@ public: ObServerRoute() : table_entry_(NULL), dummy_entry_(NULL), part_entry_(NULL), cur_chosen_pl_(NULL), is_table_entry_from_remote_(false), is_part_entry_from_remote_(false), - has_dup_replica_(false), need_use_dup_replica_(false), + has_dup_replica_(false), need_use_dup_replica_(false), no_need_pl_update_(false), consistency_level_(common::INVALID_CONSISTENCY), leader_item_(), ldc_route_(), valid_count_(0), cur_chosen_server_(), cur_chosen_route_type_(ROUTE_TYPE_MAX), skip_leader_item_(false) {} @@ -120,6 +120,7 @@ public: bool is_part_entry_from_remote_; bool has_dup_replica_; bool need_use_dup_replica_; + bool no_need_pl_update_; common::ObConsistencyLevel consistency_level_; ObLDCItem leader_item_; @@ -138,6 +139,7 @@ inline void ObServerRoute::reset() is_part_entry_from_remote_ = false; has_dup_replica_ = false; need_use_dup_replica_ = false; + no_need_pl_update_ = false; skip_leader_item_ = false; set_dummy_entry(NULL); set_table_entry(NULL); diff --git a/src/obproxy/proxy/route/obproxy_expr_calculator.cpp b/src/obproxy/proxy/route/obproxy_expr_calculator.cpp index 07338cb..0031c6d 100644 --- a/src/obproxy/proxy/route/obproxy_expr_calculator.cpp +++ b/src/obproxy/proxy/route/obproxy_expr_calculator.cpp @@ -25,6 +25,7 @@ #include "rpc/obmysql/ob_mysql_packet.h" #include "lib/timezone/ob_time_convert.h" #include "lib/timezone/ob_timezone_info.h" +#include "proxy/route/ob_server_route.h" using namespace oceanbase::common; @@ -40,6 +41,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo const ObSqlParseResult &parse_result, ObProxyMysqlRequest &client_request, ObClientSessionInfo &client_info, + ObServerRoute &route, ObProxyPartInfo &part_info, int64_t &partition_id) { @@ -51,7 +53,8 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo } } if (OB_INVALID_INDEX == partition_id && parse_result.has_simple_route_info()) { - if (OB_FAIL(calc_part_id_with_simple_route_info(allocator, parse_result, client_info, part_info, partition_id))) { + if (OB_FAIL(calc_part_id_with_simple_route_info(allocator, parse_result, client_info, + route, part_info, partition_id))) { LOG_WARN("fail to calc part id with simple part info, will do calc in normal path", K(ret)); } } @@ -89,7 +92,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo LOG_INFO("fail to do expr resolve", K(print_sql), "expr_parse_result", ObExprParseResultPrintWrapper(expr_parse_result), K(part_info), KPC(ps_id_entry), KPC(text_ps_entry), K(resolve_result)); - } else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, part_info, parse_result, + } else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, route, part_info, parse_result, allocator, partition_id))) { if (OB_MYSQL_COM_STMT_PREPARE != cmd) { LOG_INFO("fail to do expr resolve", K(print_sql), K(resolve_result), K(part_info)); @@ -99,7 +102,8 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo } } - if (OB_FAIL(ret)) { + if (OB_FAIL(ret) + && !get_global_proxy_config().enable_cached_server) { int64_t tmp_first_part_id = OB_INVALID_INDEX; int64_t tmp_sub_part_id = OB_INVALID_INDEX; if (OB_FAIL(calc_part_id_by_random_choose_from_exist(part_info, @@ -108,6 +112,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo partition_id))) { LOG_WARN("fail to cal part id by random choose", K(tmp_first_part_id), K(tmp_sub_part_id), K(ret)); } else { + route.no_need_pl_update_ = true; LOG_DEBUG("succ to cal part id by random choose", K(tmp_first_part_id), K(tmp_sub_part_id), K(partition_id)); } } @@ -119,6 +124,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo int ObProxyExprCalculator::calc_part_id_with_simple_route_info(ObArenaAllocator &allocator, const ObSqlParseResult &parse_result, ObClientSessionInfo &client_info, + ObServerRoute &route, ObProxyPartInfo &part_info, int64_t &part_id) { @@ -133,7 +139,7 @@ int ObProxyExprCalculator::calc_part_id_with_simple_route_info(ObArenaAllocator ObExprResolverResult resolve_result; if (OB_FAIL(do_resolve_with_part_key(parse_result, allocator, resolve_result))) { LOG_WARN("fail to do_resolve_with_part_key", K(ret)); - } else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, part_info, + } else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, route, part_info, parse_result, allocator, part_id))) { LOG_INFO("fail to do_partition_id_calc", K(resolve_result), K(part_info)); } @@ -260,6 +266,7 @@ int ObProxyExprCalculator::do_expr_resolve(ObExprParseResult &parse_result, int ObProxyExprCalculator::do_partition_id_calc(ObExprResolverResult &resolve_result, ObClientSessionInfo &session_info, + ObServerRoute &route, ObProxyPartInfo &part_info, const ObSqlParseResult &parse_result, ObIAllocator &allocator, @@ -306,10 +313,12 @@ int ObProxyExprCalculator::do_partition_id_calc(ObExprResolverResult &resolve_re if (OB_SUCC(ret)) { partition_id = generate_phy_part_id(first_part_id, sub_part_id, part_info.get_part_level()); LOG_DEBUG("succ to get part id", K(first_part_id), K(sub_part_id), K(partition_id)); - } else { + } else if (!get_global_proxy_config().enable_cached_server) { if (OB_FAIL(calc_part_id_by_random_choose_from_exist(part_info, first_part_id, sub_part_id, partition_id))) { LOG_WARN("fail to get part id at last", K(first_part_id), K(sub_part_id), K(ret)); } else { + // get part id by random, no need update pl + route.no_need_pl_update_ = true; LOG_DEBUG("succ to get part id by random", K(first_part_id), K(sub_part_id), K(partition_id)); } } diff --git a/src/obproxy/proxy/route/obproxy_expr_calculator.h b/src/obproxy/proxy/route/obproxy_expr_calculator.h index 4e962c6..dece8ca 100644 --- a/src/obproxy/proxy/route/obproxy_expr_calculator.h +++ b/src/obproxy/proxy/route/obproxy_expr_calculator.h @@ -43,6 +43,7 @@ class ObProxyPartInfo; class ObClientSessionInfo; class ObPsIdEntry; class ObTextPsEntry; +class ObServerRoute; class ObProxyExprCalculator { @@ -54,6 +55,7 @@ public: const obutils::ObSqlParseResult &parse_result, ObProxyMysqlRequest &client_request, ObClientSessionInfo &client_info, + ObServerRoute &route, ObProxyPartInfo &part_info, int64_t &partition_id); private: @@ -74,6 +76,7 @@ private: opsql::ObExprResolverResult &resolve_result); int do_partition_id_calc(opsql::ObExprResolverResult &resolve_result, ObClientSessionInfo &client_info, + ObServerRoute &route, ObProxyPartInfo &part_info, const obutils::ObSqlParseResult &parse_result, common::ObIAllocator &allocator, @@ -81,6 +84,7 @@ private: int calc_part_id_with_simple_route_info(common::ObArenaAllocator &allocator, const obutils::ObSqlParseResult &parse_result, ObClientSessionInfo &client_info, + ObServerRoute &route, ObProxyPartInfo &part_info, int64_t &part_id); int do_resolve_with_part_key(const obutils::ObSqlParseResult &parse_result, diff --git a/src/obproxy/proxy/shard/obproxy_shard_utils.cpp b/src/obproxy/proxy/shard/obproxy_shard_utils.cpp index d369e4a..877e9b2 100644 --- a/src/obproxy/proxy/shard/obproxy_shard_utils.cpp +++ b/src/obproxy/proxy/shard/obproxy_shard_utils.cpp @@ -1338,10 +1338,6 @@ int ObProxyShardUtils::handle_select_request(ObMysqlClientSession &client_sessio ObString sql = client_request.get_parse_sql(); if (OB_FAIL(sql_parser.parse_sql_by_obparser(sql, NORMAL_PARSE_MODE, parse_result, true))) { LOG_WARN("parse_sql_by_obparser failed", K(ret), K(sql)); - } else if (OB_FAIL(check_topology(parse_result, db_info))) { - if (OB_ERR_UNSUPPORT_DIFF_TOPOLOGY != ret) { - LOG_WARN("fail to check topology", K(ret)); - } } else if (FALSE_IT(is_scan_all = need_scan_all(parse_result))) { // impossible } else if (is_scan_all) { @@ -1466,7 +1462,7 @@ bool ObProxyShardUtils::need_scan_all(ObSqlParseResult &parse_result) { if (parse_result.is_select_stmt() && !parse_result.has_for_update() && (parse_result.get_dbp_route_info().scan_all_ - || (!parse_result.is_use_dbp_hint() && get_global_proxy_config().auto_scan_all))) { + || (!parse_result.has_shard_comment() && get_global_proxy_config().auto_scan_all))) { return true; } @@ -2006,6 +2002,13 @@ int ObProxyShardUtils::handle_scan_all_real_info(ObDbConfigLogicDb &logic_db_inf if (OB_ISNULL(select_stmt)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("select stmt is null, unexpected", K(ret)); + } else if (select_stmt->has_unsupport_expr_type()) { + ret = OB_ERROR_UNSUPPORT_EXPR_TYPE; + LOG_WARN("unsupport sql", K(ret)); + } else if (OB_FAIL(check_topology(parse_result, logic_db_info))) { + if (OB_ERR_UNSUPPORT_DIFF_TOPOLOGY != ret) { + LOG_WARN("fail to check topology", K(ret)); + } } else if (OB_FAIL(get_global_optimizer_processor().alloc_allocator(allocator))) { LOG_WARN("alloc allocator failed", K(ret)); } else if (OB_FAIL(handle_sharding_select_real_info(logic_db_info, client_session, trans_state, @@ -2109,8 +2112,6 @@ int ObProxyShardUtils::handle_select_real_info(ObDbConfigLogicDb &logic_db_info, ret = OB_EXPR_CALC_ERROR; LOG_WARN("shard connector info or prev shard connector info is null", KP(shard_conn), KP(prev_shard_conn), K(ret)); - } else if (OB_FAIL(add_table_name_to_map(allocator, table_name_map, table_name, real_table_name))) { - LOG_WARN("fail to add table name to map", K(table_name), K(real_table_name), K(ret)); } for (; OB_SUCC(ret) && iter != end; iter++) { @@ -2124,10 +2125,6 @@ int ObProxyShardUtils::handle_select_real_info(ObDbConfigLogicDb &logic_db_info, LOG_WARN("fail to cast to table expr", K(expr), K(ret)); } else { ObString &sql_table_name = table_expr->get_table_name(); - if (sql_table_name == table_name) { - continue; - } - if (OB_FAIL(logic_db_info.get_real_table_name(sql_table_name, sql_result, real_table_name, OB_MAX_TABLE_NAME_LENGTH, tb_index, hint_table, testload_type))) { diff --git a/src/share/part/ob_part_desc_hash.cpp b/src/share/part/ob_part_desc_hash.cpp index d8a6467..e0d994c 100644 --- a/src/share/part/ob_part_desc_hash.cpp +++ b/src/share/part/ob_part_desc_hash.cpp @@ -162,9 +162,10 @@ int ObPartDescHash::calc_value_for_oracle(ObObj &src_obj, uint64_t hash_val = 0; ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, cs_type_); - ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_); const ObObj *res_obj = &src_obj; + ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_); + // use src_obj as buf_obj COMMON_LOG(DEBUG, "begin to cast value for hash oracle", K(src_obj), K(cs_type_)); if (OB_FAIL(ObObjCasterV2::to_type(obj_type_, cs_type_, cast_ctx, src_obj, src_obj))) { diff --git a/src/share/part/ob_part_desc_key.cpp b/src/share/part/ob_part_desc_key.cpp index 7743213..934cb23 100644 --- a/src/share/part/ob_part_desc_key.cpp +++ b/src/share/part/ob_part_desc_key.cpp @@ -53,9 +53,9 @@ int ObPartDescKey::get_part(ObNewRange &range, COMMON_LOG(WARN, "fail to build dtc params with ctx session", K(ret), K(obj_type_)); } else { ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, cs_type_); - ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_); ObObj &src_obj = const_cast(range.get_start_key().get_obj_ptr()[0]); const ObObj *res_obj = &src_obj; + ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_); // use src_obj as buf_obj if (OB_FAIL(ObObjCasterV2::to_type(obj_type_, cs_type_, cast_ctx, src_obj, src_obj))) { diff --git a/src/share/part/ob_part_desc_list.cpp b/src/share/part/ob_part_desc_list.cpp index 68cb034..3d70163 100644 --- a/src/share/part/ob_part_desc_list.cpp +++ b/src/share/part/ob_part_desc_list.cpp @@ -125,9 +125,9 @@ inline int ObPartDescList::cast_obj(ObObj &src_obj, COMMON_LOG(WARN, "fail to build dtc params with ctx session", K(ret), K(obj_type)); } else { ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, target_obj.get_collation_type()); - ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_); const ObObj *res_obj = &src_obj; - + ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_); + // use src_obj as buf_obj if (OB_FAIL(ObObjCasterV2::to_type(obj_type, target_obj.get_collation_type(), cast_ctx, src_obj, src_obj))) { COMMON_LOG(WARN, "failed to cast obj", K(ret), K(src_obj), K(target_obj)); diff --git a/src/share/part/ob_part_desc_range.cpp b/src/share/part/ob_part_desc_range.cpp index 313d5d6..750f6d7 100644 --- a/src/share/part/ob_part_desc_range.cpp +++ b/src/share/part/ob_part_desc_range.cpp @@ -204,8 +204,8 @@ inline int ObPartDescRange::cast_obj(ObObj &src_obj, COMMON_LOG(WARN, "fail to build dtc params with ctx session", K(ret), K(obj_type)); } else { ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, target_obj.get_collation_type()); - ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_); const ObObj *res_obj = &src_obj; + ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_); // use src_obj as buf_obj if (OB_FAIL(ObObjCasterV2::to_type(obj_type, target_obj.get_collation_type(), cast_ctx, src_obj, src_obj))) { -- GitLab