Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
oceanbase
oceanbase
提交
78578299
O
oceanbase
项目概览
oceanbase
/
oceanbase
10 个月 前同步成功
通知
261
Star
6084
Fork
1301
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oceanbase
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
78578299
编写于
4月 16, 2024
作者:
K
KyrielightWei
提交者:
ob-robot
4月 16, 2024
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refine dup table new gc code in leader_switch
上级
6cbab003
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
1002 addition
and
414 deletion
+1002
-414
mittest/multi_replica/CMakeLists.txt
mittest/multi_replica/CMakeLists.txt
+1
-0
mittest/multi_replica/env/ob_multi_replica_util.h
mittest/multi_replica/env/ob_multi_replica_util.h
+32
-30
mittest/multi_replica/test_ob_dup_table_new_gc.cpp
mittest/multi_replica/test_ob_dup_table_new_gc.cpp
+450
-0
src/storage/tx/ob_dup_table_base.cpp
src/storage/tx/ob_dup_table_base.cpp
+1
-3
src/storage/tx/ob_dup_table_base.h
src/storage/tx/ob_dup_table_base.h
+6
-1
src/storage/tx/ob_dup_table_tablets.cpp
src/storage/tx/ob_dup_table_tablets.cpp
+292
-230
src/storage/tx/ob_dup_table_tablets.h
src/storage/tx/ob_dup_table_tablets.h
+17
-12
src/storage/tx/ob_dup_table_ts_sync.cpp
src/storage/tx/ob_dup_table_ts_sync.cpp
+14
-4
src/storage/tx/ob_dup_table_util.cpp
src/storage/tx/ob_dup_table_util.cpp
+157
-129
src/storage/tx/ob_dup_table_util.h
src/storage/tx/ob_dup_table_util.h
+31
-5
src/storage/tx/ob_trans_service.h
src/storage/tx/ob_trans_service.h
+1
-0
未找到文件。
mittest/multi_replica/CMakeLists.txt
浏览文件 @
78578299
...
...
@@ -34,5 +34,6 @@ ob_unittest_multi_replica(test_ob_dup_table_basic)
ob_unittest_multi_replica
(
test_ob_dup_table_restart
)
ob_unittest_multi_replica
(
test_ob_dup_table_leader_switch
)
ob_unittest_multi_replica
(
test_ob_dup_table_tablet_gc
)
ob_unittest_multi_replica
(
test_ob_dup_table_new_gc
)
ob_unittest_multi_replica
(
test_mds_replay_from_ctx_table
)
ob_unittest_multi_replica_longer_timeout
(
test_multi_transfer_tx
)
mittest/multi_replica/env/ob_multi_replica_util.h
浏览文件 @
78578299
...
...
@@ -259,38 +259,40 @@ namespace unittest
WRITE_SQL_BY_CONN(connection, "set ob_query_timeout = 3000000000"); \
WRITE_SQL_BY_CONN(connection, "set autocommit=0");
#define RETRY_UNTIL_TIMEOUT(condition, timeout_us, retry_interval_us) \
{ \
int64_t start_time = ObTimeUtility::fast_current_time(); \
while (OB_SUCC(ret) && !(condition)) { \
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
ret = OB_TIMEOUT; \
break; \
} \
SERVER_LOG(INFO, "retry one time until timeout", K(condition), K(start_time), \
K(timeout_us)); \
ob_usleep(retry_interval_us); \
} \
SERVER_LOG(INFO, "retry to wait one condition successfully", K(condition), K(start_time), \
K(timeout_us), K(ObTimeUtility::fast_current_time() - start_time)); \
#define RETRY_UNTIL_TIMEOUT(condition, timeout_us, retry_interval_us) \
{ \
ret = OB_SUCCESS; \
int64_t start_time = ObTimeUtility::fast_current_time(); \
while (OB_SUCC(ret) && !(condition)) { \
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
ret = OB_TIMEOUT; \
break; \
} \
SERVER_LOG(INFO, "retry one time until timeout", K(condition), K(start_time), \
K(timeout_us)); \
ob_usleep(retry_interval_us); \
} \
SERVER_LOG(INFO, "retry to wait one condition successfully", K(ret), K(condition), \
K(start_time), K(timeout_us), K(ObTimeUtility::fast_current_time() - start_time)); \
}
#define RETRY_OP_UNTIL_TIMEOUT(op, condition, timeout_us, retry_interval_us) \
{ \
int64_t start_time = ObTimeUtility::fast_current_time(); \
op; \
while (OB_SUCC(ret) && !(condition)) { \
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
ret = OB_TIMEOUT; \
break; \
} \
SERVER_LOG(INFO, "retry opertion until timeout", K(condition), K(start_time), \
K(timeout_us)); \
ob_usleep(retry_interval_us); \
op; \
} \
SERVER_LOG(INFO, "retry to opertion successfully", K(condition), K(start_time), K(timeout_us), \
K(ObTimeUtility::fast_current_time() - start_time)); \
#define RETRY_OP_UNTIL_TIMEOUT(op, condition, timeout_us, retry_interval_us) \
{ \
ret = OB_SUCCESS; \
int64_t start_time = ObTimeUtility::fast_current_time(); \
op; \
while (OB_SUCC(ret) && !(condition)) { \
if (ObTimeUtility::fast_current_time() - start_time > timeout_us) { \
ret = OB_TIMEOUT; \
break; \
} \
SERVER_LOG(INFO, "retry opertion until timeout", K(condition), K(start_time), \
K(timeout_us)); \
ob_usleep(retry_interval_us); \
op; \
} \
SERVER_LOG(INFO, "retry to opertion successfully", K(ret), K(condition), K(start_time), \
K(timeout_us), K(ObTimeUtility::fast_current_time() - start_time)); \
}
#define WAIT_START_SERVICE_SUCCC(timeout_us, retry_interval_us) \
...
...
mittest/multi_replica/test_ob_dup_table_new_gc.cpp
0 → 100644
浏览文件 @
78578299
/*
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "env/ob_fast_bootstrap.h"
#include "env/ob_multi_replica_util.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "storage/tx/ob_dup_table_lease.h"
#include "storage/tx/ob_dup_table_util.h"
using
namespace
oceanbase
::
transaction
;
using
namespace
oceanbase
::
storage
;
#define CUR_TEST_CASE_NAME ObDupTableNewGCTest
DEFINE_MULTI_ZONE_TEST_CASE_CLASS
MULTI_REPLICA_TEST_MAIN_FUNCTION
(
test_dup_table_new_gc_
);
static
const
int64_t
MAX_DUP_TABLE_COUNT
=
10
;
static
bool
STOP_PREPARE_SERIALIZE_DUP_TABLET
=
false
;
namespace
oceanbase
{
namespace
transaction
{
OB_NOINLINE
int
ObLSDupTabletsMgr
::
process_prepare_ser_err_test_
()
{
int
ret
=
OB_SUCCESS
;
if
(
STOP_PREPARE_SERIALIZE_DUP_TABLET
)
{
ret
=
OB_EAGAIN
;
}
if
(
OB_FAIL
(
ret
))
{
DUP_TABLE_LOG
(
INFO
,
"errsim prepare serialize err test in mittest"
,
K
(
ret
),
K
(
ls_id_
));
}
return
ret
;
}
}
// namespace transaction
namespace
unittest
{
struct
DupTableBasicArg
{
uint64_t
tenant_id_
;
int64_t
ls_id_num_
;
ObSEArray
<
int64_t
,
10
>
table_id_
;
int64_t
tablet_count_
;
ObSEArray
<
int64_t
,
100
>
tablet_id_array_
;
DupTableBasicArg
()
{
reset
();
}
void
reset
()
{
tenant_id_
=
0
;
ls_id_num_
=
0
;
table_id_
.
reset
();
tablet_count_
=
0
;
tablet_id_array_
.
reset
();
}
TO_STRING_KV
(
K
(
tenant_id_
),
K
(
ls_id_num_
),
K
(
table_id_
),
K
(
tablet_count_
),
K
(
tablet_id_array_
));
OB_UNIS_VERSION
(
1
);
};
OB_SERIALIZE_MEMBER
(
DupTableBasicArg
,
tenant_id_
,
ls_id_num_
,
table_id_
,
tablet_count_
,
tablet_id_array_
);
static
DupTableBasicArg
dup_basic_arg_
;
static
const
int64_t
CUR_GC_INTERVAL
=
2
*
1000
*
1000
;
void
reduce_dup_table_gc_interval
(
int64_t
tenant_id
,
int64_t
cur_gc_interval
)
{
share
::
ObTenantSwitchGuard
tenant_guard
;
ASSERT_EQ
(
OB_SUCCESS
,
tenant_guard
.
switch_to
(
tenant_id
));
MTL
(
transaction
::
ObTransService
*
)
->
dup_tablet_scan_task_
.
scan_task_execute_interval_
=
cur_gc_interval
;
MTL
(
transaction
::
ObTransService
*
)
->
dup_tablet_scan_task_
.
max_execute_interval_
=
cur_gc_interval
;
oceanbase
::
transaction
::
ObLSDupTabletsMgr
::
MAX_READABLE_SET_SER_INTERVAL
=
CUR_GC_INTERVAL
;
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
1
),
create_dup_table
)
{
int
ret
=
OB_SUCCESS
;
CREATE_TEST_TENANT
(
test_tenant_id
);
SERVER_LOG
(
INFO
,
"[ObMultiReplicaTestBase] create test tenant success"
,
K
(
test_tenant_id
));
reduce_dup_table_gc_interval
(
test_tenant_id
,
CUR_GC_INTERVAL
);
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
ACQUIRE_CONN_FROM_SQL_PROXY
(
sys_conn
,
get_curr_simple_server
().
get_sql_proxy
());
std
::
string
primary_zone_sql
=
"ALTER TENANT "
+
std
::
string
(
DEFAULT_TEST_TENANT_NAME
)
+
" set primary_zone='zone1; zone3; zone2';"
;
WRITE_SQL_BY_CONN
(
test_conn
,
primary_zone_sql
.
c_str
());
for
(
int
i
=
0
;
i
<
MAX_DUP_TABLE_COUNT
;
i
++
)
{
std
::
string
tname
=
"test_t"
+
std
::
to_string
(
i
);
std
::
string
create_table_sql
=
"CREATE TABLE "
+
tname
+
"( "
"id_x int, "
"id_y int, "
"id_z int, "
"PRIMARY KEY(id_x)"
") duplicate_scope='cluster' PARTITION BY hash(id_x) partitions 10;"
;
std
::
string
get_table_id_sql
=
"select table_id, duplicate_scope from "
"oceanbase.__all_table where table_name = '"
+
tname
+
"' "
;
WRITE_SQL_BY_CONN
(
test_conn
,
create_table_sql
.
c_str
());
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
get_table_id_sql
.
c_str
());
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
next
());
int64_t
table_id
=
0
;
int64_t
dup_scope
=
0
;
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
get_int
(
"table_id"
,
table_id
));
ASSERT_EQ
(
OB_SUCCESS
,
table_info_result
->
get_int
(
"duplicate_scope"
,
dup_scope
));
ASSERT_EQ
(
true
,
table_id
>
0
);
ASSERT_EQ
(
true
,
dup_scope
!=
0
);
ASSERT_EQ
(
OB_SUCCESS
,
dup_basic_arg_
.
table_id_
.
push_back
(
table_id
));
std
::
string
tablet_count_sql
=
"select count(*), ls_id from oceanbase.__all_tablet_to_ls where table_id = "
+
std
::
to_string
(
table_id
)
+
" group by ls_id order by count(*)"
;
READ_SQL_BY_CONN
(
test_conn
,
tablet_count_result
,
tablet_count_sql
.
c_str
());
int64_t
tablet_count
=
0
;
int64_t
ls_id_num
=
0
;
ASSERT_EQ
(
OB_SUCCESS
,
tablet_count_result
->
next
());
ASSERT_EQ
(
OB_SUCCESS
,
tablet_count_result
->
get_int
(
"count(*)"
,
tablet_count
));
ASSERT_EQ
(
OB_SUCCESS
,
tablet_count_result
->
get_int
(
"ls_id"
,
ls_id_num
));
ASSERT_EQ
(
10
,
tablet_count
);
ASSERT_EQ
(
true
,
share
::
ObLSID
(
ls_id_num
).
is_valid
());
if
(
dup_basic_arg_
.
ls_id_num_
<=
0
)
{
dup_basic_arg_
.
ls_id_num_
=
ls_id_num
;
}
else
{
ASSERT_EQ
(
dup_basic_arg_
.
ls_id_num_
,
ls_id_num
);
}
dup_basic_arg_
.
tablet_count_
+=
tablet_count
;
std
::
string
tablet_id_sql
=
"select tablet_id from oceanbase.__all_tablet_to_ls where table_id = "
+
std
::
to_string
(
table_id
)
+
" and ls_id = "
+
std
::
to_string
(
ls_id_num
);
READ_SQL_BY_CONN
(
test_conn
,
tablet_id_reult
,
tablet_id_sql
.
c_str
());
while
(
OB_SUCC
(
tablet_id_reult
->
next
()))
{
int64_t
id
=
0
;
ASSERT_EQ
(
OB_SUCCESS
,
tablet_id_reult
->
get_int
(
"tablet_id"
,
id
));
ASSERT_EQ
(
true
,
ObTabletID
(
id
).
is_valid
());
ASSERT_EQ
(
OB_SUCCESS
,
dup_basic_arg_
.
tablet_id_array_
.
push_back
(
id
));
}
}
dup_basic_arg_
.
tenant_id_
=
test_tenant_id
;
std
::
string
tmp_str
;
ASSERT_EQ
(
OB_SUCCESS
,
EventArgSerTool
<
DupTableBasicArg
>::
serialize_arg
(
dup_basic_arg_
,
tmp_str
));
ASSERT_EQ
(
OB_SUCCESS
,
finish_event
(
"CREATE_DUP_TABLE"
,
tmp_str
));
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
2
),
leader_switch_without_gc
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"CREATE_DUP_TABLE"
,
tmp_event_val
,
30
*
60
*
1000
));
ASSERT_EQ
(
OB_SUCCESS
,
EventArgSerTool
<
DupTableBasicArg
>::
deserialize_arg
(
dup_basic_arg_
,
tmp_event_val
));
reduce_dup_table_gc_interval
(
dup_basic_arg_
.
tenant_id_
,
CUR_GC_INTERVAL
);
ASSERT_EQ
(
OB_SUCCESS
,
get_curr_simple_server
().
init_sql_proxy2
());
common
::
ObMySQLProxy
&
sys_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
sys_conn
,
sys_sql_proxy
);
GET_LS
(
dup_basic_arg_
.
tenant_id_
,
dup_basic_arg_
.
ls_id_num_
,
ls_handle
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_inited
()
&&
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
,
30
*
1000
*
1000
,
1
*
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
!
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
usleep
(
ObDupTabletScanTask
::
DUP_TABLET_SCAN_INTERVAL
+
4
*
CUR_GC_INTERVAL
);
std
::
string
ls_id_str
=
std
::
to_string
(
dup_basic_arg_
.
ls_id_num_
);
std
::
string
zone2_ip
=
local_ip_
+
":"
+
std
::
to_string
(
rpc_ports_
[
1
]);
std
::
string
switch_leader_sql
=
"alter system switch replica leader ls="
+
ls_id_str
+
" server='"
+
zone2_ip
+
"' tenant='tt1';"
;
WRITE_SQL_BY_CONN
(
sys_conn
,
switch_leader_sql
.
c_str
());
RETRY_OP_UNTIL_TIMEOUT
(
ASSERT_EQ
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
(),
dup_basic_arg_
.
tablet_count_
),
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
50
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
<
dup_basic_arg_
.
tablet_count_
,
10
*
1000
*
1000
,
50
*
1000
);
EXPECT_EQ
(
ret
,
OB_TIMEOUT
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
,
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
std
::
string
zone1_ip
=
local_ip_
+
":"
+
std
::
to_string
(
rpc_ports_
[
0
]);
std
::
string
switch_leader_to_zone1_sql
=
"alter system switch replica leader ls="
+
ls_id_str
+
" server='"
+
zone1_ip
+
"' tenant='tt1';"
;
WRITE_SQL_BY_CONN
(
sys_conn
,
switch_leader_to_zone1_sql
.
c_str
());
RETRY_UNTIL_TIMEOUT
(
!
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
ASSERT_EQ
(
OB_SUCCESS
,
finish_event
(
"SWITCH_LEADER_TO_ZONE2_WITHOUT_GC"
,
""
));
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
1
),
leader_revoke_in_gc
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"SWITCH_LEADER_TO_ZONE2_WITHOUT_GC"
,
tmp_event_val
,
30
*
60
*
1000
));
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
GET_LS
(
dup_basic_arg_
.
tenant_id_
,
dup_basic_arg_
.
ls_id_num_
,
ls_handle
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
,
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] print dup tablet stat info 1"
,
K
(
ret
),
KPC
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
));
{
std
::
string
tname
=
"test_t"
+
std
::
to_string
(
1
);
std
::
string
delete_table_sql
=
"DROP TABLE "
+
tname
;
WRITE_SQL_BY_CONN
(
test_conn
,
delete_table_sql
.
c_str
());
std
::
string
get_table_id_sql
=
"select table_id, duplicate_scope from "
"oceanbase.__all_table where table_name = '"
+
tname
+
"' "
;
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
get_table_id_sql
.
c_str
());
ASSERT_EQ
(
OB_ITER_END
,
table_info_result
->
next
());
}
// gc t1 successfully
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
-
10
,
20
*
1000
*
1000
,
100
*
1000
);
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] print dup tablet stat info 1.1"
,
K
(
ret
),
KPC
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
));
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
print_tablet_diag_info_log
(
true
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
removing_old_set_
->
size
()
==
0
),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
STOP_PREPARE_SERIALIZE_DUP_TABLET
=
true
;
{
std
::
string
tname
=
"test_t"
+
std
::
to_string
(
2
);
std
::
string
delete_table_sql
=
"DROP TABLE "
+
tname
;
WRITE_SQL_BY_CONN
(
test_conn
,
delete_table_sql
.
c_str
());
std
::
string
get_table_id_sql
=
"select table_id, duplicate_scope from "
"oceanbase.__all_table where table_name = '"
+
tname
+
"' "
;
READ_SQL_BY_CONN
(
test_conn
,
table_info_result
,
get_table_id_sql
.
c_str
());
ASSERT_EQ
(
OB_ITER_END
,
table_info_result
->
next
());
}
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] print dup tablet stat info 2"
,
K
(
ret
),
KPC
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
));
RETRY_UNTIL_TIMEOUT
(
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
removing_old_set_
->
size
()
>
0
),
20
*
1000
*
1000
,
100
*
1000
);
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
print_tablet_diag_info_log
(
true
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] print dup tablet stat info 3"
,
K
(
ret
),
KPC
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
));
ASSERT_EQ
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
(),
dup_basic_arg_
.
tablet_count_
-
10
/*readable 90 + removing 10*/
);
ASSERT_EQ
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
removing_old_set_
->
size
(),
10
);
ASSERT_EQ
(
OB_SUCCESS
,
finish_event
(
"DROP_TABLE_WITHOUT_LOG"
,
""
));
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
2
),
continue_gc_in_new_leader
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"DROP_TABLE_WITHOUT_LOG"
,
tmp_event_val
,
30
*
60
*
1000
));
STOP_PREPARE_SERIALIZE_DUP_TABLET
=
true
;
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
ACQUIRE_CONN_FROM_SQL_PROXY
(
sys_conn
,
get_curr_simple_server
().
get_sql_proxy
());
GET_LS
(
dup_basic_arg_
.
tenant_id_
,
dup_basic_arg_
.
ls_id_num_
,
ls_handle
);
std
::
string
ls_id_str
=
std
::
to_string
(
dup_basic_arg_
.
ls_id_num_
);
std
::
string
zone2_ip
=
local_ip_
+
":"
+
std
::
to_string
(
rpc_ports_
[
1
]);
std
::
string
switch_leader_sql
=
"alter system switch replica leader ls="
+
ls_id_str
+
" server='"
+
zone2_ip
+
"' tenant='tt1';"
;
WRITE_SQL_BY_CONN
(
sys_conn
,
switch_leader_sql
.
c_str
());
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
<
dup_basic_arg_
.
tablet_count_
-
10
,
20
*
1000
*
1000
,
100
*
1000
);
TRANS_LOG
(
INFO
,
"[ObMultiReplicaTestBase] print dup tablet stat info 4"
,
K
(
ret
),
KPC
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
));
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
print_tablet_diag_info_log
(
true
);
ASSERT_EQ
(
ret
,
OB_TIMEOUT
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
-
10
/*readable 90 + removing 10*/
,
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
ASSERT_EQ
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
removing_old_set_
->
size
(),
10
);
STOP_PREPARE_SERIALIZE_DUP_TABLET
=
false
;
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
-
10
*
2
,
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
ASSERT_EQ
(
OB_SUCCESS
,
finish_event
(
"CONTINUE_GC_IN_ZONE2"
,
""
));
}
TEST_F
(
GET_ZONE_TEST_CLASS_NAME
(
1
),
check_gc_result_in_old_leader
)
{
int
ret
=
OB_SUCCESS
;
std
::
string
tmp_event_val
;
ASSERT_EQ
(
OB_SUCCESS
,
wait_event_finish
(
"CONTINUE_GC_IN_ZONE2"
,
tmp_event_val
,
30
*
60
*
1000
));
common
::
ObMySQLProxy
&
test_tenant_sql_proxy
=
get_curr_simple_server
().
get_sql_proxy2
();
ACQUIRE_CONN_FROM_SQL_PROXY
(
test_conn
,
test_tenant_sql_proxy
);
ACQUIRE_CONN_FROM_SQL_PROXY
(
sys_conn
,
get_curr_simple_server
().
get_sql_proxy
());
GET_LS
(
dup_basic_arg_
.
tenant_id_
,
dup_basic_arg_
.
ls_id_num_
,
ls_handle
);
RETRY_UNTIL_TIMEOUT
(
!
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
-
10
*
2
,
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
std
::
string
ls_id_str
=
std
::
to_string
(
dup_basic_arg_
.
ls_id_num_
);
std
::
string
zone1_ip
=
local_ip_
+
":"
+
std
::
to_string
(
rpc_ports_
[
0
]);
std
::
string
switch_leader_to_zone1_sql
=
"alter system switch replica leader ls="
+
ls_id_str
+
" server='"
+
zone1_ip
+
"' tenant='tt1';"
;
WRITE_SQL_BY_CONN
(
sys_conn
,
switch_leader_to_zone1_sql
.
c_str
());
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
is_master
(),
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
RETRY_UNTIL_TIMEOUT
(
ls_handle
.
get_ls
()
->
dup_table_ls_handler_
.
tablets_mgr_ptr_
->
get_readable_tablet_count
()
==
dup_basic_arg_
.
tablet_count_
-
10
*
2
,
20
*
1000
*
1000
,
100
*
1000
);
ASSERT_EQ
(
ret
,
OB_SUCCESS
);
}
// TEST_F(GET_ZONE_TEST_CLASS_NAME(1), random_error)
// {
// ASSERT_EQ(ObTimeUtility::current_time() % 4 != 1, true);
// }
//
// TEST_F(GET_ZONE_TEST_CLASS_NAME(2), random_error)
// {
// ASSERT_EQ(ObTimeUtility::current_time() % 4 != 2, true);
// }
//
// TEST_F(GET_ZONE_TEST_CLASS_NAME(3), random_error)
// {
// ASSERT_EQ(ObTimeUtility::current_time() % 4 != 3, true);
// }
}
// namespace unittest
}
// namespace oceanbase
src/storage/tx/ob_dup_table_base.cpp
浏览文件 @
78578299
...
...
@@ -560,10 +560,8 @@ int ObDupTableLogOperator::submit_log_entry()
int64_t
max_ser_size
=
0
;
bool
submit_result
=
false
;
DupLogTypeArray
type_array
;
if
(
OB_SUCC
(
ret
))
{
if
(
OB_TMP_FAIL
(
tablet_mgr_ptr_
->
scan_readable_set_for_gc
()))
{
DUP_TABLE_LOG
(
WARN
,
"scan readable set failed"
,
K
(
tmp_ret
));
}
if
(
OB_FAIL
(
prepare_serialize_log_entry_
(
max_ser_size
,
type_array
)))
{
DUP_TABLE_LOG
(
WARN
,
"prepare serialize log entry failed"
,
K
(
ret
));
}
else
if
(
!
type_array
.
empty
())
{
...
...
src/storage/tx/ob_dup_table_base.h
浏览文件 @
78578299
...
...
@@ -82,6 +82,8 @@ struct DupTableInterfaceStat
int64_t
dup_table_lease_log_sync_total_time_
;
int64_t
dup_table_tablet_log_sync_total_time_
;
int64_t
dup_table_ls_leader_takeover_ts_
;
void
reset
()
{
dup_table_follower_read_succ_cnt_
=
0
;
...
...
@@ -101,6 +103,8 @@ struct DupTableInterfaceStat
dup_table_log_deser_total_time_
=
0
;
dup_table_lease_log_sync_total_time_
=
0
;
dup_table_tablet_log_sync_total_time_
=
0
;
dup_table_ls_leader_takeover_ts_
=
0
;
}
TO_STRING_KV
(
K
(
dup_table_follower_read_succ_cnt_
),
...
...
@@ -115,7 +119,8 @@ struct DupTableInterfaceStat
K
(
dup_table_log_replay_total_time_
),
K
(
dup_table_log_deser_total_time_
),
K
(
dup_table_lease_log_sync_total_time_
),
K
(
dup_table_tablet_log_sync_total_time_
));
K
(
dup_table_tablet_log_sync_total_time_
),
K
(
dup_table_ls_leader_takeover_ts_
));
};
#define DUP_LEASE_LIFE_PREFIX "[DupLeaseLife] "
...
...
src/storage/tx/ob_dup_table_tablets.cpp
浏览文件 @
78578299
此差异已折叠。
点击以展开。
src/storage/tx/ob_dup_table_tablets.h
浏览文件 @
78578299
...
...
@@ -419,10 +419,6 @@ public:
DUP_TABLE_LOG
(
INFO
,
"this readable set used for other operation, should skip gc"
,
KPC
(
this
));
bool_ret
=
false
;
}
else
{
bool_ret
=
true
;
}
if
(
bool_ret
)
{
if
(
last_gc_scan_ts_
<=
0
||
gc_start_time
>
last_gc_scan_ts_
)
{
bool_ret
=
true
;
...
...
@@ -431,6 +427,8 @@ public:
}
}
DUP_TABLE_LOG
(
INFO
,
"check need gc scan"
,
K
(
bool_ret
),
K
(
last_gc_scan_ts_
),
K
(
gc_start_time
),
KPC
(
this
));
return
bool_ret
;
}
void
set_last_gc_scan_ts
(
const
int64_t
gc_start_time
)
{
...
...
@@ -440,6 +438,8 @@ public:
}
else
{
last_gc_scan_ts_
=
gc_start_time
;
}
DUP_TABLE_LOG
(
INFO
,
"set last gc scn ts"
,
K
(
last_gc_scan_ts_
),
K
(
gc_start_time
),
KPC
(
this
));
}
int64_t
get_last_gc_scan_ts
()
{
return
last_gc_scan_ts_
;
}
...
...
@@ -629,7 +629,7 @@ public:
const
share
::
SCN
&
to_scn
);
int
gc_tmporary_dup_tablets
(
const
int64_t
gc_ts
,
const
int64_t
max_task_interval
);
// new gc methods
int
scan_readable_set_for_gc
();
int
scan_readable_set_for_gc
(
const
int64_t
leader_takeover_ts
);
int
refresh_dup_tablet
(
const
common
::
ObTabletID
&
tablet_id
,
bool
is_dup_table
,
...
...
@@ -662,6 +662,7 @@ public:
int
try_to_confirm_tablets
(
const
share
::
SCN
&
confirm_scn
);
// bool need_log_tablets();
int64_t
get_dup_tablet_count
();
int64_t
get_readable_tablet_count
();
bool
has_dup_tablet
();
int64_t
get_readable_tablet_set_count
();
int64_t
get_need_confirm_tablet_set_count
();
...
...
@@ -835,6 +836,13 @@ private:
const
bool
for_replay
);
bool
need_seralize_readable_set
()
{
return
true
;
}
int
prepare_serialize_readable_tablet_set_
(
int64_t
&
max_ser_size
,
DupTabletSetIDArray
&
unique_id_array
,
const
int64_t
max_log_buf_len
);
int
prepare_serialize_confirming_tablet_set_
(
int64_t
&
max_ser_size
,
DupTabletSetIDArray
&
unique_id_array
,
const
int64_t
max_log_buf_len
);
int
cal_single_set_max_ser_size_
(
DupTabletChangeMap
*
hash_map
,
int64_t
&
max_ser_size
,
const
int64_t
ser_size_limit
,
...
...
@@ -854,8 +862,9 @@ private:
int
remove_src_and_related_set_header_from_array_
(
DupTabletChangeMap
*
src_set
,
DupTabletChangeMap
*
related_set
,
DupTabletSetIDArray
&
unique_id_array
);
DupTabletChangeMap
*
get_need_gc_set_
(
bool
&
new_round
);
DupTabletChangeMap
*
get_need_gc_set_
();
int
remove_tablet_from_readable_set_
();
bool
is_busy_in_readable_change_
();
private:
//
static
int64_t
GC_DUP_TABLETS_TIME_INTERVAL
;
// 5 min
...
...
@@ -866,6 +875,7 @@ private:
const
static
int64_t
MAX_FREE_SET_COUNT
;
const
static
int64_t
MAX_GC_TABLET_COUNT
;
static
int64_t
MAX_READABLE_SET_SER_INTERVAL
;
public:
TO_STRING_KV
(
K
(
free_set_pool_
.
get_size
()),
...
...
@@ -873,7 +883,6 @@ public:
K
(
need_confirm_new_queue_
.
get_size
()),
K
(
readable_tablets_list_
.
get_size
()),
KPC
(
removing_old_set_
),
K
(
last_gc_succ_time_
),
K
(
last_no_free_set_time_
),
K
(
extra_free_set_alloc_count_
));
...
...
@@ -885,16 +894,12 @@ private:
bool
is_master_
;
bool
is_stopped_
;
// used for gc_handler
int64_t
tablet_gc_window_
;
// default is 2 * ObDupTabletScanTask::DUP_TABLET_SCAN_INTERVAL;
common
::
ObDList
<
DupTabletChangeMap
>
free_set_pool_
;
DupTabletChangeMap
*
changing_new_set_
;
common
::
ObDList
<
DupTabletChangeMap
>
need_confirm_new_queue_
;
common
::
ObDList
<
DupTabletChangeMap
>
readable_tablets_list_
;
DupTabletChangeMap
*
removing_old_set_
;
// gc_dup_table
int64_t
last_gc_succ_time_
;
/* 1. gc one round means iter all readable set
* 2. use readable_set_in_gc_ point to readable set not finish gc in one round
* 3. use gc_start_time_ mark gc one round start time one round
...
...
src/storage/tx/ob_dup_table_ts_sync.cpp
浏览文件 @
78578299
...
...
@@ -355,13 +355,23 @@ int ObDupTableLSTsSyncMgr::get_local_ts_info(DupTableTsInfo &ts_info)
int
ObDupTableLSTsSyncMgr
::
get_cache_ts_info
(
const
common
::
ObAddr
&
addr
,
DupTableTsInfo
&
ts_info
)
{
int
ret
=
OB_SUCCESS
;
int
tmp_ret
=
OB_SUCCESS
;
SpinRLockGuard
guard
(
ts_sync_lock_
);
{
SpinRLockGuard
guard
(
ts_sync_lock_
);
if
(
OB_FAIL
(
get_ts_info_cache_
(
addr
,
ts_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"get ts info cache failed"
,
K
(
ret
));
}
DUP_TABLE_LOG
(
DEBUG
,
"get ts info cache"
,
K
(
ret
),
K
(
ret
),
K
(
ts_info
));
}
if
(
OB_FAIL
(
get_ts_info_cache_
(
addr
,
ts_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"get ts info cache failed"
,
K
(
ret
));
if
(
OB_HASH_NOT_EXIST
==
ret
)
{
if
(
OB_TMP_FAIL
(
request_ts_info
(
addr
)))
{
DUP_TABLE_LOG
(
WARN
,
"request ts info failed"
,
K
(
tmp_ret
),
K
(
ts_info
));
}
}
DUP_TABLE_LOG
(
DEBUG
,
"get ts info cache"
,
K
(
ret
),
K
(
ret
),
K
(
ts_info
));
return
ret
;
}
...
...
src/storage/tx/ob_dup_table_util.cpp
浏览文件 @
78578299
...
...
@@ -38,8 +38,13 @@ void ObDupTabletScanTask::reset()
tenant_id_
=
0
;
dup_table_scan_timer_
=
nullptr
;
dup_loop_worker_
=
nullptr
;
last_execute_time_
=
0
;
max_execute_interval_
=
0
;
min_dup_ls_status_info_
.
reset
();
tenant_schema_dup_tablet_set_
.
destroy
();
scan_task_execute_interval_
=
ObDupTabletScanTask
::
DUP_TABLET_SCAN_INTERVAL
;
last_dup_ls_refresh_time_
=
0
;
last_dup_schema_refresh_time_
=
0
;
last_scan_task_succ_time_
=
0
;
max_execute_interval_
=
ObDupTabletScanTask
::
DUP_TABLET_SCAN_INTERVAL
;
}
int
ObDupTabletScanTask
::
make
(
const
int64_t
tenant_id
,
...
...
@@ -54,6 +59,9 @@ int ObDupTabletScanTask::make(const int64_t tenant_id,
tenant_id_
=
tenant_id
;
dup_table_scan_timer_
=
scan_timer
;
dup_loop_worker_
=
loop_worker
;
min_dup_ls_status_info_
.
reset
();
tenant_schema_dup_tablet_set_
.
reuse
();
scan_task_execute_interval_
=
ObDupTabletScanTask
::
DUP_TABLET_SCAN_INTERVAL
;
// ObTransTask::make(ObTransRetryTaskType::DUP_TABLET_SCAN_TASK);
// set_retry_interval_us(DUP_TABLET_SCAN_INTERVAL, DUP_TABLET_SCAN_INTERVAL);
}
...
...
@@ -74,83 +82,143 @@ void ObDupTabletScanTask::runTimerTask()
}
dup_table_scan_timer_
->
unregister_timeout_task
(
*
this
);
dup_table_scan_timer_
->
register_timeout_task
(
*
this
,
DUP_TABLET_SCAN_INTERVAL
);
dup_table_scan_timer_
->
register_timeout_task
(
*
this
,
scan_task_execute_interval_
);
}
}
int
ObDupTabletScanTask
::
refresh_dup_tablet_schema_
(
bool
need_refresh
,
ObTenantDupTabletSchemaHelper
::
TabletIDSet
&
tenant_dup_tablet_set
,
share
::
ObLSStatusInfo
&
dup_ls_status_info
)
int
ObDupTabletScanTask
::
refresh_dup_ls_
(
const
int64_t
cur_time
)
{
int
ret
=
OB_SUCCESS
;
bool
has_dup_ls
=
false
;
if
(
need_refresh
)
{
bool
need_refresh
=
true
;
if
(
min_dup_ls_status_info_
.
is_valid
()
&&
min_dup_ls_status_info_
.
is_duplicate_ls
())
{
if
(
cur_time
-
last_dup_ls_refresh_time_
<
MAX_DUP_LS_REFRESH_INTERVAL
)
{
need_refresh
=
false
;
}
else
{
need_refresh
=
true
;
}
}
else
{
need_refresh
=
true
;
}
if
(
need_refresh
&&
OB_SUCC
(
ret
))
{
share
::
ObLSStatusInfo
tmp_dup_ls_status_info
;
share
::
ObLSStatusOperator
ls_status_op
;
if
(
OB_FAIL
(
ls_status_op
.
get_duplicate_ls_status_info
(
MTL_ID
(),
*
GCTX
.
sql_proxy_
,
dup_ls_status_info
,
share
::
OBCG_STORAGE
)))
{
tmp_
dup_ls_status_info
,
share
::
OBCG_STORAGE
)))
{
if
(
OB_ENTRY_NOT_EXIST
==
ret
)
{
DUP_TABLE_LOG
(
DEBUG
,
"no duplicate ls"
,
K
(
dup_ls_status_info
));
DUP_TABLE_LOG
(
DEBUG
,
"no duplicate ls"
,
K
(
tmp_
dup_ls_status_info
));
ret
=
OB_SUCCESS
;
}
else
{
DUP_TABLE_LOG
(
WARN
,
"get duplicate ls status info failed"
,
K
(
ret
),
K
(
dup_ls_status_info
));
DUP_TABLE_LOG
(
WARN
,
"get duplicate ls status info failed"
,
K
(
ret
),
K
(
tmp_dup_ls_status_info
));
}
}
else
{
DUP_TABLE_LOG
(
INFO
,
"find a duplicate ls"
,
K
(
ret
),
K
(
dup_ls_status_info
));
}
DUP_TABLE_LOG
(
DEBUG
,
"find a duplicate ls"
,
K
(
ret
),
K
(
tmp_dup_ls_status_info
));
if
(
OB_SUCC
(
ret
)
&&
dup_ls_status_info
.
is_duplicate_ls
())
{
if
(
OB_FAIL
(
ret
))
{
// do nothing
}
else
if
(
!
tenant_dup_tablet_set
.
created
())
{
if
(
OB_FAIL
(
tenant_dup_tablet_set
.
create
(
512
)))
{
DUP_TABLE_LOG
(
WARN
,
"init dup tablet cache failed"
,
K
(
ret
));
if
(
!
tmp_dup_ls_status_info
.
is_valid
()
||
!
tmp_dup_ls_status_info
.
is_duplicate_ls
())
{
ret
=
OB_ERR_UNEXPECTED
;
DUP_TABLE_LOG
(
ERROR
,
"invalid tmp_dup_ls_status_info"
,
K
(
ret
),
K
(
tmp_dup_ls_status_info
));
}
else
{
if
(
min_dup_ls_status_info_
.
is_valid
()
&&
min_dup_ls_status_info_
.
is_duplicate_ls
()
&&
tmp_dup_ls_status_info
.
get_ls_id
()
!=
min_dup_ls_status_info_
.
get_ls_id
())
{
DUP_TABLE_LOG
(
ERROR
,
"The min_dup_ls has already changed"
,
K
(
ret
),
K
(
tmp_dup_ls_status_info
),
K
(
min_dup_ls_status_info_
));
}
if
(
OB_FAIL
(
min_dup_ls_status_info_
.
assign
(
tmp_dup_ls_status_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"rewrite min_dup_ls_status_info_ failed"
,
K
(
ret
),
K
(
tmp_dup_ls_status_info
),
K
(
min_dup_ls_status_info_
));
}
else
{
last_dup_ls_refresh_time_
=
cur_time
;
}
}
}
}
if
(
OB_FAIL
(
ret
))
{
// do nothing
}
else
if
(
OB_FAIL
(
dup_schema_helper_
.
refresh_and_get_tablet_set
(
tenant_dup_tablet_set
)))
{
DUP_TABLE_LOG
(
WARN
,
"refresh dup tablet set failed"
,
K
(
ret
));
return
ret
;
}
int
ObDupTabletScanTask
::
refresh_dup_tablet_schema_
(
const
int64_t
cur_time
)
{
int
ret
=
OB_SUCCESS
;
if
(
OB_SUCC
(
ret
)
&&
min_dup_ls_status_info_
.
is_valid
()
&&
min_dup_ls_status_info_
.
is_duplicate_ls
())
{
if
(
OB_FAIL
(
ret
))
{
// do nothing
}
else
if
(
!
tenant_schema_dup_tablet_set_
.
created
())
{
if
(
OB_FAIL
(
tenant_schema_dup_tablet_set_
.
create
(
512
)))
{
DUP_TABLE_LOG
(
WARN
,
"init dup tablet cache failed"
,
K
(
ret
));
}
}
if
(
OB_FAIL
(
ret
))
{
// do nothing
}
else
if
(
cur_time
<=
last_dup_schema_refresh_time_
)
{
// do nothing
}
else
if
(
OB_FAIL
(
dup_schema_helper_
.
refresh_and_get_tablet_set
(
tenant_schema_dup_tablet_set_
)))
{
DUP_TABLE_LOG
(
WARN
,
"refresh dup tablet set failed"
,
K
(
ret
));
tenant_schema_dup_tablet_set_
.
clear
();
}
else
{
last_dup_schema_refresh_time_
=
cur_time
;
}
}
return
ret
;
}
bool
ObDupTabletScanTask
::
has_valid_dup_schema_
()
const
{
bool
dup_schema_is_valid
=
false
;
dup_schema_is_valid
=
min_dup_ls_status_info_
.
is_valid
()
&&
min_dup_ls_status_info_
.
is_duplicate_ls
()
&&
!
tenant_schema_dup_tablet_set_
.
empty
();
return
dup_schema_is_valid
;
}
int
ObDupTabletScanTask
::
execute_for_dup_ls_
()
{
int
ret
=
OB_SUCCESS
;
int
tmp_ret
=
OB_SUCCESS
;
TabletIDArray
tablet_id_array
;
ObTenantDupTabletSchemaHelper
::
TabletIDSet
tenant_dup_tablet_set
;
bool
need_refreh_dup_schema
=
true
;
ObLSHandle
ls_handle
;
share
::
ObLSStatusInfo
dup_ls_status_info
;
// compute scan task max execute interval
const
int64_t
cur_time
=
ObTimeUtility
::
fast_current_time
();
if
(
cur_time
-
last_execute_time_
>
0
)
{
if
(
0
!=
last_execute_time_
)
{
max_execute_interval_
=
max
(
max_execute_interval_
,
cur_time
-
last_execute_time_
);
last_execute_time_
=
cur_time
;
if
(
cur_time
-
last_scan_task_succ_time_
>
0
)
{
if
(
0
!=
last_scan_task_succ_time_
)
{
if
(
max_execute_interval_
/
2
>=
(
cur_time
-
last_scan_task_succ_time_
))
{
// Avoid residual excessive execution intervals in exceptional circumstances
ATOMIC_STORE
(
&
max_execute_interval_
,
cur_time
-
last_scan_task_succ_time_
);
}
else
{
ATOMIC_STORE
(
&
max_execute_interval_
,
max
(
max_execute_interval_
,
cur_time
-
last_scan_task_succ_time_
));
}
}
else
{
last_execute_time_
=
ObTimeUtility
::
fast_current_time
();
}
}
if
(
OB_TMP_FAIL
(
refresh_dup_ls_
(
cur_time
)))
{
DUP_TABLE_LOG
(
WARN
,
"refresh dup ls failed"
,
K
(
tmp_ret
),
KPC
(
this
));
}
else
if
(
OB_TMP_FAIL
(
refresh_dup_tablet_schema_
(
cur_time
)))
{
DUP_TABLE_LOG
(
WARN
,
"refresh dup schema failed"
,
K
(
tmp_ret
),
KPC
(
this
));
}
if
(
OB_ISNULL
(
MTL
(
ObLSService
*
))
||
OB_ISNULL
(
dup_loop_worker_
))
{
ret
=
OB_INVALID_ARGUMENT
;
DUP_TABLE_LOG
(
WARN
,
"invalid arguments"
,
K
(
ret
));
}
else
if
(
OB_FAIL
(
refresh_dup_tablet_schema_
(
need_refreh_dup_schema
,
tenant_dup_tablet_set
,
dup_ls_status_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"refresh dup table schema failed"
,
K
(
ret
));
}
else
if
(
!
dup_ls_status_info
.
is_duplicate_ls
())
{
}
else
if
(
!
has_valid_dup_schema_
())
{
DUP_TABLE_LOG
(
DEBUG
,
"refresh dup table schema failed"
,
K
(
ret
),
KPC
(
this
));
// do nothing
}
else
if
(
OB_FAIL
(
MTL
(
ObLSService
*
)
->
get_ls
(
dup_ls_status_info
.
ls_id_
,
ls_handle
,
ObLSGetMod
::
TRANS_MOD
)))
{
DUP_TABLE_LOG
(
WARN
,
"get dup ls failed"
,
K
(
ret
),
K
(
dup_ls_status_info
));
}
else
if
(
OB_FAIL
(
MTL
(
ObLSService
*
)
->
get_ls
(
min_dup_ls_status_info_
.
ls_id_
,
ls_handle
,
ObLSGetMod
::
TRANS_MOD
)))
{
DUP_TABLE_LOG
(
WARN
,
"get dup ls failed"
,
K
(
ret
),
KPC
(
this
));
}
else
{
ObLS
*
cur_ls_ptr
=
ls_handle
.
get_ls
();
...
...
@@ -158,16 +226,9 @@ int ObDupTabletScanTask::execute_for_dup_ls_()
ret
=
OB_INVALID_ARGUMENT
;
DUP_TABLE_LOG
(
WARN
,
"invalid ls ptr"
,
K
(
ret
),
KP
(
cur_ls_ptr
));
}
else
if
(
!
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
is_master
())
{
// #ifndef NDEBUG
DUP_TABLE_LOG
(
INFO
,
"ls not leader"
,
K
(
cur_ls_ptr
->
get_ls_id
()));
// #endif
}
else
if
(
OB_FAIL
(
refresh_dup_tablet_schema_
(
need_refreh_dup_schema
,
tenant_dup_tablet_set
,
dup_ls_status_info
)))
{
DUP_TABLE_LOG
(
INFO
,
"refresh dup table schema failed"
,
K
(
ret
));
}
else
if
(
OB_FALSE_IT
(
need_refreh_dup_schema
=
false
))
{
// do nothing
#ifndef NDEBUG
DUP_TABLE_LOG
(
INFO
,
"ls not leader"
,
K
(
cur_ls_ptr
->
get_ls_id
()));
#endif
}
else
{
storage
::
ObHALSTabletIDIterator
ls_tablet_id_iter
(
cur_ls_ptr
->
get_ls_id
(),
true
);
if
(
OB_FAIL
(
cur_ls_ptr
->
build_tablet_iter
(
ls_tablet_id_iter
)))
{
...
...
@@ -180,7 +241,7 @@ int ObDupTabletScanTask::execute_for_dup_ls_()
int64_t
refresh_time
=
ObTimeUtility
::
fast_current_time
();
while
(
OB_SUCC
(
ls_tablet_id_iter
.
get_next_tablet_id
(
tmp_tablet_id
)))
{
is_dup_tablet
=
false
;
ret
=
tenant_
dup_tablet_set
.
exist_refactored
(
tmp_tablet_id
);
ret
=
tenant_
schema_dup_tablet_set_
.
exist_refactored
(
tmp_tablet_id
);
if
(
OB_HASH_EXIST
==
ret
)
{
is_dup_tablet
=
true
;
ret
=
OB_SUCCESS
;
...
...
@@ -193,7 +254,9 @@ int ObDupTabletScanTask::execute_for_dup_ls_()
K
(
ret
),
K
(
cur_ls_ptr
->
get_ls_id
()),
K
(
tmp_tablet_id
));
}
if
(
!
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
is_inited
()
&&
!
is_dup_tablet
)
{
if
(
OB_FAIL
(
ret
))
{
// do nothing
}
else
if
(
!
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
is_inited
()
&&
!
is_dup_tablet
)
{
// do nothing
}
else
if
(
OB_FAIL
(
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
init
(
is_dup_tablet
))
&&
OB_INIT_TWICE
!=
ret
)
{
...
...
@@ -210,31 +273,31 @@ int ObDupTabletScanTask::execute_for_dup_ls_()
}
if
(
OB_ITER_END
==
ret
)
{
// ret = OB_SUCCESS;
if
(
OB_FAIL
(
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
gc_temporary_dup_tablets
(
refresh_time
,
max_execute_interval_
)))
{
DUP_TABLE_LOG
(
WARN
,
"ls gc dup_tablet failed"
,
KR
(
ret
),
K
(
refresh_time
),
K
(
max_execute_interval_
));
}
ret
=
OB_SUCCESS
;
}
}
}
// refresh dup_table_ls on leader and follower
if
(
!
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
check_tablet_set_exist
())
{
// refresh dup_table_ls on leader and follower
if
(
!
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
is_inited
()
||
!
cur_ls_ptr
->
get_dup_table_ls_handler
()
->
check_tablet_set_exist
())
{
// do nothing
}
else
if
(
OB_FAIL
(
dup_loop_worker_
->
append_dup_table_ls
(
cur_ls_ptr
->
get_ls_id
())))
{
}
else
if
(
OB_
TMP_
FAIL
(
dup_loop_worker_
->
append_dup_table_ls
(
cur_ls_ptr
->
get_ls_id
())))
{
DUP_TABLE_LOG
(
WARN
,
"refresh dup_table ls failed"
,
K
(
ret
));
}
}
if
(
tenant_dup_tablet_set
.
created
())
{
tenant_dup_tablet_set
.
destroy
();
}
if
(
OB_FAIL
(
ret
))
{
DUP_TABLE_LOG
(
WARN
,
"scan dup ls to find dup_tablet failed"
,
KR
(
ret
));
}
else
{
ATOMIC_STORE
(
&
last_scan_task_succ_time_
,
cur_time
);
}
#ifndef NDEBUG
DUP_TABLE_LOG
(
INFO
,
"execute dup table scan task"
,
KPC
(
this
));
#else
#endif
return
ret
;
}
...
...
@@ -453,15 +516,6 @@ int ObDupTableLSHandler::safe_to_destroy(bool &is_dup_table_handler_safe)
void
ObDupTableLSHandler
::
destroy
()
{
reset
();
}
// int ObDupTableLSHandler::offline()
// {
// int ret = OB_SUCCESS;
// }
// return ret;
// }
//
// int ObDupTableLSHandler::online() {}
void
ObDupTableLSHandler
::
reset
()
{
// ATOMIC_STORE(&is_inited_, false);
...
...
@@ -502,38 +556,6 @@ void ObDupTableLSHandler::reset()
}
}
// bool ObDupTableLSHandler::is_master()
// {
// bool sub_master = true;
// if (OB_NOT_NULL(ts_sync_mgr_ptr_)) {
// sub_master = sub_master && ts_sync_mgr_ptr_->is_master();
// }
// if (OB_NOT_NULL(lease_mgr_ptr_)) {
// sub_master = sub_master && lease_mgr_ptr_->is_master();
// }
// if (OB_NOT_NULL(tablets_mgr_ptr_)) {
// sub_master = sub_master && tablets_mgr_ptr_->is_master();
// }
//
// return (ATOMIC_LOAD(&is_master_)) && sub_master;
// }
// bool ObDupTableLSHandler::is_follower()
// {
// bool sub_not_master = true;
// if (OB_NOT_NULL(ts_sync_mgr_ptr_)) {
// sub_not_master = sub_not_master && !ts_sync_mgr_ptr_->is_master();
// }
// if (OB_NOT_NULL(lease_mgr_ptr_)) {
// sub_not_master = sub_not_master && !lease_mgr_ptr_->is_master();
// }
// if (OB_NOT_NULL(tablets_mgr_ptr_)) {
// sub_not_master = sub_not_master && !tablets_mgr_ptr_->is_master();
// }
//
// return (!ATOMIC_LOAD(&is_master_)) && sub_not_master;
// }
bool
ObDupTableLSHandler
::
is_inited
()
{
SpinRLockGuard
r_init_guard
(
init_rw_lock_
);
...
...
@@ -542,21 +564,6 @@ bool ObDupTableLSHandler::is_inited()
bool
ObDupTableLSHandler
::
is_master
()
{
return
ls_state_helper_
.
is_leader_serving
();
}
// bool ObDupTableLSHandler::is_online()
// {
// bool sub_online = true;
// if (OB_NOT_NULL(ts_sync_mgr_ptr_)) {
// sub_online = sub_online && !ts_sync_mgr_ptr_->is_master();
// }
// if (OB_NOT_NULL(lease_mgr_ptr_)) {
// sub_online = sub_online && !lease_mgr_ptr_->is_master();
// }
// if (OB_NOT_NULL(tablets_mgr_ptr_)) {
// sub_online = sub_online && !tablets_mgr_ptr_->is_master();
// }
//
// }
int
ObDupTableLSHandler
::
ls_loop_handle
()
{
int
ret
=
OB_SUCCESS
;
...
...
@@ -581,19 +588,27 @@ int ObDupTableLSHandler::ls_loop_handle()
if
(
OB_ISNULL
(
log_operator_
)
||
!
log_operator_
->
is_busy
())
{
// handle lease request and collect follower info
DupTableTsInfo
min_lease_ts_info
;
if
(
OB_FAIL
(
get_min_lease_ts_info_
(
min_lease_ts_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"get min lease ts info failed"
,
K
(
ret
),
K
(
min_lease_ts_info
));
if
(
OB_
TMP_
FAIL
(
get_min_lease_ts_info_
(
min_lease_ts_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"get min lease ts info failed"
,
K
(
tmp_
ret
),
K
(
min_lease_ts_info
));
// try confirm tablets and check tablet need log
}
else
if
(
OB_FAIL
(
try_to_confirm_tablets_
(
min_lease_ts_info
.
max_replayed_scn_
)))
{
DUP_TABLE_LOG
(
WARN
,
"try confirm tablets failed"
,
K
(
ret
),
K
(
min_lease_ts_info
));
}
else
{
// submit lease log
}
else
if
(
OB_TMP_FAIL
(
try_to_confirm_tablets_
(
min_lease_ts_info
.
max_replayed_scn_
)))
{
DUP_TABLE_LOG
(
WARN
,
"try confirm tablets failed"
,
K
(
tmp_ret
),
K
(
min_lease_ts_info
));
}
if
(
OB_TMP_FAIL
(
tablets_mgr_ptr_
->
scan_readable_set_for_gc
(
ATOMIC_LOAD
(
&
interface_stat_
.
dup_table_ls_leader_takeover_ts_
))))
{
DUP_TABLE_LOG
(
WARN
,
"scan readable set failed"
,
K
(
tmp_ret
));
}
// submit lease log
if
(
OB_FAIL
(
ret
))
{
}
else
if
(
OB_ISNULL
(
log_operator_
))
{
ret
=
OB_INVALID_ARGUMENT
;
DUP_TABLE_LOG
(
WARN
,
"invalid log operator ptr"
,
K
(
ret
),
KP
(
log_operator_
));
}
else
if
(
OB_FAIL
(
log_operator_
->
submit_log_entry
()))
{
DUP_TABLE_LOG
(
WARN
,
"submit dup table log entry failed"
,
K
(
ret
));
}
}
else
if
(
OB_FAIL
(
log_operator_
->
submit_log_entry
()))
{
DUP_TABLE_LOG
(
WARN
,
"submit dup table log entry failed"
,
K
(
ret
));
}
}
...
...
@@ -1359,6 +1374,9 @@ int ObDupTableLSHandler::switch_to_leader()
if
(
OB_FAIL
(
ls_state_helper_
.
state_change_succ
(
ObDupTableLSRoleState
::
LS_TAKEOVER_SUCC
,
restore_state_container
)))
{
DUP_TABLE_LOG
(
ERROR
,
"change ls role state error"
,
K
(
ret
),
KPC
(
this
));
}
else
{
ATOMIC_STORE
(
&
interface_stat_
.
dup_table_ls_leader_takeover_ts_
,
ObTimeUtility
::
fast_current_time
());
}
}
else
{
tmp_ret
=
OB_SUCCESS
;
...
...
@@ -1580,6 +1598,8 @@ int ObDupTableLSHandler::get_min_lease_ts_info_(DupTableTsInfo &min_ts_info)
{
int
ret
=
OB_SUCCESS
;
int
ts_info_err_ret
=
OB_SUCCESS
;
LeaseAddrArray
lease_valid_array
;
min_ts_info
.
reset
();
...
...
@@ -1599,6 +1619,10 @@ int ObDupTableLSHandler::get_min_lease_ts_info_(DupTableTsInfo &min_ts_info)
for
(
int64_t
i
=
0
;
OB_SUCC
(
ret
)
&&
i
<
lease_valid_array
.
count
();
i
++
)
{
if
(
OB_FAIL
(
ts_sync_mgr_ptr_
->
get_cache_ts_info
(
lease_valid_array
[
i
],
tmp_ts_info
)))
{
DUP_TABLE_LOG
(
WARN
,
"get cache ts info failed"
,
K
(
ret
),
K
(
lease_valid_array
[
i
]));
if
(
OB_HASH_NOT_EXIST
==
ret
)
{
ts_info_err_ret
=
ret
;
ret
=
OB_SUCCESS
;
}
}
else
{
min_ts_info
.
max_replayed_scn_
=
share
::
SCN
::
min
(
min_ts_info
.
max_replayed_scn_
,
tmp_ts_info
.
max_replayed_scn_
);
...
...
@@ -1610,8 +1634,12 @@ int ObDupTableLSHandler::get_min_lease_ts_info_(DupTableTsInfo &min_ts_info)
}
}
if
(
OB_SUCC
(
ret
)
&&
ts_info_err_ret
!=
OB_SUCCESS
)
{
ret
=
ts_info_err_ret
;
}
if
(
OB_FAIL
(
ret
))
{
DUP_TABLE_LOG
(
INFO
,
"get min lease ts info failed"
,
K
(
ret
),
K
(
min_ts_info
),
DUP_TABLE_LOG
(
WARN
,
"get min lease ts info failed"
,
K
(
ret
),
K
(
ts_info_err_
ret
),
K
(
min_ts_info
),
K
(
lease_valid_array
));
}
return
ret
;
...
...
src/storage/tx/ob_dup_table_util.h
浏览文件 @
78578299
...
...
@@ -17,6 +17,7 @@
#include "storage/tx/ob_dup_table_lease.h"
#include "storage/tx/ob_trans_define.h"
#include "storage/tx/ob_dup_table_stat.h"
#include "share/ls/ob_ls_status_operator.h"
namespace
oceanbase
{
...
...
@@ -57,7 +58,8 @@ public:
class
ObDupTabletScanTask
:
public
ObITimeoutTask
{
public:
static
const
int64_t
DUP_TABLET_SCAN_INTERVAL
=
10
*
1000
*
1000
;
// 10s
static
const
int64_t
DUP_TABLET_SCAN_INTERVAL
=
10
*
1000
*
1000
;
// 10s
static
const
int64_t
MAX_DUP_LS_REFRESH_INTERVAL
=
60
*
60
*
1000
*
1000L
;
// 60min
public:
ObDupTabletScanTask
()
{
reset
();
}
~
ObDupTabletScanTask
()
{
destroy
();
}
...
...
@@ -70,11 +72,26 @@ public:
void
runTimerTask
();
uint64_t
hash
()
const
{
return
tenant_id_
;
}
int64_t
get_max_exec_interval
()
const
{
return
ATOMIC_LOAD
(
&
max_execute_interval_
);
}
int64_t
get_last_scan_task_succ_ts
()
const
{
return
ATOMIC_LOAD
(
&
last_scan_task_succ_time_
);
}
TO_STRING_KV
(
K
(
tenant_id_
),
KP
(
dup_table_scan_timer_
),
KP
(
dup_loop_worker_
),
K
(
min_dup_ls_status_info_
),
K
(
tenant_schema_dup_tablet_set_
.
size
()),
K
(
scan_task_execute_interval_
),
K
(
last_dup_ls_refresh_time_
),
K
(
last_dup_schema_refresh_time_
),
K
(
last_scan_task_succ_time_
),
K
(
max_execute_interval_
));
private:
int
execute_for_dup_ls_
();
int
refresh_dup_tablet_schema_
(
bool
need_refresh
,
ObTenantDupTabletSchemaHelper
::
TabletIDSet
&
tenant_dup_tablet_set
,
share
::
ObLSStatusInfo
&
dup_ls_status_info
);
int
refresh_dup_ls_
(
const
int64_t
cur_time
);
int
refresh_dup_tablet_schema_
(
const
int64_t
cur_time
);
bool
has_valid_dup_schema_
()
const
;
private:
ObTenantDupTabletSchemaHelper
dup_schema_helper_
;
...
...
@@ -82,7 +99,16 @@ private:
int64_t
tenant_id_
;
ObDupTableLeaseTimer
*
dup_table_scan_timer_
;
ObDupTableLoopWorker
*
dup_loop_worker_
;
int64_t
last_execute_time_
;
share
::
ObLSStatusInfo
min_dup_ls_status_info_
;
// min_ls_id
ObTenantDupTabletSchemaHelper
::
TabletIDSet
tenant_schema_dup_tablet_set_
;
int64_t
scan_task_execute_interval_
;
int64_t
last_dup_ls_refresh_time_
;
int64_t
last_dup_schema_refresh_time_
;
int64_t
last_scan_task_succ_time_
;
int64_t
max_execute_interval_
;
};
...
...
src/storage/tx/ob_trans_service.h
浏览文件 @
78578299
...
...
@@ -230,6 +230,7 @@ public:
ObIDupTableRpc
*
get_dup_table_rpc
()
{
return
dup_table_rpc_
;
}
ObDupTableRpc
&
get_dup_table_rpc_impl
()
{
return
dup_table_rpc_impl_
;
}
ObDupTableLoopWorker
&
get_dup_table_loop_worker
()
{
return
dup_table_loop_worker_
;
}
const
ObDupTabletScanTask
&
get_dup_table_scan_task
()
{
return
dup_tablet_scan_task_
;
}
ObILocationAdapter
*
get_location_adapter
()
{
return
location_adapter_
;
}
common
::
ObMySQLProxy
*
get_mysql_proxy
()
{
return
GCTX
.
sql_proxy_
;
}
bool
is_running
()
const
{
return
is_running_
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录