From 1336e08ad0ec5d157db2ec3256f360ce49c1fa39 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 13 Mar 2022 16:27:14 +0800 Subject: [PATCH] sync io --- source/libs/sync/test/syncEnqTest.cpp | 83 ++++++++-------- source/libs/sync/test/syncIOSendMsgTest.cpp | 101 ++++++++++++++++---- 2 files changed, 126 insertions(+), 58 deletions(-) diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index e1706bb40b..57315f40ec 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -1,9 +1,10 @@ +#include #include #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" -#include "syncMessage.h" #include "syncRaftStore.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -14,64 +15,69 @@ void logTest() { sFatal("--- sync log test: fatal"); } -uint16_t ports[3] = {7010, 7110, 7210}; +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 5; +int32_t myIndex = 0; -SSyncNode* doSync(int myIndex) { - SSyncFSM* pFsm; +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; - SSyncInfo syncInfo; - syncInfo.vgId = 1; +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; - pCfg->replicaNum = 3; + pCfg->replicaNum = replicaNum; - pCfg->nodeInfo[0].nodePort = ports[0]; - snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = ports[1]; - snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = ports[2]; - snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; } -void timerPingAll(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - syncNodePingAll(pSyncNode); +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } } int main(int argc, char** argv) { - // taosInitLog((char*)"syncPingTest.log", 100000, 10); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; - logTest(); - - int myIndex = 0; + myIndex = 0; if (argc >= 2) { myIndex = atoi(argv[1]); - if (myIndex > 2 || myIndex < 0) { - fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex); - return 1; - } } int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); @@ -80,21 +86,22 @@ int main(int argc, char** argv) { ret = syncEnvStart(); assert(ret == 0); - SSyncNode* pSyncNode = doSync(myIndex); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + + syncNodePrint2((char*)"syncInitTest", pSyncNode); + + initRaftId(pSyncNode); + + //-------------------------------------------------------------- for (int i = 0; i < 10; ++i) { - SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId); + SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, "syncEnqTest"); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); taosMsleep(1000); } - while (1) { - taosMsleep(1000); - } - return 0; } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index ed88fbb03e..0fc3ebfe4c 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -1,8 +1,10 @@ #include #include +#include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -13,37 +15,96 @@ void logTest() { sFatal("--- sync log test: fatal"); } -int main() { +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 5; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; - logTest(); + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } - int32_t ret; + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); - ret = syncIOStart((char *)"127.0.0.1", 7010); + ret = syncEnvStart(); assert(ret == 0); + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + + syncNodePrint2((char*)"syncInitTest", pSyncNode); + + initRaftId(pSyncNode); + + //-------------------------------------------------------------- + for (int i = 0; i < 10; ++i) { + SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, "syncIOSendMsgTest"); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); + SEpSet epSet; - epSet.inUse = 0; - epSet.numOfEps = 0; - addEpIntoEpSet(&epSet, "127.0.0.1", 7010); - - SRpcMsg rpcMsg; - rpcMsg.contLen = 64; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest"); - rpcMsg.handle = NULL; - rpcMsg.msgType = 77; - - syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); - taosSsleep(1); - } + syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, &rpcMsg); - while (1) { - taosSsleep(1); + taosMsleep(1000); } return 0; -- GitLab