Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8f55933b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8f55933b
编写于
12月 30, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 30, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9513 from taosdata/feature/dnode3
Improve SDB traversal efficiency
上级
164a80e9
60b56c66
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
115 addition
and
72 deletion
+115
-72
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+14
-2
include/util/tlog.h
include/util/tlog.h
+0
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+2
-2
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+3
-2
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+1
-0
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+5
-5
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+65
-59
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+24
-0
未找到文件。
include/dnode/mnode/sdb/sdb.h
浏览文件 @
8f55933b
...
...
@@ -181,6 +181,7 @@ typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj);
typedef
int32_t
(
*
SdbDeployFp
)(
SMnode
*
pMnode
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
bool
(
*
sdbTraverseFp
)(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
);
typedef
struct
{
ESdbType
sdbType
;
...
...
@@ -279,7 +280,7 @@ void sdbRelease(SSdb *pSdb, void *pObj);
*
* @param pSdb The sdb object.
* @param type The type of the table.
* @param
type
The initial iterator of the table.
* @param
pIter
The initial iterator of the table.
* @param pObj The object of the row just fetched.
* @return void* The next iterator of the table.
*/
...
...
@@ -289,11 +290,22 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
* @brief Cancel a traversal
*
* @param pSdb The sdb object.
* @param pIter The iterator of the table.
* @param type The initial iterator of table.
*/
void
sdbCancelFetch
(
SSdb
*
pSdb
,
void
*
pIter
);
/**
* @brief Traverse a sdb
*
* @param pSdb The sdb object.
* @param type The initial iterator of table.
* @param fp The function pointer.
* @param p1 The callback param.
* @param p2 The callback param.
* @param p3 The callback param.
*/
void
sdbTraverse
(
SSdb
*
pSdb
,
ESdbType
type
,
sdbTraverseFp
fp
,
void
*
p1
,
void
*
p2
,
void
*
p3
);
/**
* @brief Get the number of rows in the table
*
...
...
include/util/tlog.h
浏览文件 @
8f55933b
...
...
@@ -37,7 +37,6 @@ extern int32_t mqttDebugFlag;
extern
int32_t
monDebugFlag
;
extern
int32_t
uDebugFlag
;
extern
int32_t
rpcDebugFlag
;
extern
int32_t
odbcDebugFlag
;
extern
int32_t
qDebugFlag
;
extern
int32_t
wDebugFlag
;
extern
int32_t
sDebugFlag
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8f55933b
...
...
@@ -117,7 +117,7 @@ typedef struct {
int64_t
rebootTime
;
int64_t
lastAccessTime
;
int32_t
accessTimes
;
int
16
_t
numOfVnodes
;
int
32
_t
numOfVnodes
;
int32_t
numOfSupportVnodes
;
int32_t
numOfCores
;
EDndReason
offlineReason
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
8f55933b
...
...
@@ -549,7 +549,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
return
0
;
}
static
int32_t
mndSetUpdateDbCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
static
int32_t
mndSetUpdateDbCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
SSdbRaw
*
pCommitRaw
=
mndDbActionEncode
(
pNewDb
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
...
...
@@ -725,7 +725,7 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
static
int32_t
mndBuildDropVgroupAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
STransAction
action
=
{
0
};
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
8f55933b
...
...
@@ -196,6 +196,8 @@ static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
sdbCancelFetch
(
pSdb
,
pIter
);
return
pDnode
;
}
sdbRelease
(
pSdb
,
pDnode
);
}
return
NULL
;
...
...
@@ -419,10 +421,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
static
int32_t
mndProcessCreateDnodeMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SCreateDnodeMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
pCreate
->
port
=
htonl
(
pCreate
->
port
);
mDebug
(
"dnode:%s:%d, start to create"
,
pCreate
->
fqdn
,
pCreate
->
port
);
pCreate
->
port
=
htonl
(
pCreate
->
port
);
if
(
pCreate
->
fqdn
[
0
]
==
0
||
pCreate
->
port
<=
0
||
pCreate
->
port
>
UINT16_MAX
)
{
terrno
=
TSDB_CODE_MND_INVALID_DNODE_EP
;
mError
(
"dnode:%s:%d, failed to create since %s"
,
pCreate
->
fqdn
,
pCreate
->
port
,
terrstr
());
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
8f55933b
...
...
@@ -219,6 +219,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
}
pEpSet
->
numOfEps
++
;
sdbRelease
(
pSdb
,
pObj
);
}
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
8f55933b
...
...
@@ -79,17 +79,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for
(
int32_t
i
=
0
;
i
<
redoLogNum
;
++
i
)
{
SSdbRaw
*
pTmp
=
taosArrayGetP
(
pTrans
->
redoLogs
,
i
);
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
rawDataLen
+=
(
sdbGetRawTotalSize
(
pTmp
)
+
4
);
}
for
(
int32_t
i
=
0
;
i
<
undoLogNum
;
++
i
)
{
SSdbRaw
*
pTmp
=
taosArrayGetP
(
pTrans
->
undoLogs
,
i
);
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
rawDataLen
+=
(
sdbGetRawTotalSize
(
pTmp
)
+
4
);
}
for
(
int32_t
i
=
0
;
i
<
commitLogNum
;
++
i
)
{
SSdbRaw
*
pTmp
=
taosArrayGetP
(
pTrans
->
commitLogs
,
i
);
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
rawDataLen
+=
(
sdbGetRawTotalSize
(
pTmp
)
+
4
);
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
...
...
@@ -437,7 +437,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
static
int32_t
mndTransSync
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to
decode
trans since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to
encode while sync
trans since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
...
...
@@ -835,7 +835,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
SSdbRaw
*
pRaw
=
mndTransActionEncode
(
pTrans
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to
decode
trans since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to
encode while finish
trans since %s"
,
pTrans
->
id
,
terrstr
());
}
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_DROPPED
);
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
8f55933b
...
...
@@ -238,39 +238,48 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p
return
pDrop
;
}
static
bool
mndResetDnodesArrayFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SDnodeObj
*
pDnode
=
pObj
;
pDnode
->
numOfVnodes
=
0
;
return
true
;
}
static
bool
mndBuildDnodesArrayFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SDnodeObj
*
pDnode
=
pObj
;
SArray
*
pArray
=
p1
;
pDnode
->
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
if
(
online
&&
pDnode
->
numOfSupportVnodes
>
0
)
{
taosArrayPush
(
pArray
,
pDnode
);
}
bool
isMnode
=
mndIsMnode
(
pMnode
,
pDnode
->
id
);
mDebug
(
"dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
online
);
if
(
isMnode
)
{
pDnode
->
numOfVnodes
++
;
}
return
true
;
}
static
SArray
*
mndBuildDnodesArray
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfDnodes
=
mndGetDnodeSize
(
pMnode
);
SArray
*
pArray
=
taosArrayInit
(
numOfDnodes
,
sizeof
(
SDnodeObj
));
if
(
pArray
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
void
*
pIter
=
NULL
;
while
(
1
)
{
SDnodeObj
*
pDnode
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_DNODE
,
pIter
,
(
void
**
)
&
pDnode
);
if
(
pIter
==
NULL
)
break
;
int32_t
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
bool
isMnode
=
mndIsMnode
(
pMnode
,
pDnode
->
id
);
if
(
isMnode
)
{
pDnode
->
numOfVnodes
++
;
}
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
if
(
online
)
{
taosArrayPush
(
pArray
,
pDnode
);
}
mDebug
(
"dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d"
,
pDnode
->
id
,
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
online
);
sdbRelease
(
pSdb
,
pDnode
);
}
sdbTraverse
(
pSdb
,
SDB_DNODE
,
mndResetDnodesArrayFp
,
NULL
,
NULL
,
NULL
);
sdbTraverse
(
pSdb
,
SDB_DNODE
,
mndBuildDnodesArrayFp
,
pArray
,
NULL
,
NULL
);
return
pArray
;
}
...
...
@@ -302,7 +311,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
pVgid
->
role
=
TAOS_SYNC_STATE_FOLLOWER
;
}
mDebug
(
"db:%s, vgId:%d, v
index:%d dnodeId
:%d is alloced"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
v
,
pVgid
->
dnodeId
);
mDebug
(
"db:%s, vgId:%d, v
n:%d dnode
:%d is alloced"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
v
,
pVgid
->
dnodeId
);
pDnode
->
numOfVnodes
++
;
}
...
...
@@ -412,6 +421,20 @@ static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static
int32_t
mndProcessCompactVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
bool
mndGetVgroupMaxReplicaFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SVgObj
*
pVgroup
=
pObj
;
int64_t
uid
=
*
(
int64_t
*
)
p1
;
int8_t
*
pReplica
=
p2
;
int32_t
*
pNumOfVgroups
=
p3
;
if
(
pVgroup
->
dbUid
==
uid
)
{
*
pReplica
=
MAX
(
*
pReplica
,
pVgroup
->
replica
);
(
*
pNumOfVgroups
)
++
;
}
return
true
;
}
static
int32_t
mndGetVgroupMaxReplica
(
SMnode
*
pMnode
,
char
*
dbName
,
int8_t
*
pReplica
,
int32_t
*
pNumOfVgroups
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
...
...
@@ -420,25 +443,10 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return
-
1
;
}
int8_t
replica
=
1
;
int32_t
numOfVgroups
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
==
pDb
->
uid
)
{
replica
=
MAX
(
replica
,
pVgroup
->
replica
);
numOfVgroups
++
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
*
pReplica
=
replica
;
*
pNumOfVgroups
=
numOfVgroups
;
*
pReplica
=
1
;
*
pNumOfVgroups
=
0
;
sdbTraverse
(
pSdb
,
SDB_VGROUP
,
mndGetVgroupMaxReplicaFp
,
&
pDb
->
uid
,
pReplica
,
pNumOfVgroups
);
mndReleaseDb
(
pMnode
,
pDb
);
return
0
;
}
...
...
@@ -540,25 +548,23 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
sdbCancelFetch
(
pSdb
,
pIter
);
}
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
S
Sdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfVnodes
=
0
;
void
*
pIter
=
NULL
;
static
bool
mndGetVnodesNumFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
S
VgObj
*
pVgroup
=
pObj
;
int32_t
dnodeId
=
*
(
int32_t
*
)
p1
;
int32_t
*
pNumOfVnodes
=
(
int32_t
*
)
p2
;
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
if
(
pVgroup
->
vnodeGid
[
v
].
dnodeId
==
dnodeId
)
{
numOfVnodes
++
;
}
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
if
(
pVgroup
->
vnodeGid
[
v
].
dnodeId
==
dnodeId
)
{
(
*
pNumOfVnodes
)
++
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
true
;
}
int32_t
mndGetVnodesNum
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
int32_t
numOfVnodes
=
0
;
sdbTraverse
(
pMnode
->
pSdb
,
SDB_VGROUP
,
mndGetVnodesNumFp
,
&
dnodeId
,
&
numOfVnodes
,
NULL
);
return
numOfVnodes
;
}
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
8f55933b
...
...
@@ -336,6 +336,30 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
taosRUnLockLatch
(
pLock
);
}
void
sdbTraverse
(
SSdb
*
pSdb
,
ESdbType
type
,
sdbTraverseFp
fp
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
;
SRWLatch
*
pLock
=
&
pSdb
->
locks
[
type
];
taosRLockLatch
(
pLock
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
ppRow
!=
NULL
)
{
SSdbRow
*
pRow
=
*
ppRow
;
if
(
pRow
->
status
==
SDB_STATUS_READY
)
{
bool
isContinue
=
(
*
fp
)(
pSdb
->
pMnode
,
pRow
->
pObj
,
p1
,
p2
,
p3
);
if
(
!
isContinue
)
{
taosHashCancelIterate
(
hash
,
ppRow
);
break
;
}
}
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
}
taosRUnLockLatch
(
pLock
);
}
int32_t
sdbGetSize
(
SSdb
*
pSdb
,
ESdbType
type
)
{
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录