Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
996886e7
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
996886e7
编写于
3月 23, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync refactor
上级
e3a3c138
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
436 addition
and
31 deletion
+436
-31
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+7
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+39
-31
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+6
-0
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+55
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+6
-0
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+28
-0
source/libs/sync/test/syncLogStoreCheck.cpp
source/libs/sync/test/syncLogStoreCheck.cpp
+107
-0
source/libs/sync/test/syncReplicateLoadTest.cpp
source/libs/sync/test/syncReplicateLoadTest.cpp
+188
-0
未找到文件。
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
996886e7
...
...
@@ -47,6 +47,8 @@ SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry
*
logStoreGetLastEntry
(
SSyncLogStore
*
pLogStore
);
cJSON
*
logStore2Json
(
SSyncLogStore
*
pLogStore
);
char
*
logStore2Str
(
SSyncLogStore
*
pLogStore
);
cJSON
*
logStoreSimple2Json
(
SSyncLogStore
*
pLogStore
);
char
*
logStoreSimple2Str
(
SSyncLogStore
*
pLogStore
);
// for debug
void
logStorePrint
(
SSyncLogStore
*
pLogStore
);
...
...
@@ -54,6 +56,11 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore);
void
logStoreLog
(
SSyncLogStore
*
pLogStore
);
void
logStoreLog2
(
char
*
s
,
SSyncLogStore
*
pLogStore
);
void
logStoreSimplePrint
(
SSyncLogStore
*
pLogStore
);
void
logStoreSimplePrint2
(
char
*
s
,
SSyncLogStore
*
pLogStore
);
void
logStoreSimpleLog
(
SSyncLogStore
*
pLogStore
);
void
logStoreSimpleLog2
(
char
*
s
,
SSyncLogStore
*
pLogStore
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
996886e7
...
...
@@ -155,30 +155,35 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept request
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
&&
ths
->
state
==
TAOS_SYNC_STATE_FOLLOWER
&&
logOK
)
{
bool
preMatch
=
false
;
if
(
pMsg
->
prevLogIndex
==
SYNC_INDEX_INVALID
&&
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
)
==
SYNC_INDEX_INVALID
)
{
preMatch
=
true
;
}
if
(
pMsg
->
prevLogIndex
>=
SYNC_INDEX_BEGIN
&&
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
SSyncRaftEntry
*
pPreEntry
=
logStoreGetEntry
(
ths
->
pLogStore
,
pMsg
->
prevLogIndex
);
assert
(
pPreEntry
!=
NULL
);
if
(
pMsg
->
prevLogTerm
==
pPreEntry
->
term
)
{
preMatch
=
true
;
}
syncEntryDestory
(
pPreEntry
);
}
/*
bool preMatch = false;
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
preMatch = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <=
ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore,
pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { preMatch = true;
}
syncEntryDestory(pPreEntry);
}
sTrace
(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d, preMatch:%d"
,
pMsg
->
term
,
ths
->
pRaftStore
->
currentTerm
,
ths
->
state
,
logOK
,
preMatch
);
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d, preMatch:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch);
if (preMatch) {
*/
if
(
preMatch
)
{
//
must has preIndex
in local log
{
//
preIndex = -1, or has preIndex entry
in local log
assert
(
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
));
// has extra entries (> preIndex) in local log
bool
hasExtraEntries
=
pMsg
->
prevLogIndex
<
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
);
// has entries in SyncAppendEntries msg
bool
hasAppendEntries
=
pMsg
->
dataLen
>
0
;
if
(
hasExtraEntries
&&
hasAppendEntries
)
{
...
...
@@ -287,20 +292,23 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pReply
);
}
}
else
{
SyncAppendEntriesReply
*
pReply
=
syncAppendEntriesReplyBuild
();
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
/*
else {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pReply
);
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
}
*/
// maybe update commit index from leader
if
(
pMsg
->
commitIndex
>
ths
->
commitIndex
)
{
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
996886e7
...
...
@@ -48,6 +48,9 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return
ret
;
}
syncIndexMgrLog2
(
"==syncNodeOnAppendEntriesReplyCb== before pNextIndex"
,
ths
->
pNextIndex
);
syncIndexMgrLog2
(
"==syncNodeOnAppendEntriesReplyCb== before pMatchIndex"
,
ths
->
pMatchIndex
);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
...
...
@@ -77,5 +80,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
}
syncIndexMgrLog2
(
"==syncNodeOnAppendEntriesReplyCb== after pNextIndex"
,
ths
->
pNextIndex
);
syncIndexMgrLog2
(
"==syncNodeOnAppendEntriesReplyCb== after pMatchIndex"
,
ths
->
pMatchIndex
);
return
ret
;
}
source/libs/sync/src/syncRaftLog.c
浏览文件 @
996886e7
...
...
@@ -165,6 +165,34 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
return
serialized
;
}
cJSON
*
logStoreSimple2Json
(
SSyncLogStore
*
pLogStore
)
{
char
u64buf
[
128
];
SSyncLogStoreData
*
pData
=
(
SSyncLogStoreData
*
)
pLogStore
->
data
;
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pData
!=
NULL
&&
pData
->
pWal
!=
NULL
)
{
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pWal
);
cJSON_AddStringToObject
(
pRoot
,
"pWal"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
logStoreLastIndex
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"LastIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
logStoreLastTerm
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"LastTerm"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SSyncLogStoreSimple"
,
pRoot
);
return
pJson
;
}
char
*
logStoreSimple2Str
(
SSyncLogStore
*
pLogStore
)
{
cJSON
*
pJson
=
logStoreSimple2Json
(
pLogStore
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
// for debug -----------------
void
logStorePrint
(
SSyncLogStore
*
pLogStore
)
{
char
*
serialized
=
logStore2Str
(
pLogStore
);
...
...
@@ -191,3 +219,30 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
sTrace
(
"logStorePrint | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
free
(
serialized
);
}
// for debug -----------------
void
logStoreSimplePrint
(
SSyncLogStore
*
pLogStore
)
{
char
*
serialized
=
logStoreSimple2Str
(
pLogStore
);
printf
(
"logStoreSimplePrint | len:%lu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
free
(
serialized
);
}
void
logStoreSimplePrint2
(
char
*
s
,
SSyncLogStore
*
pLogStore
)
{
char
*
serialized
=
logStoreSimple2Str
(
pLogStore
);
printf
(
"logStoreSimplePrint2 | len:%lu | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
free
(
serialized
);
}
void
logStoreSimpleLog
(
SSyncLogStore
*
pLogStore
)
{
char
*
serialized
=
logStoreSimple2Str
(
pLogStore
);
sTrace
(
"logStoreSimpleLog | len:%lu | %s"
,
strlen
(
serialized
),
serialized
);
free
(
serialized
);
}
void
logStoreSimpleLog2
(
char
*
s
,
SSyncLogStore
*
pLogStore
)
{
char
*
serialized
=
logStoreSimple2Str
(
pLogStore
);
sTrace
(
"logStoreSimpleLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
free
(
serialized
);
}
\ No newline at end of file
source/libs/sync/src/syncReplication.c
浏览文件 @
996886e7
...
...
@@ -49,6 +49,10 @@
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
syncIndexMgrLog2
(
"==syncNodeAppendEntriesPeers== pNextIndex"
,
pSyncNode
->
pNextIndex
);
syncIndexMgrLog2
(
"==syncNodeAppendEntriesPeers== pMatchIndex"
,
pSyncNode
->
pMatchIndex
);
logStoreSimpleLog2
(
"==syncNodeAppendEntriesPeers=="
,
pSyncNode
->
pLogStore
);
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
...
...
@@ -99,6 +103,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
syncAppendEntriesLog2
(
"==syncNodeAppendEntriesPeers=="
,
pMsg
);
// send AppendEntries
syncNodeAppendEntries
(
pSyncNode
,
pDestId
,
pMsg
);
syncAppendEntriesDestroy
(
pMsg
);
...
...
source/libs/sync/test/CMakeLists.txt
浏览文件 @
996886e7
...
...
@@ -34,7 +34,9 @@ add_executable(syncEncodeTest "")
add_executable
(
syncWriteTest
""
)
add_executable
(
syncReplicateTest
""
)
add_executable
(
syncReplicateTest2
""
)
add_executable
(
syncReplicateLoadTest
""
)
add_executable
(
syncRefTest
""
)
add_executable
(
syncLogStoreCheck
""
)
target_sources
(
syncTest
...
...
@@ -181,10 +183,18 @@ target_sources(syncReplicateTest2
PRIVATE
"syncReplicateTest2.cpp"
)
target_sources
(
syncReplicateLoadTest
PRIVATE
"syncReplicateLoadTest.cpp"
)
target_sources
(
syncRefTest
PRIVATE
"syncRefTest.cpp"
)
target_sources
(
syncLogStoreCheck
PRIVATE
"syncLogStoreCheck.cpp"
)
target_include_directories
(
syncTest
...
...
@@ -367,11 +377,21 @@ target_include_directories(syncReplicateTest2
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncReplicateLoadTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncRefTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncLogStoreCheck
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
syncTest
...
...
@@ -518,10 +538,18 @@ target_link_libraries(syncReplicateTest2
sync
gtest_main
)
target_link_libraries
(
syncReplicateLoadTest
sync
gtest_main
)
target_link_libraries
(
syncRefTest
sync
gtest_main
)
target_link_libraries
(
syncLogStoreCheck
sync
gtest_main
)
enable_testing
()
...
...
source/libs/sync/test/syncLogStoreCheck.cpp
0 → 100644
浏览文件 @
996886e7
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
uint16_t
ports
[]
=
{
7010
,
7110
,
7210
,
7310
,
7410
};
int32_t
replicaNum
=
1
;
int32_t
myIndex
=
0
;
SRaftId
ids
[
TSDB_MAX_REPLICA
];
SSyncInfo
syncInfo
;
SSyncFSM
*
pFsm
;
SWal
*
pWal
;
SSyncNode
*
pSyncNode
;
SSyncNode
*
syncNodeInit
(
const
char
*
path
)
{
syncInfo
.
vgId
=
1234
;
syncInfo
.
rpcClient
=
gSyncIO
->
clientRpc
;
syncInfo
.
FpSendMsg
=
syncIOSendMsg
;
syncInfo
.
queue
=
gSyncIO
->
pMsgQ
;
syncInfo
.
FpEqMsg
=
syncIOEqMsg
;
syncInfo
.
pFsm
=
pFsm
;
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s"
,
"./log_check"
);
int
code
=
walInit
();
assert
(
code
==
0
);
SWalCfg
walCfg
;
memset
(
&
walCfg
,
0
,
sizeof
(
SWalCfg
));
walCfg
.
vgId
=
syncInfo
.
vgId
;
walCfg
.
fsyncPeriod
=
1000
;
walCfg
.
retentionPeriod
=
1000
;
walCfg
.
rollPeriod
=
1000
;
walCfg
.
retentionSize
=
1000
;
walCfg
.
segSize
=
1000
;
walCfg
.
level
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
path
,
&
walCfg
);
assert
(
pWal
!=
NULL
);
syncInfo
.
pWal
=
pWal
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
myIndex
;
pCfg
->
replicaNum
=
replicaNum
;
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
pCfg
->
nodeInfo
[
i
].
nodePort
=
ports
[
i
];
snprintf
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
pSyncNode
=
syncNodeOpen
(
&
syncInfo
);
assert
(
pSyncNode
!=
NULL
);
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncRequestVote
=
pSyncNode
->
FpOnRequestVote
;
gSyncIO
->
FpOnSyncRequestVoteReply
=
pSyncNode
->
FpOnRequestVoteReply
;
gSyncIO
->
FpOnSyncAppendEntries
=
pSyncNode
->
FpOnAppendEntries
;
gSyncIO
->
FpOnSyncAppendEntriesReply
=
pSyncNode
->
FpOnAppendEntriesReply
;
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncTimeout
=
pSyncNode
->
FpOnTimeout
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
return
pSyncNode
;
}
SSyncNode
*
logStoreCheck
(
const
char
*
path
)
{
return
syncNodeInit
(
path
);
}
int
main
(
int
argc
,
char
**
argv
)
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
myIndex
=
0
;
if
(
argc
>=
2
)
{
myIndex
=
atoi
(
argv
[
1
]);
}
int32_t
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
ports
[
myIndex
]);
assert
(
ret
==
0
);
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
pSyncNode
=
logStoreCheck
(
argv
[
1
]);
assert
(
pSyncNode
!=
NULL
);
logStorePrint2
((
char
*
)
"logStoreCheck"
,
pSyncNode
->
pLogStore
);
return
0
;
}
source/libs/sync/test/syncReplicateLoadTest.cpp
0 → 100644
浏览文件 @
996886e7
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
uint16_t
ports
[]
=
{
7010
,
7110
,
7210
,
7310
,
7410
};
int32_t
replicaNum
=
3
;
int32_t
myIndex
=
0
;
SRaftId
ids
[
TSDB_MAX_REPLICA
];
SSyncInfo
syncInfo
;
SSyncFSM
*
pFsm
;
SWal
*
pWal
;
void
CommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
,
ESyncState
state
)
{
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
index
,
isWeak
,
code
,
state
,
syncUtilState2String
(
state
));
syncRpcMsgPrint2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
,
ESyncState
state
)
{
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
index
,
isWeak
,
code
,
state
,
syncUtilState2String
(
state
));
syncRpcMsgPrint2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
void
RollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SyncIndex
index
,
bool
isWeak
,
int32_t
code
,
ESyncState
state
)
{
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
index
,
isWeak
,
code
,
state
,
syncUtilState2String
(
state
));
syncRpcMsgPrint2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
void
initFsm
()
{
pFsm
=
(
SSyncFSM
*
)
malloc
(
sizeof
(
SSyncFSM
));
pFsm
->
FpCommitCb
=
CommitCb
;
pFsm
->
FpPreCommitCb
=
PreCommitCb
;
pFsm
->
FpRollBackCb
=
RollBackCb
;
}
int64_t
syncNodeInit
()
{
syncInfo
.
vgId
=
1234
;
syncInfo
.
rpcClient
=
gSyncIO
->
clientRpc
;
syncInfo
.
FpSendMsg
=
syncIOSendMsg
;
syncInfo
.
queue
=
gSyncIO
->
pMsgQ
;
syncInfo
.
FpEqMsg
=
syncIOEqMsg
;
syncInfo
.
pFsm
=
pFsm
;
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"./replicate2_test_%d"
,
myIndex
);
int
code
=
walInit
();
assert
(
code
==
0
);
SWalCfg
walCfg
;
memset
(
&
walCfg
,
0
,
sizeof
(
SWalCfg
));
walCfg
.
vgId
=
syncInfo
.
vgId
;
walCfg
.
fsyncPeriod
=
1000
;
walCfg
.
retentionPeriod
=
1000
;
walCfg
.
rollPeriod
=
1000
;
walCfg
.
retentionSize
=
1000
;
walCfg
.
segSize
=
1000
;
walCfg
.
level
=
TAOS_WAL_FSYNC
;
char
tmpdir
[
128
];
snprintf
(
tmpdir
,
sizeof
(
tmpdir
),
"./replicate2_test_wal_%d"
,
myIndex
);
pWal
=
walOpen
(
tmpdir
,
&
walCfg
);
assert
(
pWal
!=
NULL
);
syncInfo
.
pWal
=
pWal
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
myIndex
;
pCfg
->
replicaNum
=
replicaNum
;
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
pCfg
->
nodeInfo
[
i
].
nodePort
=
ports
[
i
];
snprintf
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t
rid
=
syncStart
(
&
syncInfo
);
assert
(
rid
>
0
);
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
syncNodeAcquire
(
rid
);
assert
(
pSyncNode
!=
NULL
);
// pSyncNode->hbBaseLine = 500;
// pSyncNode->electBaseLine = 1500;
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncRequestVote
=
pSyncNode
->
FpOnRequestVote
;
gSyncIO
->
FpOnSyncRequestVoteReply
=
pSyncNode
->
FpOnRequestVoteReply
;
gSyncIO
->
FpOnSyncAppendEntries
=
pSyncNode
->
FpOnAppendEntries
;
gSyncIO
->
FpOnSyncAppendEntriesReply
=
pSyncNode
->
FpOnAppendEntriesReply
;
gSyncIO
->
FpOnSyncTimeout
=
pSyncNode
->
FpOnTimeout
;
gSyncIO
->
FpOnSyncClientRequest
=
pSyncNode
->
FpOnClientRequest
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
syncNodeRelease
(
pSyncNode
);
return
rid
;
}
void
initRaftId
(
SSyncNode
*
pSyncNode
)
{
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
ids
[
i
]
=
pSyncNode
->
replicasId
[
i
];
char
*
s
=
syncUtilRaftId2Str
(
&
ids
[
i
]);
printf
(
"raftId[%d] : %s
\n
"
,
i
,
s
);
free
(
s
);
}
}
SRpcMsg
*
step0
(
int
i
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
malloc
(
sizeof
(
SRpcMsg
));
memset
(
pMsg
,
0
,
sizeof
(
SRpcMsg
));
pMsg
->
msgType
=
9999
;
pMsg
->
contLen
=
128
;
pMsg
->
pCont
=
malloc
(
pMsg
->
contLen
);
snprintf
((
char
*
)(
pMsg
->
pCont
),
pMsg
->
contLen
,
"value-%u-%d"
,
ports
[
myIndex
],
i
);
return
pMsg
;
}
SyncClientRequest
*
step1
(
const
SRpcMsg
*
pMsg
)
{
SyncClientRequest
*
pRetMsg
=
syncClientRequestBuild2
(
pMsg
,
123
,
true
);
return
pRetMsg
;
}
int
main
(
int
argc
,
char
**
argv
)
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
void
logTest
();
myIndex
=
0
;
if
(
argc
>=
2
)
{
myIndex
=
atoi
(
argv
[
1
]);
}
int32_t
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
ports
[
myIndex
]);
assert
(
ret
==
0
);
initFsm
();
ret
=
syncInit
();
assert
(
ret
==
0
);
int64_t
rid
=
syncNodeInit
();
assert
(
rid
>
0
);
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
syncNodeAcquire
(
rid
);
assert
(
pSyncNode
!=
NULL
);
syncNodePrint2
((
char
*
)
""
,
pSyncNode
);
initRaftId
(
pSyncNode
);
// only load ...
while
(
1
)
{
sTrace
(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d"
,
pSyncNode
->
state
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
,
pSyncNode
->
electTimerMS
);
taosMsleep
(
1000
);
}
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录