提交 e2d26db1 编写于 作者: W Wesley Wang 提交者: guangshu.wgs

Feature:

 1. Added the parameter location_expire_period_time to set the active
    expiration time of the location cache. The value range is [0s, 30d],
    the default is 0
 2. show proxyroute supports printing location cache expiration time
 3. Distinguish between business requests and internal, full-time
    requests, and business request parameter control
 4. When the business request is enabled_cached_server = true, the
    Partition is not random

Bugfix:
 1. Fix core issue when obproxy::omt::ObResourceUnitTableProcessor::inc_conn
 2. Fix the problem that the route cannot be routed when establishing
    a connection with a cluster id
 3. Fix the timestamp accuracy modification caused the old version of
    the observer to be inaccurate routing
 4. Fix the Sharding scenario, the select table name of the sub-database
    and sub-table is case-sensitive, resulting in an error report
 5. Fix the Sharding scenario, the select of sub-database and sub-table
    will be intercepted and unsupported syntax problem
 6. Fixed an issue where a single SQL was sent multiple times, causing
    the SQL to take more than 40ms
 7. Fix the rslist method, specify 127.0.0.1 to start obproxy, in the
    server failure scenario, more than 3 times, the connection fails to be
    established
 8. Fixed the issue that if the cluster scheduled task fails more than
    10 times, the blacklist will be invalid
 9. Fixed the problem that OBProxy still thinks the connection is
    successful when the server hangs up
 10. Fixed the problem that the cluster scheduled task failed more than
     10 times, causing it to remain in the blacklist
 11. Fix the problem that COM_CHANGE_USER will be disconnected
上级 99faebfc
......@@ -106,7 +106,8 @@
[with perf (default is NO)]),
[
if test "$withval" = "yes"; then
# 下面参数为使用gperftools的选项,目前未用tcmalloc
# The following parameters are options for using gperftools,
# currently not using tcmalloc
# test_perf=yes
# AM_CXXFLAGS="${AM_CXXFLAGS} -D__NEED_PERF__"
# AM_LDFLAGS="${AM_LDFLAGS} -lprofiler"
......
......@@ -147,7 +147,7 @@ function check_opt()
OBPROXY_OPT_LOCAL="${OBPROXY_OPT_LOCAL},$OBPROXY_EXTRA_OPT"
fi
OBPROXY_OPT_LOCAL=",enable_cached_server=false,enable_get_rslist_remote=true,monitor_stat_dump_interval=1s,enable_qos=true,enable_standby=false,query_digest_time_threshold=2ms,monitor_cost_ms_unit=true,enable_strict_kernel_release=false,enable_proxy_scramble=true,work_thread_num=$WORK_THREAD_NUM,proxy_mem_limited='2G',log_dir_size_threshold=10G${OBPROXY_OPT_LOCAL}"
OBPROXY_OPT_LOCAL=",enable_cached_server=true,enable_get_rslist_remote=true,monitor_stat_dump_interval=1s,enable_qos=true,enable_standby=false,query_digest_time_threshold=2ms,monitor_cost_ms_unit=true,enable_strict_kernel_release=false,enable_proxy_scramble=true,work_thread_num=$WORK_THREAD_NUM,proxy_mem_limited='2G',log_dir_size_threshold=10G${OBPROXY_OPT_LOCAL}"
}
# change to the path where this script locates.
......
......@@ -34,7 +34,10 @@ public:
ObAccuracy(ObPrecision precision, ObScale scale) { set_precision(precision); set_scale(scale); }
ObAccuracy(ObLength length, ObPrecision precision, ObScale scale)
{ set_length(length); set_precision(precision); set_scale(scale); }
ObAccuracy(const ObAccuracy &other) { accuracy_ = other.accuracy_; }
ObAccuracy(bool valid, ObLength length, ObPrecision precision, ObScale scale) :
valid_(valid), length_(length), precision_(precision), scale_(scale) {}
ObAccuracy(const ObAccuracy &other) { accuracy_ = other.accuracy_; valid_ = other.valid_; }
OB_INLINE void set_accuracy(const ObAccuracy &accuracy) { accuracy_ = accuracy.accuracy_; }
OB_INLINE void set_accuracy(const int64_t &accuracy) { accuracy_ = accuracy; }
OB_INLINE void set_length(ObLength length) { length_ = length; }
......@@ -47,12 +50,19 @@ public:
OB_INLINE ObLength get_length() const { return length_; }
OB_INLINE ObPrecision get_precision() const { return precision_; }
OB_INLINE ObScale get_scale() const { return scale_; }
OB_INLINE void reset() { accuracy_ = -1; }
/*
* the default length, precision, scale is different in each type
*/
OB_INLINE void reset() { valid_ = false; length_ = -1; precision_ = -1; scale_ = -1; }
OB_INLINE bool is_valid() const { return valid_; }
public:
OB_INLINE ObAccuracy &operator =(const ObAccuracy &other)
{
if (this != &other) {
accuracy_ = other.accuracy_;
valid_ = other.valid_;
}
return *this;
}
......@@ -71,9 +81,17 @@ public:
public:
TO_STRING_KV(N_LENGTH, length_,
N_PRECISION, precision_,
N_SCALE, scale_);
N_SCALE, scale_,
K_(valid));
NEED_SERIALIZE_AND_DESERIALIZE;
public:
/*
* whether we get the accuracy from server or not
* it is not recommend to judge the init status by value, the init value of different type is different
* the valid value of <len, pre, scale> is as the same as the mysql/oracle document defined
*/
bool valid_;
union
{
int64_t accuracy_;
......
......@@ -4126,39 +4126,36 @@ int obj_accuracy_check(ObCastCtx &cast_ctx,
{
int ret = OB_SUCCESS;
LOG_DEBUG("obj_accuracy_check before", K(obj), K(accuracy), K(cs_type));
switch (obj.get_type_class()) {
case ObFloatTC: {
ret = float_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObDoubleTC: {
ret = double_check_precision(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObNumberTC: {
ret = number_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObDateTimeTC: {
ret = datetime_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObOTimestampTC: {
ret = otimestamp_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObTimeTC: {
ret = time_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObStringTC: {
ret = string_length_check(cast_ctx, accuracy, cs_type, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
default: {
break;
if (accuracy.is_valid()) {
LOG_DEBUG("obj_accuracy_check before", K(obj), K(accuracy), K(cs_type));
switch (obj.get_type_class()) {
case ObFloatTC: {
ret = float_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObDoubleTC: {
ret = double_check_precision(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObNumberTC: {
ret = number_range_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObDateTimeTC: {
ret = datetime_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObOTimestampTC: {
ret = otimestamp_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
case ObTimeTC: {
ret = time_scale_check(cast_ctx, accuracy, obj, buf_obj, res_obj, cast_ctx.cast_mode_);
break;
}
default: {
break;
}
}
}
......
......@@ -48,6 +48,8 @@ enum
OB_RC_LAST_VALID,
OB_RC_LAST_ACCESS,
OB_RC_LAST_UPDATE,
OB_RC_EXPIRE_TIME,
OB_RC_RELATIVE_EXPIRE_TIME,
OB_RC_SERVER_ADDR,
OB_RC_MAX_ROUTE_COLUMN_ID,
};
......@@ -69,6 +71,8 @@ const ObProxyColumnSchema ROUTE_COLUMN_ARRAY[OB_RC_MAX_ROUTE_COLUMN_ID] = {
ObProxyColumnSchema::make_schema(OB_RC_LAST_VALID, "last_valid_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RC_LAST_ACCESS, "last_access_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RC_LAST_UPDATE, "last_update_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RC_EXPIRE_TIME, "expire_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RC_RELATIVE_EXPIRE_TIME, "relative_expire_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RC_SERVER_ADDR, "server addr", OB_MYSQL_TYPE_VARCHAR),
};
......@@ -84,6 +88,8 @@ enum
OB_RPC_LAST_VALID,
OB_RPC_LAST_ACCESS,
OB_RPC_LAST_UPDATE,
OB_RPC_EXPIRE_TIME,
OB_RPC_RELATIVE_EXPIRE_TIME,
OB_RPC_SERVER_ADDR,
OB_RPC_MAX_ROUTE_COLUMN_ID,
};
......@@ -98,6 +104,8 @@ const ObProxyColumnSchema ROUTE_PARTITION_COLUMN_ARRAY[OB_RPC_MAX_ROUTE_COLUMN_I
ObProxyColumnSchema::make_schema(OB_RPC_LAST_VALID, "last_valid_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RPC_LAST_ACCESS, "last_access_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RPC_LAST_UPDATE, "last_update_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RPC_EXPIRE_TIME, "expire_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RPC_RELATIVE_EXPIRE_TIME, "relative_expire_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RPC_SERVER_ADDR, "server addr", OB_MYSQL_TYPE_VARCHAR),
};
......@@ -119,6 +127,8 @@ enum
OB_RRC_LAST_VALID,
OB_RRC_LAST_ACCESS,
OB_RRC_LAST_UPDATE,
OB_RRC_EXPIRE_TIME,
OB_RRC_RELATIVE_EXPIRE_TIME,
OB_RRC_ROUTE_SQL,
OB_RRC_MAX_ROUTE_COLUMN_ID,
};
......@@ -138,11 +148,14 @@ const ObProxyColumnSchema ROUTE_ROUTINE_COLUMN_ARRAY[OB_RRC_MAX_ROUTE_COLUMN_ID]
ObProxyColumnSchema::make_schema(OB_RRC_LAST_VALID, "last_valid_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RRC_LAST_ACCESS, "last_access_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RRC_LAST_UPDATE, "last_update_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RRC_EXPIRE_TIME, "expire_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RRC_RELATIVE_EXPIRE_TIME, "relative_expire_time", OB_MYSQL_TYPE_VARCHAR),
ObProxyColumnSchema::make_schema(OB_RRC_ROUTE_SQL, "route_sql", OB_MYSQL_TYPE_VARCHAR),
};
int extract_entry_time(const ObRouteEntry &entry, char *create_timebuf, char *valid_timebuf,
char *access_timebuf, char *update_timebuf, const uint32_t buf_len);
char *access_timebuf, char *update_timebuf, char *expire_timebuf,
char *relative_expire_timebuf, const uint32_t buf_len);
ObShowRouteHandler::ObShowRouteHandler(ObContinuation *cont, ObMIOBuffer *buf,
const ObInternalCmdInfo &info)
......@@ -534,15 +547,19 @@ int ObShowRouteHandler::dump_table_item(const ObTableEntry &entry)
char valid_timebuf[buf_len];
char access_timebuf[buf_len];
char update_timebuf[buf_len];
char expire_timebuf[buf_len];
char relative_expire_timebuf[buf_len];
if (OB_FAIL(extract_entry_time(entry, create_timebuf, valid_timebuf, access_timebuf,
update_timebuf, buf_len))) {
update_timebuf, expire_timebuf, relative_expire_timebuf, buf_len))) {
WARN_ICMD("fail to extract_entry_time", K(entry), K(ret));
} else {
cells[OB_RC_CREATE].set_varchar(create_timebuf);
cells[OB_RC_LAST_VALID].set_varchar(valid_timebuf);
cells[OB_RC_LAST_ACCESS].set_varchar(access_timebuf);
cells[OB_RC_LAST_UPDATE].set_varchar(update_timebuf);
cells[OB_RC_EXPIRE_TIME].set_varchar(expire_timebuf);
cells[OB_RC_RELATIVE_EXPIRE_TIME].set_varchar(relative_expire_timebuf);
row.cells_ = cells;
row.count_ = OB_RC_MAX_ROUTE_COLUMN_ID;
......@@ -558,7 +575,8 @@ int ObShowRouteHandler::dump_table_item(const ObTableEntry &entry)
}
int extract_entry_time(const ObRouteEntry &entry, char *create_timebuf, char *valid_timebuf,
char *access_timebuf, char *update_timebuf, const uint32_t buf_len)
char *access_timebuf, char *update_timebuf, char *expire_timebuf,
char *relative_expire_time, const uint32_t buf_len)
{
int ret = OB_SUCCESS;
struct tm struct_tm;
......@@ -621,6 +639,34 @@ int extract_entry_time(const ObRouteEntry &entry, char *create_timebuf, char *va
}
}
}
if (OB_SUCC(ret)) {
time_us = usec_to_sec(entry.get_time_for_expired());
if (OB_ISNULL(localtime_r(&time_us, &struct_tm))) {
ret = OB_ERR_UNEXPECTED;
WARN_ICMD("fail to converts the calendar time timep to broken-time representation", K(time_us), K(ret));
} else {
strftime_len = strftime(expire_timebuf, buf_len, "%Y-%m-%d %H:%M:%S", &struct_tm);
if (OB_UNLIKELY(strftime_len <= 0) || OB_UNLIKELY(strftime_len >= buf_len)) {
ret = OB_BUF_NOT_ENOUGH;
WARN_ICMD("timebuf is not enough", K(strftime_len), "timebuf length", buf_len,
K(expire_timebuf), K(ret));
}
}
}
if (OB_SUCC(ret)) {
time_us = usec_to_sec(get_global_table_cache().get_cache_expire_time_us());
if (OB_ISNULL(localtime_r(&time_us, &struct_tm))) {
ret = OB_ERR_UNEXPECTED;
WARN_ICMD("fail to converts the calendar time timep to broken-time representation", K(time_us), K(ret));
} else {
strftime_len = strftime(relative_expire_time, buf_len, "%Y-%m-%d %H:%M:%S", &struct_tm);
if (OB_UNLIKELY(strftime_len <= 0) || OB_UNLIKELY(strftime_len >= buf_len)) {
ret = OB_BUF_NOT_ENOUGH;
WARN_ICMD("timebuf is not enough", K(strftime_len), "timebuf length", buf_len,
K(expire_timebuf), K(ret));
}
}
}
return ret;
}
......@@ -668,15 +714,19 @@ int ObShowRouteHandler::dump_partition_item(const ObPartitionEntry &entry)
char valid_timebuf[buf_len];
char access_timebuf[buf_len];
char update_timebuf[buf_len];
char expire_timebuf[buf_len];
char relative_expire_timebuf[buf_len];
if (OB_FAIL(extract_entry_time(entry, create_timebuf, valid_timebuf, access_timebuf,
update_timebuf, buf_len))) {
update_timebuf, expire_timebuf, relative_expire_timebuf, buf_len))) {
WARN_ICMD("fail to extract_entry_time", K(entry), K(ret));
} else {
cells[OB_RPC_CREATE].set_varchar(create_timebuf);
cells[OB_RPC_LAST_VALID].set_varchar(valid_timebuf);
cells[OB_RPC_LAST_ACCESS].set_varchar(access_timebuf);
cells[OB_RPC_LAST_UPDATE].set_varchar(update_timebuf);
cells[OB_RPC_EXPIRE_TIME].set_varchar(expire_timebuf);
cells[OB_RPC_RELATIVE_EXPIRE_TIME].set_varchar(relative_expire_timebuf);
row.cells_ = cells;
row.count_ = OB_RPC_MAX_ROUTE_COLUMN_ID;
......@@ -715,15 +765,19 @@ int ObShowRouteHandler::dump_routine_item(const ObRoutineEntry &entry)
char valid_timebuf[buf_len];
char access_timebuf[buf_len];
char update_timebuf[buf_len];
char expire_timebuf[buf_len];
char relative_expire_timebuf[buf_len];
if (OB_FAIL(extract_entry_time(entry, create_timebuf, valid_timebuf, access_timebuf,
update_timebuf, buf_len))) {
update_timebuf, expire_timebuf, relative_expire_timebuf, buf_len))) {
WARN_ICMD("fail to extract_entry_time", K(entry), K(ret));
} else {
cells[OB_RRC_CREATE].set_varchar(create_timebuf);
cells[OB_RRC_LAST_VALID].set_varchar(valid_timebuf);
cells[OB_RRC_LAST_ACCESS].set_varchar(access_timebuf);
cells[OB_RRC_LAST_UPDATE].set_varchar(update_timebuf);
cells[OB_RRC_EXPIRE_TIME].set_varchar(expire_timebuf);
cells[OB_RRC_RELATIVE_EXPIRE_TIME].set_varchar(relative_expire_timebuf);
row.cells_ = cells;
row.count_ = OB_RRC_MAX_ROUTE_COLUMN_ID;
......
......@@ -361,7 +361,7 @@ int ObProxyTableScanOp::set_index_for_expr(ObProxyExpr *expr)
} else {
ObString &table_name = expr_column->get_table_name();
ObString &column_name = expr_column->get_column_name();
if ((table_name.empty() || field.tname_.prefix_match(table_name))
if ((table_name.empty() || field.tname_.prefix_case_match(table_name))
&& 0 == column_name.case_compare(field.cname_)) {
expr->set_index(i);
expr->set_accuracy(field.accuracy_);
......
......@@ -48,8 +48,7 @@ namespace obproxy
namespace net
{
// A block size is 8k, and 2 blocks can transmit 16k data, which meets the requirements
static const int64_t NET_MAX_IOV = 2;
static const int64_t NET_MAX_IOV = 16;
static inline ObNetState &get_net_state_by_vio(ObVIO &vio)
{
......@@ -1013,7 +1012,9 @@ ObVIO *ObUnixNetVConnection::do_io_write(
// SSL_read maybe trigger write and SSL_write maybe trigger read
// so reenable write
if (nbytes > 0 && (!write_.enabled_ || using_ssl_)) {
write_.triggered_ = true;
if (using_ssl_) {
write_.triggered_ = true;
}
write_.vio_.reenable();
}
} else {
......
......@@ -156,7 +156,6 @@ public:
ObCongestionZoneState *get_zone_state(const common::ObString &zone_name);
bool is_base_servers_added() const { return is_base_servers_added_; }
void set_base_servers_added() { is_base_servers_added_ = true; }
void clear_base_servers_added() { is_base_servers_added_ = false; }
bool is_congestion_avail();
int update_tc_congestion_map(ObCongestionEntry &entry);
DECLARE_TO_STRING;
......
......@@ -159,7 +159,6 @@ public:
DEF_TIME(idc_list_refresh_interval, "2h", "[10s, 1d]", "the interval to refresh idc list for getting newest region-idc, [10s, 1d]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
DEF_TIME(stat_table_sync_interval, "60s", "[0s,1d]", "update sync statistic to ob_all_proxy_stat table interval, [0s, 1d], 0 means disable, if set a negative value, proxy treat it as 0", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
DEF_TIME(stat_dump_interval, "6000s", "[0s,1d]", "dump statistic in log interval, [0s, 1d], 0 means disable, if set a negative value, proxy treat it as 0", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
DEF_INT(partition_location_expire_relative_time, "0", "[-36000000,36000000]", "the unit is ms, 0 means do not expire, others will expire partition location base on relative time", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_INT(cluster_count_high_water_mark, "256", "[2, 102400]", "if cluster count is greater than this water mark, cluser will be kicked out by LRU", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
DEF_TIME(cluster_expire_time, "1d", "[0,]", "cluster resource expire time, 0 means never expire,cluster will be deleted if it has not been accessed for more than the time,[0, ]", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
......@@ -341,8 +340,14 @@ public:
DEF_BOOL(enable_extra_prometheus_metric, "false", "enable net and route prometheus merics or not", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
DEF_BOOL(enable_causal_order_read, "true", "if enabled, proxy will choose server by priority and sync safe snapshot version if need", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
DEF_BOOL(enable_qa_mode, "false", "just for test, if enabled, proxy can forcibly expire all location cache", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_INT(location_expire_period, "0", "[0,36000000]", "the unit is ms, only work if qa_mode is set, it means location cache which has been created for more than this value will be expired", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
// The following four parameters are related to the location cache at the table level.
// Here, it is recommended to use partition_location_expire_relative_time in an emergency,
// and set the active expiration recommendation location_expire_period_time
// enable_qa_mode and location_expire_period will be discarded later
DEF_INT(partition_location_expire_relative_time, "0", "[-36000000,36000000]", "the unit is ms, 0 means do not expire, others will expire partition location base on relative time", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_BOOL(enable_qa_mode, "false", "just for test, not recommended, if enabled, proxy can forcibly expire all location cache", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_INT(location_expire_period, "0", "[0,36000000]", "just for test, not recommended, the unit is ms, only work if qa_mode is set, it means location cache which has been created for more than this value will be expired", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
DEF_TIME(location_expire_period_time, "0d", "[0s, 30d]", "time for location expire period, values in [0s, 30d], 0 means no expire", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_SYS);
// in public cloud, will assign a vip addr to proxy. qa_mode_mock_slb_vip is a vip addr for testing
DEF_STR(qa_mode_mock_public_cloud_slb_addr, "127.0.0.1:33045", "mock public cloud slb addr", CFG_NO_NEED_REBOOT, CFG_SECTION_OBPROXY, CFG_VISIBLE_LEVEL_USER);
......
......@@ -40,7 +40,8 @@ int ObProxyDMLStmt::limit_to_sql_string(common::ObSqlString& sql_string)
return ret;
}
ObProxySelectStmt::ObProxySelectStmt() : is_inited_(false), has_rollup_(false),
has_for_update_(false), from_token_off_(-1)
has_for_update_(false), has_unsupport_expr_type_(false),
from_token_off_(-1)
{
}
......@@ -100,7 +101,7 @@ int ObProxySelectStmt::handle_parse_result(const ParseResult &parse_result)
if (NULL == tmp_node) {
// do nothing
} else if (i == PARSE_SELECT_HAVING) {
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("having not support", K(ret), K(sql_string_));
} else {
switch(tmp_node->type_) {
......@@ -146,7 +147,7 @@ int ObProxySelectStmt::handle_parse_result(const ParseResult &parse_result)
break;
case T_QEURY_EXPRESSION_LIST: //distinct not support now
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport type", "node_type", get_type_name(tmp_node->type_), K(sql_string_), K(ret));
}
}
......@@ -339,7 +340,7 @@ int ObProxySelectStmt::handle_limit_clause(ParseNode* node)
}
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport type", "node_type", get_type_name(tmp_node->type_), K(sql_string_));
}
}
......@@ -367,7 +368,7 @@ int ObProxySelectStmt::handle_project_list(ParseNode* node)
}
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport type", "node_type", get_type_name(tmp_node->type_), K(sql_string_));
}
......@@ -532,7 +533,7 @@ int ObProxySelectStmt::get_expr_by_type(ObProxyExpr* &expr, ObProxyExprType type
ALLOC_PROXY_EXPR_BY_TYPE(ObProxyGroupItem);
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unexpected type", K(type));
break;
}
......@@ -562,25 +563,25 @@ int ObProxySelectStmt::handle_table_and_db_node(ParseNode* node, ObProxyExprTabl
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dynamic_cast failed", K(ret));
}
} else if (OB_SUCCESS == table_exprs_map_.get_refactored(table_name, expr)) {
if (OB_ISNULL(expr_table = dynamic_cast<ObProxyExprTable*>(expr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dynamic_cast failed", K(ret));
} else {
// Only the real table name needs to be rewritten, not the alias
ObProxyExprTablePos expr_table_pos;
if (NULL != db_node) {
expr_table_pos.set_database_pos(db_node->token_off_);
}
expr_table_pos.set_table_pos(table_node->token_off_);
expr_table_pos.set_table_expr(expr_table);
if (OB_FAIL(table_pos_array_.push_back(expr_table_pos))) {
LOG_WARN("fail to push expr table pos", K(ret));
} else {
string_to_upper_case(table_name.ptr(), table_name.length()); // change all to upper to store
if (OB_SUCCESS == table_exprs_map_.get_refactored(table_name, expr)) {
if (OB_ISNULL(expr_table = dynamic_cast<ObProxyExprTable*>(expr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dynamic_cast failed", K(ret));
} else {
// Only the real table name needs to be rewritten, not the alias
ObProxyExprTablePos expr_table_pos;
if (NULL != db_node) {
expr_table_pos.set_database_pos(db_node->token_off_);
}
expr_table_pos.set_table_pos(table_node->token_off_);
expr_table_pos.set_table_expr(expr_table);
if (OB_FAIL(table_pos_array_.push_back(expr_table_pos))) {
LOG_WARN("fail to push expr table pos", K(ret));
}
}
}
} else {
ret = OB_ERR_BAD_FIELD_ERROR;
LOG_WARN("table name of column is not alias name or real table name", K(table_name), K(ret));
}
}
......@@ -622,7 +623,7 @@ int ObProxySelectStmt::column_ref_to_expr(ParseNode* node, ObProxyExpr* &expr, O
LOG_WARN("T_COLUMN_REF unexpected column entry", K(ret));
} else if (T_IDENT != column_node->type_ && T_STAR != column_node->type_) {
// now column_ref child should be T_IDENT
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("T_COLUMN_REF unexpected column entry", "node_type", get_type_name(column_node->type_), K(ret));
} else if (T_IDENT == column_node->type_) {
ObProxyExprColumn* expr_column = NULL;
......@@ -759,7 +760,7 @@ int ObProxySelectStmt::check_node_has_agg(ParseNode* node)
case T_FUN_MAX:
case T_FUN_MIN:
case T_FUN_AVG:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("node has agg", "node_type", get_type_name(tmp_node->type_), K(ret), K(sql_string_));
break;
default:
......@@ -859,7 +860,7 @@ int ObProxySelectStmt::string_node_to_expr(ParseNode* node, ObProxyExpr* &expr,
if (OB_FAIL(check_node_has_agg(node))) {
LOG_WARN("unsupport type", "node_type", get_type_name(node->type_), K(node->str_value_));
} else if (string_node == NULL) {
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport type", "node_type", get_type_name(node->type_), K(node->str_value_), K(sql_string_));
} else if (OB_FAIL(get_sharding_const_expr(string_node, expr))) {
LOG_WARN("get_sharding_const_expr failed", K(ret));
......@@ -914,6 +915,7 @@ int ObProxySelectStmt::get_table_and_db_expr(ParseNode* node, ObProxyExprTable*
"token len", table_node->token_len_, K(ret));
} else {
ObString table_name(table_node->token_len_, table_node->str_value_);
string_to_upper_case(table_name.ptr(), table_name.length()); // change all to upper to store
if (OB_FAIL(table_exprs_map_.get_refactored(table_name, expr))) { /* same table keep last one. */
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(get_expr_by_type(expr, OB_PROXY_EXPR_TYPE_TABLE))) {
......@@ -923,6 +925,8 @@ int ObProxySelectStmt::get_table_and_db_expr(ParseNode* node, ObProxyExprTable*
LOG_WARN("dynamic_cast failed", K(ret));
} else {
if (NULL != db_node) {
ObString database_name(db_node->token_len_, db_node->str_value_);
string_to_upper_case(database_name.ptr(), database_name.length()); // change all to upper to store
expr_table->set_database_name(db_node->str_value_, db_node->token_len_);
}
expr_table->set_table_name(table_node->str_value_, table_node->token_len_);
......@@ -939,6 +943,8 @@ int ObProxySelectStmt::get_table_and_db_expr(ParseNode* node, ObProxyExprTable*
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dynamic_cast failed", K(ret));
} else if (NULL != db_node && expr_table->get_database_name().empty()) {
ObString database_name(db_node->token_len_, db_node->str_value_);
string_to_upper_case(database_name.ptr(), database_name.length()); // change all to upper to store
expr_table->set_database_name(db_node->str_value_, db_node->token_len_);
}
}
......@@ -991,7 +997,7 @@ int ObProxySelectStmt::handle_table_node_to_expr(ParseNode* node)
}
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport type", "node_type", get_type_name(node->type_), K(node->str_value_));
}
}
......@@ -1169,7 +1175,7 @@ int ObProxySelectStmt::handle_where_node(ParseNode* node)
}
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unexpected expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_));
}
}
......@@ -1334,13 +1340,13 @@ int ObProxySelectStmt::handle_sort_list_node(ParseNode* node, const SortListType
}
break;
default:
ret = OB_ERR_UNEXPECTED;
has_unsupport_expr_type_ = true;
LOG_WARN("invalid sort_list_type", K(sort_list_type), K(sql_string_), K(ret));
}
}
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_), K(ret));
}
}
......@@ -1370,7 +1376,7 @@ int ObProxySelectStmt::handle_with_rollup_in_groupby(ParseNode* node)
}
break;
default:
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_));
}
}
......@@ -1387,7 +1393,7 @@ int ObProxySelectStmt::handle_groupby_clause(ParseNode* node)
if (NULL == tmp_node) {
//do nothing
} else if (T_WITH_ROLLUP_CLAUSE != tmp_node->type_) {
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_));
} else if (OB_FAIL(handle_with_rollup_in_groupby(tmp_node))) {
LOG_WARN("handle_with_rollup_in_groupby failed", K(ret), K(i), K(sql_string_));
......@@ -1406,7 +1412,7 @@ int ObProxySelectStmt::handle_orderby_clause(ParseNode* node)
if (NULL == tmp_node) {
//do nothing
} else if (T_SORT_LIST != tmp_node->type_) {
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
has_unsupport_expr_type_ = true;
LOG_WARN("unsupport expr type", "node_type", get_type_name(tmp_node->type_), K(sql_string_));
} else if (OB_FAIL(handle_sort_list_node(tmp_node, SORT_LSIT_IN_ORDER_BY))) {
LOG_WARN("handle_sort_list_node", K(ret), K(i), K(sql_string_));
......
......@@ -113,6 +113,7 @@ public:
int handle_comment_list(const ParseResult &parse_result);
virtual int to_sql_string(common::ObSqlString& sql_string);
bool has_for_update() const { return has_for_update_; }
bool has_unsupport_expr_type() const { return has_unsupport_expr_type_; }
void set_table_name(const common::ObString& table_name) { table_name_ = table_name; }
int64_t get_from_token_off() { return from_token_off_; }
......@@ -175,6 +176,7 @@ private:
bool is_inited_;
bool has_rollup_;
bool has_for_update_;
bool has_unsupport_expr_type_;
int64_t from_token_off_;
ExprMap table_exprs_map_;
ExprMap alias_table_map_;
......
......@@ -173,8 +173,6 @@ int ObRslistFetchCont::init_task()
} else {
fetch_result_ = true;
}
} else {
fetch_result_ = true;
}
if (!fetch_result_) {
......
......@@ -1193,9 +1193,6 @@ int ObServerStateRefreshCont::add_refresh_rslist_task(const bool need_update_dum
cont->destroy();
cont = NULL;
}
} else {
congestion_manager_->clear_base_servers_added();
LOG_INFO("congestion manager's base servers has cleared", K_(cluster_name), K_(cluster_id));
}
} else {
LOG_DEBUG("refresh rslist task has been scheduled", K_(cluster_name), K_(cluster_id),
......
......@@ -229,9 +229,8 @@ void ObResourceUnitTableProcessor::dec_conn(
if (ATOMIC_FAA(&used_conn->max_used_connections_, -1) > 1) {
} else {
DRWLock::WRLockGuard guard(used_conn_rwlock_);
if (0 == used_conn->max_used_connections_) {
erase_used_conn(key_name);
used_conn->dec_ref();
if (0 == used_conn->max_used_connections_ && used_conn->is_in_map_) {
erase_used_conn(key_name, used_conn);
LOG_DEBUG("erase used conn", K(key_name));
}
}
......@@ -544,6 +543,8 @@ int ObResourceUnitTableProcessor::create_used_conn(ObString& key_name,
used_conn->inc_ref();
if (OB_FAIL(used_conn_cache_.set(used_conn))) {
LOG_WARN("fail to set used conn map", K(key_name), KPC(used_conn), K(ret));
} else {
used_conn->is_in_map_ = true;
}
}
......@@ -601,11 +602,14 @@ int ObResourceUnitTableProcessor::get_or_create_used_conn(ObString& key_name,
return ret;
}
int ObResourceUnitTableProcessor::erase_used_conn(ObString& key_name)
int ObResourceUnitTableProcessor::erase_used_conn(ObString& key_name, ObUsedConn* used_conn)
{
int ret = OB_SUCCESS;
if (OB_FAIL(used_conn_cache_.erase(key_name))) {
LOG_WARN("erase used conn failed", K(key_name));
} else {
used_conn->is_in_map_ = false;
used_conn->dec_ref();
}
return ret;
......
......@@ -97,7 +97,7 @@ public:
int create_used_conn(common::ObString& key_name, ObUsedConn*& used_conn, int64_t& cur_used_connections);
int get_used_conn(common::ObString& key_name, bool is_need_inc_used_connections, ObUsedConn*& used_conn, int64_t& cur_used_connections);
int erase_used_conn(common::ObString& key_name);
int erase_used_conn(common::ObString& key_name, ObUsedConn* used_conn);
int get_or_create_used_conn(common::ObString& key_name, ObUsedConn*& used_conn, int64_t& cur_used_connections);
TO_STRING_KV(K_(is_inited), K_(backup_status));
......
......@@ -131,7 +131,7 @@ private:
class ObUsedConn : public common::ObSharedRefCount {
public:
ObUsedConn(common::ObString& full_name) : max_used_connections_(0) {
ObUsedConn(common::ObString& full_name) : max_used_connections_(0), is_in_map_(false) {
if (full_name.length() < OB_PROXY_MAX_TENANT_CLUSTER_NAME_LENGTH + common::OB_IP_STR_BUFF) {
MEMCPY(full_name_str_, full_name.ptr(), full_name.length());
full_name_.assign_ptr(full_name_str_, (int32_t)full_name.length());
......@@ -146,13 +146,15 @@ public:
void reset() {
full_name_.reset();
max_used_connections_ = 0;
is_in_map_ = false;
}
TO_STRING_KV(K_(full_name), K_(max_used_connections));
TO_STRING_KV(K_(full_name), K_(max_used_connections), K_(is_in_map));
LINK(ObUsedConn, used_conn_link_);
public:
common::ObString full_name_;
volatile int64_t max_used_connections_;
bool is_in_map_;
private:
char full_name_str_[OB_PROXY_MAX_TENANT_CLUSTER_NAME_LENGTH + common::OB_IP_STR_BUFF];
......
......@@ -137,9 +137,9 @@ typedef struct _ObProxyRelationExpr
* table: oceanbase.__all_virtual_proxy_partition_info
* -> column spare5: VARCHAR, format: "int32_t,int16_t,int16_t"
* which means "length,precision/length_semantics,scale"
* init -1 to each elements for invalid status.
*/
typedef struct _ObProxyPartKeyAccuracy {
int8_t valid_; // default is 0, means not valid
int32_t length_;
int16_t precision_; // the same as length_semantics
int16_t scale_;
......
......@@ -5101,6 +5101,7 @@ void ObMysqlSM::do_partition_location_lookup()
int ret = OB_SUCCESS;
ObAction *pl_lookup_action_handle = NULL;
bool find_entry = false;
int64_t tenant_version = 0;
bool need_pl_route = (is_pl_route_supported() && (trans_state_.trans_info_.client_request_.get_parse_result().is_call_stmt()
|| trans_state_.trans_info_.client_request_.get_parse_result().is_text_ps_call_stmt()));
// Get it from table_map first
......@@ -5117,10 +5118,16 @@ void ObMysqlSM::do_partition_location_lookup()
cr_id = sm_cluster_resource_->get_cluster_id();
}
ObTableEntryKey key(name, sm_cluster_resource_->version_, cr_id);
if (OB_UNLIKELY(get_global_proxy_config().check_tenant_locality_change)) {
tenant_version = sm_cluster_resource_->get_location_tenant_version(
client_session_->get_session_info().get_priv_info().tenant_name_);
}
tmp_entry = table_map.get(key);
if (NULL != tmp_entry && !tmp_entry->is_partition_table()
&& (tmp_entry->is_avail_state() || tmp_entry->is_updating_state())
&& !(get_global_table_cache().is_table_entry_expired(*tmp_entry))) {
tmp_entry->check_and_set_expire_time(tenant_version, tmp_entry->is_dummy_entry());
tmp_entry->renew_last_access_time();
ObMysqlRouteResult result;
result.table_entry_ = tmp_entry;
......@@ -5128,7 +5135,8 @@ void ObMysqlSM::do_partition_location_lookup()
result.has_dup_replica_ = tmp_entry->has_dup_replica();
tmp_entry->set_need_force_flush(false);
find_entry = true;
LOG_DEBUG("get table entry from thread map", KPC(tmp_entry));
bool is_table_entry_from_remote = false;
LOG_DEBUG("ObMysqlRoute get table entry succ", KPC(tmp_entry), K(is_table_entry_from_remote));
state_partition_location_lookup(TABLE_ENTRY_EVENT_LOOKUP_DONE, &result);
} else if (NULL != tmp_entry) {
tmp_entry->dec_ref();
......@@ -5149,15 +5157,13 @@ void ObMysqlSM::do_partition_location_lookup()
} else {
param.cr_id_ = sm_cluster_resource_->get_cluster_id();
}
if (OB_UNLIKELY(get_global_proxy_config().check_tenant_locality_change)) {
param.tenant_version_ = sm_cluster_resource_->get_location_tenant_version(
client_session_->get_session_info().get_priv_info().tenant_name_);
}
param.tenant_version_ = tenant_version;
param.timeout_us_ = hrtime_to_usec(trans_state_.mysql_config_params_->short_async_task_timeout_);
param.is_partition_table_route_supported_ = is_partition_table_route_supported();
param.is_oracle_mode_ = client_session_->get_session_info().is_oracle_mode();
param.client_request_ = &trans_state_.trans_info_.client_request_; // priv parse result
param.client_info_ = &client_session_->get_session_info();
param.route_ = &trans_state_.pll_info_.route_;
param.need_pl_route_ = need_pl_route;
param.current_idc_name_ = client_session_->get_current_idc_name();//shallow copy
if (trans_state_.pll_info_.is_cached_dummy_force_renew()) {
......
......@@ -292,6 +292,7 @@ void ObMysqlTransact::acquire_cached_server_session(ObTransState &s)
LOG_DEBUG("[ObMysqlTransact::acquire_cached_server_session] last_insert_id_session is alive, pick it");
} else if (get_global_proxy_config().enable_cached_server
&& !s.sm_->client_session_->is_proxy_mysql_client_
&& NULL != last_session && OB_LIKELY(!s.mysql_config_params_->is_random_routing_mode())) {
const int32_t ip = ops_ip4_addr_host_order(last_session->get_netvc()->get_remote_addr());
const int32_t port = static_cast<int32_t>(ops_ip_port_host_order(last_session->get_netvc()->get_remote_addr()));
......@@ -1271,7 +1272,9 @@ void ObMysqlTransact::handle_pl_update(ObTransState &s)
inline bool ObMysqlTransact::ObPartitionLookupInfo::need_update_entry_by_partition_hit()
{
return (need_update_entry() && !route_.is_dummy_table());
return (need_update_entry()
&& !route_.is_dummy_table()
&& !route_.no_need_pl_update_);
}
int64_t ObMysqlTransact::ObPartitionLookupInfo::to_string(char *buf, const int64_t buf_len) const
......
......@@ -234,7 +234,8 @@ int ObRespResult::is_resp_finished(bool &finished, ObMysqlRespEndingType &ending
case OB_MYSQL_COM_REFRESH :
case OB_MYSQL_COM_PROCESS_KILL :
case OB_MYSQL_COM_LOGIN:
case OB_MYSQL_COM_INIT_DB : {
case OB_MYSQL_COM_INIT_DB :
case OB_MYSQL_COM_CHANGE_USER: {
if (OB_UNLIKELY(is_mysql_mode())) {
if (1 == pkt_cnt_[OK_PACKET_ENDING_TYPE]) {
finished = true;
......
......@@ -227,7 +227,7 @@ int ObLDCLocation::assign(const ObTenantServer *ts, const ObIArray<ObServerState
}//end of found server
}//end of for ss_info
if (OB_SUCC(ret) && !found) {
if (is_base_servers_added) {
if (is_base_servers_added && !replica.server_.is_ip_loopback()) {
LOG_WARN("fail to find tenant server from server list, maybe has not updated, don not use it", K(replica));
} else {
// if relica has no IDC info, lower priority. Avoid choosing offline relica
......
......@@ -404,19 +404,6 @@ inline void ObMysqlRoute::setup_table_entry_lookup()
}
if (OB_SUCC(ret)) {
if (NULL != table_param.result_.target_entry_) {
ObTableEntry* entry = table_param.result_.target_entry_;
if (param_.tenant_version_ != entry->get_tenant_version()
&& entry->is_avail_state()
&& !entry->is_dummy_entry()
&& entry->get_time_for_expired() == 0) {
LOG_INFO("tenant locality change, set table entry time expired",
K_(param_.tenant_version), K(entry->get_tenant_version()), KPC(entry));
entry->set_time_for_expired(ObRandomNumUtils::get_random_half_to_full(60*1000*1000)
+ common::ObTimeUtility::current_time());
}
}
if (NULL == table_entry_action_handle) { // cache hit
handle_event(TABLE_ENTRY_EVENT_LOOKUP_DONE, &table_param.result_);
table_param.result_.reset();
......@@ -447,6 +434,10 @@ int ObMysqlRoute::state_table_entry_lookup(int event, void *data)
if (false == param_.is_need_force_flush_) {
param_.is_need_force_flush_ = result->is_need_force_flush_;
}
if (NULL != table_entry_) {
table_entry_->check_and_set_expire_time(param_.tenant_version_, table_entry_->is_dummy_entry());
}
result->target_entry_ = NULL;
is_table_entry_from_remote_ = result->is_from_remote_;
// use different log level only for debug
......@@ -603,6 +594,7 @@ inline void ObMysqlRoute::setup_partition_id_calc()
*result,
*param_.client_request_,
*param_.client_info_,
*param_.route_,
*part_info,
part_id_))) {
LOG_INFO("fail to calculate partition id, just use tenant server", K(tmp_ret));
......@@ -713,13 +705,8 @@ int ObMysqlRoute::state_partition_entry_lookup(int event, void *data)
result->target_entry_ = NULL;
is_part_entry_from_remote_ = result->is_from_remote_;
if (NULL != part_entry_ && part_entry_->is_avail_state()
&& param_.tenant_version_ != part_entry_->get_tenant_version()
&& part_entry_->get_time_for_expired() == 0) {
LOG_DEBUG("tenant locality change, set partition entry time expired",
K(param_.tenant_version_), K(part_entry_->get_tenant_version()), KPC_(part_entry));
part_entry_->set_time_for_expired(ObRandomNumUtils::get_random_half_to_full(60*1000*1000)
+ common::ObTimeUtility::current_time());
if (NULL != part_entry_) {
part_entry_->check_and_set_expire_time(param_.tenant_version_, false);
}
if (NULL != part_entry_ && (part_entry_->is_avail_state())) {
......@@ -951,6 +938,7 @@ inline int ObMysqlRoute::deep_copy_route_param(ObRouteParam &param)
// !!Attention client_request should not be used in scheduled cont
param_.client_request_ = param.client_request_;
param_.client_info_ = param.client_info_;
param_.route_ = param.route_;
// no need assign result_ and cr
if (NULL != name_buf_ && name_buf_len_ > 0) {
op_fixed_mem_free(name_buf_, name_buf_len_);
......
......@@ -34,6 +34,7 @@ class ObMysqlRoute;
class ObPartitionEntry;
class ObProxyMysqlRequest;
class ObClientSessionInfo;
class ObServerRoute;
typedef int (ObMysqlRoute::*MysqlRouteHandler)(int event, void *data);
class ObMysqlRouteResult
......@@ -89,7 +90,7 @@ public:
: cont_(NULL), name_(), force_renew_(false), use_lower_case_name_(false),
is_partition_table_route_supported_(false), need_pl_route_(false), is_oracle_mode_(false),
is_need_force_flush_(false), result_(), mysql_proxy_(NULL), client_request_(NULL), client_info_(),
cr_version_(-1), cr_id_(-1), tenant_version_(0), timeout_us_(-1), current_idc_name_(),
route_(NULL), cr_version_(-1), cr_id_(-1), tenant_version_(0), timeout_us_(-1), current_idc_name_(),
cr_(NULL) {}
~ObRouteParam() { reset(); }
......@@ -113,6 +114,7 @@ public:
ObMysqlProxy *mysql_proxy_;
ObProxyMysqlRequest *client_request_;
ObClientSessionInfo *client_info_;
ObServerRoute *route_;
int64_t cr_version_;
int64_t cr_id_;
uint64_t tenant_version_;
......@@ -148,6 +150,7 @@ inline void ObRouteParam::reset()
is_oracle_mode_ = false;
is_need_force_flush_ = false;
mysql_proxy_ = NULL;
route_ = NULL;
cr_version_ = -1;
cr_id_ = -1;
tenant_version_ = 0;
......
......@@ -70,7 +70,7 @@ inline bool ObPartitionEntryKey::is_valid() const
return (common::OB_INVALID_ID != table_id_
&& common::OB_INVALID_ID != partition_id_
&& (cr_version_ >= 0)
&& (cr_id_ <= 0));
&& (cr_id_ >= 0));
}
inline uint64_t ObPartitionEntryKey::hash(uint64_t seed) const
......
......@@ -14,9 +14,11 @@
#include "proxy/route/ob_route_struct.h"
#include "iocore/eventsystem/ob_buf_allocator.h"
#include "utils/ob_proxy_utils.h"
using namespace oceanbase::common;
using namespace oceanbase::share;
using namespace oceanbase::obproxy;
namespace oceanbase
{
......@@ -313,6 +315,33 @@ int64_t ObTableEntryKey::to_string(char *buf, const int64_t buf_len) const
return pos;
}
void ObRouteEntry::check_and_set_expire_time(const uint64_t tenant_version, const bool is_dummy_entry)
{
int64_t period_us = obutils::get_global_proxy_config().location_expire_period_time;
const int64_t TENANT_LOCALITY_CHANGE_TIME = 60 * 1000 * 1000;
const int64_t TENANT_LOCALITY_CHANGE_TIME_CONFIG = -1;
if (AVAIL == state_ && !is_dummy_entry) {
if (tenant_version != tenant_version_) {
tenant_version_ = tenant_version;
// -1 means the change comes from a locality change
current_expire_time_config_ = TENANT_LOCALITY_CHANGE_TIME_CONFIG;
period_us = TENANT_LOCALITY_CHANGE_TIME;
time_for_expired_ = ObRandomNumUtils::get_random_half_to_full(period_us) + common::ObTimeUtility::current_time();
} else if (period_us != current_expire_time_config_ && TENANT_LOCALITY_CHANGE_TIME_CONFIG != current_expire_time_config_) {
current_expire_time_config_ = period_us;
if (period_us > 0) {
time_for_expired_ = ObRandomNumUtils::get_random_half_to_full(period_us) + common::ObTimeUtility::current_time();
} else if (period_us == 0) {
time_for_expired_ = 0;
} else {
// period_us < 0, unexpected error
}
} else {
// do nothing
}
}
}
} // end of namespace proxy
} // end of namespace obproxy
......
......@@ -383,7 +383,7 @@ public:
ObRouteEntry()
: common::ObSharedRefCount(), cr_version_(-1), cr_id_(common::OB_INVALID_CLUSTER_ID), schema_version_(0), create_time_us_(0),
last_valid_time_us_(0), last_access_time_us_(0), last_update_time_us_(0),
state_(BORN), tenant_version_(0), time_for_expired_(0) {}
state_(BORN), tenant_version_(0), time_for_expired_(0), current_expire_time_config_(0) {}
virtual ~ObRouteEntry() {}
virtual void free() = 0;
......@@ -430,6 +430,7 @@ public:
uint64_t get_tenant_version() const { return tenant_version_; }
int64_t get_time_for_expired() const { return time_for_expired_; }
void set_time_for_expired(int64_t expire_time) { time_for_expired_ = expire_time; }
void check_and_set_expire_time(const uint64_t tenant_version, const bool is_dummy_entry);
protected:
int64_t cr_version_; // one entry must belong to one cluster with the specfied version
......@@ -444,6 +445,7 @@ protected:
ObRouteEntryState state_;
uint64_t tenant_version_;
int64_t time_for_expired_;
int64_t current_expire_time_config_;
};
inline bool ObRouteEntry::is_need_update() const
......
......@@ -635,9 +635,11 @@ void ObRouteUtils::parse_part_key_accuracy(ObProxyPartKey *part_key,
|| ob_is_time_tc(part_key_type)) {
part_key->accuracy_.precision_ = precision;
part_key->accuracy_.scale_ = scale;
part_key->accuracy_.valid_ = 1;
} else if (ob_is_string_tc(part_key_type)) {
part_key->accuracy_.length_ = length;
part_key->accuracy_.precision_ = precision;
part_key->accuracy_.valid_ = 1;
}
// more obj type could be supported here.
}
......@@ -716,6 +718,7 @@ inline int ObRouteUtils::fetch_part_key(ObResultSetFetcher &rs_fetcher,
part_key->name_.str_ = buf;
part_key->obj_type_ = part_key_type;
part_key->idx_in_rowid_ = idx_in_rowid;
part_key->accuracy_.valid_ = 0; // not valid accuracy
parse_part_key_accuracy(part_key, part_key_type, &allocator, part_key_accuracy);
......
......@@ -33,7 +33,7 @@ public:
ObServerRoute()
: table_entry_(NULL), dummy_entry_(NULL), part_entry_(NULL), cur_chosen_pl_(NULL),
is_table_entry_from_remote_(false), is_part_entry_from_remote_(false),
has_dup_replica_(false), need_use_dup_replica_(false),
has_dup_replica_(false), need_use_dup_replica_(false), no_need_pl_update_(false),
consistency_level_(common::INVALID_CONSISTENCY), leader_item_(),
ldc_route_(), valid_count_(0), cur_chosen_server_(),
cur_chosen_route_type_(ROUTE_TYPE_MAX), skip_leader_item_(false) {}
......@@ -120,6 +120,7 @@ public:
bool is_part_entry_from_remote_;
bool has_dup_replica_;
bool need_use_dup_replica_;
bool no_need_pl_update_;
common::ObConsistencyLevel consistency_level_;
ObLDCItem leader_item_;
......@@ -138,6 +139,7 @@ inline void ObServerRoute::reset()
is_part_entry_from_remote_ = false;
has_dup_replica_ = false;
need_use_dup_replica_ = false;
no_need_pl_update_ = false;
skip_leader_item_ = false;
set_dummy_entry(NULL);
set_table_entry(NULL);
......
......@@ -25,6 +25,7 @@
#include "rpc/obmysql/ob_mysql_packet.h"
#include "lib/timezone/ob_time_convert.h"
#include "lib/timezone/ob_timezone_info.h"
#include "proxy/route/ob_server_route.h"
using namespace oceanbase::common;
......@@ -40,6 +41,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo
const ObSqlParseResult &parse_result,
ObProxyMysqlRequest &client_request,
ObClientSessionInfo &client_info,
ObServerRoute &route,
ObProxyPartInfo &part_info,
int64_t &partition_id)
{
......@@ -51,7 +53,8 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo
}
}
if (OB_INVALID_INDEX == partition_id && parse_result.has_simple_route_info()) {
if (OB_FAIL(calc_part_id_with_simple_route_info(allocator, parse_result, client_info, part_info, partition_id))) {
if (OB_FAIL(calc_part_id_with_simple_route_info(allocator, parse_result, client_info,
route, part_info, partition_id))) {
LOG_WARN("fail to calc part id with simple part info, will do calc in normal path", K(ret));
}
}
......@@ -89,7 +92,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo
LOG_INFO("fail to do expr resolve", K(print_sql), "expr_parse_result",
ObExprParseResultPrintWrapper(expr_parse_result),
K(part_info), KPC(ps_id_entry), KPC(text_ps_entry), K(resolve_result));
} else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, part_info, parse_result,
} else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, route, part_info, parse_result,
allocator, partition_id))) {
if (OB_MYSQL_COM_STMT_PREPARE != cmd) {
LOG_INFO("fail to do expr resolve", K(print_sql), K(resolve_result), K(part_info));
......@@ -99,7 +102,8 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo
}
}
if (OB_FAIL(ret)) {
if (OB_FAIL(ret)
&& !get_global_proxy_config().enable_cached_server) {
int64_t tmp_first_part_id = OB_INVALID_INDEX;
int64_t tmp_sub_part_id = OB_INVALID_INDEX;
if (OB_FAIL(calc_part_id_by_random_choose_from_exist(part_info,
......@@ -108,6 +112,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo
partition_id))) {
LOG_WARN("fail to cal part id by random choose", K(tmp_first_part_id), K(tmp_sub_part_id), K(ret));
} else {
route.no_need_pl_update_ = true;
LOG_DEBUG("succ to cal part id by random choose", K(tmp_first_part_id), K(tmp_sub_part_id), K(partition_id));
}
}
......@@ -119,6 +124,7 @@ int ObProxyExprCalculator::calculate_partition_id(common::ObArenaAllocator &allo
int ObProxyExprCalculator::calc_part_id_with_simple_route_info(ObArenaAllocator &allocator,
const ObSqlParseResult &parse_result,
ObClientSessionInfo &client_info,
ObServerRoute &route,
ObProxyPartInfo &part_info,
int64_t &part_id)
{
......@@ -133,7 +139,7 @@ int ObProxyExprCalculator::calc_part_id_with_simple_route_info(ObArenaAllocator
ObExprResolverResult resolve_result;
if (OB_FAIL(do_resolve_with_part_key(parse_result, allocator, resolve_result))) {
LOG_WARN("fail to do_resolve_with_part_key", K(ret));
} else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, part_info,
} else if (OB_FAIL(do_partition_id_calc(resolve_result, client_info, route, part_info,
parse_result, allocator, part_id))) {
LOG_INFO("fail to do_partition_id_calc", K(resolve_result), K(part_info));
}
......@@ -260,6 +266,7 @@ int ObProxyExprCalculator::do_expr_resolve(ObExprParseResult &parse_result,
int ObProxyExprCalculator::do_partition_id_calc(ObExprResolverResult &resolve_result,
ObClientSessionInfo &session_info,
ObServerRoute &route,
ObProxyPartInfo &part_info,
const ObSqlParseResult &parse_result,
ObIAllocator &allocator,
......@@ -306,10 +313,12 @@ int ObProxyExprCalculator::do_partition_id_calc(ObExprResolverResult &resolve_re
if (OB_SUCC(ret)) {
partition_id = generate_phy_part_id(first_part_id, sub_part_id, part_info.get_part_level());
LOG_DEBUG("succ to get part id", K(first_part_id), K(sub_part_id), K(partition_id));
} else {
} else if (!get_global_proxy_config().enable_cached_server) {
if (OB_FAIL(calc_part_id_by_random_choose_from_exist(part_info, first_part_id, sub_part_id, partition_id))) {
LOG_WARN("fail to get part id at last", K(first_part_id), K(sub_part_id), K(ret));
} else {
// get part id by random, no need update pl
route.no_need_pl_update_ = true;
LOG_DEBUG("succ to get part id by random", K(first_part_id), K(sub_part_id), K(partition_id));
}
}
......
......@@ -43,6 +43,7 @@ class ObProxyPartInfo;
class ObClientSessionInfo;
class ObPsIdEntry;
class ObTextPsEntry;
class ObServerRoute;
class ObProxyExprCalculator
{
......@@ -54,6 +55,7 @@ public:
const obutils::ObSqlParseResult &parse_result,
ObProxyMysqlRequest &client_request,
ObClientSessionInfo &client_info,
ObServerRoute &route,
ObProxyPartInfo &part_info,
int64_t &partition_id);
private:
......@@ -74,6 +76,7 @@ private:
opsql::ObExprResolverResult &resolve_result);
int do_partition_id_calc(opsql::ObExprResolverResult &resolve_result,
ObClientSessionInfo &client_info,
ObServerRoute &route,
ObProxyPartInfo &part_info,
const obutils::ObSqlParseResult &parse_result,
common::ObIAllocator &allocator,
......@@ -81,6 +84,7 @@ private:
int calc_part_id_with_simple_route_info(common::ObArenaAllocator &allocator,
const obutils::ObSqlParseResult &parse_result,
ObClientSessionInfo &client_info,
ObServerRoute &route,
ObProxyPartInfo &part_info,
int64_t &part_id);
int do_resolve_with_part_key(const obutils::ObSqlParseResult &parse_result,
......
......@@ -1338,10 +1338,6 @@ int ObProxyShardUtils::handle_select_request(ObMysqlClientSession &client_sessio
ObString sql = client_request.get_parse_sql();
if (OB_FAIL(sql_parser.parse_sql_by_obparser(sql, NORMAL_PARSE_MODE, parse_result, true))) {
LOG_WARN("parse_sql_by_obparser failed", K(ret), K(sql));
} else if (OB_FAIL(check_topology(parse_result, db_info))) {
if (OB_ERR_UNSUPPORT_DIFF_TOPOLOGY != ret) {
LOG_WARN("fail to check topology", K(ret));
}
} else if (FALSE_IT(is_scan_all = need_scan_all(parse_result))) {
// impossible
} else if (is_scan_all) {
......@@ -1466,7 +1462,7 @@ bool ObProxyShardUtils::need_scan_all(ObSqlParseResult &parse_result)
{
if (parse_result.is_select_stmt() && !parse_result.has_for_update()
&& (parse_result.get_dbp_route_info().scan_all_
|| (!parse_result.is_use_dbp_hint() && get_global_proxy_config().auto_scan_all))) {
|| (!parse_result.has_shard_comment() && get_global_proxy_config().auto_scan_all))) {
return true;
}
......@@ -2006,6 +2002,13 @@ int ObProxyShardUtils::handle_scan_all_real_info(ObDbConfigLogicDb &logic_db_inf
if (OB_ISNULL(select_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("select stmt is null, unexpected", K(ret));
} else if (select_stmt->has_unsupport_expr_type()) {
ret = OB_ERROR_UNSUPPORT_EXPR_TYPE;
LOG_WARN("unsupport sql", K(ret));
} else if (OB_FAIL(check_topology(parse_result, logic_db_info))) {
if (OB_ERR_UNSUPPORT_DIFF_TOPOLOGY != ret) {
LOG_WARN("fail to check topology", K(ret));
}
} else if (OB_FAIL(get_global_optimizer_processor().alloc_allocator(allocator))) {
LOG_WARN("alloc allocator failed", K(ret));
} else if (OB_FAIL(handle_sharding_select_real_info(logic_db_info, client_session, trans_state,
......@@ -2109,8 +2112,6 @@ int ObProxyShardUtils::handle_select_real_info(ObDbConfigLogicDb &logic_db_info,
ret = OB_EXPR_CALC_ERROR;
LOG_WARN("shard connector info or prev shard connector info is null", KP(shard_conn),
KP(prev_shard_conn), K(ret));
} else if (OB_FAIL(add_table_name_to_map(allocator, table_name_map, table_name, real_table_name))) {
LOG_WARN("fail to add table name to map", K(table_name), K(real_table_name), K(ret));
}
for (; OB_SUCC(ret) && iter != end; iter++) {
......@@ -2124,10 +2125,6 @@ int ObProxyShardUtils::handle_select_real_info(ObDbConfigLogicDb &logic_db_info,
LOG_WARN("fail to cast to table expr", K(expr), K(ret));
} else {
ObString &sql_table_name = table_expr->get_table_name();
if (sql_table_name == table_name) {
continue;
}
if (OB_FAIL(logic_db_info.get_real_table_name(sql_table_name, sql_result,
real_table_name, OB_MAX_TABLE_NAME_LENGTH,
tb_index, hint_table, testload_type))) {
......
......@@ -162,9 +162,10 @@ int ObPartDescHash::calc_value_for_oracle(ObObj &src_obj,
uint64_t hash_val = 0;
ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, cs_type_);
ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
const ObObj *res_obj = &src_obj;
ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
// use src_obj as buf_obj
COMMON_LOG(DEBUG, "begin to cast value for hash oracle", K(src_obj), K(cs_type_));
if (OB_FAIL(ObObjCasterV2::to_type(obj_type_, cs_type_, cast_ctx, src_obj, src_obj))) {
......
......@@ -53,9 +53,9 @@ int ObPartDescKey::get_part(ObNewRange &range,
COMMON_LOG(WARN, "fail to build dtc params with ctx session", K(ret), K(obj_type_));
} else {
ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, cs_type_);
ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
ObObj &src_obj = const_cast<ObObj &>(range.get_start_key().get_obj_ptr()[0]);
const ObObj *res_obj = &src_obj;
ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
// use src_obj as buf_obj
if (OB_FAIL(ObObjCasterV2::to_type(obj_type_, cs_type_, cast_ctx, src_obj, src_obj))) {
......
......@@ -125,9 +125,9 @@ inline int ObPartDescList::cast_obj(ObObj &src_obj,
COMMON_LOG(WARN, "fail to build dtc params with ctx session", K(ret), K(obj_type));
} else {
ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, target_obj.get_collation_type());
ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
const ObObj *res_obj = &src_obj;
ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
// use src_obj as buf_obj
if (OB_FAIL(ObObjCasterV2::to_type(obj_type, target_obj.get_collation_type(), cast_ctx, src_obj, src_obj))) {
COMMON_LOG(WARN, "failed to cast obj", K(ret), K(src_obj), K(target_obj));
......
......@@ -204,8 +204,8 @@ inline int ObPartDescRange::cast_obj(ObObj &src_obj,
COMMON_LOG(WARN, "fail to build dtc params with ctx session", K(ret), K(obj_type));
} else {
ObCastCtx cast_ctx(&allocator, &dtc_params, CM_NULL_ON_WARN, target_obj.get_collation_type());
ObAccuracy accuracy(accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
const ObObj *res_obj = &src_obj;
ObAccuracy accuracy(accuracy_.valid_, accuracy_.length_, accuracy_.precision_, accuracy_.scale_);
// use src_obj as buf_obj
if (OB_FAIL(ObObjCasterV2::to_type(obj_type, target_obj.get_collation_type(), cast_ctx, src_obj, src_obj))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册