Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
80665854
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
80665854
编写于
12月 01, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
12月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18449 from taosdata/FIX/TD-19334-3.0
feat: implement pipelining of sync
上级
28399313
704885e2
变更
60
展开全部
隐藏空白更改
内联
并排
Showing
60 changed file
with
1972 addition
and
180 deletion
+1972
-180
include/libs/sync/sync.h
include/libs/sync/sync.h
+10
-8
include/libs/wal/wal.h
include/libs/wal/wal.h
+1
-1
include/os/osTime.h
include/os/osTime.h
+8
-0
include/util/tdef.h
include/util/tdef.h
+1
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+2
-0
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+2
-0
source/dnode/mgmt/mgmt_snode/src/smWorker.c
source/dnode/mgmt/mgmt_snode/src/smWorker.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-1
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+11
-6
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+3
-3
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+1
-2
source/dnode/vnode/src/vnd/vnodeBufPool.c
source/dnode/vnode/src/vnd/vnodeBufPool.c
+3
-0
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+2
-2
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+3
-6
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+8
-0
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+17
-18
source/libs/sync/inc/syncCommit.h
source/libs/sync/inc/syncCommit.h
+5
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+12
-2
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+3
-1
source/libs/sync/inc/syncPipeline.h
source/libs/sync/inc/syncPipeline.h
+116
-0
source/libs/sync/inc/syncRaftEntry.h
source/libs/sync/inc/syncRaftEntry.h
+5
-1
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+4
-0
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+2
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+122
-13
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+53
-1
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+40
-6
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+3
-1
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+9
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+272
-19
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+29
-0
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+1090
-0
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+0
-1
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+5
-5
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+3
-7
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+37
-6
source/libs/sync/src/syncRequestVote.c
source/libs/sync/src/syncRequestVote.c
+1
-1
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+15
-6
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+17
-16
source/libs/sync/test/syncAppendEntriesBatchTest.cpp
source/libs/sync/test/syncAppendEntriesBatchTest.cpp
+1
-1
source/libs/sync/test/syncEnvTest.cpp
source/libs/sync/test/syncEnvTest.cpp
+2
-2
source/libs/sync/test/syncLocalCmdTest.cpp
source/libs/sync/test/syncLocalCmdTest.cpp
+6
-6
source/libs/sync/test/syncPreSnapshotReplyTest.cpp
source/libs/sync/test/syncPreSnapshotReplyTest.cpp
+6
-6
source/libs/sync/test/syncPreSnapshotTest.cpp
source/libs/sync/test/syncPreSnapshotTest.cpp
+6
-6
source/libs/sync/test/syncRespMgrTest.cpp
source/libs/sync/test/syncRespMgrTest.cpp
+1
-1
source/libs/sync/test/syncSnapshotSenderTest.cpp
source/libs/sync/test/syncSnapshotSenderTest.cpp
+3
-3
source/libs/sync/test/syncSnapshotTest.cpp
source/libs/sync/test/syncSnapshotTest.cpp
+0
-1
source/libs/sync/test/syncTest.cpp
source/libs/sync/test/syncTest.cpp
+1
-1
source/libs/sync/test/syncWriteTest.cpp
source/libs/sync/test/syncWriteTest.cpp
+0
-1
source/libs/sync/test/sync_test_lib/inc/syncTest.h
source/libs/sync/test/sync_test_lib/inc/syncTest.h
+7
-9
source/libs/sync/test/sync_test_lib/src/syncBatch.c
source/libs/sync/test/sync_test_lib/src/syncBatch.c
+1
-1
source/libs/sync/test/sync_test_lib/src/syncRaftStoreDebug.c
source/libs/sync/test/sync_test_lib/src/syncRaftStoreDebug.c
+1
-1
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+2
-1
source/libs/transport/src/tmsgcb.c
source/libs/transport/src/tmsgcb.c
+2
-0
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+1
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+3
-0
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+8
-3
tests/pytest/util/cases.py
tests/pytest/util/cases.py
+1
-1
tests/system-test/0-others/taosdShell.py
tests/system-test/0-others/taosdShell.py
+1
-1
未找到文件。
include/libs/sync/sync.h
浏览文件 @
80665854
...
...
@@ -40,6 +40,8 @@ extern "C" {
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30
#define SYNC_MAX_RETRY_BACKOFF 5
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
...
...
@@ -49,7 +51,7 @@ extern "C" {
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
#define SYNC_TERM_INVALID
-1 //
0xFFFFFFFFFFFFFFFF
typedef
enum
{
SYNC_STRATEGY_NO_SNAPSHOT
=
0
,
...
...
@@ -60,7 +62,7 @@ typedef enum {
typedef
uint64_t
SyncNodeId
;
typedef
int32_t
SyncGroupId
;
typedef
int64_t
SyncIndex
;
typedef
uint64_t
SyncTerm
;
typedef
int64_t
SyncTerm
;
typedef
struct
SSyncNode
SSyncNode
;
typedef
struct
SWal
SWal
;
...
...
@@ -136,13 +138,13 @@ typedef struct SSnapshotMeta {
typedef
struct
SSyncFSM
{
void
*
data
;
void
(
*
FpCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpPreCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRollBackCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpPreCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRollBackCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRestoreFinishCb
)(
const
struct
SSyncFSM
*
pFsm
);
void
(
*
FpReConfigCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SReConfigCbMeta
*
pMeta
);
void
(
*
FpLeaderTransferCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpReConfigCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SReConfigCbMeta
*
pMeta
);
void
(
*
FpLeaderTransferCb
)(
const
struct
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
bool
(
*
FpApplyQueueEmptyCb
)(
const
struct
SSyncFSM
*
pFsm
);
int32_t
(
*
FpApplyQueueItems
)(
const
struct
SSyncFSM
*
pFsm
);
...
...
@@ -224,7 +226,7 @@ typedef struct SSyncState {
int32_t
syncInit
();
void
syncCleanUp
();
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
);
void
syncStart
(
int64_t
rid
);
int32_t
syncStart
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
void
syncPreStop
(
int64_t
rid
);
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
...
...
include/libs/wal/wal.h
浏览文件 @
80665854
...
...
@@ -170,7 +170,7 @@ int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo
// Assign version automatically and return to caller,
// -1 will be returned for failed writes
int64_t
walAppendLog
(
SWal
*
,
tmsg_t
msgType
,
SWalSyncInfo
syncMeta
,
const
void
*
body
,
int32_t
bodyLen
);
int64_t
walAppendLog
(
SWal
*
,
int64_t
index
,
tmsg_t
msgType
,
SWalSyncInfo
syncMeta
,
const
void
*
body
,
int32_t
bodyLen
);
void
walFsync
(
SWal
*
,
bool
force
);
...
...
include/os/osTime.h
浏览文件 @
80665854
...
...
@@ -35,6 +35,7 @@ extern "C" {
#ifdef WINDOWS
#define CLOCK_REALTIME 0
#define CLOCK_MONOTONIC 0
#define MILLISECOND_PER_SECOND (1000i64)
#else
...
...
@@ -82,6 +83,13 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
return
(
int64_t
)
systemTime
.
tv_sec
*
1000000000LL
+
(
int64_t
)
systemTime
.
tv_nsec
;
}
//@return timestamp of monotonic clock in millisecond
static
FORCE_INLINE
int64_t
taosGetMonoTimestampMs
()
{
struct
timespec
systemTime
=
{
0
};
taosClockGetTime
(
CLOCK_MONOTONIC
,
&
systemTime
);
return
(
int64_t
)
systemTime
.
tv_sec
*
1000LL
+
(
int64_t
)
systemTime
.
tv_nsec
/
1000000
;
}
char
*
taosStrpTime
(
const
char
*
buf
,
const
char
*
fmt
,
struct
tm
*
tm
);
struct
tm
*
taosLocalTime
(
const
time_t
*
timep
,
struct
tm
*
result
);
struct
tm
*
taosLocalTimeNolock
(
struct
tm
*
result
,
const
time_t
*
timep
,
int
dst
);
...
...
include/util/tdef.h
浏览文件 @
80665854
...
...
@@ -281,6 +281,7 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
80665854
...
...
@@ -162,11 +162,13 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
dTrace
(
"msg:%p, is created and will put into %s queue, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pRpc
->
msgType
));
int32_t
code
=
mmPutMsgToWorker
(
pMgmt
,
pWorker
,
pMsg
);
if
(
code
!=
0
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
return
code
;
...
...
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
80665854
...
...
@@ -61,6 +61,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
switch
(
qtype
)
{
case
QUERY_QUEUE
:
...
...
@@ -74,6 +75,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
return
0
;
default:
terrno
=
TSDB_CODE_INVALID_PARA
;
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
return
-
1
;
}
...
...
source/dnode/mgmt/mgmt_snode/src/smWorker.c
浏览文件 @
80665854
...
...
@@ -151,6 +151,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
vgId
=
SNODE_HANDLE
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
switch
(
qtype
)
{
case
STREAM_QUEUE
:
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
80665854
...
...
@@ -246,12 +246,12 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
pRpc
->
pCont
=
NULL
;
int32_t
code
=
vmPutMsgToQueue
(
pMgmt
,
pMsg
,
qtype
);
if
(
code
!=
0
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
pRpc
->
pCont
=
NULL
;
taosFreeQitem
(
pMsg
);
}
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
80665854
...
...
@@ -72,15 +72,11 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return
code
;
}
void
mnd
SyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
void
mnd
ProcessWriteMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SSdbRaw
*
pRaw
=
pMsg
->
pCont
;
// delete msg handle
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
info
=
pMsg
->
info
;
int32_t
transId
=
sdbGetIdFromRaw
(
pMnode
->
pSdb
,
pRaw
);
pMgmt
->
errCode
=
pMeta
->
code
;
mInfo
(
"trans:%d, is proposed, saved:%d code:0x%x, apply index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
...
...
@@ -120,6 +116,12 @@ void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMet
}
}
void
mndSyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
mndProcessWriteMsg
(
pFsm
,
pMsg
,
pMeta
);
rpcFreeCont
(
pMsg
->
pCont
);
pMsg
->
pCont
=
NULL
;
}
int32_t
mndSyncGetSnapshot
(
const
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
,
void
*
pReaderParam
,
void
**
ppReader
)
{
mInfo
(
"start to read snapshot from sdb in atomic way"
);
SMnode
*
pMnode
=
pFsm
->
data
;
...
...
@@ -361,7 +363,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
void
mndSyncStart
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
syncStart
(
pMgmt
->
sync
);
if
(
syncStart
(
pMgmt
->
sync
)
<
0
)
{
mError
(
"vgId:1, failed to start sync, id:%"
PRId64
,
pMgmt
->
sync
);
return
;
}
mInfo
(
"vgId:1, sync started, id:%"
PRId64
,
pMgmt
->
sync
);
}
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
80665854
...
...
@@ -97,7 +97,7 @@ bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c
int32_t
vnodeSyncOpen
(
SVnode
*
pVnode
,
char
*
path
);
void
vnodeSyncStart
(
SVnode
*
pVnode
);
int32_t
vnodeSyncStart
(
SVnode
*
pVnode
);
void
vnodeSyncPreClose
(
SVnode
*
pVnode
);
void
vnodeSyncClose
(
SVnode
*
pVnode
);
void
vnodeRedirectRpcMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
80665854
...
...
@@ -811,7 +811,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
int32_t
lino
=
0
;
memset
(
pCommitter
,
0
,
sizeof
(
*
pCommitter
));
ASSERT
(
pTsdb
->
mem
&&
pTsdb
->
imem
==
NULL
);
ASSERT
(
pTsdb
->
mem
&&
pTsdb
->
imem
==
NULL
&&
"last tsdb commit incomplete"
);
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
pTsdb
->
imem
=
pTsdb
->
mem
;
...
...
@@ -1261,7 +1261,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
}
}
}
else
{
ASSERT
(
0
);
ASSERT
(
0
&&
"dup rows not allowed"
);
}
if
(
pBDataW
->
nRow
>=
pCommitter
->
maxRow
)
{
...
...
@@ -1682,4 +1682,4 @@ _exit:
tsdbInfo
(
"vgId:%d, tsdb rollback commit"
,
TD_VID
(
pTsdb
->
pVnode
));
}
return
code
;
}
\ No newline at end of file
}
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
80665854
...
...
@@ -728,7 +728,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
taosArraySet
(
pMerger
->
pArray
,
iCol
,
pColVal
);
}
}
else
{
ASSERT
(
0
);
ASSERT
(
0
&&
"dup versions not allowed"
);
}
}
...
...
@@ -902,7 +902,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_clear
;
}
midx
=
(
sidx
+
eidx
)
/
2
;
code
=
tsdbBuildDeleteSkyline
(
aDelData
,
sidx
,
midx
,
aSkyline1
);
...
...
source/dnode/vnode/src/vnd/vnodeBufPool.c
浏览文件 @
80665854
...
...
@@ -176,6 +176,9 @@ void vnodeBufPoolRef(SVBufPool *pPool) {
}
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
)
{
if
(
pPool
==
NULL
)
{
return
;
}
int32_t
nRef
=
atomic_sub_fetch_32
(
&
pPool
->
nRef
,
1
);
if
(
nRef
==
0
)
{
SVnode
*
pVnode
=
pPool
->
pVnode
;
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
80665854
...
...
@@ -209,8 +209,8 @@ int vnodeCommit(SVnode *pVnode) {
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
];
vInfo
(
"vgId:%d, start to commit, commit ID:%"
PRId64
" version:%"
PRId64
,
TD_VID
(
pVnode
),
pVnode
->
state
.
commitID
,
pVnode
->
state
.
applied
);
vInfo
(
"vgId:%d, start to commit, commit ID:%"
PRId64
" version:%"
PRId64
" term: %"
PRId64
,
TD_VID
(
pVnode
)
,
pVnode
->
state
.
commitID
,
pVnode
->
state
.
applied
,
pVnode
->
state
.
applyTerm
);
// persist wal before starting
if
(
walPersist
(
pVnode
->
pWal
)
<
0
)
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
80665854
...
...
@@ -144,9 +144,9 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode
->
config
=
info
.
config
;
pVnode
->
state
.
committed
=
info
.
state
.
committed
;
pVnode
->
state
.
commitTerm
=
info
.
state
.
commitTerm
;
pVnode
->
state
.
applied
=
info
.
state
.
committed
;
pVnode
->
state
.
commitID
=
info
.
state
.
commitID
;
pVnode
->
state
.
commitTerm
=
info
.
state
.
commitTerm
;
pVnode
->
state
.
applied
=
info
.
state
.
committed
;
pVnode
->
state
.
applyTerm
=
info
.
state
.
commitTerm
;
pVnode
->
pTfs
=
pTfs
;
pVnode
->
msgCb
=
msgCb
;
taosThreadMutexInit
(
&
pVnode
->
lock
,
NULL
);
...
...
@@ -269,10 +269,7 @@ void vnodeClose(SVnode *pVnode) {
}
// start the sync timer after the queue is ready
int32_t
vnodeStart
(
SVnode
*
pVnode
)
{
vnodeSyncStart
(
pVnode
);
return
0
;
}
int32_t
vnodeStart
(
SVnode
*
pVnode
)
{
return
vnodeSyncStart
(
pVnode
);
}
void
vnodeStop
(
SVnode
*
pVnode
)
{}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
80665854
...
...
@@ -178,8 +178,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
return
-
1
;
}
if
(
version
<=
pVnode
->
state
.
applied
)
{
vError
(
"vgId:%d, duplicate write request. version: %"
PRId64
", applied: %"
PRId64
""
,
TD_VID
(
pVnode
),
version
,
pVnode
->
state
.
applied
);
pRsp
->
info
.
handle
=
NULL
;
return
-
1
;
}
vDebug
(
"vgId:%d, start to process write request %s, index:%"
PRId64
,
TD_VID
(
pVnode
),
TMSG_INFO
(
pMsg
->
msgType
),
version
);
ASSERT
(
pVnode
->
state
.
applyTerm
<=
pMsg
->
info
.
conn
.
applyTerm
);
pVnode
->
state
.
applied
=
version
;
pVnode
->
state
.
applyTerm
=
pMsg
->
info
.
conn
.
applyTerm
;
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
80665854
...
...
@@ -295,36 +295,31 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
return
0
;
}
static
void
vnodeSyncApplyMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncApplyMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
memcpy
(
rpcMsg
.
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
rpcMsg
.
info
=
pMsg
->
info
;
rpcMsg
.
info
.
conn
.
applyIndex
=
pMeta
->
index
;
rpcMsg
.
info
.
conn
.
applyTerm
=
pMeta
->
term
;
pMsg
->
info
.
conn
.
applyIndex
=
pMeta
->
index
;
pMsg
->
info
.
conn
.
applyTerm
=
pMeta
->
term
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
pMsg
->
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpc
Msg
);
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
p
Msg
);
}
static
void
vnodeSyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncCommitMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
vnodeSyncApplyMsg
(
pFsm
,
pMsg
,
pMeta
);
}
static
void
vnodeSyncPreCommitMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncPreCommitMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
if
(
pMeta
->
isWeak
==
1
)
{
vnodeSyncApplyMsg
(
pFsm
,
pMsg
,
pMeta
);
}
}
static
void
vnodeSyncRollBackMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
static
void
vnodeSyncRollBackMsg
(
const
SSyncFSM
*
pFsm
,
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vTrace
(
"vgId:%d, rollback-cb is excuted, fsm:%p, index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
...
...
@@ -404,7 +399,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
walApplyVer
(
pVnode
->
pWal
,
pVnode
->
state
.
applied
);
pVnode
->
restored
=
true
;
v
Debug
(
"vgId:%d, sync restore finished"
,
pVnode
->
config
.
vgId
);
v
Info
(
"vgId:%d, sync restore finished"
,
pVnode
->
config
.
vgId
);
}
static
void
vnodeBecomeFollower
(
const
SSyncFSM
*
pFsm
)
{
...
...
@@ -506,9 +501,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return
0
;
}
void
vnodeSyncStart
(
SVnode
*
pVnode
)
{
int32_t
vnodeSyncStart
(
SVnode
*
pVnode
)
{
vInfo
(
"vgId:%d, start sync"
,
pVnode
->
config
.
vgId
);
syncStart
(
pVnode
->
sync
);
if
(
syncStart
(
pVnode
->
sync
)
<
0
)
{
vError
(
"vgId:%d, failed to start sync subsystem since %s"
,
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
}
return
0
;
}
void
vnodeSyncPreClose
(
SVnode
*
pVnode
)
{
...
...
@@ -553,12 +552,12 @@ bool vnodeIsLeader(SVnode *pVnode) {
}
else
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
v
Debug
(
"vgId:%d, vnode not ready, state:%s, restore:%d"
,
pVnode
->
config
.
vgId
,
syncStr
(
state
.
state
),
state
.
restored
);
v
Info
(
"vgId:%d, vnode not ready, state:%s, restore:%d"
,
pVnode
->
config
.
vgId
,
syncStr
(
state
.
state
),
state
.
restored
);
return
false
;
}
if
(
!
pVnode
->
restored
)
{
v
Debug
(
"vgId:%d, vnode not restored"
,
pVnode
->
config
.
vgId
);
v
Info
(
"vgId:%d, vnode not restored"
,
pVnode
->
config
.
vgId
);
terrno
=
TSDB_CODE_APP_NOT_READY
;
return
false
;
}
...
...
source/libs/sync/inc/syncCommit.h
浏览文件 @
80665854
...
...
@@ -47,8 +47,13 @@ extern "C" {
//
void
syncOneReplicaAdvance
(
SSyncNode
*
pSyncNode
);
void
syncMaybeAdvanceCommitIndex
(
SSyncNode
*
pSyncNode
);
bool
syncAgreeIndex
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pRaftId
,
SyncIndex
index
);
bool
syncAgree
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
);
bool
syncNodeAgreedUpon
(
SSyncNode
*
pNode
,
SyncIndex
index
);
int64_t
syncNodeUpdateCommitIndex
(
SSyncNode
*
ths
,
SyncIndex
commitIndex
);
int64_t
syncNodeCheckCommitIndex
(
SSyncNode
*
ths
,
SyncIndex
indexLikely
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
80665854
...
...
@@ -50,6 +50,8 @@ typedef struct SyncPreSnapshotReply SyncPreSnapshotReply;
typedef
struct
SyncHeartbeatReply
SyncHeartbeatReply
;
typedef
struct
SyncHeartbeat
SyncHeartbeat
;
typedef
struct
SyncPreSnapshot
SyncPreSnapshot
;
typedef
struct
SSyncLogBuffer
SSyncLogBuffer
;
typedef
struct
SSyncLogReplMgr
SSyncLogReplMgr
;
typedef
struct
SRaftId
{
SyncNodeId
addr
;
...
...
@@ -97,6 +99,7 @@ typedef struct SSyncNode {
char
configPath
[
TSDB_FILENAME_LEN
*
2
];
// sync io
SSyncLogBuffer
*
pLogBuf
;
SWal
*
pWal
;
const
SMsgCb
*
msgcb
;
int32_t
(
*
syncSendMSg
)(
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
...
...
@@ -179,6 +182,9 @@ typedef struct SSyncNode {
SSyncSnapshotSender
*
senders
[
TSDB_MAX_REPLICA
];
SSyncSnapshotReceiver
*
pNewNodeReceiver
;
// log replication mgr
SSyncLogReplMgr
*
logReplMgrs
[
TSDB_MAX_REPLICA
];
SPeerState
peerStates
[
TSDB_MAX_REPLICA
];
// is config changing
...
...
@@ -205,11 +211,12 @@ typedef struct SSyncNode {
// open/close --------------
SSyncNode
*
syncNodeOpen
(
SSyncInfo
*
pSyncInfo
);
void
syncNodeStart
(
SSyncNode
*
pSyncNode
);
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStart
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
void
syncNodePreClose
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncNodeRestore
(
SSyncNode
*
pSyncNode
);
void
syncHbTimerDataFree
(
SSyncHbTimerData
*
pData
);
// on message ---------------------
...
...
@@ -261,6 +268,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
void
syncNodeVoteForTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
,
SRaftId
*
pRaftId
);
void
syncNodeVoteForSelf
(
SSyncNode
*
pSyncNode
);
// log replication
SSyncLogReplMgr
*
syncNodeGetLogReplMgr
(
SSyncNode
*
pNode
,
SRaftId
*
pDestId
);
// snapshot --------------
bool
syncNodeHasSnapshot
(
SSyncNode
*
pSyncNode
);
void
syncNodeMaybeUpdateCommitBySnapshot
(
SSyncNode
*
pSyncNode
);
...
...
source/libs/sync/inc/syncMessage.h
浏览文件 @
80665854
...
...
@@ -106,7 +106,7 @@ typedef struct SyncAppendEntriesReply {
SRaftId
destId
;
// private data
SyncTerm
term
;
SyncTerm
private
Term
;
SyncTerm
lastMatch
Term
;
bool
success
;
SyncIndex
matchIndex
;
SyncIndex
lastSendIndex
;
...
...
@@ -246,6 +246,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t
syncBuildRequestVoteReply
(
SRpcMsg
*
pMsg
,
int32_t
vgId
);
int32_t
syncBuildAppendEntries
(
SRpcMsg
*
pMsg
,
int32_t
dataLen
,
int32_t
vgId
);
int32_t
syncBuildAppendEntriesReply
(
SRpcMsg
*
pMsg
,
int32_t
vgId
);
int32_t
syncBuildAppendEntriesFromRaftLog
(
SSyncNode
*
pNode
,
SSyncRaftEntry
*
pEntry
,
SyncTerm
prevLogTerm
,
SRpcMsg
*
pRpcMsg
);
int32_t
syncBuildHeartbeat
(
SRpcMsg
*
pMsg
,
int32_t
vgId
);
int32_t
syncBuildHeartbeatReply
(
SRpcMsg
*
pMsg
,
int32_t
vgId
);
int32_t
syncBuildPreSnapshot
(
SRpcMsg
*
pMsg
,
int32_t
vgId
);
...
...
source/libs/sync/inc/syncPipeline.h
0 → 100644
浏览文件 @
80665854
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_PIPELINE_H
#define _TD_LIBS_SYNC_PIPELINE_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "syncInt.h"
typedef
struct
SSyncReplInfo
{
bool
barrier
;
bool
acked
;
int64_t
timeMs
;
int64_t
term
;
}
SSyncReplInfo
;
typedef
struct
SSyncLogReplMgr
{
SSyncReplInfo
states
[
TSDB_SYNC_LOG_BUFFER_SIZE
];
int64_t
startIndex
;
int64_t
matchIndex
;
int64_t
endIndex
;
int64_t
size
;
bool
restored
;
int64_t
peerStartTime
;
int32_t
retryBackoff
;
int32_t
peerId
;
}
SSyncLogReplMgr
;
typedef
struct
SSyncLogBufEntry
{
SSyncRaftEntry
*
pItem
;
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
}
SSyncLogBufEntry
;
typedef
struct
SSyncLogBuffer
{
SSyncLogBufEntry
entries
[
TSDB_SYNC_LOG_BUFFER_SIZE
];
int64_t
startIndex
;
int64_t
commitIndex
;
int64_t
matchIndex
;
int64_t
endIndex
;
int64_t
size
;
TdThreadMutex
mutex
;
TdThreadMutexAttr
attr
;
}
SSyncLogBuffer
;
// SSyncLogRepMgr
SSyncLogReplMgr
*
syncLogReplMgrCreate
();
void
syncLogReplMgrDestroy
(
SSyncLogReplMgr
*
pMgr
);
int32_t
syncLogReplMgrReset
(
SSyncLogReplMgr
*
pMgr
);
int32_t
syncNodeLogReplMgrInit
(
SSyncNode
*
pNode
);
void
syncNodeLogReplMgrDestroy
(
SSyncNode
*
pNode
);
// access
static
FORCE_INLINE
int64_t
syncLogGetRetryBackoffTimeMs
(
SSyncLogReplMgr
*
pMgr
)
{
return
(
1
<<
pMgr
->
retryBackoff
)
*
SYNC_LOG_REPL_RETRY_WAIT_MS
;
}
static
FORCE_INLINE
int32_t
syncLogGetNextRetryBackoff
(
SSyncLogReplMgr
*
pMgr
)
{
return
TMIN
(
pMgr
->
retryBackoff
+
1
,
SYNC_MAX_RETRY_BACKOFF
);
}
SyncTerm
syncLogReplMgrGetPrevLogTerm
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncIndex
index
);
int32_t
syncLogReplMgrReplicateOnce
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
);
int32_t
syncLogBufferReplicateOneTo
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncIndex
index
,
SyncTerm
*
pTerm
,
SRaftId
*
pDestId
,
bool
*
pBarrier
);
int32_t
syncLogReplMgrReplicateAttemptedOnce
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
);
int32_t
syncLogReplMgrReplicateProbeOnce
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncIndex
index
);
int32_t
syncLogReplMgrProcessReply
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncLogReplMgrProcessReplyInRecoveryMode
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncLogReplMgrProcessReplyInNormalMode
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncLogReplMgrProcessHeartbeatReply
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncHeartbeatReply
*
pMsg
);
int32_t
syncLogReplMgrRetryOnNeed
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
);
// SSyncLogBuffer
SSyncLogBuffer
*
syncLogBufferCreate
();
void
syncLogBufferDestroy
(
SSyncLogBuffer
*
pBuf
);
int32_t
syncLogBufferInit
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
);
int32_t
syncLogBufferReInit
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
);
// access
int64_t
syncLogBufferGetEndIndex
(
SSyncLogBuffer
*
pBuf
);
int32_t
syncLogBufferAppend
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
SSyncRaftEntry
*
pEntry
);
int32_t
syncLogBufferAccept
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
SSyncRaftEntry
*
pEntry
,
SyncTerm
prevTerm
);
int64_t
syncLogBufferProceed
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
SyncTerm
*
pMatchTerm
);
int32_t
syncLogBufferCommit
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
int64_t
commitIndex
);
int32_t
syncLogBufferReset
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
);
// private
SSyncRaftEntry
*
syncLogBufferGetOneEntry
(
SSyncLogBuffer
*
pBuf
,
SSyncNode
*
pNode
,
SyncIndex
index
,
bool
*
pInBuf
);
int32_t
syncLogBufferValidate
(
SSyncLogBuffer
*
pBuf
);
int32_t
syncLogBufferRollback
(
SSyncLogBuffer
*
pBuf
,
SyncIndex
toIndex
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_LIBS_SYNC_PIPELINE_H*/
source/libs/sync/inc/syncRaftEntry.h
浏览文件 @
80665854
...
...
@@ -42,9 +42,13 @@ SSyncRaftEntry* syncEntryBuildFromClientRequest(const SyncClientRequest* pMsg, S
SSyncRaftEntry
*
syncEntryBuildFromRpcMsg
(
const
SRpcMsg
*
pMsg
,
SyncTerm
term
,
SyncIndex
index
);
SSyncRaftEntry
*
syncEntryBuildFromAppendEntries
(
const
SyncAppendEntries
*
pMsg
);
SSyncRaftEntry
*
syncEntryBuildNoop
(
SyncTerm
term
,
SyncIndex
index
,
int32_t
vgId
);
void
syncEntryDest
or
y
(
SSyncRaftEntry
*
pEntry
);
void
syncEntryDest
ro
y
(
SSyncRaftEntry
*
pEntry
);
void
syncEntry2OriginalRpc
(
const
SSyncRaftEntry
*
pEntry
,
SRpcMsg
*
pRpcMsg
);
// step 7
static
FORCE_INLINE
bool
syncLogIsReplicationBarrier
(
SSyncRaftEntry
*
pEntry
)
{
return
pEntry
->
originalRpcType
==
TDMT_SYNC_NOOP
;
}
typedef
struct
SRaftEntryHashCache
{
SHashObj
*
pEntryHash
;
int32_t
maxCount
;
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
80665854
...
...
@@ -52,6 +52,10 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpc
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeReplicateOne
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
bool
snapshot
);
int32_t
syncNodeReplicateWithoutLock
(
SSyncNode
*
pNode
);
int32_t
syncNodeSendAppendEntries
(
SSyncNode
*
pNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
);
int32_t
syncNodeMaybeSendAppendEntries
(
SSyncNode
*
pNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
80665854
...
...
@@ -87,6 +87,8 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive
int32_t
syncNodeOnSnapshot
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pMsg
);
int32_t
syncNodeOnSnapshotReply
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pMsg
);
SyncIndex
syncNodeGetSnapshotConfigIndex
(
SSyncNode
*
pSyncNode
,
SyncIndex
snapshotLastApplyIndex
);
// start
#ifdef __cplusplus
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
80665854
...
...
@@ -15,10 +15,13 @@
#define _DEFAULT_SOURCE
#include "syncAppendEntries.h"
#include "syncPipeline.h"
#include "syncMessage.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncUtil.h"
#include "syncCommit.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
...
...
@@ -124,9 +127,113 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
return
0
;
}
SSyncRaftEntry
*
syncLogAppendEntriesToRaftEntry
(
const
SyncAppendEntries
*
pMsg
)
{
SSyncRaftEntry
*
pEntry
=
taosMemoryMalloc
(
pMsg
->
dataLen
);
if
(
pEntry
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
(
void
)
memcpy
(
pEntry
,
pMsg
->
data
,
pMsg
->
dataLen
);
ASSERT
(
pEntry
->
bytes
==
pMsg
->
dataLen
);
return
pEntry
;
}
int32_t
syncNodeOnAppendEntries
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
SRpcMsg
rpcRsp
=
{
0
};
bool
accepted
=
false
;
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"not in my config"
);
goto
_IGNORE
;
}
int32_t
code
=
syncBuildAppendEntriesReply
(
&
rpcRsp
,
ths
->
vgId
);
if
(
code
!=
0
)
{
syncLogRecvAppendEntries
(
ths
,
pMsg
,
"build rsp error"
);
goto
_IGNORE
;
}
SyncAppendEntriesReply
*
pReply
=
rpcRsp
.
pCont
;
// prepare response msg
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
lastSendIndex
=
pMsg
->
prevLogIndex
+
1
;
pReply
->
startTime
=
ths
->
startTime
;
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
goto
_SEND_RESPONSE
;
}
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
pReply
->
term
=
pMsg
->
term
;
}
syncNodeStepDown
(
ths
,
pMsg
->
term
);
syncNodeResetElectTimer
(
ths
);
if
(
pMsg
->
dataLen
<
(
int32_t
)
sizeof
(
SSyncRaftEntry
))
{
sError
(
"vgId:%d, incomplete append entries received. prev index:%"
PRId64
", term:%"
PRId64
", datalen:%d"
,
ths
->
vgId
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
goto
_IGNORE
;
}
SSyncRaftEntry
*
pEntry
=
syncLogAppendEntriesToRaftEntry
(
pMsg
);
if
(
pEntry
==
NULL
)
{
sError
(
"vgId:%d, failed to get raft entry from append entries since %s"
,
ths
->
vgId
,
terrstr
());
goto
_IGNORE
;
}
if
(
pMsg
->
prevLogIndex
+
1
!=
pEntry
->
index
||
pEntry
->
term
<
0
)
{
sError
(
"vgId:%d, invalid previous log index in msg. index:%"
PRId64
", term:%"
PRId64
", prevLogIndex:%"
PRId64
", prevLogTerm:%"
PRId64
,
ths
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
);
goto
_IGNORE
;
}
sTrace
(
"vgId:%d, recv append entries msg. index:%"
PRId64
", term:%"
PRId64
", preLogIndex:%"
PRId64
", prevLogTerm:%"
PRId64
" commitIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
prevLogIndex
+
1
,
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
commitIndex
);
// accept
if
(
syncLogBufferAccept
(
ths
->
pLogBuf
,
ths
,
pEntry
,
pMsg
->
prevLogTerm
)
<
0
)
{
goto
_SEND_RESPONSE
;
}
accepted
=
true
;
_SEND_RESPONSE:
pReply
->
matchIndex
=
syncLogBufferProceed
(
ths
->
pLogBuf
,
ths
,
&
pReply
->
lastMatchTerm
);
bool
matched
=
(
pReply
->
matchIndex
>=
pReply
->
lastSendIndex
);
if
(
accepted
&&
matched
)
{
pReply
->
success
=
true
;
// update commit index only after matching
(
void
)
syncNodeUpdateCommitIndex
(
ths
,
pMsg
->
commitIndex
);
}
// ack, i.e. send response
(
void
)
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcRsp
);
// commit index, i.e. leader notice me
if
(
syncLogBufferCommit
(
ths
->
pLogBuf
,
ths
,
ths
->
commitIndex
)
<
0
)
{
sError
(
"vgId:%d, failed to commit raft fsm log since %s."
,
ths
->
vgId
,
terrstr
());
goto
_out
;
}
_out:
return
0
;
_IGNORE:
rpcFreeCont
(
rpcRsp
.
pCont
);
return
0
;
}
int32_t
syncNodeOnAppendEntriesOld
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
SRpcMsg
rpcRsp
=
{
0
};
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
...
...
@@ -233,18 +340,20 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDest
or
y
(
pLocalEntry
);
syncEntryDest
ro
y
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDest
or
y
(
pAppendEntry
);
syncEntryDest
ro
y
(
pAppendEntry
);
}
goto
_IGNORE
;
}
ASSERT
(
pAppendEntry
->
index
==
appendIndex
);
// append
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
if
(
code
!=
0
)
{
...
...
@@ -255,13 +364,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDest
or
y
(
pLocalEntry
);
syncEntryDest
ro
y
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDest
or
y
(
pAppendEntry
);
syncEntryDest
ro
y
(
pAppendEntry
);
}
goto
_IGNORE
;
...
...
@@ -281,8 +390,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, truncate error, append-index:%"
PRId64
,
appendIndex
);
syncLogRecvAppendEntries
(
ths
,
pMsg
,
logBuf
);
syncEntryDest
or
y
(
pLocalEntry
);
syncEntryDest
or
y
(
pAppendEntry
);
syncEntryDest
ro
y
(
pLocalEntry
);
syncEntryDest
ro
y
(
pAppendEntry
);
goto
_IGNORE
;
}
...
...
@@ -296,13 +405,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDest
or
y
(
pLocalEntry
);
syncEntryDest
ro
y
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDest
or
y
(
pAppendEntry
);
syncEntryDest
ro
y
(
pAppendEntry
);
}
goto
_IGNORE
;
...
...
@@ -320,13 +429,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDest
or
y
(
pLocalEntry
);
syncEntryDest
ro
y
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDest
or
y
(
pAppendEntry
);
syncEntryDest
ro
y
(
pAppendEntry
);
}
goto
_IGNORE
;
...
...
@@ -339,13 +448,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if
(
hLocal
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hLocal
,
false
);
}
else
{
syncEntryDest
or
y
(
pLocalEntry
);
syncEntryDest
ro
y
(
pLocalEntry
);
}
if
(
hAppend
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
hAppend
,
false
);
}
else
{
syncEntryDest
or
y
(
pAppendEntry
);
syncEntryDest
ro
y
(
pAppendEntry
);
}
}
else
{
...
...
@@ -373,4 +482,4 @@ _SEND_RESPONSE:
// send response
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcRsp
);
return
0
;
}
\ No newline at end of file
}
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
80665854
...
...
@@ -17,7 +17,9 @@
#include "syncAppendEntriesReply.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncPipeline.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncSnapshot.h"
...
...
@@ -38,8 +40,58 @@
//
int32_t
syncNodeOnAppendEntriesReply
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
int32_t
ret
=
0
;
SyncAppendEntriesReply
*
pMsg
=
pRpcMsg
->
pCont
;
int32_t
ret
=
0
;
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"not in my config"
);
return
0
;
}
// drop stale response
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"drop stale response"
);
return
0
;
}
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
syncLogRecvAppendEntriesReply
(
ths
,
pMsg
,
"error term"
);
syncNodeStepDown
(
ths
,
pMsg
->
term
);
return
-
1
;
}
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
sTrace
(
"vgId:%d received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
term
,
pMsg
->
matchIndex
);
if
(
pMsg
->
success
)
{
SyncIndex
oldMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
if
(
pMsg
->
matchIndex
>
oldMatchIndex
)
{
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
);
}
// commit if needed
SyncIndex
indexLikely
=
TMIN
(
pMsg
->
matchIndex
,
ths
->
pLogBuf
->
matchIndex
);
SyncIndex
commitIndex
=
syncNodeCheckCommitIndex
(
ths
,
indexLikely
);
(
void
)
syncLogBufferCommit
(
ths
->
pLogBuf
,
ths
,
commitIndex
);
}
// replicate log
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
ths
,
&
pMsg
->
srcId
);
if
(
pMgr
==
NULL
)
{
sError
(
"vgId:%d, failed to get log repl mgr for src addr: 0x%016"
PRIx64
,
ths
->
vgId
,
pMsg
->
srcId
.
addr
);
return
-
1
;
}
(
void
)
syncLogReplMgrProcessReply
(
pMgr
,
ths
,
pMsg
);
}
return
0
;
}
int32_t
syncNodeOnAppendEntriesReplyOld
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
80665854
...
...
@@ -84,6 +84,7 @@ void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
}
void
syncMaybeAdvanceCommitIndex
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
false
&&
"deprecated"
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"pSyncNode is NULL"
);
return
;
...
...
@@ -138,7 +139,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
break
;
...
...
@@ -150,7 +151,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
}
}
...
...
@@ -286,15 +287,48 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
}
*/
bool
syncAgree
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
)
{
bool
syncNodeAgreedUpon
(
SSyncNode
*
pNode
,
SyncIndex
index
)
{
int
count
=
0
;
SSyncIndexMgr
*
pMatches
=
pNode
->
pMatchIndex
;
ASSERT
(
pNode
->
replicaNum
==
pMatches
->
replicaNum
);
for
(
int
i
=
0
;
i
<
pNode
->
replicaNum
;
i
++
)
{
SyncIndex
matchIndex
=
pMatches
->
index
[
i
];
if
(
matchIndex
>=
index
)
{
count
++
;
}
}
return
count
>=
pNode
->
quorum
;
}
bool
syncAgree
(
SSyncNode
*
pNode
,
SyncIndex
index
)
{
int
agreeCount
=
0
;
for
(
int
i
=
0
;
i
<
p
Sync
Node
->
replicaNum
;
++
i
)
{
if
(
syncAgreeIndex
(
p
SyncNode
,
&
(
pSync
Node
->
replicasId
[
i
]),
index
))
{
for
(
int
i
=
0
;
i
<
pNode
->
replicaNum
;
++
i
)
{
if
(
syncAgreeIndex
(
p
Node
,
&
(
p
Node
->
replicasId
[
i
]),
index
))
{
++
agreeCount
;
}
if
(
agreeCount
>=
p
Sync
Node
->
quorum
)
{
if
(
agreeCount
>=
pNode
->
quorum
)
{
return
true
;
}
}
return
false
;
}
int64_t
syncNodeUpdateCommitIndex
(
SSyncNode
*
ths
,
SyncIndex
commitIndex
)
{
SyncIndex
lastVer
=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
);
commitIndex
=
TMAX
(
commitIndex
,
ths
->
commitIndex
);
ths
->
commitIndex
=
TMIN
(
commitIndex
,
lastVer
);
ths
->
pLogStore
->
syncLogUpdateCommitIndex
(
ths
->
pLogStore
,
ths
->
commitIndex
);
return
ths
->
commitIndex
;
}
int64_t
syncNodeCheckCommitIndex
(
SSyncNode
*
ths
,
SyncIndex
indexLikely
)
{
if
(
indexLikely
>
ths
->
commitIndex
&&
syncNodeAgreedUpon
(
ths
,
indexLikely
))
{
SyncIndex
commitIndex
=
indexLikely
;
syncNodeUpdateCommitIndex
(
ths
,
commitIndex
);
sTrace
(
"vgId:%d, agreed upon. role:%d, term:%"
PRId64
", index: %"
PRId64
""
,
ths
->
vgId
,
ths
->
state
,
ths
->
pRaftStore
->
currentTerm
,
commitIndex
);
}
return
ths
->
commitIndex
;
}
source/libs/sync/src/syncEnv.c
浏览文件 @
80665854
...
...
@@ -100,7 +100,9 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
return
pNode
;
}
void
syncNodeRelease
(
SSyncNode
*
pNode
)
{
taosReleaseRef
(
gNodeRefId
,
pNode
->
rid
);
}
void
syncNodeRelease
(
SSyncNode
*
pNode
)
{
if
(
pNode
)
taosReleaseRef
(
gNodeRefId
,
pNode
->
rid
);
}
int64_t
syncHbTimerDataAdd
(
SSyncHbTimerData
*
pData
)
{
pData
->
rid
=
taosAddRef
(
gHbDataRefId
,
pData
);
...
...
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
80665854
...
...
@@ -81,6 +81,15 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
index
);
}
SSyncLogReplMgr
*
syncNodeGetLogReplMgr
(
SSyncNode
*
pNode
,
SRaftId
*
pDestId
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replicaNum
;
i
++
)
{
if
(
syncUtilSameId
(
&
(
pNode
->
replicasId
[
i
]),
pDestId
))
{
return
pNode
->
logReplMgrs
[
i
];
}
}
return
NULL
;
}
SyncIndex
syncIndexMgrGetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
if
(
pSyncIndexMgr
==
NULL
)
{
return
SYNC_INDEX_INVALID
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
80665854
...
...
@@ -22,6 +22,7 @@
#include "syncEnv.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncPipeline.h"
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
...
...
@@ -34,6 +35,7 @@
#include "syncTimeout.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "tref.h"
static
void
syncNodeEqPingTimer
(
void
*
param
,
void
*
tmrId
);
static
void
syncNodeEqElectTimer
(
void
*
param
,
void
*
tmrId
);
...
...
@@ -56,7 +58,6 @@ static int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeade
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
);
static
ESyncStrategy
syncNodeStrategy
(
SSyncNode
*
pSyncNode
);
static
SyncIndex
syncNodeGetSnapshotConfigIndex
(
SSyncNode
*
pSyncNode
,
SyncIndex
snapshotLastApplyIndex
);
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
...
...
@@ -80,12 +81,29 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
return
pSyncNode
->
rid
;
}
void
syncStart
(
int64_t
rid
)
{
int32_t
syncStart
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
!=
NULL
)
{
syncNodeStart
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"failed to acquire rid: %"
PRId64
" of tsNodeReftId for pSyncNode"
,
rid
);
return
-
1
;
}
if
(
syncNodeRestore
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to restore sync log buffer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_err
;
}
if
(
syncNodeStart
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to start sync node since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_err
;
}
syncNodeRelease
(
pSyncNode
);
return
0
;
_err:
syncNodeRelease
(
pSyncNode
);
return
-
1
;
}
void
syncStop
(
int64_t
rid
)
{
...
...
@@ -426,7 +444,7 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
}
}
...
...
@@ -554,7 +572,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
if (pEntry != NULL) {
syncEntryDest
or
y(pEntry);
syncEntryDest
ro
y(pEntry);
}
syncNodeRelease(pSyncNode);
return -1;
...
...
@@ -566,7 +584,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDest
or
y(pEntry);
syncEntryDest
ro
y(pEntry);
syncNodeRelease(pSyncNode);
return 0;
}
...
...
@@ -637,7 +655,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
tstrncpy
(
pEp
->
fqdn
,
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
i
].
nodeFqdn
,
TSDB_FQDN_LEN
);
pEp
->
port
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
)[
i
].
nodePort
;
pEpSet
->
numOfEps
++
;
s
Info
(
"vgId:%d, sync get retry epset, index:%d %s:%d"
,
pSyncNode
->
vgId
,
i
,
pEp
->
fqdn
,
pEp
->
port
);
s
Debug
(
"vgId:%d, sync get retry epset, index:%d %s:%d"
,
pSyncNode
->
vgId
,
i
,
pEp
->
fqdn
,
pEp
->
port
);
}
if
(
pEpSet
->
numOfEps
>
0
)
{
pEpSet
->
inUse
=
(
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
+
1
)
%
pEpSet
->
numOfEps
;
...
...
@@ -767,6 +785,29 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
return
ret
;
}
int32_t
syncNodeLogStoreRestoreOnNeed
(
SSyncNode
*
pNode
)
{
ASSERT
(
pNode
->
pLogStore
!=
NULL
&&
"log store not created"
);
ASSERT
(
pNode
->
pFsm
!=
NULL
&&
"pFsm not registered"
);
ASSERT
(
pNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
&&
"FpGetSnapshotInfo not registered"
);
SSnapshot
snapshot
;
if
(
pNode
->
pFsm
->
FpGetSnapshotInfo
(
pNode
->
pFsm
,
&
snapshot
)
<
0
)
{
sError
(
"vgId:%d, failed to get snapshot info since %s"
,
pNode
->
vgId
,
terrstr
());
return
-
1
;
}
SyncIndex
commitIndex
=
snapshot
.
lastApplyIndex
;
SyncIndex
firstVer
=
pNode
->
pLogStore
->
syncLogBeginIndex
(
pNode
->
pLogStore
);
SyncIndex
lastVer
=
pNode
->
pLogStore
->
syncLogLastIndex
(
pNode
->
pLogStore
);
if
(
lastVer
<
commitIndex
||
firstVer
>
commitIndex
+
1
)
{
if
(
pNode
->
pLogStore
->
syncLogRestoreFromSnapshot
(
pNode
->
pLogStore
,
commitIndex
))
{
sError
(
"vgId:%d, failed to restore log store from snapshot since %s. lastVer: %"
PRId64
", snapshotVer: %"
PRId64
,
pNode
->
vgId
,
terrstr
(),
lastVer
,
commitIndex
);
return
-
1
;
}
}
return
0
;
}
// open/close --------------
SSyncNode
*
syncNodeOpen
(
SSyncInfo
*
pSyncInfo
)
{
SSyncNode
*
pSyncNode
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncNode
));
if
(
pSyncNode
==
NULL
)
{
...
...
@@ -842,6 +883,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode
->
syncEqMsg
=
pSyncInfo
->
syncEqMsg
;
pSyncNode
->
syncEqCtrlMsg
=
pSyncInfo
->
syncEqCtrlMsg
;
// create raft log ring buffer
pSyncNode
->
pLogBuf
=
syncLogBufferCreate
();
if
(
pSyncNode
->
pLogBuf
==
NULL
)
{
sError
(
"failed to init sync log buffer since %s. vgId:%d"
,
terrstr
(),
pSyncNode
->
vgId
);
goto
_error
;
}
// init raft config
pSyncNode
->
pRaftCfg
=
raftCfgOpen
(
pSyncNode
->
configPath
);
if
(
pSyncNode
->
pRaftCfg
==
NULL
)
{
...
...
@@ -967,6 +1015,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
}
pSyncNode
->
commitIndex
=
commitIndex
;
if
(
syncNodeLogStoreRestoreOnNeed
(
pSyncNode
)
<
0
)
{
goto
_error
;
}
// timer ms init
pSyncNode
->
pingBaseLine
=
PING_TIMER_MS
;
pSyncNode
->
electBaseLine
=
ELECT_TIMER_MS_MIN
;
...
...
@@ -1024,9 +1075,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// is config changing
pSyncNode
->
changing
=
false
;
// replication mgr
syncNodeLogReplMgrInit
(
pSyncNode
);
// peer state
syncNodePeerStateInit
(
pSyncNode
);
//
// min match index
pSyncNode
->
minMatchIndex
=
SYNC_INDEX_INVALID
;
...
...
@@ -1042,6 +1097,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// snapshotting
atomic_store_64
(
&
pSyncNode
->
snapshottingIndex
,
SYNC_INDEX_INVALID
);
// init log buffer
if
(
syncLogBufferInit
(
pSyncNode
->
pLogBuf
,
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to init sync log buffer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_error
;
}
pSyncNode
->
isStart
=
true
;
pSyncNode
->
electNum
=
0
;
pSyncNode
->
becomeLeaderNum
=
0
;
...
...
@@ -1075,7 +1136,49 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
}
}
void
syncNodeStart
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodeRestore
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
pLogStore
!=
NULL
&&
"log store not created"
);
ASSERT
(
pSyncNode
->
pLogBuf
!=
NULL
&&
"ring log buffer not created"
);
SyncIndex
lastVer
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
commitIndex
=
pSyncNode
->
pLogStore
->
syncLogCommitIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogBuf
->
endIndex
;
if
(
lastVer
!=
-
1
&&
endIndex
!=
lastVer
+
1
)
{
terrno
=
TSDB_CODE_WAL_LOG_INCOMPLETE
;
sError
(
"vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %"
PRId64
", lastVer: %"
PRId64
""
,
pSyncNode
->
vgId
,
terrstr
(),
endIndex
-
1
,
lastVer
);
return
-
1
;
}
ASSERT
(
endIndex
==
lastVer
+
1
);
commitIndex
=
TMAX
(
pSyncNode
->
commitIndex
,
commitIndex
);
if
(
syncLogBufferCommit
(
pSyncNode
->
pLogBuf
,
pSyncNode
,
commitIndex
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
syncNodeStart
(
SSyncNode
*
pSyncNode
)
{
// start raft
if
(
pSyncNode
->
replicaNum
==
1
)
{
raftStoreNextTerm
(
pSyncNode
->
pRaftStore
);
syncNodeBecomeLeader
(
pSyncNode
,
"one replica start"
);
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
}
else
{
syncNodeBecomeFollower
(
pSyncNode
,
"first start"
);
}
int32_t
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
ASSERT
(
ret
==
0
);
return
ret
;
}
void
syncNodeStartOld
(
SSyncNode
*
pSyncNode
)
{
// start raft
if
(
pSyncNode
->
replicaNum
==
1
)
{
raftStoreNextTerm
(
pSyncNode
->
pRaftStore
);
...
...
@@ -1094,7 +1197,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
ASSERT
(
ret
==
0
);
}
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
)
{
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_FOLLOWER
;
syncNodeStopHeartbeatTimer
(
pSyncNode
);
...
...
@@ -1107,6 +1210,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
ASSERT
(
ret
==
0
);
return
ret
;
}
void
syncNodePreClose
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -1127,6 +1231,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
ASSERT
(
ret
==
0
);
pSyncNode
->
pRaftStore
=
NULL
;
syncNodeLogReplMgrDestroy
(
pSyncNode
);
syncRespMgrDestroy
(
pSyncNode
->
pSyncRespMgr
);
pSyncNode
->
pSyncRespMgr
=
NULL
;
voteGrantedDestroy
(
pSyncNode
->
pVotesGranted
);
...
...
@@ -1139,6 +1244,8 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode
->
pMatchIndex
=
NULL
;
logStoreDestory
(
pSyncNode
->
pLogStore
);
pSyncNode
->
pLogStore
=
NULL
;
syncLogBufferDestroy
(
pSyncNode
->
pLogBuf
);
pSyncNode
->
pLogBuf
=
NULL
;
raftCfgClose
(
pSyncNode
->
pRaftCfg
);
pSyncNode
->
pRaftCfg
=
NULL
;
...
...
@@ -1540,7 +1647,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
//
syncMaybeAdvanceCommitIndex(pSyncNode);
}
else
{
syncNodeBecomeFollower
(
pSyncNode
,
tmpbuf
);
...
...
@@ -1628,6 +1735,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// min match index
pSyncNode
->
minMatchIndex
=
SYNC_INDEX_INVALID
;
// reset log buffer
syncLogBufferReset
(
pSyncNode
->
pLogBuf
,
pSyncNode
);
// trace log
sNTrace
(
pSyncNode
,
"become follower %s"
,
debugStr
);
}
...
...
@@ -1723,6 +1833,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// min match index
pSyncNode
->
minMatchIndex
=
SYNC_INDEX_INVALID
;
// reset log buffer
syncLogBufferReset
(
pSyncNode
->
pLogBuf
,
pSyncNode
);
// trace log
sNInfo
(
pSyncNode
,
"become leader %s"
,
debugStr
);
}
...
...
@@ -1734,6 +1847,22 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
sNTrace
(
pSyncNode
,
"state change syncNodeCandidate2Leader"
);
int32_t
ret
=
syncNodeAppendNoop
(
pSyncNode
);
if
(
ret
<
0
)
{
sError
(
"vgId:%d, failed to append noop entry since %s"
,
pSyncNode
->
vgId
,
terrstr
());
}
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
ASSERT
(
lastIndex
>=
0
);
sInfo
(
"vgId:%d, become leader. term: %"
PRId64
", commit index: %"
PRId64
", last index: %"
PRId64
""
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
}
void
syncNodeCandidate2LeaderOld
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
ASSERT
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
syncNodeBecomeLeader
(
pSyncNode
,
"candidate to leader"
);
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
...
...
@@ -1757,18 +1886,30 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
void
syncNodeFollower2Candidate
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
);
pSyncNode
->
state
=
TAOS_SYNC_STATE_CANDIDATE
;
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
sInfo
(
"vgId:%d, become candidate from follower. term: %"
PRId64
", commit index: %"
PRId64
", last index: %"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
sNTrace
(
pSyncNode
,
"follower to candidate"
);
}
void
syncNodeLeader2Follower
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
syncNodeBecomeFollower
(
pSyncNode
,
"leader to follower"
);
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
sInfo
(
"vgId:%d, become follower from leader. term: %"
PRId64
", commit index: %"
PRId64
", last index: %"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
sNTrace
(
pSyncNode
,
"leader to follower"
);
}
void
syncNodeCandidate2Follower
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
syncNodeBecomeFollower
(
pSyncNode
,
"candidate to follower"
);
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
sInfo
(
"vgId:%d, become follower from candidate. term: %"
PRId64
", commit index: %"
PRId64
", last index: %"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
sNTrace
(
pSyncNode
,
"candidate to follower"
);
}
...
...
@@ -1922,7 +2063,7 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pPreEntry
);
syncEntryDest
ro
y
(
pPreEntry
);
}
return
preTerm
;
...
...
@@ -2164,7 +2305,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
SRpcMsg
rpcMsg
=
{
0
};
int32_t
code
=
syncBuildClientRequestFromNoopEntry
(
&
rpcMsg
,
pEntry
,
pNode
->
vgId
);
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
sNTrace
(
pNode
,
"propose msg, type:noop"
);
code
=
(
*
pNode
->
syncEqMsg
)(
pNode
->
msgcb
,
&
rpcMsg
);
...
...
@@ -2192,6 +2333,37 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return
code
;
}
int32_t
syncNodeAppend
(
SSyncNode
*
ths
,
SSyncRaftEntry
*
pEntry
)
{
// append to log buffer
if
(
syncLogBufferAppend
(
ths
->
pLogBuf
,
ths
,
pEntry
)
<
0
)
{
sError
(
"vgId:%d, failed to enqueue sync log buffer. index:%"
PRId64
""
,
ths
->
vgId
,
pEntry
->
index
);
return
-
1
;
}
// proceed match index, with replicating on needed
SyncIndex
matchIndex
=
syncLogBufferProceed
(
ths
->
pLogBuf
,
ths
,
NULL
);
sTrace
(
"vgId:%d, append raft entry. index: %"
PRId64
", term: %"
PRId64
" pBuf: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
ths
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
ths
->
pLogBuf
->
startIndex
,
ths
->
pLogBuf
->
commitIndex
,
ths
->
pLogBuf
->
matchIndex
,
ths
->
pLogBuf
->
endIndex
);
// multi replica
if
(
ths
->
replicaNum
>
1
)
{
return
0
;
}
// single replica
(
void
)
syncNodeUpdateCommitIndex
(
ths
,
matchIndex
);
if
(
syncLogBufferCommit
(
ths
->
pLogBuf
,
ths
,
ths
->
commitIndex
)
<
0
)
{
sError
(
"vgId:%d, failed to commit until commitIndex:%"
PRId64
""
,
ths
->
vgId
,
ths
->
commitIndex
);
return
-
1
;
}
return
0
;
}
bool
syncNodeHeartbeatReplyTimeout
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
replicaNum
==
1
)
{
return
false
;
...
...
@@ -2235,6 +2407,20 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
}
static
int32_t
syncNodeAppendNoop
(
SSyncNode
*
ths
)
{
SyncIndex
index
=
syncLogBufferGetEndIndex
(
ths
->
pLogBuf
);
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuildNoop
(
term
,
index
,
ths
->
vgId
);
if
(
pEntry
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
int32_t
ret
=
syncNodeAppend
(
ths
,
pEntry
);
return
0
;
}
static
int32_t
syncNodeAppendNoopOld
(
SSyncNode
*
ths
)
{
int32_t
ret
=
0
;
SyncIndex
index
=
ths
->
pLogStore
->
syncLogWriteIndex
(
ths
->
pLogStore
);
...
...
@@ -2257,7 +2443,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
return
ret
;
...
...
@@ -2282,6 +2468,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply
->
srcId
=
ths
->
myRaftId
;
pMsgReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pMsgReply
->
privateTerm
=
8864
;
// magic number
pMsgReply
->
startTime
=
ths
->
startTime
;
pMsgReply
->
timeStamp
=
tsMs
;
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
&&
ths
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
...
...
@@ -2345,6 +2532,26 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
int32_t
syncNodeOnHeartbeatReply
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
const
STraceId
*
trace
=
&
pRpcMsg
->
info
.
traceId
;
char
tbuf
[
40
]
=
{
0
};
TRACE_TO_STR
(
trace
,
tbuf
);
SyncHeartbeatReply
*
pMsg
=
pRpcMsg
->
pCont
;
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
ths
,
&
pMsg
->
srcId
);
if
(
pMgr
==
NULL
)
{
sError
(
"vgId:%d, failed to get log repl mgr for the peer at addr 0x016%"
PRIx64
""
,
ths
->
vgId
,
pMsg
->
srcId
.
addr
);
return
-
1
;
}
int64_t
tsMs
=
taosGetTimestampMs
();
syncLogRecvHeartbeatReply
(
ths
,
pMsg
,
tsMs
-
pMsg
->
timeStamp
,
tbuf
);
syncIndexMgrSetRecvTime
(
ths
->
pMatchIndex
,
&
pMsg
->
srcId
,
tsMs
);
return
syncLogReplMgrProcessHeartbeatReply
(
pMgr
,
ths
,
pMsg
);
}
int32_t
syncNodeOnHeartbeatReplyOld
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncHeartbeatReply
*
pMsg
=
pRpcMsg
->
pCont
;
const
STraceId
*
trace
=
&
pRpcMsg
->
info
.
traceId
;
...
...
@@ -2364,6 +2571,26 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd
*
pMsg
=
pRpcMsg
->
pCont
;
syncLogRecvLocalCmd
(
ths
,
pMsg
,
""
);
if
(
pMsg
->
cmd
==
SYNC_LOCAL_CMD_STEP_DOWN
)
{
syncNodeStepDown
(
ths
,
pMsg
->
sdNewTerm
);
}
else
if
(
pMsg
->
cmd
==
SYNC_LOCAL_CMD_FOLLOWER_CMT
)
{
(
void
)
syncNodeUpdateCommitIndex
(
ths
,
pMsg
->
fcIndex
);
if
(
syncLogBufferCommit
(
ths
->
pLogBuf
,
ths
,
ths
->
commitIndex
)
<
0
)
{
sError
(
"vgId:%d, failed to commit raft log since %s. commit index: %"
PRId64
""
,
ths
->
vgId
,
terrstr
(),
ths
->
commitIndex
);
}
}
else
{
sError
(
"error local cmd"
);
}
return
0
;
}
int32_t
syncNodeOnLocalCmdOld
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncLocalCmd
*
pMsg
=
pRpcMsg
->
pCont
;
syncLogRecvLocalCmd
(
ths
,
pMsg
,
""
);
if
(
pMsg
->
cmd
==
SYNC_LOCAL_CMD_STEP_DOWN
)
{
syncNodeStepDown
(
ths
,
pMsg
->
sdNewTerm
);
...
...
@@ -2391,6 +2618,31 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t
syncNodeOnClientRequest
(
SSyncNode
*
ths
,
SRpcMsg
*
pMsg
,
SyncIndex
*
pRetIndex
)
{
sNTrace
(
ths
,
"on client request"
);
int32_t
code
=
0
;
SyncIndex
index
=
syncLogBufferGetEndIndex
(
ths
->
pLogBuf
);
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
NULL
;
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
pEntry
=
syncEntryBuildFromClientRequest
(
pMsg
->
pCont
,
term
,
index
);
}
else
{
pEntry
=
syncEntryBuildFromRpcMsg
(
pMsg
,
term
,
index
);
}
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pRetIndex
)
{
(
*
pRetIndex
)
=
index
;
}
return
syncNodeAppend
(
ths
,
pEntry
);
}
return
-
1
;
}
int32_t
syncNodeOnClientRequestOld
(
SSyncNode
*
ths
,
SRpcMsg
*
pMsg
,
SyncIndex
*
pRetIndex
)
{
sNTrace
(
ths
,
"on client request"
);
int32_t
ret
=
0
;
int32_t
code
=
0
;
...
...
@@ -2414,7 +2666,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
return
-
1
;
...
...
@@ -2437,7 +2689,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
return
-
1
;
...
...
@@ -2472,7 +2724,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
if
(
h
)
{
taosLRUCacheRelease
(
ths
->
pLogStore
->
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
return
ret
;
...
...
@@ -2582,6 +2834,7 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
}
int32_t
syncNodeDoCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
ASSERT
(
false
);
if
(
beginIndex
>
endIndex
)
{
return
0
;
}
...
...
@@ -2695,7 +2948,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
}
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
80665854
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
int32_t
syncBuildTimeout
(
SRpcMsg
*
pMsg
,
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
SSyncNode
*
pNode
)
{
...
...
@@ -153,6 +154,34 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
return
0
;
}
int32_t
syncBuildAppendEntriesFromRaftLog
(
SSyncNode
*
pNode
,
SSyncRaftEntry
*
pEntry
,
SyncTerm
prevLogTerm
,
SRpcMsg
*
pRpcMsg
)
{
uint32_t
dataLen
=
pEntry
->
bytes
;
uint32_t
bytes
=
sizeof
(
SyncAppendEntries
)
+
dataLen
;
pRpcMsg
->
contLen
=
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
if
(
pRpcMsg
->
pCont
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
bytes
=
pRpcMsg
->
contLen
;
pMsg
->
msgType
=
pRpcMsg
->
msgType
=
TDMT_SYNC_APPEND_ENTRIES
;
pMsg
->
dataLen
=
dataLen
;
(
void
)
memcpy
(
pMsg
->
data
,
pEntry
,
dataLen
);
pMsg
->
prevLogIndex
=
pEntry
->
index
-
1
;
pMsg
->
prevLogTerm
=
prevLogTerm
;
pMsg
->
vgId
=
pNode
->
vgId
;
pMsg
->
srcId
=
pNode
->
myRaftId
;
pMsg
->
term
=
pNode
->
pRaftStore
->
currentTerm
;
pMsg
->
commitIndex
=
pNode
->
commitIndex
;
pMsg
->
privateTerm
=
0
;
return
0
;
}
int32_t
syncBuildHeartbeat
(
SRpcMsg
*
pMsg
,
int32_t
vgId
)
{
int32_t
bytes
=
sizeof
(
SyncHeartbeat
);
pMsg
->
pCont
=
rpcMallocCont
(
bytes
);
...
...
source/libs/sync/src/syncPipeline.c
0 → 100644
浏览文件 @
80665854
此差异已折叠。
点击以展开。
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
80665854
...
...
@@ -395,4 +395,3 @@ int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) {
cJSON_Delete
(
pRoot
);
return
0
;
}
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
80665854
...
...
@@ -20,7 +20,7 @@
SSyncRaftEntry
*
syncEntryBuild
(
int32_t
dataLen
)
{
int32_t
bytes
=
sizeof
(
SSyncRaftEntry
)
+
dataLen
;
SSyncRaftEntry
*
pEntry
=
taosMemory
Malloc
(
bytes
);
SSyncRaftEntry
*
pEntry
=
taosMemory
Calloc
(
1
,
bytes
);
if
(
pEntry
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -89,7 +89,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
return
pEntry
;
}
void
syncEntryDest
or
y
(
SSyncRaftEntry
*
pEntry
)
{
void
syncEntryDest
ro
y
(
SSyncRaftEntry
*
pEntry
)
{
if
(
pEntry
!=
NULL
)
{
sTrace
(
"free entry: %p"
,
pEntry
);
taosMemoryFree
(
pEntry
);
...
...
@@ -265,7 +265,7 @@ static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(
static
void
freeRaftEntry
(
void
*
param
)
{
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
param
;
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
SRaftEntryCache
*
raftEntryCacheCreate
(
SSyncNode
*
pSyncNode
,
int32_t
maxCount
)
{
...
...
@@ -395,7 +395,7 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
SSkipListNode
*
pNode
=
tSkipListIterGet
(
pIter
);
ASSERT
(
pNode
!=
NULL
);
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
++
returnCnt
;
}
tSkipListDestroyIter
(
pIter
);
...
...
@@ -424,7 +424,7 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
++
returnCnt
;
SSyncRaftEntry
*
pEntry
=
(
SSyncRaftEntry
*
)
SL_GET_NODE_DATA
(
pNode
);
// syncEntryDest
or
y(pEntry);
// syncEntryDest
ro
y(pEntry);
taosRemoveRef
(
pCache
->
refMgr
,
pEntry
->
rid
);
}
tSkipListDestroyIter
(
pIter
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
80665854
...
...
@@ -201,9 +201,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
syncMeta
.
isWeek
=
pEntry
->
isWeak
;
syncMeta
.
seqNum
=
pEntry
->
seqNum
;
syncMeta
.
term
=
pEntry
->
term
;
int64_t
tsWriteBegin
=
taosGetTimestampNs
();
index
=
walAppendLog
(
pWal
,
pEntry
->
originalRpcType
,
syncMeta
,
pEntry
->
data
,
pEntry
->
dataLen
);
index
=
walAppendLog
(
pWal
,
pEntry
->
index
,
pEntry
->
originalRpcType
,
syncMeta
,
pEntry
->
data
,
pEntry
->
dataLen
);
int64_t
tsWriteEnd
=
taosGetTimestampNs
();
int64_t
tsElapsed
=
tsWriteEnd
-
tsWriteBegin
;
...
...
@@ -217,7 +216,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
pEntry
->
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
return
-
1
;
}
pEntry
->
index
=
index
;
ASSERT
(
pEntry
->
index
==
index
);
sNTrace
(
pData
->
pSyncNode
,
"write index:%"
PRId64
", type:%s, origin type:%s, elapsed:%"
PRId64
,
pEntry
->
index
,
TMSG_INFO
(
pEntry
->
msgType
),
TMSG_INFO
(
pEntry
->
originalRpcType
),
tsElapsed
);
...
...
@@ -344,8 +344,6 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"vgId:%d, wal truncate error, from-index:%"
PRId64
", err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
fromIndex
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
// ASSERT(0);
}
// event log
...
...
@@ -397,8 +395,6 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"vgId:%d, wal update commit index error, index:%"
PRId64
", err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
index
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
ASSERT
(
0
);
return
-
1
;
}
return
0
;
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
80665854
...
...
@@ -16,13 +16,11 @@
#define _DEFAULT_SOURCE
#include "syncReplication.h"
#include "syncIndexMgr.h"
#include "syncPipeline.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
static
int32_t
syncNodeSendAppendEntries
(
SSyncNode
*
pNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
);
static
int32_t
syncNodeMaybeSendAppendEntries
(
SSyncNode
*
pNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
);
// TLA+ Spec
// AppendEntries(i, j) ==
// /\ i /= j
...
...
@@ -48,7 +46,10 @@ static int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* d
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
int32_t
syncNodeMaybeSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
);
int32_t
syncNodeReplicateOne
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
bool
snapshot
)
{
ASSERT
(
false
&&
"deprecated"
);
// next index
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
...
...
@@ -117,7 +118,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
if
(
h
)
{
taosLRUCacheRelease
(
pCache
,
h
,
false
);
}
else
{
syncEntryDest
or
y
(
pEntry
);
syncEntryDest
ro
y
(
pEntry
);
}
// prepare msg
...
...
@@ -136,7 +137,29 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
return
0
;
}
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodeReplicate
(
SSyncNode
*
pNode
)
{
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
taosThreadMutexLock
(
&
pBuf
->
mutex
);
int32_t
ret
=
syncNodeReplicateWithoutLock
(
pNode
);
taosThreadMutexUnlock
(
&
pBuf
->
mutex
);
return
ret
;
}
int32_t
syncNodeReplicateWithoutLock
(
SSyncNode
*
pNode
)
{
if
(
pNode
->
state
!=
TAOS_SYNC_STATE_LEADER
||
pNode
->
replicaNum
==
1
)
{
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pNode
->
replicaNum
;
i
++
)
{
if
(
syncUtilSameId
(
&
pNode
->
replicasId
[
i
],
&
pNode
->
myRaftId
))
{
continue
;
}
SSyncLogReplMgr
*
pMgr
=
pNode
->
logReplMgrs
[
i
];
(
void
)
syncLogReplMgrReplicateOnce
(
pMgr
,
pNode
);
}
return
0
;
}
int32_t
syncNodeReplicateOld
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
-
1
;
}
...
...
@@ -159,6 +182,14 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
}
int32_t
syncNodeSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
int32_t
ret
=
0
;
pMsg
->
destId
=
*
destRaftId
;
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
pRpcMsg
);
return
0
;
}
int32_t
syncNodeSendAppendEntriesOld
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
ret
=
0
;
SyncAppendEntries
*
pMsg
=
pRpcMsg
->
pCont
;
if
(
pMsg
==
NULL
)
{
...
...
@@ -237,4 +268,4 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
}
return
0
;
}
\ No newline at end of file
}
source/libs/sync/src/syncRequestVote.c
浏览文件 @
80665854
...
...
@@ -124,7 +124,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// send msg
SRpcMsg
rpcMsg
=
{
0
};
ret
=
syncBuildRequestVoteReply
(
&
rpcMsg
,
ths
->
vgId
);
ASSERT
(
ret
==
0
);
ASSERT
(
ret
==
0
);
SyncRequestVoteReply
*
pReply
=
rpcMsg
.
pCont
;
pReply
->
srcId
=
ths
->
myRaftId
;
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
80665854
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "syncSnapshot.h"
#include "syncIndexMgr.h"
#include "syncPipeline.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
...
...
@@ -273,6 +274,11 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return
1
;
}
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pDestId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sInfo
(
"vgId:%d, start snapshot for peer: %s:%d"
,
pSyncNode
->
vgId
,
host
,
port
);
code
=
snapshotSenderStart
(
pSender
);
if
(
code
!=
0
)
{
sNError
(
pSyncNode
,
"snapshot sender start error"
);
...
...
@@ -372,7 +378,10 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
}
int32_t
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pPreMsg
)
{
ASSERT
(
!
snapshotReceiverIsStart
(
pReceiver
));
if
(
snapshotReceiverIsStart
(
pReceiver
))
{
sWarn
(
"vgId:%d, snapshot receiver has started."
,
pReceiver
->
pSyncNode
->
vgId
);
return
0
;
}
pReceiver
->
start
=
true
;
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
;
...
...
@@ -742,6 +751,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
syncNodeOnSnapshotEnd
(
pSyncNode
,
pMsg
);
(
void
)
syncLogBufferReInit
(
pSyncNode
->
pLogBuf
,
pSyncNode
);
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_FORCE_CLOSE
)
{
// force close, no response
...
...
@@ -873,11 +883,10 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// receive ack is finish, close sender
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_END
)
{
snapshotSenderStop
(
pSender
,
true
);
// update next-index
syncIndexMgrSetIndex
(
pSyncNode
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
lastIndex
+
1
);
syncNodeReplicateOne
(
pSyncNode
,
&
(
pMsg
->
srcId
),
false
);
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pSyncNode
,
&
pMsg
->
srcId
);
if
(
pMgr
)
{
syncLogReplMgrReset
(
pMgr
);
}
return
0
;
}
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
80665854
...
...
@@ -131,4 +131,4 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
}
return
ret
;
}
\ No newline at end of file
}
source/libs/sync/src/syncUtil.c
浏览文件 @
80665854
...
...
@@ -278,20 +278,21 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
terrno
=
errCode
;
if
(
pNode
!=
NULL
&&
pNode
->
pRaftCfg
!=
NULL
)
{
taosPrintLog
(
flags
,
level
,
dflag
,
"vgId:%d, sync %s "
"%s"
", tm:%"
PRIu64
", cmt:%"
PRId64
", fst:%"
PRId64
", lst:%"
PRId64
", min:%"
PRId64
", snap:%"
PRId64
", snap-tm:%"
PRIu64
", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, aq:%d, snaping:%"
PRId64
", r-num:%d, lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s, %s, %s"
,
pNode
->
vgId
,
syncStr
(
pNode
->
state
),
eventLog
,
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
electNum
,
pNode
->
becomeLeaderNum
,
pNode
->
configChangeNum
,
cacheHit
,
cacheMiss
,
pNode
->
hbSlowNum
,
pNode
->
hbrSlowNum
,
aqItems
,
pNode
->
snapshottingIndex
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
,
hbTimeStr
,
hbrTimeStr
);
taosPrintLog
(
flags
,
level
,
dflag
,
"vgId:%d, sync %s "
"%s"
", term:%"
PRIu64
", commit-index:%"
PRId64
", first-ver:%"
PRId64
", last-ver:%"
PRId64
", min:%"
PRId64
", snap:%"
PRId64
", snap-term:%"
PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%"
PRId64
", replicas:%d, last-cfg:%"
PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%"
PRId64
", hb:%"
PRId64
", %s, %s, %s, %s"
,
pNode
->
vgId
,
syncStr
(
pNode
->
state
),
eventLog
,
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
electNum
,
pNode
->
becomeLeaderNum
,
pNode
->
configChangeNum
,
cacheHit
,
cacheMiss
,
pNode
->
hbSlowNum
,
pNode
->
hbrSlowNum
,
aqItems
,
pNode
->
snapshottingIndex
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
,
hbTimeStr
,
hbrTimeStr
);
}
}
...
...
@@ -427,7 +428,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
sNTrace
(
pSyncNode
,
"send sync-append-entries-reply to %s:%d, {term:%"
PRId64
", pterm:%"
PRId64
", success:%d, lsend-index:%"
PRId64
", match:%"
PRId64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
private
Term
,
pMsg
->
success
,
pMsg
->
lastSendIndex
,
pMsg
->
matchIndex
,
s
);
host
,
port
,
pMsg
->
term
,
pMsg
->
lastMatch
Term
,
pMsg
->
success
,
pMsg
->
lastSendIndex
,
pMsg
->
matchIndex
,
s
);
}
void
syncLogRecvAppendEntriesReply
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntriesReply
*
pMsg
,
const
char
*
s
)
{
...
...
@@ -440,7 +441,7 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
sNTrace
(
pSyncNode
,
"recv sync-append-entries-reply from %s:%d {term:%"
PRId64
", pterm:%"
PRId64
", success:%d, lsend-index:%"
PRId64
", match:%"
PRId64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
private
Term
,
pMsg
->
success
,
pMsg
->
lastSendIndex
,
pMsg
->
matchIndex
,
s
);
host
,
port
,
pMsg
->
term
,
pMsg
->
lastMatch
Term
,
pMsg
->
success
,
pMsg
->
lastSendIndex
,
pMsg
->
matchIndex
,
s
);
}
void
syncLogSendHeartbeat
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeat
*
pMsg
,
bool
printX
,
int64_t
timerElapsed
,
...
...
source/libs/sync/test/syncAppendEntriesBatchTest.cpp
浏览文件 @
80665854
#include "syncTest.h"
#include "syncBatch.h"
#include "syncTest.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
...
...
source/libs/sync/test/syncEnvTest.cpp
浏览文件 @
80665854
...
...
@@ -21,12 +21,12 @@ int main() {
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
//ret = syncEnvStartTimer();
//
ret = syncEnvStartTimer();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
//ret = syncEnvStopTimer();
//
ret = syncEnvStopTimer();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
...
...
source/libs/sync/test/syncLocalCmdTest.cpp
浏览文件 @
80665854
...
...
@@ -31,8 +31,8 @@ void test1() {
void
test2
()
{
SyncLocalCmd
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncLocalCmdSerialize
(
pMsg
,
serialized
,
len
);
SyncLocalCmd
*
pMsg2
=
syncLocalCmdBuild
(
1000
);
syncLocalCmdDeserialize
(
serialized
,
len
,
pMsg2
);
...
...
@@ -45,8 +45,8 @@ void test2() {
void
test3
()
{
SyncLocalCmd
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncLocalCmdSerialize2
(
pMsg
,
&
len
);
uint32_t
len
;
char
*
serialized
=
syncLocalCmdSerialize2
(
pMsg
,
&
len
);
SyncLocalCmd
*
pMsg2
=
syncLocalCmdDeserialize2
(
serialized
,
len
);
syncLocalCmdLog2
((
char
*
)
"test3: syncLocalCmdSerialize3 -> syncLocalCmdDeserialize2 "
,
pMsg2
);
...
...
@@ -57,7 +57,7 @@ void test3() {
void
test4
()
{
SyncLocalCmd
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncLocalCmd2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncLocalCmd
*
pMsg2
=
(
SyncLocalCmd
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncLocalCmdFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
...
...
@@ -70,7 +70,7 @@ void test4() {
void
test5
()
{
SyncLocalCmd
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncLocalCmd2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncLocalCmd
*
pMsg2
=
syncLocalCmdFromRpcMsg2
(
&
rpcMsg
);
syncLocalCmdLog2
((
char
*
)
"test5: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg2 "
,
pMsg2
);
...
...
source/libs/sync/test/syncPreSnapshotReplyTest.cpp
浏览文件 @
80665854
...
...
@@ -30,8 +30,8 @@ void test1() {
void
test2
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncPreSnapshotReplySerialize
(
pMsg
,
serialized
,
len
);
SyncPreSnapshotReply
*
pMsg2
=
syncPreSnapshotReplyBuild
(
789
);
syncPreSnapshotReplyDeserialize
(
serialized
,
len
,
pMsg2
);
...
...
@@ -44,8 +44,8 @@ void test2() {
void
test3
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncPreSnapshotReplySerialize2
(
pMsg
,
&
len
);
uint32_t
len
;
char
*
serialized
=
syncPreSnapshotReplySerialize2
(
pMsg
,
&
len
);
SyncPreSnapshotReply
*
pMsg2
=
syncPreSnapshotReplyDeserialize2
(
serialized
,
len
);
syncPreSnapshotReplyLog2
((
char
*
)
"test3: syncPreSnapshotReplySerialize2 -> syncPreSnapshotReplyDeserialize2 "
,
pMsg2
);
...
...
@@ -56,7 +56,7 @@ void test3() {
void
test4
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPreSnapshotReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshotReply
*
pMsg2
=
(
SyncPreSnapshotReply
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncPreSnapshotReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
...
...
@@ -69,7 +69,7 @@ void test4() {
void
test5
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPreSnapshotReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshotReply
*
pMsg2
=
syncPreSnapshotReplyFromRpcMsg2
(
&
rpcMsg
);
syncPreSnapshotReplyLog2
((
char
*
)
"test5: syncPreSnapshotReply2RpcMsg -> syncPreSnapshotReplyFromRpcMsg2 "
,
pMsg2
);
...
...
source/libs/sync/test/syncPreSnapshotTest.cpp
浏览文件 @
80665854
...
...
@@ -28,8 +28,8 @@ void test1() {
void
test2
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncPreSnapshotSerialize
(
pMsg
,
serialized
,
len
);
SyncPreSnapshot
*
pMsg2
=
syncPreSnapshotBuild
(
789
);
syncPreSnapshotDeserialize
(
serialized
,
len
,
pMsg2
);
...
...
@@ -42,8 +42,8 @@ void test2() {
void
test3
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncPreSnapshotSerialize2
(
pMsg
,
&
len
);
uint32_t
len
;
char
*
serialized
=
syncPreSnapshotSerialize2
(
pMsg
,
&
len
);
SyncPreSnapshot
*
pMsg2
=
syncPreSnapshotDeserialize2
(
serialized
,
len
);
syncPreSnapshotLog2
((
char
*
)
"test3: syncPreSnapshotSerialize2 -> syncPreSnapshotDeserialize2 "
,
pMsg2
);
...
...
@@ -54,7 +54,7 @@ void test3() {
void
test4
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPreSnapshot2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshot
*
pMsg2
=
(
SyncPreSnapshot
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncPreSnapshotFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
...
...
@@ -67,7 +67,7 @@ void test4() {
void
test5
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPreSnapshot2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshot
*
pMsg2
=
syncPreSnapshotFromRpcMsg2
(
&
rpcMsg
);
syncPreSnapshotLog2
((
char
*
)
"test5: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg2 "
,
pMsg2
);
...
...
source/libs/sync/test/syncRespMgrTest.cpp
浏览文件 @
80665854
...
...
@@ -63,7 +63,7 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
SRpcHandleInfo
stub
;
int32_t
ret
=
syncRespMgrGetAndDel
(
pMgr
,
i
,
&
stub
);
if
(
ret
==
1
)
{
//printStub(&stub);
//
printStub(&stub);
}
else
if
(
ret
==
0
)
{
printf
(
"%"
PRId64
" notFound
\n
"
,
i
);
}
...
...
source/libs/sync/test/syncSnapshotSenderTest.cpp
浏览文件 @
80665854
...
...
@@ -32,7 +32,7 @@ SSyncSnapshotSender* createSender() {
pSyncNode
->
pRaftStore
=
(
SRaftStore
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pRaftStore
)));
pSyncNode
->
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pFsm
)));
#if 0
#if 0
pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead;
pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead;
pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead;
...
...
@@ -52,8 +52,8 @@ SSyncSnapshotSender* createSender() {
pSender
->
snapshot
.
lastApplyTerm
=
88
;
pSender
->
sendingMS
=
77
;
pSender
->
term
=
66
;
//pSender->privateTerm = 99;
//
pSender->privateTerm = 99;
return
pSender
;
}
...
...
source/libs/sync/test/syncSnapshotTest.cpp
浏览文件 @
80665854
...
...
@@ -76,7 +76,6 @@ void initFsm() {
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
#endif
}
SSyncNode
*
syncNodeInit
()
{
...
...
source/libs/sync/test/syncTest.cpp
浏览文件 @
80665854
#include <gtest/gtest.h>
#include "syncTest.h"
#include <gtest/gtest.h>
/*
typedef enum {
...
...
source/libs/sync/test/syncWriteTest.cpp
浏览文件 @
80665854
...
...
@@ -54,7 +54,6 @@ void initFsm() {
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
#endif
}
SSyncNode
*
syncNodeInit
()
{
...
...
source/libs/sync/test/sync_test_lib/inc/syncTest.h
浏览文件 @
80665854
...
...
@@ -25,7 +25,6 @@ extern "C" {
#include "tref.h"
#include "wal.h"
#include "tref.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncIndexMgr.h"
...
...
@@ -39,6 +38,7 @@ extern "C" {
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "tref.h"
extern
void
addEpIntoEpSet
(
SEpSet
*
pEpSet
,
const
char
*
fqdn
,
uint16_t
port
);
...
...
@@ -125,12 +125,12 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
syncIndexMgr2Json
(
SSyncIndexMgr
*
pSyncIndexMgr
);
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
);
void
syncIndexMgrPrint
(
SSyncIndexMgr
*
pObj
);
void
syncIndexMgrPrint2
(
char
*
s
,
SSyncIndexMgr
*
pObj
);
void
syncIndexMgrLog
(
SSyncIndexMgr
*
pObj
);
void
syncIndexMgrLog2
(
char
*
s
,
SSyncIndexMgr
*
pObj
);
cJSON
*
syncIndexMgr2Json
(
SSyncIndexMgr
*
pSyncIndexMgr
);
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
);
void
syncIndexMgrPrint
(
SSyncIndexMgr
*
pObj
);
void
syncIndexMgrPrint2
(
char
*
s
,
SSyncIndexMgr
*
pObj
);
void
syncIndexMgrLog
(
SSyncIndexMgr
*
pObj
);
void
syncIndexMgrLog2
(
char
*
s
,
SSyncIndexMgr
*
pObj
);
cJSON
*
syncRpcMsg2Json
(
SRpcMsg
*
pRpcMsg
);
cJSON
*
syncRpcUnknownMsg2Json
();
...
...
@@ -140,7 +140,6 @@ void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
void
syncRpcMsgLog
(
SRpcMsg
*
pMsg
);
void
syncRpcMsgLog2
(
char
*
s
,
SRpcMsg
*
pMsg
);
// origin syncMessage
typedef
struct
SyncPing
{
uint32_t
bytes
;
...
...
@@ -153,7 +152,6 @@ typedef struct SyncPing {
char
data
[];
}
SyncPing
;
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
SyncPing
*
syncPingBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
int32_t
vgId
,
const
char
*
str
);
SyncPing
*
syncPingBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
int32_t
vgId
);
...
...
source/libs/sync/test/sync_test_lib/src/syncBatch.c
浏览文件 @
80665854
...
...
@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
#include "syncBatch.h"
#include "syncTest.h"
// ---- message process SyncClientRequestBatch----
...
...
source/libs/sync/test/sync_test_lib/src/syncRaftStoreDebug.c
浏览文件 @
80665854
...
...
@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
#include "cJSON.h"
#include "syncTest.h"
int32_t
raftStoreFromJson
(
SRaftStore
*
pRaftStore
,
cJSON
*
pJson
)
{
return
0
;
}
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
80665854
...
...
@@ -195,7 +195,8 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
btc
.
idx
++
;
}
else
if
(
c
==
0
)
{
// dup key not allowed
ASSERT
(
0
);
tdbError
(
"unable to insert dup key. pKey: %p, kLen: %d, btc: %p, pTxn: %p"
,
pKey
,
kLen
,
&
btc
,
pTxn
);
// ASSERT(0);
return
-
1
;
}
}
...
...
source/libs/transport/src/tmsgcb.c
浏览文件 @
80665854
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "tmsgcb.h"
#include "taoserror.h"
#include "transLog.h"
#include "trpc.h"
static
SMsgCb
defaultMsgCb
;
...
...
@@ -23,6 +24,7 @@ static SMsgCb defaultMsgCb;
void
tmsgSetDefault
(
const
SMsgCb
*
msgcb
)
{
defaultMsgCb
=
*
msgcb
;
}
int32_t
tmsgPutToQueue
(
const
SMsgCb
*
msgcb
,
EQueueType
qtype
,
SRpcMsg
*
pMsg
)
{
ASSERT
(
msgcb
!=
NULL
);
int32_t
code
=
(
*
msgcb
->
putToQueueFp
)(
msgcb
->
mgmt
,
qtype
,
pMsg
);
if
(
code
!=
0
)
{
rpcFreeCont
(
pMsg
->
pCont
);
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
80665854
...
...
@@ -1259,6 +1259,7 @@ _return2:
tDebug
(
"handle %p failed to send to release handle"
,
exh
);
return
-
1
;
}
int
transSendResponse
(
const
STransMsg
*
msg
)
{
if
(
msg
->
info
.
noResp
)
{
rpcFreeCont
(
msg
->
pCont
);
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
80665854
...
...
@@ -285,6 +285,9 @@ void walAlignVersions(SWal* pWal) {
if
(
pWal
->
vers
.
lastVer
<
pWal
->
vers
.
snapshotVer
)
{
wWarn
(
"vgId:%d, lastVer:%"
PRId64
" is less than snapshotVer:%"
PRId64
". align with it."
,
pWal
->
cfg
.
vgId
,
pWal
->
vers
.
lastVer
,
pWal
->
vers
.
snapshotVer
);
if
(
pWal
->
vers
.
lastVer
<
pWal
->
vers
.
firstVer
)
{
pWal
->
vers
.
firstVer
=
pWal
->
vers
.
snapshotVer
+
1
;
}
pWal
->
vers
.
lastVer
=
pWal
->
vers
.
snapshotVer
;
}
if
(
pWal
->
vers
.
commitVer
<
pWal
->
vers
.
snapshotVer
)
{
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
80665854
...
...
@@ -70,7 +70,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pWal
->
lastRollSeq
=
-
1
;
taosArrayClear
(
pWal
->
fileInfoSet
);
pWal
->
vers
.
firstVer
=
-
1
;
pWal
->
vers
.
firstVer
=
ver
+
1
;
pWal
->
vers
.
lastVer
=
ver
;
pWal
->
vers
.
commitVer
=
ver
;
pWal
->
vers
.
snapshotVer
=
ver
;
...
...
@@ -554,10 +554,15 @@ END:
return
-
1
;
}
int64_t
walAppendLog
(
SWal
*
pWal
,
tmsg_t
msgType
,
SWalSyncInfo
syncMeta
,
const
void
*
body
,
int32_t
bodyLen
)
{
int64_t
walAppendLog
(
SWal
*
pWal
,
int64_t
index
,
tmsg_t
msgType
,
SWalSyncInfo
syncMeta
,
const
void
*
body
,
int32_t
bodyLen
)
{
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
index
=
pWal
->
vers
.
lastVer
+
1
;
if
(
index
!=
pWal
->
vers
.
lastVer
+
1
)
{
terrno
=
TSDB_CODE_WAL_INVALID_VER
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
if
(
walCheckAndRoll
(
pWal
)
<
0
)
{
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
...
...
tests/pytest/util/cases.py
浏览文件 @
80665854
...
...
@@ -63,7 +63,7 @@ class TDCases:
tdLog
.
info
(
"total %d Linux test case(s) executed"
%
(
runNum
))
def
runOneLinux
(
self
,
conn
,
fileName
,
replicaVar
):
def
runOneLinux
(
self
,
conn
,
fileName
,
replicaVar
=
1
):
testModule
=
self
.
__dynamicLoadModule
(
fileName
)
runNum
=
0
...
...
tests/system-test/0-others/taosdShell.py
浏览文件 @
80665854
...
...
@@ -166,7 +166,7 @@ class TDTestCase:
# keyDict['c'] = cfgPath
# keyDict['P'] = self.serverPort
tdDnodes
=
cluster
.
dnodes
for
i
in
range
(
5
):
for
i
in
range
(
len
(
tdDnodes
)
):
tdDnodes
[
i
].
stoptaosd
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录