Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
af070148
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
af070148
编写于
10月 18, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17394 from taosdata/enh/TD-19463_2
fix: crash while reconfig mnode will cause taosd reboot failure
上级
3ec553aa
0f6ba60d
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
129 addition
and
69 deletion
+129
-69
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+12
-7
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+20
-12
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+4
-0
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+20
-0
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+3
-4
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+38
-45
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+13
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+19
-1
未找到文件。
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
af070148
...
...
@@ -603,22 +603,27 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
return
0
;
}
if
(
mndAcquireRpc
(
pMsg
->
info
.
node
)
==
0
)
return
0
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
const
char
*
role
=
syncGetMyRoleStr
(
pMnode
->
syncMgmt
.
sync
);
bool
restored
=
syncIsRestoreFinish
(
pMnode
->
syncMgmt
.
sync
);
if
(
pMsg
->
msgType
==
TDMT_MND_MQ_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TELEM_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TRANS_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TTL_TIMER
||
pMsg
->
msgType
==
TDMT_MND_UPTIME_TIMER
)
{
mTrace
(
"timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s "
,
pMnode
->
restored
,
pMnode
->
stopped
,
restored
,
role
);
return
-
1
;
}
SEpSet
epSet
=
{
0
}
;
S
Mnode
*
pMnode
=
pMsg
->
info
.
node
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
S
EpSet
epSet
=
{
0
}
;
mndGetMnodeEpSet
(
pMnode
,
&
epSet
);
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
mDebug
(
"msg:%p,
failed to check mnode state since %s, mnode restored:%d stopped:%d, sync restored:%d role:%s type:%s
"
"numOfEps:%d inUse:%d"
,
pMsg
,
terrstr
(),
pMnode
->
restored
,
pMnode
->
stopped
,
syncIsRestoreFinish
(
pMnode
->
syncMgmt
.
sync
)
,
syncGetMyRoleStr
(
pMnode
->
syncMgmt
.
sync
),
TMSG_INFO
(
pMsg
->
msgType
),
epSet
.
numOfEps
,
epSet
.
inUse
);
"msg:%p,
type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d
"
"
role:%s, redirect
numOfEps:%d inUse:%d"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
(),
pMnode
->
restored
,
pMnode
->
stopped
,
restored
,
role
,
epSet
.
numOfEps
,
epSet
.
inUse
);
if
(
epSet
.
numOfEps
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
af070148
...
...
@@ -394,7 +394,6 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if
(
mndSetCreateMnodeRedoActions
(
pMnode
,
pTrans
,
pDnode
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeRedoLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeCommitLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndTransAppendNullLog
(
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
...
...
@@ -478,7 +477,6 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
static
int32_t
mndSetDropMnodeRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SMnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SDDropMnodeReq
dropReq
=
{
0
};
SEpSet
dropEpSet
=
{
0
};
...
...
@@ -505,9 +503,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
int32_t
mndSetDropMnodeInfoToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMnodeObj
*
pObj
)
{
if
(
pObj
==
NULL
)
return
0
;
if
(
mndSetDropMnodeCommitLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
return
-
1
;
if
(
mndSetDropMnodeRedoActions
(
pMnode
,
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
return
-
1
;
if
(
mnd
TransAppendNullLog
(
pTrans
)
!=
0
)
return
-
1
;
if
(
mnd
SetDropMnodeCommitLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
return
-
1
;
return
0
;
}
...
...
@@ -715,7 +712,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
SMnodeObj
*
pObj
=
NULL
;
ESdbStatus
objStatus
=
0
;
void
*
pIter
=
NULL
;
bool
hasUpdatingMnode
=
false
;
int32_t
updatingMnodes
=
0
;
int32_t
readyMnodes
=
0
;
SSyncCfg
cfg
=
{.
myIndex
=
-
1
};
while
(
1
)
{
...
...
@@ -723,7 +721,11 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
if
(
pIter
==
NULL
)
break
;
if
(
objStatus
==
SDB_STATUS_CREATING
||
objStatus
==
SDB_STATUS_DROPPING
)
{
mInfo
(
"vgId:1, has updating mnode:%d, status:%s"
,
pObj
->
id
,
sdbStatusName
(
objStatus
));
hasUpdatingMnode
=
true
;
updatingMnodes
++
;
}
if
(
objStatus
==
SDB_STATUS_READY
)
{
mInfo
(
"vgId:1, has ready mnode:%d, status:%s"
,
pObj
->
id
,
sdbStatusName
(
objStatus
));
readyMnodes
++
;
}
if
(
objStatus
==
SDB_STATUS_READY
||
objStatus
==
SDB_STATUS_CREATING
)
{
...
...
@@ -739,18 +741,24 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
sdbReleaseLock
(
pSdb
,
pObj
,
false
);
}
if
(
cfg
.
myIndex
==
-
1
)
{
mInfo
(
"vgId:1, mnode
not reload since selfIndex is -1"
);
if
(
readyMnodes
<=
0
||
updatingMnodes
<=
0
)
{
mInfo
(
"vgId:1, mnode
sync not reconfig since readyMnodes:%d updatingMnodes:%d"
,
readyMnodes
,
updatingMnodes
);
return
;
}
if
(
!
mndGetRestored
(
pMnode
))
{
mInfo
(
"vgId:1, mnode not reload since restore not finished"
);
if
(
cfg
.
myIndex
==
-
1
)
{
#if 1
mInfo
(
"vgId:1, mnode sync not reconfig since selfIndex is -1"
);
#else
// cannot reconfig because the leader may fail to elect after reboot
mInfo
(
"vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper"
);
syncStop
(
pMnode
->
syncMgmt
.
sync
);
#endif
return
;
}
if
(
hasUpdatingMnode
)
{
mInfo
(
"vgId:1,
start to reload mnode sync
, replica:%d myIndex:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
);
if
(
updatingMnodes
>
0
)
{
mInfo
(
"vgId:1,
mnode sync reconfig
, replica:%d myIndex:%d"
,
cfg
.
replicaNum
,
cfg
.
myIndex
);
for
(
int32_t
i
=
0
;
i
<
cfg
.
replicaNum
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
mInfo
(
"vgId:1, index:%d, fqdn:%s port:%d"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
af070148
...
...
@@ -403,6 +403,10 @@ const char *sdbStatusName(ESdbStatus status);
void
sdbPrintOper
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
,
const
char
*
oper
);
int32_t
sdbGetIdFromRaw
(
SSdb
*
pSdb
,
SSdbRaw
*
pRaw
);
void
sdbWriteLock
(
SSdb
*
pSdb
,
int32_t
type
);
void
sdbReadLock
(
SSdb
*
pSdb
,
int32_t
type
);
void
sdbUnLock
(
SSdb
*
pSdb
,
int32_t
type
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
af070148
...
...
@@ -181,3 +181,23 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config
pSdb
->
applyIndex
,
pSdb
->
applyTerm
,
pSdb
->
applyConfig
,
*
index
,
*
term
,
*
config
);
#endif
}
void
sdbWriteLock
(
SSdb
*
pSdb
,
int32_t
type
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
// mTrace("sdb table:%d start write lock:%p", type, pLock);
taosThreadRwlockWrlock
(
pLock
);
// mTrace("sdb table:%d stop write lock:%p", type, pLock);
}
void
sdbReadLock
(
SSdb
*
pSdb
,
int32_t
type
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
// mTrace("sdb table:%d start read lock:%p", type, pLock);
taosThreadRwlockRdlock
(
pLock
);
// mTrace("sdb table:%d stop read lock:%p", type, pLock);
}
void
sdbUnLock
(
SSdb
*
pSdb
,
int32_t
type
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
// mTrace("sdb table:%d unlock:%p", type, pLock);
taosThreadRwlockUnlock
(
pLock
);
}
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
af070148
...
...
@@ -363,9 +363,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mInfo
(
"write %s to sdb file, total %d rows"
,
sdbTableName
(
i
),
sdbGetSize
(
pSdb
,
i
));
SHashObj
*
hash
=
pSdb
->
hashObjs
[
i
];
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
i
];
taosThreadRwlockWrlock
(
pLock
);
SHashObj
*
hash
=
pSdb
->
hashObjs
[
i
];
sdbWriteLock
(
pSdb
,
i
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
ppRow
!=
NULL
)
{
...
...
@@ -410,7 +409,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
sdbFreeRaw
(
pRaw
);
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
i
);
}
if
(
code
==
0
)
{
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
af070148
...
...
@@ -133,12 +133,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
}
static
int32_t
sdbInsertRow
(
SSdb
*
pSdb
,
SHashObj
*
hash
,
SSdbRaw
*
pRaw
,
SSdbRow
*
pRow
,
int32_t
keySize
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pRow
->
type
]
;
taosThreadRwlockWrlock
(
pLock
);
int32_t
type
=
pRow
->
type
;
sdbWriteLock
(
pSdb
,
type
);
SSdbRow
*
pOldRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
pOldRow
!=
NULL
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
TSDB_CODE_SDB_OBJ_ALREADY_THERE
;
return
terrno
;
...
...
@@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper
(
pSdb
,
pRow
,
"insert"
);
if
(
taosHashPut
(
hash
,
pRow
->
pObj
,
keySize
,
&
pRow
,
sizeof
(
void
*
))
!=
0
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
terrno
;
...
...
@@ -164,12 +164,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove
(
hash
,
pRow
->
pObj
,
keySize
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
code
;
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
return
terrno
;
}
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
if
(
pSdb
->
keyTypes
[
pRow
->
type
]
==
SDB_KEY_INT32
)
{
pSdb
->
maxId
[
pRow
->
type
]
=
TMAX
(
pSdb
->
maxId
[
pRow
->
type
],
*
((
int32_t
*
)
pRow
->
pObj
));
...
...
@@ -183,26 +183,27 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
static
int32_t
sdbUpdateRow
(
SSdb
*
pSdb
,
SHashObj
*
hash
,
SSdbRaw
*
pRaw
,
SSdbRow
*
pNewRow
,
int32_t
keySize
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pNewRow
->
type
]
;
taosThreadRwlockWrlock
(
pLock
);
int32_t
type
=
pNewRow
->
type
;
sdbWriteLock
(
pSdb
,
type
);
SSdbRow
**
ppOldRow
=
taosHashGet
(
hash
,
pNewRow
->
pObj
,
keySize
);
if
(
ppOldRow
==
NULL
||
*
ppOldRow
==
NULL
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
return
sdbInsertRow
(
pSdb
,
hash
,
pRaw
,
pNewRow
,
keySize
);
}
SSdbRow
*
pOldRow
=
*
ppOldRow
;
pOldRow
->
status
=
pRaw
->
status
;
sdbPrintOper
(
pSdb
,
pOldRow
,
"update"
);
sdbUnLock
(
pSdb
,
type
);
int32_t
code
=
0
;
SdbUpdateFp
updateFp
=
pSdb
->
updateFps
[
pNewRow
->
type
];
SdbUpdateFp
updateFp
=
pSdb
->
updateFps
[
type
];
if
(
updateFp
!=
NULL
)
{
code
=
(
*
updateFp
)(
pSdb
,
pOldRow
->
pObj
,
pNewRow
->
pObj
);
}
taosThreadRwlockUnlock
(
pLock
);
// sdbUnLock(pSdb, type
);
sdbFreeRow
(
pSdb
,
pNewRow
,
false
);
pSdb
->
tableVer
[
pOldRow
->
type
]
++
;
...
...
@@ -210,12 +211,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
static
int32_t
sdbDeleteRow
(
SSdb
*
pSdb
,
SHashObj
*
hash
,
SSdbRaw
*
pRaw
,
SSdbRow
*
pRow
,
int32_t
keySize
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pRow
->
type
]
;
taosThreadRwlockWrlock
(
pLock
);
int32_t
type
=
pRow
->
type
;
sdbWriteLock
(
pSdb
,
type
);
SSdbRow
**
ppOldRow
=
taosHashGet
(
hash
,
pRow
->
pObj
,
keySize
);
if
(
ppOldRow
==
NULL
||
*
ppOldRow
==
NULL
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
terrno
=
TSDB_CODE_SDB_OBJ_NOT_THERE
;
return
terrno
;
...
...
@@ -228,7 +229,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove
(
hash
,
pOldRow
->
pObj
,
keySize
);
pSdb
->
tableVer
[
pOldRow
->
type
]
++
;
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
sdbFreeRow
(
pSdb
,
pRow
,
false
);
...
...
@@ -282,12 +283,11 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
void
*
pRet
=
NULL
;
int32_t
keySize
=
sdbGetkeySize
(
pSdb
,
type
,
pKey
);
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
taosThreadRwlockRdlock
(
pLock
);
sdbReadLock
(
pSdb
,
type
);
SSdbRow
**
ppRow
=
taosHashGet
(
hash
,
pKey
,
keySize
);
if
(
ppRow
==
NULL
||
*
ppRow
==
NULL
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
terrno
=
TSDB_CODE_SDB_OBJ_NOT_THERE
;
return
NULL
;
}
...
...
@@ -310,13 +310,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break
;
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
return
pRet
;
}
static
void
sdbCheckRow
(
SSdb
*
pSdb
,
SSdbRow
*
pRow
)
{
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pRow
->
type
]
;
taosThreadRwlockWrlock
(
pLock
);
int32_t
type
=
pRow
->
type
;
sdbWriteLock
(
pSdb
,
type
);
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
sdbPrintOper
(
pSdb
,
pRow
,
"check"
);
...
...
@@ -324,7 +324,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow
(
pSdb
,
pRow
,
true
);
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
}
void
sdbReleaseLock
(
SSdb
*
pSdb
,
void
*
pObj
,
bool
lock
)
{
...
...
@@ -333,9 +333,9 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
SSdbRow
*
pRow
=
(
SSdbRow
*
)((
char
*
)
pObj
-
sizeof
(
SSdbRow
));
if
(
pRow
->
type
>=
SDB_MAX
)
return
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pRow
->
type
]
;
int32_t
type
=
pRow
->
type
;
if
(
lock
)
{
taosThreadRwlockWrlock
(
pLock
);
sdbWriteLock
(
pSdb
,
type
);
}
int32_t
ref
=
atomic_sub_fetch_32
(
&
pRow
->
refCount
,
1
);
...
...
@@ -345,7 +345,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) {
}
if
(
lock
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
}
}
...
...
@@ -357,8 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
NULL
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
taosThreadRwlockRdlock
(
pLock
);
sdbReadLock
(
pSdb
,
type
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
pIter
);
while
(
ppRow
!=
NULL
)
{
...
...
@@ -373,7 +372,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*
ppObj
=
pRow
->
pObj
;
break
;
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
return
ppRow
;
}
...
...
@@ -384,9 +383,8 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
NULL
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
if
(
lock
)
{
taosThreadRwlockRdlock
(
pLock
);
sdbReadLock
(
pSdb
,
type
);
}
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
pIter
);
...
...
@@ -404,7 +402,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat
break
;
}
if
(
lock
)
{
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
}
return
ppRow
;
...
...
@@ -416,18 +414,17 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
pRow
->
type
);
if
(
hash
==
NULL
)
return
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
pRow
->
type
]
;
taosThreadRwlockRdlock
(
pLock
);
int32_t
type
=
pRow
->
type
;
sdbReadLock
(
pSdb
,
type
);
taosHashCancelIterate
(
hash
,
pIter
);
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
}
void
sdbTraverse
(
SSdb
*
pSdb
,
ESdbType
type
,
sdbTraverseFp
fp
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
taosThreadRwlockRdlock
(
pLock
);
sdbReadLock
(
pSdb
,
type
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
ppRow
!=
NULL
)
{
...
...
@@ -443,17 +440,16 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
}
int32_t
sdbGetSize
(
SSdb
*
pSdb
,
ESdbType
type
)
{
SHashObj
*
hash
=
sdbGetHash
(
pSdb
,
type
);
if
(
hash
==
NULL
)
return
0
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
taosThreadRwlockRdlock
(
pLock
);
sdbReadLock
(
pSdb
,
type
);
int32_t
size
=
taosHashGetSize
(
hash
);
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
return
size
;
}
...
...
@@ -465,9 +461,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
if
(
pSdb
->
keyTypes
[
type
]
!=
SDB_KEY_INT32
)
return
-
1
;
int32_t
maxId
=
0
;
TdThreadRwlock
*
pLock
=
&
pSdb
->
locks
[
type
];
taosThreadRwlockRdlock
(
pLock
);
sdbReadLock
(
pSdb
,
type
);
SSdbRow
**
ppRow
=
taosHashIterate
(
hash
,
NULL
);
while
(
ppRow
!=
NULL
)
{
...
...
@@ -477,8 +471,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
ppRow
=
taosHashIterate
(
hash
,
ppRow
);
}
taosThreadRwlockUnlock
(
pLock
);
sdbUnLock
(
pSdb
,
type
);
maxId
=
TMAX
(
maxId
,
pSdb
->
maxId
[
type
]);
return
maxId
+
1
;
}
...
...
source/libs/sync/src/syncElection.c
浏览文件 @
af070148
...
...
@@ -96,6 +96,19 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeCandidate2Leader
(
pSyncNode
);
pSyncNode
->
pVotesGranted
->
toLeader
=
true
;
return
ret
;
}
if
(
pSyncNode
->
replicaNum
==
1
)
{
// only myself, to leader
voteGrantedUpdate
(
pSyncNode
->
pVotesGranted
,
pSyncNode
);
votesRespondUpdate
(
pSyncNode
->
pVotesRespond
,
pSyncNode
);
pSyncNode
->
quorum
=
syncUtilQuorum
(
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
);
syncNodeCandidate2Leader
(
pSyncNode
);
pSyncNode
->
pVotesGranted
->
toLeader
=
true
;
return
ret
;
}
switch
(
pSyncNode
->
pRaftCfg
->
snapshotStrategy
)
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
af070148
...
...
@@ -1891,8 +1891,26 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
return
b1
;
}
static
bool
syncIsConfigChanged
(
const
SSyncCfg
*
pOldCfg
,
const
SSyncCfg
*
pNewCfg
)
{
if
(
pOldCfg
->
replicaNum
!=
pNewCfg
->
replicaNum
)
return
true
;
if
(
pOldCfg
->
myIndex
!=
pNewCfg
->
myIndex
)
return
true
;
for
(
int32_t
i
=
0
;
i
<
pOldCfg
->
replicaNum
;
++
i
)
{
const
SNodeInfo
*
pOldInfo
=
&
pOldCfg
->
nodeInfo
[
i
];
const
SNodeInfo
*
pNewInfo
=
&
pNewCfg
->
nodeInfo
[
i
];
if
(
strcmp
(
pOldInfo
->
nodeFqdn
,
pNewInfo
->
nodeFqdn
)
!=
0
)
return
true
;
if
(
pOldInfo
->
nodePort
!=
pNewInfo
->
nodePort
)
return
true
;
}
return
false
;
}
void
syncNodeDoConfigChange
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
pNewConfig
,
SyncIndex
lastConfigChangeIndex
)
{
SSyncCfg
oldConfig
=
pSyncNode
->
pRaftCfg
->
cfg
;
if
(
!
syncIsConfigChanged
(
&
oldConfig
,
pNewConfig
))
{
sInfo
(
"vgId:1, sync not reconfig since not changed"
);
return
;
}
pSyncNode
->
pRaftCfg
->
cfg
=
*
pNewConfig
;
pSyncNode
->
pRaftCfg
->
lastConfigIndex
=
lastConfigChangeIndex
;
...
...
@@ -2264,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
//
ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader
(
pSyncNode
,
"candidate to leader"
);
syncNodeLog2
(
"==state change syncNodeCandidate2Leader=="
,
pSyncNode
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录