From 0161b6456c0e184003388b61e9491f5c84ef7744 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 19:44:01 +0800 Subject: [PATCH] refactor: adjust msgcb --- include/libs/sync/sync.h | 16 +++----- include/libs/sync/syncTools.h | 10 ++--- source/dnode/vnode/src/inc/vnd.h | 7 ++-- source/dnode/vnode/src/vnd/vnodeOpen.c | 3 +- source/dnode/vnode/src/vnd/vnodeSync.c | 33 ++++------------ source/libs/sync/inc/syncIO.h | 12 +++--- source/libs/sync/inc/syncInt.h | 9 ++--- source/libs/sync/src/syncIO.c | 8 ++-- source/libs/sync/src/syncMain.c | 39 +++++++------------ .../libs/sync/test/syncConfigChangeTest.cpp | 3 +- source/libs/sync/test/syncElectTest.cpp | 3 +- source/libs/sync/test/syncEncodeTest.cpp | 3 +- source/libs/sync/test/syncEnqTest.cpp | 4 +- source/libs/sync/test/syncIOClientTest.cpp | 2 +- source/libs/sync/test/syncIOSendMsgTest.cpp | 5 +-- source/libs/sync/test/syncIndexMgrTest.cpp | 3 +- source/libs/sync/test/syncInitTest.cpp | 3 +- source/libs/sync/test/syncPingSelfTest.cpp | 3 +- source/libs/sync/test/syncPingTimerTest.cpp | 3 +- source/libs/sync/test/syncPingTimerTest2.cpp | 3 +- source/libs/sync/test/syncReplicateTest.cpp | 3 +- source/libs/sync/test/syncSnapshotTest.cpp | 5 +-- .../libs/sync/test/syncVotesGrantedTest.cpp | 3 +- .../libs/sync/test/syncVotesRespondTest.cpp | 3 +- source/libs/sync/test/syncWriteTest.cpp | 5 +-- 25 files changed, 66 insertions(+), 125 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 551e0fc7b8..831063c606 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -20,14 +20,11 @@ extern "C" { #endif -#include -#include -//#include +#include "os.h" + #include "cJSON.h" #include "tdef.h" -//#include "taosdef.h" -//#include "trpc.h" -//#include "wal.h" +#include "tmsgcb.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -132,11 +129,10 @@ typedef struct SSyncInfo { char path[TSDB_FILENAME_LEN]; SWal* pWal; SSyncFSM* pFsm; + SMsgCb* msgcb; - void* rpcClient; - int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - void* queue; - int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); + int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 8de4c7cd10..01c25b93cc 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -20,13 +20,10 @@ extern "C" { #endif -#include -#include -//#include +#include "os.h" + #include "cJSON.h" -//#include "taosdef.h" #include "trpc.h" -//#include "wal.h" // ------------------ ds ------------------- typedef struct SRaftId { @@ -43,8 +40,7 @@ void syncNodeRelease(SSyncNode* pNode); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); -void syncSetQ(int64_t rid, void* queueHandle); -void syncSetRpc(int64_t rid, void* rpcHandle); +void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); // set timer ms diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 3f5435ee47..a034833a57 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -85,10 +85,9 @@ int vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -void vnodeSyncSetQ(SVnode* pVnode, void* qHandle); -void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle); -int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg); -int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg); +void vnodeSyncSetMsgCb(SVnode* pVnode); +int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg); +int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg); void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 739f7f9fa3..db15c059e3 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -178,8 +178,7 @@ void vnodeClose(SVnode *pVnode) { // start the sync timer after the queue is ready int32_t vnodeStart(SVnode *pVnode) { - vnodeSyncSetQ(pVnode, NULL); - vnodeSyncSetRpc(pVnode, NULL); + vnodeSyncSetMsgCb(pVnode); vnodeSyncStart(pVnode); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a93844c5ff..bcef95baff 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -27,9 +27,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { syncInfo.pWal = pVnode->pWal; syncInfo.pFsm = syncVnodeMakeFsm(pVnode); - syncInfo.rpcClient = NULL; - syncInfo.FpSendMsg = vnodeSendMsg; - syncInfo.queue = NULL; + syncInfo.msgcb = NULL; + syncInfo.FpSendMsg = vnodeSyncSendMsg; syncInfo.FpEqMsg = vnodeSyncEqMsg; pVnode->sync = syncOpen(&syncInfo); @@ -53,31 +52,13 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } -void vnodeSyncSetQ(SVnode *pVnode, void *qHandle) { syncSetQ(pVnode->sync, (void *)(&(pVnode->msgCb))); } +void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); } -void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle) { syncSetRpc(pVnode->sync, (void *)(&(pVnode->msgCb))); } +int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } -int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg) { - int32_t ret = 0; - SMsgCb *pMsgCb = qHandle; - if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { - tmsgPutToQueue(qHandle, SYNC_QUEUE, pMsg); - } else { - vError("vnodeSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); - } - return ret; -} - -int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t ret = 0; - SMsgCb *pMsgCb = rpcHandle; - if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { - pMsg->info.noResp = 1; - tmsgSendReq(pEpSet, pMsg); - } else { - vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); - } - return ret; +int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { + pMsg->info.noResp = 1; + return tmsgSendReq(pEpSet, pMsg); } int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 99f9deb99e..f65a317694 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -36,10 +36,10 @@ typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; TdThread consumerTid; - - void * serverRpc; - void * clientRpc; - SEpSet myAddr; + void *serverRpc; + void *clientRpc; + SEpSet myAddr; + SMsgCb msgcb; tmr_h qTimer; int32_t qTimerMS; @@ -65,8 +65,8 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); -int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); +int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg); int32_t syncIOQTimerStart(); int32_t syncIOQTimerStop(); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 9b655fb0fa..36f22db05f 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -159,11 +159,10 @@ typedef struct SSyncNode { char configPath[TSDB_FILENAME_LEN * 2]; // sync io - SWal* pWal; - void* rpcClient; - int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - void* queue; - int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); + SWal* pWal; + const SMsgCb* msgcb; + int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); // init internal SNodeInfo myNodeInfo; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 203a8a1e62..1da1cd7e4d 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -66,7 +66,7 @@ int32_t syncIOStop() { return ret; } -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { +int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { assert(pEpSet->inUse == 0); assert(pEpSet->numOfEps == 1); @@ -83,11 +83,11 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->info.handle = NULL; pMsg->info.noResp = 1; - rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + rpcSendRequest(gSyncIO->clientRpc, pEpSet, pMsg, NULL); return ret; } -int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { +int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t ret = 0; char logBuf[128]; syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg); @@ -96,7 +96,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - STaosQueue *pMsgQ = queue; + STaosQueue *pMsgQ = gSyncIO->pMsgQ; taosWriteQitem(pMsgQ, pTemp); return ret; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 562694bbbc..56389de88a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -240,26 +240,14 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -void syncSetQ(int64_t rid, void* queue) { +void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); return; } assert(rid == pSyncNode->rid); - pSyncNode->queue = queue; - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncSetRpc(int64_t rid, void* rpcHandle) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid); - return; - } - assert(rid == pSyncNode->rid); - pSyncNode->rpcClient = rpcHandle; + pSyncNode->msgcb = msgcb; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } @@ -332,7 +320,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncPropose pSyncNode->FpEqMsg is NULL"); } @@ -375,9 +363,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); pSyncNode->pWal = pSyncInfo->pWal; - pSyncNode->rpcClient = pSyncInfo->rpcClient; + pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->queue = pSyncInfo->queue; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; // init raft config @@ -691,7 +678,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp // htonl syncUtilMsgHtoN(pMsg->pCont); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); } @@ -706,7 +693,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S // htonl syncUtilMsgHtoN(pMsg->pCont); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); } @@ -728,12 +715,12 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "queue", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); @@ -1095,7 +1082,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); } @@ -1118,7 +1105,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL"); } @@ -1145,7 +1132,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL"); } @@ -1175,10 +1162,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { assert(pSyncMsg->dataLen == entryLen); memcpy(pSyncMsg->data, serialized, entryLen); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (ths->FpEqMsg != NULL) { - ths->FpEqMsg(ths->queue, &rpcMsg); + ths->FpEqMsg(ths->msgcb, &rpcMsg); } else { sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 9a2d9a6b34..cff692239a 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -100,9 +100,8 @@ SWal* createWal(char* path, int32_t vgId) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) { SSyncInfo syncInfo; syncInfo.vgId = vgId; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index f58b6b670b..862f7bd0ba 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -44,9 +44,8 @@ SWal* createWal(char* path, int32_t vgId) { SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { SSyncInfo syncInfo; syncInfo.vgId = vgId; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = NULL; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 09d20156f4..454c823c6a 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -31,9 +31,8 @@ SSyncNode *pSyncNode; SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 6f83ede5a0..8461bfe9b7 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -25,9 +25,7 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -99,7 +97,7 @@ int main(int argc, char** argv) { SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncIOClientTest.cpp b/source/libs/sync/test/syncIOClientTest.cpp index 492b2e4349..bd0221114a 100644 --- a/source/libs/sync/test/syncIOClientTest.cpp +++ b/source/libs/sync/test/syncIOClientTest.cpp @@ -43,7 +43,7 @@ int main() { SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); - syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); + syncIOSendMsg(&epSet, &rpcMsg); taosSsleep(1); } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 03d308ea28..b8a9bec108 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -103,7 +102,7 @@ int main(int argc, char** argv) { SEpSet epSet; syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, &rpcMsg); + pSyncNode->FpSendMsg(&epSet, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index ea5d5f6b6f..7fcce2bc4f 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -28,9 +28,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index ca0657c74d..d0843151f4 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index 641ff059be..99287bf7b0 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index 29e99435be..cd9440e3e2 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index 285828125d..fa09d04368 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 0e94498a38..bf9e34fffb 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -97,9 +97,8 @@ SWal* createWal(char* path, int32_t vgId) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { SSyncInfo syncInfo; syncInfo.vgId = vgId; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 5dd9ea9fcf..62bda5b22e 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -83,9 +83,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -200,7 +199,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); + gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 02a35b3d00..d4885d0316 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -27,9 +27,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index f276d34745..77262dfc65 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -27,9 +27,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index ef09d2a0a4..34c8eb0f56 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -62,9 +62,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -178,7 +177,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); + gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } -- GitLab