Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
oceanbase
oceanbase
提交
d84bce42
O
oceanbase
项目概览
oceanbase
/
oceanbase
9 个月 前同步成功
通知
261
Star
6084
Fork
1301
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oceanbase
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
d84bce42
编写于
4月 19, 2024
作者:
K
KyrielightWei
提交者:
ob-robot
4月 19, 2024
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Prohibit dup table local read on a election expired leader
上级
9289ce3b
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
496 addition
and
18 deletion
+496
-18
mittest/multi_replica/CMakeLists.txt
mittest/multi_replica/CMakeLists.txt
+1
-0
mittest/multi_replica/env/ob_multi_replica_test_base.cpp
mittest/multi_replica/env/ob_multi_replica_test_base.cpp
+14
-7
mittest/multi_replica/env/ob_multi_replica_test_base.h
mittest/multi_replica/env/ob_multi_replica_test_base.h
+1
-0
mittest/multi_replica/env/ob_multi_replica_util.h
mittest/multi_replica/env/ob_multi_replica_util.h
+9
-5
mittest/multi_replica/env/ob_simple_replica.cpp
mittest/multi_replica/env/ob_simple_replica.cpp
+1
-1
mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp
.../multi_replica/test_max_commit_ts_read_from_dup_table.cpp
+432
-0
src/observer/omt/ob_worker_processor.cpp
src/observer/omt/ob_worker_processor.cpp
+5
-1
src/storage/tx/ob_dup_table_tablets.cpp
src/storage/tx/ob_dup_table_tablets.cpp
+3
-1
src/storage/tx/ob_dup_table_util.cpp
src/storage/tx/ob_dup_table_util.cpp
+20
-3
src/storage/tx/ob_dup_table_util.h
src/storage/tx/ob_dup_table_util.h
+2
-0
src/storage/tx/ob_trans_service_v4.cpp
src/storage/tx/ob_trans_service_v4.cpp
+8
-0
未找到文件。
mittest/multi_replica/CMakeLists.txt
浏览文件 @
d84bce42
...
...
@@ -26,4 +26,5 @@ ob_unittest_multi_replica(test_ob_dup_table_restart)
ob_unittest_multi_replica
(
test_ob_dup_table_leader_switch
)
ob_unittest_multi_replica
(
test_ob_dup_table_tablet_gc
)
ob_unittest_multi_replica
(
test_ob_standby_read_basic
)
ob_unittest_multi_replica
(
test_max_commit_ts_read_from_dup_table
)
ob_unittest_multi_replica
(
test_mds_replay_from_ctx_table
)
mittest/multi_replica/env/ob_multi_replica_test_base.cpp
浏览文件 @
d84bce42
...
...
@@ -582,7 +582,7 @@ int ObMultiReplicaTestBase::init_test_replica_(const int zone_id)
int
ObMultiReplicaTestBase
::
read_cur_json_document_
(
rapidjson
::
Document
&
json_doc
)
{
int
ret
=
OB_SUCCESS
;
FILE
*
fp
=
fopen
(
event_file_path_
.
c_str
(),
"r"
);
FILE
*
fp
=
fopen
(
event_file_path_
.
c_str
(),
"r
b
"
);
if
(
fp
==
NULL
)
{
if
(
json_doc
.
IsObject
())
{
fprintf
(
stdout
,
"Fail to open file! file_path = %s
\n
"
,
event_file_path_
.
c_str
());
...
...
@@ -591,11 +591,18 @@ int ObMultiReplicaTestBase::read_cur_json_document_(rapidjson::Document &json_do
return
ret
;
}
char
read_buffer
[
2
*
102
4
*
1024
];
char
read_buffer
[
4
*
1024
];
rapidjson
::
FileReadStream
rs
(
fp
,
read_buffer
,
sizeof
(
read_buffer
));
json_doc
.
ParseStream
(
rs
);
if
(
json_doc
.
HasParseError
())
{
ret
=
OB_ERR_UNEXPECTED
;
SERVER_LOG
(
WARN
,
"[ObMultiReplicaTestBase] Parse EVENT JSON ERROR"
,
K
(
ret
),
K
(
json_doc
.
GetParseError
()));
fprintf
(
stdout
,
"Parse Event Json Error
\n
"
);
}
fclose
(
fp
);
return
OB_SUCCESS
;
...
...
@@ -643,7 +650,7 @@ int ObMultiReplicaTestBase::wait_event_finish(const std::string &event_name,
ret
=
OB_TIMEOUT
;
break
;
}
else
{
ob_
usleep
(
retry_interval_ms
*
1000
);
usleep
(
retry_interval_ms
*
1000
);
}
}
else
{
break
;
...
...
@@ -670,7 +677,7 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name,
if
(
OB_SUCC
(
ret
))
{
FILE
*
fp
=
fopen
(
event_file_path_
.
c_str
(),
"w"
);
char
write_buffer
[
2
*
102
4
*
1024
];
char
write_buffer
[
4
*
1024
];
rapidjson
::
FileWriteStream
file_w_stream
(
fp
,
write_buffer
,
sizeof
(
write_buffer
));
rapidjson
::
PrettyWriter
<
rapidjson
::
FileWriteStream
>
prettywriter
(
file_w_stream
);
json_doc
.
AddMember
(
rapidjson
::
StringRef
(
event_name
.
c_str
(),
event_name
.
size
()),
...
...
@@ -680,8 +687,8 @@ int ObMultiReplicaTestBase::finish_event(const std::string &event_name,
fclose
(
fp
);
}
fprintf
(
stdout
,
"[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s
\n
"
,
event_name
.
c_str
(),
event_content
.
c_str
());
fprintf
(
stdout
,
"[WAIT EVENT] write target event : EVENT_KEY = %s; EVENT_VAL = %s
\n
"
,
event_name
.
c_str
(),
event_content
.
c_str
());
SERVER_LOG
(
INFO
,
"[ObMultiReplicaTestBase] [WAIT EVENT] write target event"
,
K
(
event_name
.
c_str
()),
K
(
event_content
.
c_str
()));
return
ret
;
...
...
@@ -898,7 +905,7 @@ int ::oceanbase::omt::ObWorkerProcessor::process_err_test()
if
(
ATOMIC_LOAD
(
&::
oceanbase
::
unittest
::
ObMultiReplicaTestBase
::
block_msg_
))
{
ret
=
OB_EAGAIN
;
SERVER_LOG
(
INFO
,
"[
ObMultiReplicaTestBase
] block msg process"
,
K
(
ret
));
SERVER_LOG
(
INFO
,
"[
ERRSIM
] block msg process"
,
K
(
ret
));
}
return
ret
;
...
...
mittest/multi_replica/env/ob_multi_replica_test_base.h
浏览文件 @
d84bce42
...
...
@@ -72,6 +72,7 @@ public:
observer
::
ObSimpleServerReplica
&
get_curr_simple_server
()
{
return
*
replica_
;
}
static
int
read_cur_json_document_
(
rapidjson
::
Document
&
json_doc
);
static
int
wait_event_finish
(
const
std
::
string
&
event_name
,
std
::
string
&
event_content
,
int64_t
wait_timeout_ms
,
...
...
mittest/multi_replica/env/ob_multi_replica_util.h
浏览文件 @
d84bce42
...
...
@@ -234,23 +234,26 @@ namespace unittest
common::ObString trace_id; \
common::ObString query_sql; \
int64_t request_time = 0; \
int64_t snapshot = 0; \
int64_t ret_code = OB_SUCCESS; \
int64_t retry_cnt = 0; \
ASSERT_EQ(true, conn != nullptr); \
std::string sql_str = \
"select TX_ID, TRACE_ID, REQUEST_TIME, RET_CODE, RETRY_CNT, QUERY_SQL from " \
"oceanbase.V$OB_SQL_AUDIT where QUERY_SQL like " \
+ std::string(" \"") + std::string(sql) + std::string("\" order by REQUEST_TIME DESC"); \
std::string sql_str = "select TX_ID, SNAPSHOT_VERSION, TRACE_ID, REQUEST_TIME, RET_CODE, " \
"RETRY_CNT, QUERY_SQL from " \
"oceanbase.V$OB_SQL_AUDIT where QUERY_SQL like " \
+ std::string(" \"") + std::string(sql) \
+ std::string("\" order by REQUEST_TIME DESC"); \
READ_SQL_BY_CONN(conn, process_result, sql_str.c_str()); \
ASSERT_EQ(OB_SUCCESS, process_result->next()); \
ASSERT_EQ(OB_SUCCESS, process_result->get_int("TX_ID", tx_id)); \
ASSERT_EQ(OB_SUCCESS, process_result->get_int("SNAPSHOT_VERSION", snapshot)); \
ASSERT_EQ(OB_SUCCESS, process_result->get_varchar("TRACE_ID", trace_id)); \
ASSERT_EQ(OB_SUCCESS, process_result->get_int("REQUEST_TIME", request_time)); \
ASSERT_EQ(OB_SUCCESS, process_result->get_int("RET_CODE", ret_code)); \
ASSERT_EQ(OB_SUCCESS, process_result->get_int("RETRY_CNT", retry_cnt)); \
ASSERT_EQ(OB_SUCCESS, process_result->get_varchar("QUERY_SQL", query_sql)); \
SERVER_LOG(INFO, "[ObMultiReplicaTestBase] query sql_audit for tx_id", K(trace_id), K(tx_id), \
K(
request_time), K(ret_code), K(retry_cnt), K(query_sql));
\
K(
snapshot), K(request_time), K(ret_code), K(retry_cnt), K(query_sql));
\
}
#define PREPARE_CONN_ENV(conn) \
...
...
@@ -476,6 +479,7 @@ public:
}
};
}
// namespace unittest
}
// namespace oceanbase
...
...
mittest/multi_replica/env/ob_simple_replica.cpp
浏览文件 @
d84bce42
...
...
@@ -149,7 +149,7 @@ int ObSimpleServerReplica::simple_init()
+
",memory_limit="
+
std
::
string
(
memory_limit_
)
+
",cache_wash_threshold=1G,net_thread_count=4,cpu_count=16,schema_history_expire_time="
"1d,workers_per_cpu_quota=10,datafile_disk_percentage=2,__min_full_resource_pool_"
"memory=
1073741824
,system_memory=5G,trace_log_slow_query_watermark=100ms,datafile_"
"memory=
2147483648
,system_memory=5G,trace_log_slow_query_watermark=100ms,datafile_"
"size=10G,stack_size=512K"
;
opts
.
optstr_
=
optstr_
.
c_str
();
// opts.devname_ = "eth0";
...
...
mittest/multi_replica/test_max_commit_ts_read_from_dup_table.cpp
0 → 100644
浏览文件 @
d84bce42
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <thread>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "env/ob_fast_bootstrap.h"
#include "env/ob_multi_replica_util.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx/ob_dup_table_lease.h"
#include "storage/tx/ob_tx_loop_worker.h"
#include "storage/tx/ob_tx_replay_executor.h"
using
namespace
oceanbase
::
transaction
;
using
namespace
oceanbase
::
storage
;
#define CUR_TEST_CASE_NAME ObDupTableMaxCommitTsRead
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
MULTI_REPLICA_TEST_MAIN_FUNCTION
(
test_max_commit_ts_read_from_dup_table
);
namespace
oceanbase
{
namespace
transaction
{
static
bool
STOP_TX_REPLAY
=
false
;
static
bool
BLOCK_DUP_TABLE_LEADER_REVOKE
=
false
;
static
bool
RETURN_NULL_GTS_CACHE
=
false
;
static
sqlclient
::
ObISQLConnection
*
static_conn
=
nullptr
;
static
sqlclient
::
ObMySQLResult
*
static_result
=
nullptr
;
static
int64_t
final_row_count
=
0
;
int
ObTxReplayExecutor
::
errsim_tx_replay_
()
{
int
ret
=
OB_SUCCESS
;
if
(
STOP_TX_REPLAY
)
{
ret
=
OB_EAGAIN
;
}
if
(
OB_FAIL
(
ret
))
{
TRANS_LOG
(
INFO
,
"[ERRSIM] errsim tx replay in test"
,
K
(
ret
));
}
return
ret
;
}
int
ObDupTableLSHandler
::
errsim_leader_revoke_
()
{
int
ret
=
OB_SUCCESS
;
while
(
BLOCK_DUP_TABLE_LEADER_REVOKE
)
{
usleep
(
1000
*
1000
);
TRANS_LOG
(
INFO
,
"[ERRSIM] errsim wait leader revoke"
,
K
(
ret
));
}
return
ret
;
}
}
// namespace transaction
namespace
unittest
{
using
namespace
oceanbase
::
transaction
;
using
namespace
oceanbase
::
storage
;
struct
TableBasicArg
{
uint64_t
tenant_id_
;
int64_t
dup_ls_id_num_
;
int64_t
dup_table_id_
;
ObSEArray
<
int64_t
,
10
>
dup_tablet_id_array_
;
int64_t
normal_ls_id_num_
;
int64_t
normal_table_id_
;
ObSEArray
<
int64_t
,
10
>
normal_tablet_id_array_
;
TO_STRING_KV
(
K
(
tenant_id_
),
K
(
dup_ls_id_num_
),
K
(
dup_table_id_
),
K
(
normal_ls_id_num_
),
K
(
normal_table_id_
),
K
(
dup_tablet_id_array_
),
K
(
normal_tablet_id_array_
));
OB_UNIS_VERSION
(
1
);
};
OB_SERIALIZE_MEMBER
(
TableBasicArg
,
tenant_id_
,
dup_ls_id_num_
,
dup_table_id_
,
dup_tablet_id_array_
,
normal_ls_id_num_
,
normal_table_id_
,
normal_tablet_id_array_
);
static
TableBasicArg
static_basic_arg_
;
const
std
::
string
test_dup_table_name
=
"test_dup_1"
;
const
std
::
string
test_normal_table_name
=
"test_normal_1"
;
const
int64_t
DEFAULT_LOAD_ROW_CNT
=
10
;
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
1
),
create_test_env
)
{
int
ret
=
OB_SUCCESS
;
CREATE_TEST_TENANT
(
test_tenant_id
);
SERVER_LOG
(
INFO
,
"[ObMultiReplicaTestBase] create test tenant success"
,
K
(
test_tenant_id
));
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
sys_conn
,
get_curr_simple_server
().
get_sql_proxy
());
WRITE_SQL_BY_CONN
(
sys_conn
,
"alter system set _private_buffer_size = '1B';"
);
std
::
string
ls_id_str
=
std
::
to_string
(
1
);
std
::
string
target_ip
=
local_ip_
+
":"
+
std
::
to_string
(
rpc_ports_
[
1
]);
std
::
string
switch_leader_sql
=
"alter system switch replica leader ls="
+
ls_id_str
+
" server='"
+
target_ip
+
"' tenant='tt1';"
;
WRITE_SQL_BY_CONN
(
sys_conn
,
switch_leader_sql
.
c_str
());
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
std
::
string
primary_zone_sql
=
"ALTER TENANT "
+
std
::
string
(
DEFAULT_TEST_TENANT_NAME
)
+
" set primary_zone='zone1; zone2; zone3';"
;
WRITE_SQL_BY_CONN
(
test_conn
,
primary_zone_sql
.
c_str
());
unittest
::
TestEnvTool
::
create_table_for_test_env
(
test_conn
,
test_dup_table_name
.
c_str
(),
10
,
true
/*is_dup_table*/
,
static_basic_arg_
.
dup_ls_id_num_
,
static_basic_arg_
.
dup_table_id_
,
static_basic_arg_
.
dup_tablet_id_array_
);
unittest
::
TestEnvTool
::
create_table_for_test_env
(
test_conn
,
test_normal_table_name
.
c_str
(),
10
,
false
/*is_dup_table*/
,
static_basic_arg_
.
normal_ls_id_num_
,
static_basic_arg_
.
normal_table_id_
,
static_basic_arg_
.
normal_tablet_id_array_
);
GET_LS
(
test_tenant_id
,
static_basic_arg_
.
dup_ls_id_num_
,
ls_handle
);
SERVER_LOG
(
INFO
,
"[ObMultiReplicaTestBase] -------- before wait dup tablet discover"
,
K
(
ret
),
K
(
static_basic_arg_
));
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
get_dup_tablet_count
()
==
static_basic_arg_
.
dup_tablet_id_array_
.
count
(),
20
*
1000
*
1000
,
100
*
1000
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_set_count
()
>=
1
,
20
*
1000
*
1000
,
100
*
1000
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_need_confirm_tablet_set_count
()
==
0
,
20
*
1000
*
1000
,
100
*
1000
);
SERVER_LOG
(
INFO
,
"[ObMultiReplicaTestBase] -------- after wait dup tablet discover"
,
K
(
ret
),
K
(
static_basic_arg_
),
K
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
get_dup_tablet_count
()));
ASSERT_EQ
(
OB_SUCCESS
,
ret
/*has_dup_tablet*/
);
WRITE_SQL_BY_CONN
(
test_conn
,
"set autocommit = false;"
);
WRITE_SQL_BY_CONN
(
test_conn
,
"begin;"
);
for
(
int
i
=
1
;
i
<=
DEFAULT_LOAD_ROW_CNT
;
i
++
)
{
std
::
string
insert_dup_sql_str
=
"INSERT INTO "
+
test_dup_table_name
+
" VALUES("
+
std
::
to_string
(
i
)
+
", 0 , 0)"
;
std
::
string
insert_normal_sql_str
=
"INSERT INTO "
+
test_normal_table_name
+
" VALUES("
+
std
::
to_string
(
i
)
+
", 0 , 0)"
;
WRITE_SQL_BY_CONN
(
test_conn
,
insert_dup_sql_str
.
c_str
());
WRITE_SQL_BY_CONN
(
test_conn
,
insert_normal_sql_str
.
c_str
());
}
WRITE_SQL_BY_CONN
(
test_conn
,
"commit;"
);
static_basic_arg_
.
tenant_id_
=
test_tenant_id
;
std
::
string
tmp_str
;
ASSERT_EQ
(
OB_SUCCESS
,
EventArgSerTool
<
TableBasicArg
>::
serialize_arg
(
static_basic_arg_
,
tmp_str
));
finish_event
(
"CREATE_TEST_TABLE"
,
tmp_str
);
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
2
),
normal_follower_max_commit_ts_read
)
{
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"CREATE_TEST_TABLE"
,
tmp_event_val
,
30
*
60
*
1000
));
ASSERT_EQ
(
OB_SUCCESS
,
EventArgSerTool
<
TableBasicArg
>::
deserialize_arg
(
static_basic_arg_
,
tmp_event_val
));
ASSERT_EQ
(
OB_SUCCESS
,
get_curr_simple_server
().
init_sql_proxy2
());
std
::
string
SELECT_SQL_ON_DUP_TABLE
=
"select count(*) from "
+
test_dup_table_name
;
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
SELECT_SQL_ON_DUP_TABLE
.
c_str
());
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
next
());
int64_t
dup_table_row_count
=
0
;
const
int64_t
col_index
=
0
;
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
get_int
(
col_index
,
dup_table_row_count
));
ASSERT_EQ
(
dup_table_row_count
,
DEFAULT_LOAD_ROW_CNT
);
finish_event
(
"NORMAL_FOLLOWER_LOCAL_READ"
,
""
);
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
1
),
switch_follwer_forcedly_to_zone2_and_stop_replay
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"NORMAL_FOLLOWER_LOCAL_READ"
,
tmp_event_val
,
30
*
60
*
1000
));
// refresh location cache
{
std
::
string
SELECT_SQL_ON_DUP_TABLE
=
"select count(*) from "
+
test_dup_table_name
;
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
SELECT_SQL_ON_DUP_TABLE
.
c_str
());
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
next
());
}
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"PREPARE_TO_UPDATE_ON_NEW_LEADER"
,
tmp_event_val
,
30
*
60
*
1000
));
STOP_TX_REPLAY
=
true
;
BLOCK_DUP_TABLE_LEADER_REVOKE
=
true
;
block_msg_
=
true
;
// usleep(6*1000*1000);
finish_event
(
"STOP_ZONE1"
,
""
);
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
2
),
update_on_new_leader
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
finish_event
(
"PREPARE_TO_UPDATE_ON_NEW_LEADER"
,
""
);
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"STOP_ZONE1"
,
tmp_event_val
,
30
*
60
*
1000
));
{
std
::
string
SELECT_SQL_ON_DUP_TABLE
=
"select count(*) from "
+
test_dup_table_name
;
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
SELECT_SQL_ON_DUP_TABLE
.
c_str
());
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
next
());
}
{
GET_LS
(
static_basic_arg_
.
tenant_id_
,
static_basic_arg_
.
dup_ls_id_num_
,
ls_handle
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
OB_SUCCESS
,
ret
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
ls_tx_svr_
.
mgr_
->
in_leader_serving_state
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
OB_SUCCESS
,
ret
);
}
sleep
(
2
);
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
{
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
WRITE_SQL_BY_CONN
(
test_conn
,
"set autocommit = false;"
);
WRITE_SQL_BY_CONN
(
test_conn
,
"set ob_trx_timeout = 1000000000;"
)
WRITE_SQL_BY_CONN
(
test_conn
,
"begin;"
);
int64_t
tmp_tx_id
=
0
;
for
(
int
i
=
DEFAULT_LOAD_ROW_CNT
+
1
;
i
<=
DEFAULT_LOAD_ROW_CNT
+
3
;
i
++
)
{
std
::
string
insert_dup_sql_str
=
"INSERT INTO "
+
test_dup_table_name
+
" VALUES("
+
std
::
to_string
(
i
)
+
", 0 , 0)"
;
// std::string insert_normal_sql_str =
// "INSERT INTO " + test_normal_table_name + " VALUES(" + std::to_string(i) + ", 0 , 0)";
WRITE_SQL_BY_CONN
(
test_conn
,
insert_dup_sql_str
.
c_str
());
// GET_TX_ID_FROM_SQL_AUDIT(test_conn, insert_dup_sql_str, tmp_tx_id);
// WRITE_SQL_BY_CONN(test_conn, insert_normal_sql_str.c_str());
}
// GET_RUNNGING_TRX_ID(test_conn, tmp_tx_id)
WRITE_SQL_BY_CONN
(
test_conn
,
"commit;"
);
}
std
::
string
SELECT_SQL_ON_DUP_TABLE
=
"select count(*) from "
+
test_dup_table_name
;
{
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
SELECT_SQL_ON_DUP_TABLE
.
c_str
());
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
next
());
int64_t
dup_table_row_count
=
0
;
const
int64_t
col_index
=
0
;
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
get_int
(
col_index
,
dup_table_row_count
));
ASSERT_EQ
(
dup_table_row_count
,
DEFAULT_LOAD_ROW_CNT
+
3
);
}
finish_event
(
"UPDATE_ON_NEW_LEADER"
,
""
);
// ob_abort();
}
void
read_in_new_thread
()
{
int
ret
=
OB_SUCCESS
;
std
::
string
SELECT_SQL_ON_DUP_TABLE
=
"select count(*) from "
+
test_dup_table_name
+
" where id_x="
+
std
::
to_string
(
DEFAULT_LOAD_ROW_CNT
+
2
);
int64_t
tmp_tx_id
=
0
;
{
share
::
ObTenantSwitchGuard
tenant_guard
;
ObTsSourceInfoGuard
info_guard
;
ASSERT_EQ
(
OB_SUCCESS
,
tenant_guard
.
switch_to
(
static_basic_arg_
.
tenant_id_
));
MTL
(
ObTxLoopWorker
*
)
->
stop
();
usleep
(
1
*
1000
*
1000
);
// share::SCN max_commit_ts_5_min =
// share::SCN::minus(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false),
// 3 * 1000 * 1000 * 1000L);
// TRANS_LOG(INFO, "[ObMultiReplicaTestBase] print max commit ts", K(ret),
// K(MTL(ObTransService *)->get_tx_version_mgr().get_max_commit_ts(false)),
// K(max_commit_ts_5_min));
// MTL(ObTransService *)->get_tx_version_mgr().max_commit_ts_ = max_commit_ts_5_min;
// ((ObTsMgr*)(MTL(ObTransService
// *)->ts_mgr_))->get_ts_source_info_opt_(static_basic_arg_.tenant_id_, info_guard, true, true);
// int64_t tmp_gts = 0 ;
// ASSERT_EQ(OB_SUCCESS, max_commit_ts_5_min.convert_for_gts(tmp_gts));
// info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.gts_ =
// max_commit_ts_5_min.get_val_for_gts();
// info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.srr_ =
// MonotonicTs::current_time();
// info_guard.get_ts_source_info()->get_gts_source()->gts_local_cache_.srr_.mts_ +=
// 1*1000*1000;
RETURN_NULL_GTS_CACHE
=
true
;
const
int64_t
col_index
=
0
;
READ_SQL_BY_CONN
(
static_conn
,
table_info_result
,
SELECT_SQL_ON_DUP_TABLE
.
c_str
());
if
(
OB_FAIL
(
table_info_result
->
next
()))
{
TRANS_LOG
(
WARN
,
"[ObMultiReplicaTestBase] get next in new thread failed"
,
K
(
ret
),
K
(
final_row_count
));
final_row_count
=
-
1
;
}
else
if
(
OB_FAIL
(
table_info_result
->
get_int
(
col_index
,
final_row_count
)))
{
TRANS_LOG
(
WARN
,
"[ObMultiReplicaTestBase] get count(*) in new thread failed"
,
K
(
ret
),
K
(
final_row_count
));
final_row_count
=
-
1
;
}
}
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] after read in new thread"
,
K
(
ret
),
K
(
final_row_count
),
K
(
SELECT_SQL_ON_DUP_TABLE
.
c_str
()));
GET_TX_ID_FROM_SQL_AUDIT
(
static_conn
,
SELECT_SQL_ON_DUP_TABLE
,
tmp_tx_id
);
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
1
),
read_from_old_leader_zone1
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"UPDATE_ON_NEW_LEADER"
,
tmp_event_val
,
30
*
60
*
1000
,
5
*
1000
));
block_msg_
=
false
;
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
// refresh location cache
{
std
::
string
SELECT_SQL_ON_DUP_TABLE
=
"select count(*) from "
+
test_dup_table_name
;
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
SELECT_SQL_ON_DUP_TABLE
.
c_str
());
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
next
());
}
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] 3 - Stop blocking msg during the read operation"
,
K
(
ret
),
K
(
final_row_count
));
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
static_conn
=
test_conn
;
WRITE_SQL_BY_CONN
(
test_conn
,
"set ob_query_timeout=20000000;"
);
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] 2 - Stop blocking msg during the read operation"
,
K
(
ret
),
K
(
final_row_count
));
std
::
thread
read_thread
(
read_in_new_thread
);
std
::
thread
::
id
tid
=
read_thread
.
get_id
();
uint64_t
read_thread_id
=
*
(
uint64_t
*
)(
&
(
tid
));
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] 1 - Stop blocking msg during the read operation"
,
K
(
ret
),
K
(
read_thread_id
),
K
(
final_row_count
));
share
::
ObTenantSwitchGuard
tenant_guard
;
ObTsSourceInfoGuard
info_guard
;
ASSERT_EQ
(
OB_SUCCESS
,
tenant_guard
.
switch_to
(
static_basic_arg_
.
tenant_id_
));
int64_t
start_ts
=
ObTimeUtility
::
fast_current_time
();
while
(
ObTimeUtility
::
fast_current_time
()
-
start_ts
>=
3
*
1000
*
1000
)
{
share
::
SCN
max_commit_ts_5_min
=
share
::
SCN
::
minus
(
MTL
(
ObTransService
*
)
->
get_tx_version_mgr
().
get_max_commit_ts
(
false
),
3
*
1000
*
1000
*
1000L
);
MTL
(
ObTransService
*
)
->
get_tx_version_mgr
().
max_commit_ts_
=
max_commit_ts_5_min
;
info_guard
.
get_ts_source_info
()
->
get_gts_source
()
->
gts_local_cache_
.
gts_
=
max_commit_ts_5_min
.
get_val_for_gts
();
usleep
(
3
*
1000
);
}
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] Stop blocking msg during the read operation"
,
K
(
ret
),
K
(
read_thread_id
),
K
(
final_row_count
));
read_thread
.
join
();
ASSERT_EQ
(
final_row_count
,
1
);
// usleep(1000 * 1000 * 1000);
}
}
// namespace unittest
}
// namespace oceanbase
src/observer/omt/ob_worker_processor.cpp
浏览文件 @
d84bce42
...
...
@@ -59,8 +59,12 @@ OB_NOINLINE int ObWorkerProcessor::process_err_test()
#ifdef ERRSIM
ret
=
EN_WORKER_PROCESS_REQUEST
;
LOG_DEBUG
(
"process err_test"
,
K
(
ret
));
#endif
if
(
OB_FAIL
(
ret
))
{
LOG_WARN
(
"process err_test"
,
K
(
ret
));
}
return
ret
;
}
...
...
src/storage/tx/ob_dup_table_tablets.cpp
浏览文件 @
d84bce42
...
...
@@ -1114,8 +1114,10 @@ OB_NOINLINE int ObLSDupTabletsMgr::process_prepare_ser_err_test_()
ret
=
EN_DUP_TABLE_LOG_PREPARE_SERIALIZE
;
#endif
if
(
OB_FAIL
(
ret
))
{
DUP_TABLE_LOG
(
INFO
,
"errsim prepare serialize err test"
,
K
(
ret
),
K
(
ls_id_
));
}
return
ret
;
}
...
...
src/storage/tx/ob_dup_table_util.cpp
浏览文件 @
d84bce42
...
...
@@ -1048,8 +1048,8 @@ bool ObDupTableLSHandler::is_dup_table_lease_valid()
if
(
!
is_inited
()
||
OB_ISNULL
(
lease_mgr_ptr_
))
{
is_dup_lease_ls
=
false
;
}
else
if
(
ls_state_helper_
.
is_leader
())
{
is_dup_lease_ls
=
tru
e
;
DUP_TABLE_LOG
(
INFO
,
"
the lease is always valid for a
dup ls leader"
,
K
(
is_dup_lease_ls
),
is_dup_lease_ls
=
fals
e
;
DUP_TABLE_LOG
(
INFO
,
"
None valid lease on
dup ls leader"
,
K
(
is_dup_lease_ls
),
KPC
(
this
));
}
else
{
is_dup_lease_ls
=
lease_mgr_ptr_
->
is_follower_lease_valid
();
...
...
@@ -1373,6 +1373,16 @@ int ObDupTableLSHandler::switch_to_leader()
return
ret
;
}
OB_NOINLINE
int
ObDupTableLSHandler
::
errsim_leader_revoke_
()
{
int
ret
=
OB_SUCCESS
;
if
(
OB_FAIL
(
ret
))
{
DUP_TABLE_LOG
(
WARN
,
"errsim leader revoke"
,
K
(
ret
),
K
(
ls_id_
));
}
return
ret
;
}
int
ObDupTableLSHandler
::
leader_revoke_
(
const
bool
is_forcedly
)
{
int
ret
=
OB_SUCCESS
;
...
...
@@ -1380,12 +1390,19 @@ int ObDupTableLSHandler::leader_revoke_(const bool is_forcedly)
bool
is_logging
=
false
;
if
(
OB_SUCC
(
ret
))
{
if
(
OB_FAIL
(
errsim_leader_revoke_
()))
{
DUP_TABLE_LOG
(
WARN
,
"errsim for dup table leader revoke"
,
K
(
ret
),
K
(
ls_id_
),
K
(
is_forcedly
));
}
}
if
(
is_inited_
)
{
if
(
OB_NOT_NULL
(
log_operator_
))
{
log_operator_
->
rlock_for_log
();
is_logging
=
log_operator_
->
check_is_busy_without_lock
();
}
if
(
OB_NOT_NULL
(
tablets_mgr_ptr_
)
&&
OB_TMP_FAIL
(
tablets_mgr_ptr_
->
leader_revoke
(
is_logging
)))
{
if
(
OB_SUCC
(
ret
)
&&
OB_NOT_NULL
(
tablets_mgr_ptr_
))
if
(
OB_TMP_FAIL
(
tablets_mgr_ptr_
->
leader_revoke
(
is_logging
)))
{
DUP_TABLE_LOG
(
WARN
,
"tablets_mgr switch to follower failed"
,
K
(
ret
),
K
(
tmp_ret
),
KPC
(
this
));
if
(
!
is_forcedly
)
{
ret
=
tmp_ret
;
...
...
src/storage/tx/ob_dup_table_util.h
浏览文件 @
d84bce42
...
...
@@ -237,6 +237,8 @@ private:
int
recover_ckpt_into_memory_
();
int
errsim_leader_revoke_
();
private:
share
::
ObLSID
ls_id_
;
...
...
src/storage/tx/ob_trans_service_v4.cpp
浏览文件 @
d84bce42
...
...
@@ -1051,6 +1051,14 @@ int ObTransService::get_read_store_ctx(const ObTxReadSnapshot &snapshot,
}
}
if
(
OB_SUCC
(
ret
))
{
if
(
snapshot
.
snapshot_ls_role_
==
common
::
ObRole
::
FOLLOWER
&&
snapshot
.
snapshot_acquire_addr_
!=
GCTX
.
self_addr
())
{
TRANS_LOG
(
INFO
,
"get read store_ctx by a follower's max_commit_ts"
,
K
(
ret
),
K
(
snapshot
),
K
(
ls_id
),
K
(
store_ctx
));
}
}
// setup tx_table_guard
ObTxTableGuard
tx_table_guard
;
if
(
OB_SUCC
(
ret
)
&&
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录