diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 551e0fc7b84b5847755ff2ca1d3edeab2cb87d2d..831063c606e3c65ac9caba11802a9b92332fe184 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -20,14 +20,11 @@ extern "C" { #endif -#include -#include -//#include +#include "os.h" + #include "cJSON.h" #include "tdef.h" -//#include "taosdef.h" -//#include "trpc.h" -//#include "wal.h" +#include "tmsgcb.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -132,11 +129,10 @@ typedef struct SSyncInfo { char path[TSDB_FILENAME_LEN]; SWal* pWal; SSyncFSM* pFsm; + SMsgCb* msgcb; - void* rpcClient; - int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - void* queue; - int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); + int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 8de4c7cd103a4bfd7a918be44a12acf276ef5af6..01c25b93cc7e3264f05280d8eb87fa0b732a7bd8 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -20,13 +20,10 @@ extern "C" { #endif -#include -#include -//#include +#include "os.h" + #include "cJSON.h" -//#include "taosdef.h" #include "trpc.h" -//#include "wal.h" // ------------------ ds ------------------- typedef struct SRaftId { @@ -43,8 +40,7 @@ void syncNodeRelease(SSyncNode* pNode); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); -void syncSetQ(int64_t rid, void* queueHandle); -void syncSetRpc(int64_t rid, void* rpcHandle); +void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); // set timer ms diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 3f5435ee47c43c9170042d65ba077fa14e7edd4a..a034833a57a172d2aecc10d69a6e2e138a5422f0 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -85,10 +85,9 @@ int vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -void vnodeSyncSetQ(SVnode* pVnode, void* qHandle); -void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle); -int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg); -int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg); +void vnodeSyncSetMsgCb(SVnode* pVnode); +int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg); +int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg); void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 739f7f9fa3578dd2f4c13a714f0e717d88c0c4f4..db15c059e3d53a1ef8420126a54ad63148995a95 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -178,8 +178,7 @@ void vnodeClose(SVnode *pVnode) { // start the sync timer after the queue is ready int32_t vnodeStart(SVnode *pVnode) { - vnodeSyncSetQ(pVnode, NULL); - vnodeSyncSetRpc(pVnode, NULL); + vnodeSyncSetMsgCb(pVnode); vnodeSyncStart(pVnode); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a93844c5ffeb46db1319cc4dda7b7ea762c59151..bcef95baff6417684a4a39063648814c35149221 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -27,9 +27,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { syncInfo.pWal = pVnode->pWal; syncInfo.pFsm = syncVnodeMakeFsm(pVnode); - syncInfo.rpcClient = NULL; - syncInfo.FpSendMsg = vnodeSendMsg; - syncInfo.queue = NULL; + syncInfo.msgcb = NULL; + syncInfo.FpSendMsg = vnodeSyncSendMsg; syncInfo.FpEqMsg = vnodeSyncEqMsg; pVnode->sync = syncOpen(&syncInfo); @@ -53,31 +52,13 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } -void vnodeSyncSetQ(SVnode *pVnode, void *qHandle) { syncSetQ(pVnode->sync, (void *)(&(pVnode->msgCb))); } +void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); } -void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle) { syncSetRpc(pVnode->sync, (void *)(&(pVnode->msgCb))); } +int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } -int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg) { - int32_t ret = 0; - SMsgCb *pMsgCb = qHandle; - if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { - tmsgPutToQueue(qHandle, SYNC_QUEUE, pMsg); - } else { - vError("vnodeSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); - } - return ret; -} - -int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t ret = 0; - SMsgCb *pMsgCb = rpcHandle; - if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { - pMsg->info.noResp = 1; - tmsgSendReq(pEpSet, pMsg); - } else { - vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); - } - return ret; +int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { + pMsg->info.noResp = 1; + return tmsgSendReq(pEpSet, pMsg); } int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 99f9deb99e8da9e484d07e3566995dd1f3e00daf..f65a31769420d6cf584d2079f1b147e510f3bdb6 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -36,10 +36,10 @@ typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; TdThread consumerTid; - - void * serverRpc; - void * clientRpc; - SEpSet myAddr; + void *serverRpc; + void *clientRpc; + SEpSet myAddr; + SMsgCb msgcb; tmr_h qTimer; int32_t qTimerMS; @@ -65,8 +65,8 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); -int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); +int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg); int32_t syncIOQTimerStart(); int32_t syncIOQTimerStop(); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 9b655fb0fae7f699aeccca4bc4047f9ce8008e56..36f22db05f3921be2a71cae15c9f33c8098306c6 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -159,11 +159,10 @@ typedef struct SSyncNode { char configPath[TSDB_FILENAME_LEN * 2]; // sync io - SWal* pWal; - void* rpcClient; - int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - void* queue; - int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); + SWal* pWal; + const SMsgCb* msgcb; + int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); // init internal SNodeInfo myNodeInfo; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 203a8a1e625176f2d037915dea945f26999f218f..1da1cd7e4dbccd5aa30512757a120aa32dbc6ed9 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -66,7 +66,7 @@ int32_t syncIOStop() { return ret; } -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { +int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { assert(pEpSet->inUse == 0); assert(pEpSet->numOfEps == 1); @@ -83,11 +83,11 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { pMsg->info.handle = NULL; pMsg->info.noResp = 1; - rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + rpcSendRequest(gSyncIO->clientRpc, pEpSet, pMsg, NULL); return ret; } -int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { +int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t ret = 0; char logBuf[128]; syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg); @@ -96,7 +96,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - STaosQueue *pMsgQ = queue; + STaosQueue *pMsgQ = gSyncIO->pMsgQ; taosWriteQitem(pMsgQ, pTemp); return ret; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 562694bbbcec668e6ebaeda8124299d5e33ec7cd..56389de88a4137fe460e5e4c4e484d3f6ce7c304 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -240,26 +240,14 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -void syncSetQ(int64_t rid, void* queue) { +void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); return; } assert(rid == pSyncNode->rid); - pSyncNode->queue = queue; - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncSetRpc(int64_t rid, void* rpcHandle) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid); - return; - } - assert(rid == pSyncNode->rid); - pSyncNode->rpcClient = rpcHandle; + pSyncNode->msgcb = msgcb; taosReleaseRef(tsNodeRefId, pSyncNode->rid); } @@ -332,7 +320,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncPropose pSyncNode->FpEqMsg is NULL"); } @@ -375,9 +363,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); pSyncNode->pWal = pSyncInfo->pWal; - pSyncNode->rpcClient = pSyncInfo->rpcClient; + pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->queue = pSyncInfo->queue; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; // init raft config @@ -691,7 +678,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp // htonl syncUtilMsgHtoN(pMsg->pCont); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); } @@ -706,7 +693,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S // htonl syncUtilMsgHtoN(pMsg->pCont); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); } @@ -728,12 +715,12 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "queue", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); @@ -1095,7 +1082,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); } @@ -1118,7 +1105,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL"); } @@ -1145,7 +1132,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); } else { sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL"); } @@ -1175,10 +1162,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { assert(pSyncMsg->dataLen == entryLen); memcpy(pSyncMsg->data, serialized, entryLen); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (ths->FpEqMsg != NULL) { - ths->FpEqMsg(ths->queue, &rpcMsg); + ths->FpEqMsg(ths->msgcb, &rpcMsg); } else { sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 9a2d9a6b3461f46c6af744b2fa503ca9ce46a6b8..cff692239a756081cf35191cf0787be5bd878326 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -100,9 +100,8 @@ SWal* createWal(char* path, int32_t vgId) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) { SSyncInfo syncInfo; syncInfo.vgId = vgId; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index f58b6b670bba73476b177fca179445d3872aa2ec..862f7bd0baebed56d2e103c56ae1a88e9074c8ea 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -44,9 +44,8 @@ SWal* createWal(char* path, int32_t vgId) { SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { SSyncInfo syncInfo; syncInfo.vgId = vgId; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = NULL; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 09d20156f476a93b1de432ac9b87972fb691ad1f..454c823c6a501ac0cb2532892159957056d7e1fa 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -31,9 +31,8 @@ SSyncNode *pSyncNode; SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 6f83ede5a0feec8ddfe930e36c311100ca5a66be..8461bfe9b7403c70d45a79f59474ad5d0ca43b2a 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -25,9 +25,7 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -99,7 +97,7 @@ int main(int argc, char** argv) { SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncIOClientTest.cpp b/source/libs/sync/test/syncIOClientTest.cpp index 492b2e4349cc5a19473f6af936b1bfd45bbb553c..bd0221114ad618fb030ee5ec476a0a01634bcdd7 100644 --- a/source/libs/sync/test/syncIOClientTest.cpp +++ b/source/libs/sync/test/syncIOClientTest.cpp @@ -43,7 +43,7 @@ int main() { SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); - syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); + syncIOSendMsg(&epSet, &rpcMsg); taosSsleep(1); } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 03d308ea285f97c59427efcc92266eb248ee3168..b8a9bec1087b41e60f94b11ea2ec1b0a651a98b9 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -103,7 +102,7 @@ int main(int argc, char** argv) { SEpSet epSet; syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); - pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, &rpcMsg); + pSyncNode->FpSendMsg(&epSet, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index ea5d5f6b6fff0539d329bfb52ce86202ff65c13b..7fcce2bc4f06b404cc4565ca2e94ea884d9603aa 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -28,9 +28,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index ca0657c74def18c8c8ed30495f5ca976ee55c402..d0843151f4b3b04c2c67d44b4cdce3a48e78d6d8 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index 641ff059be3843928cc5e68074b7c131472ffcc2..99287bf7b0fd23c350e7d5f8503b17efee641b7e 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index 29e99435bef6f82e20ffaf07de52631e00cfe20b..cd9440e3e280b71820ca5db8ed40bcc8b2c37d26 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index 285828125d61216362b24281e3ce446826f906c6..fa09d04368178e7b7ab6c94cd53c7672174bdf7b 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -25,9 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 0e94498a382bb61928e4023e84f9e2c97d7acfda..bf9e34fffbc52121001ac0d9010d7a0b18869f88 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -97,9 +97,8 @@ SWal* createWal(char* path, int32_t vgId) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { SSyncInfo syncInfo; syncInfo.vgId = vgId; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 5dd9ea9fcff94be6cb4083a8bf62bbc789e7b340..62bda5b22ec8633f1cb6ba2ff2cfbe224ead8c94 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -83,9 +83,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -200,7 +199,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); + gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 02a35b3d00fd730da651d02c660abe18cbe801cb..d4885d0316b3a099f1b1e1c0bca45b4f1f471849 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -27,9 +27,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index f276d34745b1b52cda42b9dee4385b871c32b2f0..77262dfc65e0f134af75e8a11be674c40a1437ab 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -27,9 +27,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index ef09d2a0a4e66b52f942ae6525a0dd9814a4da03..34c8eb0f56c5036977a6d22878f508ef6418c508 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -62,9 +62,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -178,7 +177,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); + gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); }