Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c7b24f1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4c7b24f1
编写于
6月 15, 2022
作者:
L
Li Minghao
提交者:
GitHub
6月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13861 from taosdata/feature/3.0_mhli
enh(sync): adjust errno
上级
f445c59a
13eaf5fc
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
128 addition
and
88 deletion
+128
-88
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-9
include/util/taoserror.h
include/util/taoserror.h
+2
-0
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+10
-5
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+9
-9
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+6
-3
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+8
-19
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+86
-39
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+1
-1
source/libs/sync/test/syncConfigChangeTest.cpp
source/libs/sync/test/syncConfigChangeTest.cpp
+1
-1
source/libs/sync/test/syncReplicateTest.cpp
source/libs/sync/test/syncReplicateTest.cpp
+1
-1
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+2
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
4c7b24f1
...
...
@@ -24,7 +24,7 @@ extern "C" {
#include "tdef.h"
#include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN
0
#define SYNC_INDEX_INVALID -1
typedef
uint64_t
SyncNodeId
;
...
...
@@ -44,14 +44,6 @@ typedef enum {
TAOS_SYNC_STATE_ERROR
=
103
,
}
ESyncState
;
typedef
enum
{
TAOS_SYNC_PROPOSE_SUCCESS
=
0
,
TAOS_SYNC_PROPOSE_NOT_LEADER
=
1
,
TAOS_SYNC_ONLY_ONE_REPLICA
=
2
,
TAOS_SYNC_NOT_IN_NEW_CONFIG
=
3
,
TAOS_SYNC_OTHER_ERROR
=
100
,
}
ESyncProposeCode
;
typedef
enum
{
TAOS_SYNC_FSM_CB_SUCCESS
=
0
,
TAOS_SYNC_FSM_CB_OTHER_ERROR
=
1
,
...
...
include/util/taoserror.h
浏览文件 @
4c7b24f1
...
...
@@ -411,6 +411,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
4c7b24f1
...
...
@@ -380,17 +380,19 @@ void mndStop(SMnode *pMnode) {
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
int32_t
code
=
TAOS_SYNC_OTHER_ERROR
;
int32_t
code
=
0
;
if
(
!
syncEnvIsStart
())
{
mError
(
"failed to process sync msg:%p type:%s since syncEnv stop"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMgmt
->
sync
);
if
(
pSyncNode
==
NULL
)
{
mError
(
"failed to process sync msg:%p type:%s since syncNode is null"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
char
logBuf
[
512
]
=
{
0
};
...
...
@@ -451,7 +453,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp
(
&
rsp
);
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
TAOS_SYNC_OTHER_ERROR
;
code
=
-
1
;
}
}
else
{
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
...
...
@@ -492,10 +494,13 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp
(
&
rsp
);
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
TAOS_SYNC_OTHER_ERROR
;
code
=
-
1
;
}
}
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
code
;
}
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
4c7b24f1
...
...
@@ -234,9 +234,9 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
rsp
,
isWeak
);
if
(
code
==
0
)
{
tsem_wait
(
&
pMgmt
->
syncSem
);
}
else
if
(
code
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
else
if
(
code
==
TAOS_SYNC_OTHER
_ERROR
)
{
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_INTERNAL
_ERROR
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
else
{
terrno
=
TSDB_CODE_APP_ERROR
;
...
...
@@ -257,13 +257,13 @@ void mndSyncStart(SMnode *pMnode) {
syncStart
(
pMgmt
->
sync
);
mDebug
(
"mnode sync started, id:%"
PRId64
" standby:%d"
,
pMgmt
->
sync
,
pMgmt
->
standby
);
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
/*
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
*/
}
void
mndSyncStop
(
SMnode
*
pMnode
)
{}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
4c7b24f1
...
...
@@ -296,7 +296,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
}
int32_t
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
ret
=
TAOS_SYNC_OTHER_ERROR
;
int32_t
ret
=
0
;
if
(
syncEnvIsStart
())
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
...
...
@@ -381,15 +381,18 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tmsgSendRsp
(
&
rsp
);
}
else
{
vError
(
"==vnodeProcessSyncReq== error msg type:%d"
,
pRpcMsg
->
msgType
);
ret
=
TAOS_SYNC_OTHER_ERROR
;
ret
=
-
1
;
}
syncNodeRelease
(
pSyncNode
);
}
else
{
vError
(
"==vnodeProcessSyncReq== error syncEnv stop"
);
ret
=
TAOS_SYNC_OTHER_ERROR
;
ret
=
-
1
;
}
if
(
ret
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
return
ret
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
4c7b24f1
...
...
@@ -98,7 +98,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if
(
code
==
0
)
{
vnodeAccumBlockMsg
(
pVnode
,
pMsg
->
msgType
);
}
else
if
(
code
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
SEpSet
newEpSet
=
{
0
};
syncGetEpSet
(
pVnode
->
sync
,
&
newEpSet
);
SEp
*
pEp
=
&
newEpSet
.
eps
[
newEpSet
.
inUse
];
...
...
@@ -247,29 +248,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
vnodeSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
vnodeSnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
vnodeSnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
static
SSyncFSM
*
vnodeSyncMakeFsm
(
SVnode
*
pVnode
)
{
SSyncFSM
*
pFsm
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncFSM
));
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
4c7b24f1
...
...
@@ -149,12 +149,14 @@ void syncStop(int64_t rid) {
int32_t
syncSetStandby
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
// state change
...
...
@@ -177,7 +179,8 @@ int32_t syncSetStandby(int64_t rid) {
int32_t
syncReconfigBuild
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
...
...
@@ -201,7 +204,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
if
(
!
IamInNew
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_NOT_IN_NEW_CONFIG
;
terrno
=
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
;
return
-
1
;
}
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
...
...
@@ -219,7 +223,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
...
...
@@ -246,7 +251,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
if
(
!
IamInNew
)
{
sError
(
"sync reconfig error, not in new config"
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_NOT_IN_NEW_CONFIG
;
terrno
=
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
;
return
-
1
;
}
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
...
...
@@ -272,13 +278,15 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
int32_t
syncLeaderTransfer
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
if
(
pSyncNode
->
peersNum
==
0
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
SNodeInfo
newLeader
=
(
pSyncNode
->
peersNodeInfo
)[
0
];
...
...
@@ -291,7 +299,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
int32_t
syncLeaderTransferTo
(
int64_t
rid
,
SNodeInfo
newLeader
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
0
;
...
...
@@ -299,7 +308,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
if
(
pSyncNode
->
replicaNum
==
1
)
{
sError
(
"only one replica, cannot drop leader"
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
TAOS_SYNC_ONLY_ONE_REPLICA
;
terrno
=
TSDB_CODE_SYN_ONE_REPLICA
;
return
-
1
;
}
SyncLeaderTransfer
*
pMsg
=
syncLeaderTransferBuild
(
pSyncNode
->
vgId
);
...
...
@@ -538,11 +548,12 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t
syncPropose
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
int32_t
ret
=
0
;
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_OTHER_ERROR
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
sDebug
(
"vgId:%d sync event propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
...
...
@@ -553,7 +564,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
}
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
int32_t
ret
=
0
;
sDebug
(
"vgId:%d sync event propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
...
...
@@ -567,14 +578,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
syncClientRequest2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
&&
(
*
pSyncNode
->
FpEqMsg
)(
pSyncNode
->
msgcb
,
&
rpcMsg
)
==
0
)
{
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
ret
=
0
;
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"syncPropose pSyncNode->FpEqMsg is NULL"
);
}
syncClientRequestDestroy
(
pSyncMsg
);
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
sError
(
"syncPropose not leader, %s"
,
syncUtilState2String
(
pSyncNode
->
state
));
ret
=
TAOS_SYNC_PROPOSE_NOT_LEADER
;
}
return
ret
;
...
...
@@ -945,9 +959,13 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
// timer control --------------
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
taosTmrReset
(
pSyncNode
->
FpPingTimerCB
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
atomic_store_64
(
&
pSyncNode
->
pingTimerLogicClock
,
pSyncNode
->
pingTimerLogicClockUser
);
if
(
syncEnvIsStart
())
{
taosTmrReset
(
pSyncNode
->
FpPingTimerCB
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
atomic_store_64
(
&
pSyncNode
->
pingTimerLogicClock
,
pSyncNode
->
pingTimerLogicClockUser
);
}
else
{
sError
(
"sync env is stop, syncNodeStartPingTimer"
);
}
return
ret
;
}
...
...
@@ -961,10 +979,14 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t
syncNodeStartElectTimer
(
SSyncNode
*
pSyncNode
,
int32_t
ms
)
{
int32_t
ret
=
0
;
pSyncNode
->
electTimerMS
=
ms
;
taosTmrReset
(
pSyncNode
->
FpElectTimerCB
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
atomic_store_64
(
&
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
if
(
syncEnvIsStart
())
{
pSyncNode
->
electTimerMS
=
ms
;
taosTmrReset
(
pSyncNode
->
FpElectTimerCB
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
atomic_store_64
(
&
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
}
else
{
sError
(
"sync env is stop, syncNodeStartElectTimer"
);
}
return
ret
;
}
...
...
@@ -998,9 +1020,13 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t
syncNodeStartHeartbeatTimer
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
taosTmrReset
(
pSyncNode
->
FpHeartbeatTimerCB
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
atomic_store_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
,
pSyncNode
->
heartbeatTimerLogicClockUser
);
if
(
syncEnvIsStart
())
{
taosTmrReset
(
pSyncNode
->
FpHeartbeatTimerCB
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
atomic_store_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
,
pSyncNode
->
heartbeatTimerLogicClockUser
);
}
else
{
sError
(
"sync env is stop, syncNodeStartHeartbeatTimer"
);
}
return
ret
;
}
...
...
@@ -1720,14 +1746,25 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
syncRpcMsgLog2
((
char
*
)
"==syncNodeEqPingTimer=="
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue ping msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
sTrace
(
"syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"
);
}
syncTimeoutDestroy
(
pSyncMsg
);
taosTmrReset
(
syncNodeEqPingTimer
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
if
(
syncEnvIsStart
())
{
taosTmrReset
(
syncNodeEqPingTimer
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
}
else
{
sError
(
"sync env is stop, syncNodeEqPingTimer"
);
}
}
else
{
sTrace
(
"==syncNodeEqPingTimer== pingTimerLogicClock:%"
PRIu64
", pingTimerLogicClockUser:%"
PRIu64
""
,
pSyncNode
->
pingTimerLogicClock
,
pSyncNode
->
pingTimerLogicClockUser
);
...
...
@@ -1743,16 +1780,26 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
syncRpcMsgLog2
((
char
*
)
"==syncNodeEqElectTimer=="
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
)
{
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
int32_t
code
=
pSyncNode
->
FpEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue elect msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
sTrace
(
"syncNodeEqElectTimer
pSyncNode->
FpEqMsg is NULL"
);
sTrace
(
"syncNodeEqElectTimer FpEqMsg is NULL"
);
}
syncTimeoutDestroy
(
pSyncMsg
);
// reset timer ms
pSyncNode
->
electTimerMS
=
syncUtilElectRandomMS
(
pSyncNode
->
electBaseLine
,
2
*
pSyncNode
->
electBaseLine
);
taosTmrReset
(
syncNodeEqElectTimer
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
if
(
syncEnvIsStart
())
{
pSyncNode
->
electTimerMS
=
syncUtilElectRandomMS
(
pSyncNode
->
electBaseLine
,
2
*
pSyncNode
->
electBaseLine
);
taosTmrReset
(
syncNodeEqElectTimer
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
}
else
{
sError
(
"sync env is stop, syncNodeEqElectTimer"
);
}
}
else
{
sTrace
(
"==syncNodeEqElectTimer== electTimerLogicClock:%"
PRIu64
", electTimerLogicClockUser:%"
PRIu64
""
,
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
...
...
@@ -1774,19 +1821,19 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if
(
code
!=
0
)
{
sError
(
"vgId:%d sync enqueue timer msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
s
Trace
(
"syncNodeEqHeartbeatTimer pSyncNode->
FpEqMsg is NULL"
);
s
Error
(
"syncNodeEqHeartbeatTimer
FpEqMsg is NULL"
);
}
syncTimeoutDestroy
(
pSyncMsg
);
if
(
gSyncEnv
!=
NULL
)
{
taosTmrReset
(
syncNodeEqHeartbeatTimer
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
if
(
syncEnvIsStart
()
)
{
taosTmrReset
(
syncNodeEqHeartbeatTimer
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
}
else
{
sError
(
"sync env is
already stop
"
);
sError
(
"sync env is
stop, syncNodeEqHeartbeatTimer
"
);
}
}
else
{
sTrace
(
"==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%"
PRIu64
", heartbeatTimerLogicClockUser:%"
PRIu64
...
...
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
浏览文件 @
4c7b24f1
...
...
@@ -338,7 +338,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader"
,
s
,
alreadySend
);
}
else
{
assert
(
ret
==
0
);
...
...
source/libs/sync/test/syncConfigChangeTest.cpp
浏览文件 @
4c7b24f1
...
...
@@ -251,7 +251,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader"
,
s
,
alreadySend
);
}
else
{
assert
(
ret
==
0
);
...
...
source/libs/sync/test/syncReplicateTest.cpp
浏览文件 @
4c7b24f1
...
...
@@ -188,7 +188,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader"
,
s
,
alreadySend
);
}
else
{
assert
(
ret
==
0
);
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
4c7b24f1
...
...
@@ -391,7 +391,7 @@ int main(int argc, char** argv) {
if
(
alreadySend
<
writeRecordNum
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
alreadySend
,
writeRecordNum
,
myIndex
);
int32_t
ret
=
syncPropose
(
rid
,
pRpcMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE
_NOT_LEADER
)
{
if
(
ret
==
-
1
&&
terrno
==
TSDB_CODE_SYN
_NOT_LEADER
)
{
sTrace
(
"%s value%d write not leader, leaderTransferWait:%d"
,
simpleStr
,
alreadySend
,
leaderTransferWait
);
}
else
{
assert
(
ret
==
0
);
...
...
source/util/src/terror.c
浏览文件 @
4c7b24f1
...
...
@@ -413,6 +413,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_MSGTYPE
,
"Invalid msg type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_NOT_LEADER
,
"Sync not leader"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_ONE_REPLICA
,
"Sync one replica"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_NOT_IN_NEW_CONFIG
,
"Sync not in new config"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
// wal
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录