提交 11c8ac91 编写于 作者: M Minghao Li

sync refactor

上级 e5b0e146
......@@ -50,18 +50,20 @@ void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
SyncIndex endIndex = SYNC_INDEX_INVALID;
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
}
}
\ No newline at end of file
......@@ -29,6 +29,7 @@ add_executable(syncPingTimerTest2 "")
add_executable(syncPingSelfTest "")
add_executable(syncElectTest "")
add_executable(syncEncodeTest "")
add_executable(syncWriteTest "")
target_sources(syncTest
......@@ -155,6 +156,10 @@ target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
)
target_sources(syncWriteTest
PRIVATE
"syncWriteTest.cpp"
)
target_include_directories(syncTest
......@@ -317,6 +322,11 @@ target_include_directories(syncEncodeTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncWriteTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -443,6 +453,10 @@ target_link_libraries(syncEncodeTest
sync
gtest_main
)
target_link_libraries(syncWriteTest
sync
gtest_main
)
enable_testing()
......
......@@ -8,7 +8,6 @@
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncRaftLog.h"
void logTest() {
sTrace("--- sync log test: trace");
......@@ -25,11 +24,11 @@ int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
SSyncNode* pSyncNode;
SSyncFSM * pFsm;
SWal * pWal;
SSyncNode *pSyncNode;
SSyncNode* syncNodeInit() {
SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
......@@ -54,7 +53,7 @@ SSyncNode* syncNodeInit() {
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
......@@ -81,43 +80,12 @@ SSyncNode* syncNodeInit() {
return pSyncNode;
}
SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore);
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100;
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i);
// syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry);
if (i == 0) {
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
}
}
logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore);
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore);
}
SSyncNode *syncInitTest() { return syncNodeInit(); }
void initRaftId(SSyncNode* pSyncNode) {
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
......@@ -181,7 +149,7 @@ int main(int argc, char **argv) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
......@@ -210,7 +178,7 @@ int main(int argc, char **argv) {
syncEntryPrint2((char *)"==step4==", pMsg4);
// log, relog
SSyncNode* pSyncNode = syncNodeInit();
SSyncNode *pSyncNode = syncNodeInit();
assert(pSyncNode != NULL);
SSyncRaftEntry *pEntry = pMsg4;
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM * pFsm;
SWal * pWal;
SSyncNode *gSyncNode;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf);
}
void initFsm() {
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
}
SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./write_test");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./write_test_wal", &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode *pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode *syncInitTest() { return syncNodeInit(); }
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0() {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 32;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world");
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
taosRemoveDir("./wal_test");
initFsm();
gSyncNode = syncInitTest();
assert(gSyncNode != NULL);
syncNodePrint2((char *)"", gSyncNode);
initRaftId(gSyncNode);
// step0
SRpcMsg *pMsg0 = step0();
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
// step1
SyncClientRequest *pMsg1 = step1(pMsg0);
syncClientRequestPrint2((char *)"==step1==", pMsg1);
for (int i = 0; i < 5; ++i) {
SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
taosMsleep(1000);
}
while (1) {
sTrace("while 1 sleep");
taosMsleep(1000);
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册