Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Metz
oceanbase
提交
ef6137fe
O
oceanbase
项目概览
Metz
/
oceanbase
与 Fork 源项目一致
Fork自
oceanbase / oceanbase
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oceanbase
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
ef6137fe
编写于
8月 25, 2021
作者:
L
lx0
提交者:
wangzelin.wzl
8月 25, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix dag str related bug
上级
23556dbc
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
187 addition
and
74 deletion
+187
-74
src/observer/virtual_table/ob_all_virtual_dag_warning_history.cpp
...rver/virtual_table/ob_all_virtual_dag_warning_history.cpp
+3
-4
src/share/scheduler/ob_dag_scheduler.cpp
src/share/scheduler/ob_dag_scheduler.cpp
+80
-5
src/share/scheduler/ob_dag_scheduler.h
src/share/scheduler/ob_dag_scheduler.h
+70
-29
src/storage/ob_dag_warning_history_mgr.cpp
src/storage/ob_dag_warning_history_mgr.cpp
+18
-3
src/storage/ob_dag_warning_history_mgr.h
src/storage/ob_dag_warning_history_mgr.h
+10
-27
unittest/storage/test_dag_warning_history.cpp
unittest/storage/test_dag_warning_history.cpp
+6
-6
未找到文件。
src/observer/virtual_table/ob_all_virtual_dag_warning_history.cpp
浏览文件 @
ef6137fe
...
...
@@ -96,12 +96,12 @@ int ObAllVirtualDagWarningHistory::fill_cells(ObDagWarningInfo& dag_warning_info
break
;
case
MODULE
:
// module
cells
[
i
].
set_varchar
(
ObDagModuleStr
[
dag_warning_info
.
dag_type_
]
);
cells
[
i
].
set_varchar
(
share
::
ObIDag
::
get_dag_module_str
(
dag_warning_info
.
dag_type_
)
);
cells
[
i
].
set_collation_type
(
ObCharset
::
get_default_collation
(
ObCharset
::
get_default_charset
()));
break
;
case
TYPE
:
{
// dag_type
cells
[
i
].
set_varchar
(
share
::
ObIDag
::
ObIDagTypeStr
[
dag_warning_info
.
dag_type_
]
);
cells
[
i
].
set_varchar
(
share
::
ObIDag
::
get_dag_type_str
(
dag_warning_info
.
dag_type_
)
);
cells
[
i
].
set_collation_type
(
ObCharset
::
get_default_collation
(
ObCharset
::
get_default_charset
()));
break
;
}
...
...
@@ -113,8 +113,7 @@ int ObAllVirtualDagWarningHistory::fill_cells(ObDagWarningInfo& dag_warning_info
}
case
STATUS
:
// dag_status
// dag_ret
cells
[
i
].
set_varchar
(
ObDagStatusStr
[
dag_warning_info
.
dag_status_
]);
cells
[
i
].
set_varchar
(
ObDagWarningInfo
::
get_dag_status_str
(
dag_warning_info
.
dag_status_
));
cells
[
i
].
set_collation_type
(
ObCharset
::
get_default_collation
(
ObCharset
::
get_default_charset
()));
break
;
case
GMT_CREATE
:
...
...
src/share/scheduler/ob_dag_scheduler.cpp
浏览文件 @
ef6137fe
...
...
@@ -181,7 +181,8 @@ void ObITask::prepare_check_cycle()
/********************************************ObIDag impl******************************************/
const
common
::
ObString
ObIDag
::
ObIDagPriorityStr
[
ObIDag
::
DAG_PRIO_MAX
]
=
{
"DAG_PRIO_TRANS_TABLE_MERGE"
,
const
char
*
ObIDag
::
ObIDagPriorityStr
[
ObIDag
::
DAG_PRIO_MAX
]
=
{
"DAG_PRIO_TRANS_TABLE_MERGE"
,
"DAG_PRIO_SSTABLE_MINI_MERGE"
,
"DAG_PRIO_SSTABLE_MINOR_MERGE"
,
"DAG_PRIO_GROUP_MIGRATE"
,
...
...
@@ -192,9 +193,10 @@ const common::ObString ObIDag::ObIDagPriorityStr[ObIDag::DAG_PRIO_MAX] = {"DAG_P
"DAG_PRIO_MIGRATE_LOW"
,
"DAG_PRIO_CREATE_INDEX"
,
"DAG_PRIO_SSTABLE_SPLIT"
,
"DAG_PRIO_VALIDATE"
};
"DAG_PRIO_VALIDATE"
,
};
const
c
ommon
::
ObString
ObIDag
::
ObIDagUpLimitTypeStr
[
ObIDag
::
DAG_ULT_MAX
]
=
{
const
c
har
*
ObIDag
::
ObIDagUpLimitTypeStr
[
ObIDag
::
DAG_ULT_MAX
]
=
{
"DAG_ULT_MINI_MERGE"
,
"DAG_ULT_MINOR_MERGE"
,
"DAG_ULT_GROUP_MIGRATE"
,
...
...
@@ -224,6 +226,27 @@ const char* ObIDag::ObIDagTypeStr[ObIDag::DAG_TYPE_MAX] = {"DAG_UT",
"DAG_FAST_RECOVERY"
"DAG_TYPE_VALIDATE"
};
const
char
*
ObIDag
::
ObIDagModuleStr
[
share
::
ObIDag
::
DAG_TYPE_MAX
]
=
{
"EMPTY"
,
"COMPACTION"
,
"COMPACTION"
,
"INDEX"
,
"SPLIT"
,
"OTHER"
,
"MIGRATE"
,
"COMPACTION"
,
"MIGRATE"
,
"INDEX"
,
"COMPACTION"
,
"TRANS_TABLE_MERGE"
,
"FAST_RECOVERY"
,
"FAST_RECOVERY"
,
"BACKUP"
,
"OTHER"
,
"OTHER"
,
"OTHER"
,
};
ObIDag
::
ObIDag
(
ObIDagType
type
,
ObIDagPriority
priority
)
:
dag_ret_
(
OB_SUCCESS
),
is_inited_
(
false
),
...
...
@@ -473,7 +496,15 @@ int64_t ObIDag::to_string(char* buf, const int64_t buf_len) const
}
else
{
const
int64_t
tenant_id
=
get_tenant_id
();
J_OBJ_START
();
J_KV
(
KP
(
this
),
K_
(
type
),
K_
(
id
),
K_
(
dag_ret
),
K_
(
dag_status
),
K_
(
start_time
),
K
(
tenant_id
));
J_KV
(
KP
(
this
),
K_
(
type
),
"name"
,
get_dag_type_str
(
type_
),
K_
(
id
),
K_
(
dag_ret
),
K_
(
dag_status
),
K_
(
start_time
),
K
(
tenant_id
));
J_OBJ_END
();
}
return
pos
;
...
...
@@ -494,6 +525,50 @@ void ObIDag::gene_warning_info(storage::ObDagWarningInfo& info)
fill_comment
(
info
.
warning_info_
,
OB_DAG_WARNING_INFO_LENGTH
);
}
const
char
*
ObIDag
::
get_dag_type_str
(
enum
ObIDagType
type
)
{
const
char
*
str
=
""
;
if
(
type
>=
DAG_TYPE_MAX
||
type
<
DAG_TYPE_UT
)
{
str
=
"invalid_type"
;
}
else
{
str
=
ObIDagTypeStr
[
type
];
}
return
str
;
}
const
char
*
ObIDag
::
get_dag_module_str
(
enum
ObIDagType
type
)
{
const
char
*
str
=
""
;
if
(
type
>=
DAG_TYPE_MAX
||
type
<
DAG_TYPE_UT
)
{
str
=
"invalid_type"
;
}
else
{
str
=
ObIDagModuleStr
[
type
];
}
return
str
;
}
const
char
*
ObIDag
::
get_dag_prio_str
(
enum
ObIDagPriority
prio
)
{
const
char
*
str
=
""
;
if
(
prio
>=
DAG_PRIO_MAX
||
prio
<
DAG_PRIO_TRANS_TABLE_MERGE
)
{
str
=
"invalid_type"
;
}
else
{
str
=
ObIDagPriorityStr
[
prio
];
}
return
str
;
}
const
char
*
ObIDag
::
get_dag_uplimit_type_str
(
enum
ObIDagUpLimitType
uplimit_type
)
{
const
char
*
str
=
""
;
if
(
uplimit_type
>=
DAG_ULT_MAX
||
uplimit_type
<
DAG_ULT_MINI_MERGE
)
{
str
=
"invalid_type"
;
}
else
{
str
=
ObIDagUpLimitTypeStr
[
uplimit_type
];
}
return
str
;
}
/*************************************ObDagWorker***********************************/
__thread
ObDagWorker
*
ObDagWorker
::
self_
=
NULL
;
...
...
@@ -578,7 +653,7 @@ void ObDagWorker::run1()
COMMON_LOG
(
WARN
,
"dag is null"
,
K
(
ret
),
K
(
task_
));
}
else
{
ObCurTraceId
::
set
(
dag
->
get_dag_id
());
lib
::
set_thread_name
(
dag
->
get_
name
(
));
lib
::
set_thread_name
(
dag
->
get_
dag_type_str
(
dag
->
get_type
()
));
if
(
OB_UNLIKELY
(
ObWorker
::
CompatMode
::
INVALID
==
(
compat_mode
=
static_cast
<
ObWorker
::
CompatMode
>
(
dag
->
get_compat_mode
()))))
{
ret
=
OB_ERR_UNEXPECTED
;
...
...
src/share/scheduler/ob_dag_scheduler.h
浏览文件 @
ef6137fe
...
...
@@ -211,22 +211,23 @@ public:
DAG_PRIO_CREATE_INDEX
,
DAG_PRIO_SSTABLE_SPLIT
,
DAG_PRIO_VALIDATE
,
/* add new item in ObIDagPriorityStr */
DAG_PRIO_MAX
,
};
const
static
c
ommon
::
ObString
ObIDagPriorityStr
[
DAG_PRIO_MAX
];
/* = {
"DAG_PRIO_TRANS_TABLE_MERGE",
"DAG_PRIO_SSTABLE_MINI_MERGE",
"DAG_PRIO_SSTABLE_MINOR_MERGE",
"DAG_PRIO_GROUP_MIGRATE",
"DAG_PRIO_MIGRATE_HIGH",
"DAG_PRIO_MIGRATE_MID",
"DAG_PRIO_SSTABLE_MAJOR_MERGE",
"DAG_PRIO_BACKUP",
"DAG_PRIO_MIGRATE_LOW",
"DAG_PRIO_CREATE_INDEX",
"DAG_PRIO_SSTABLE_SPLIT",
"DAG_PRIO_VALIDATE"
};*/
const
static
c
har
*
ObIDagPriorityStr
[
DAG_PRIO_MAX
];
/* = {
"DAG_PRIO_TRANS_TABLE_MERGE",
"DAG_PRIO_SSTABLE_MINI_MERGE",
"DAG_PRIO_SSTABLE_MINOR_MERGE",
"DAG_PRIO_GROUP_MIGRATE",
"DAG_PRIO_MIGRATE_HIGH",
"DAG_PRIO_MIGRATE_MID",
"DAG_PRIO_SSTABLE_MAJOR_MERGE",
"DAG_PRIO_BACKUP",
"DAG_PRIO_MIGRATE_LOW",
"DAG_PRIO_CREATE_INDEX",
"DAG_PRIO_SSTABLE_SPLIT",
"DAG_PRIO_VALIDATE",
};*/
// We limit the max concurrency of tasks by UpLimitType (ult for short)
// why not simply use Priority? since several priorities may share one UpLimitType
...
...
@@ -243,18 +244,19 @@ public:
DAG_ULT_CREATE_INDEX
=
5
,
DAG_ULT_SPLIT
=
6
,
DAG_ULT_BACKUP
=
7
,
/* add new item in ObIDagUpLimitTypeStr */
DAG_ULT_MAX
,
};
const
static
c
ommon
::
ObString
ObIDagUpLimitTypeStr
[
DAG_ULT_MAX
];
/* = {
"DAG_ULT_MINI_MERGE",
"DAG_ULT_MINOR_MERGE",
"DAG_ULT_GROUP_MIGRATE",
"DAG_ULT_MIGRATE",
"DAG_ULT_MAJOR_MERGE",
"DAG_ULT_CREATE_INDEX",
"DAG_ULT_SPLIT",
"DAG_ULT_BACKUP"
};*/
const
static
c
har
*
ObIDagUpLimitTypeStr
[
DAG_ULT_MAX
];
/* = {
"DAG_ULT_MINI_MERGE",
"DAG_ULT_MINOR_MERGE",
"DAG_ULT_GROUP_MIGRATE",
"DAG_ULT_MIGRATE",
"DAG_ULT_MAJOR_MERGE",
"DAG_ULT_CREATE_INDEX",
"DAG_ULT_SPLIT",
"DAG_ULT_BACKUP",};*/
enum
ObIDagType
{
DAG_TYPE_UT
=
0
,
DAG_TYPE_SSTABLE_MINOR_MERGE
=
1
,
...
...
@@ -274,10 +276,49 @@ public:
DAG_TYPE_SERVER_PREPROCESS
=
15
,
DAG_TYPE_FAST_RECOVERY
=
16
,
DAG_TYPE_VALIDATE
=
17
,
/* add new item in ObIDagTypeStr and ObIDagModuleStr*/
DAG_TYPE_MAX
,
};
const
static
char
*
ObIDagTypeStr
[
DAG_TYPE_MAX
];
const
static
char
*
ObIDagTypeStr
[
DAG_TYPE_MAX
];
/* = {
"DAG_UT",
"DAG_MINOR_MERGE",
"DAG_MAJOR_MERGE",
"DAG_CREATE_INDEX",
"DAG_SSTABLE_SPLIT",
"DAG_UNIQUE_CHECKING",
"DAG_MIGRATE",
"DAG_MAJOR_FINISH",
"DAG_GROUP_MIGRATE",
"DAG_BUILD_INDEX",
"DAG_MINI_MERGE",
"DAG_TRANS_MERGE",
"DAG_RECOVERY_SPLIT",
"DAG_RECOVERY_RECOVER",
"DAG_TYPE_BACKUP",
"DAG_SERVER_PREPROCESS",
"DAG_FAST_RECOVERY",
"DAG_TYPE_VALIDATE",} */
const
static
char
*
ObIDagModuleStr
[
share
::
ObIDag
::
DAG_TYPE_MAX
];
/* = {
"EMPTY",
"COMPACTION",
"COMPACTION",
"INDEX",
"SPLIT",
"OTHER",
"MIGRATE",
"COMPACTION",
"MIGRATE",
"INDEX",
"COMPACTION",
"TRANS_TABLE_MERGE",
"FAST_RECOVERY",
"FAST_RECOVERY",
"BACKUP",
"OTHER",
"OTHER",
"OTHER",};*/
enum
ObDagStatus
{
DAG_STATUS_INITING
=
0
,
...
...
@@ -316,10 +357,10 @@ public:
{
return
type_
;
}
const
char
*
get_name
()
const
{
return
ObIDagTypeStr
[
type_
]
;
}
static
const
char
*
get_dag_type_str
(
enum
ObIDagType
type
);
static
const
char
*
get_dag_prio_str
(
enum
ObIDagPriority
prio
);
static
const
char
*
get_dag_module_str
(
enum
ObIDagType
type
)
;
static
const
char
*
get_dag_uplimit_type_str
(
enum
ObIDagUpLimitType
uplimit_type
);
bool
has_set_stop
()
{
return
is_stop_
;
...
...
src/storage/ob_dag_warning_history_mgr.cpp
浏览文件 @
ef6137fe
...
...
@@ -22,6 +22,19 @@ namespace storage {
* ObDagWarningInfo Func
* */
const
char
*
ObDagWarningInfo
::
ObDagStatusStr
[
ODS_MAX
]
=
{
"WARNING"
,
"RETRYED"
};
const
char
*
ObDagWarningInfo
::
get_dag_status_str
(
enum
ObDagStatus
status
)
{
const
char
*
str
=
""
;
if
(
status
>=
ODS_MAX
||
status
<
ODS_WARNING
)
{
str
=
"invalid_type"
;
}
else
{
str
=
ObDagStatusStr
[
status
];
}
return
str
;
}
ObDagWarningInfo
::
ObDagWarningInfo
()
:
tenant_id_
(
0
),
task_id_
(),
...
...
@@ -31,7 +44,9 @@ ObDagWarningInfo::ObDagWarningInfo()
gmt_create_
(
0
),
gmt_modified_
(
0
),
warning_info_
()
{}
{
MEMSET
(
warning_info_
,
'\0'
,
common
::
OB_DAG_WARNING_INFO_LENGTH
);
}
ObDagWarningInfo
::~
ObDagWarningInfo
()
{
...
...
@@ -86,14 +101,14 @@ int ObDagWarningHistoryManager::add_dag_warning_info(share::ObIDag* dag)
}
else
{
dag
->
gene_warning_info
(
*
info
);
dag
->
gene_basic_warning_info
(
*
info
);
// only once
info
->
dag_status_
=
ODS_WARNING
;
info
->
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
}
}
else
{
STORAGE_LOG
(
WARN
,
"failed to get dag warning info"
,
K
(
ret
),
K
(
key
),
K
(
info
));
}
}
else
{
// update
dag
->
gene_warning_info
(
*
info
);
info
->
dag_status_
=
ODS_RETRYED
;
info
->
dag_status_
=
O
bDagWarningInfo
::
O
DS_RETRYED
;
}
}
return
ret
;
...
...
src/storage/ob_dag_warning_history_mgr.h
浏览文件 @
ef6137fe
...
...
@@ -77,35 +77,18 @@ private:
InfoMap
map_
;
};
enum
ObDagStatus
{
ODS_WARNING
=
0
,
ODS_RETRYED
,
ODS_MAX
,
};
static
common
::
ObString
ObDagStatusStr
[
ODS_MAX
]
=
{
"WARNING"
,
"RETRYED"
};
static
common
::
ObString
ObDagModuleStr
[
share
::
ObIDag
::
DAG_TYPE_MAX
]
=
{
"EMPTY"
,
"COMPACTION"
,
"COMPACTION"
,
"INDEX"
,
"SPLIT"
,
"OTHER"
,
"MIGRATE"
,
"COMPACTION"
,
"MIGRATE"
,
"INDEX"
,
"COMPACTION"
,
"TRANS_TABLE_MERGE"
,
"FAST_RECOVERY"
,
"FAST_RECOVERY"
,
"BACKUP"
,
"OTHER"
,
"OTHER"
,
"OTHER"
};
struct
ObDagWarningInfo
{
public:
enum
ObDagStatus
{
ODS_WARNING
=
0
,
ODS_RETRYED
,
ODS_MAX
,
};
static
const
char
*
ObDagStatusStr
[
ODS_MAX
];
static
const
char
*
get_dag_status_str
(
enum
ObDagStatus
status
);
ObDagWarningInfo
();
~
ObDagWarningInfo
();
void
reset
();
...
...
unittest/storage/test_dag_warning_history.cpp
浏览文件 @
ef6137fe
...
...
@@ -47,7 +47,7 @@ TEST_F(TestDagWarningHistory, simple_add)
ASSERT_EQ
(
OB_SUCCESS
,
ret
);
info
->
dag_ret_
=
-
4016
;
info
->
dag_status_
=
ODS_WARNING
;
info
->
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
info
->
dag_type_
=
share
::
ObIDag
::
DAG_TYPE_SSTABLE_MINOR_MERGE
;
strcpy
(
info
->
warning_info_
,
"table_id=1101710651081571, partition_id=66, mini merge error"
);
...
...
@@ -76,7 +76,7 @@ TEST_F(TestDagWarningHistory, simple_del)
const
int64_t
key
=
8888
;
ASSERT_EQ
(
OB_SUCCESS
,
manager
.
alloc_and_add
(
key
,
info
));
info
->
dag_ret_
=
-
4016
;
info
->
dag_status_
=
ODS_WARNING
;
info
->
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
info
->
dag_type_
=
share
::
ObIDag
::
DAG_TYPE_SSTABLE_MINOR_MERGE
;
strcpy
(
info
->
warning_info_
,
"table_id=1101710651081571, partition_id=66, mini merge error"
);
...
...
@@ -97,7 +97,7 @@ TEST_F(TestDagWarningHistory, simple_loop_get)
ObDagWarningInfo
basic_info
;
basic_info
.
dag_ret_
=
-
4016
;
basic_info
.
dag_status_
=
ODS_WARNING
;
basic_info
.
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
basic_info
.
dag_type_
=
share
::
ObIDag
::
DAG_TYPE_SSTABLE_MINOR_MERGE
;
strcpy
(
basic_info
.
warning_info_
,
"table_id=1101710651081571, partition_id=66, mini merge error"
);
...
...
@@ -108,7 +108,7 @@ TEST_F(TestDagWarningHistory, simple_loop_get)
key
=
8888
+
i
;
ASSERT_EQ
(
OB_SUCCESS
,
manager
.
alloc_and_add
(
key
,
info
));
info
->
dag_ret_
=
-
4016
+
i
;
info
->
dag_status_
=
ODS_WARNING
;
info
->
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
info
->
dag_type_
=
share
::
ObIDag
::
DAG_TYPE_SSTABLE_MINOR_MERGE
;
strcpy
(
info
->
warning_info_
,
"table_id=1101710651081571, partition_id=66, mini merge error"
);
}
...
...
@@ -157,7 +157,7 @@ TEST_F(TestDagWarningHistory, test_rebuild)
const
int64_t
max_cnt
=
20
;
ObDagWarningInfo
basic_info
;
basic_info
.
dag_ret_
=
-
4016
;
basic_info
.
dag_status_
=
ODS_WARNING
;
basic_info
.
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
basic_info
.
dag_type_
=
share
::
ObIDag
::
DAG_TYPE_SSTABLE_MINOR_MERGE
;
strcpy
(
basic_info
.
warning_info_
,
"table_id=1101710651081571, partition_id=66, mini merge error"
);
...
...
@@ -167,7 +167,7 @@ TEST_F(TestDagWarningHistory, test_rebuild)
key
=
8888
+
i
;
ASSERT_EQ
(
OB_SUCCESS
,
manager
.
alloc_and_add
(
key
,
info
));
info
->
dag_ret_
=
-
4016
+
i
;
info
->
dag_status_
=
ODS_WARNING
;
info
->
dag_status_
=
O
bDagWarningInfo
::
O
DS_WARNING
;
info
->
dag_type_
=
share
::
ObIDag
::
DAG_TYPE_SSTABLE_MINOR_MERGE
;
strcpy
(
info
->
warning_info_
,
"table_id=1101710651081571, partition_id=66, mini merge error"
);
STORAGE_LOG
(
DEBUG
,
"print info"
,
K
(
ret
),
K
(
i
),
K
(
key
),
KPC
(
info
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录