diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 130379a1f3e1ac283f20b56c1b17b1e1d434514a..84da7783005b054620408af120c3c36477d43261 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,7 +17,13 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + } + return code; +} int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 538d3f7f872f5f2afc874385560ace908ca75006..882ee912cde37414bc219efe75c113d0868c1810 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } -int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } +int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + } + return code; +} int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d3345620e9a8cb95b031ef2c042651aa09627d30..9246041b815e401f1c7638e5cba07160048a36f4 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -149,7 +149,7 @@ typedef struct SSyncNode { // restore state bool restoreFinish; - sem_t restoreSem; + //sem_t restoreSem; SSnapshot* pSnapshot; } SSyncNode; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ef66d580fb51d391b263849f319f31056123762e..fa735e71c029e22d67e7b2681ff1fc7144527061 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -366,8 +366,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pFsm->FpRestoreFinish(ths->pFsm); } ths->restoreFinish = true; + sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); + /* tsem_post(&ths->restoreSem); + sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths); + */ } } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index aa4bcb4fae4630d3f0018ce6b2aa0491741b54af..18c6f8930ac73f2bdc5d9e3d860f8b2f8dec0188 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -143,8 +143,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); } pSyncNode->restoreFinish = true; + sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); + /* tsem_post(&pSyncNode->restoreSem); + sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode); + */ } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bffebc1693674fb6c7b43ee6f2bac03529c4a025..821ed364e840f268f1c0da76f95536a6b2342676 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -499,7 +499,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); } - tsem_init(&(pSyncNode->restoreSem), 0, 0); + //tsem_init(&(pSyncNode->restoreSem), 0, 0); // start in syncNodeStart // start raft @@ -521,7 +521,19 @@ void syncNodeStart(SSyncNode* pSyncNode) { syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + /* + sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode); tsem_wait(&pSyncNode->restoreSem); + sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode); + */ + + /* + while (pSyncNode->restoreFinish != true) { + taosMsleep(10); + } + */ + + sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId); return; } @@ -532,7 +544,18 @@ void syncNodeStart(SSyncNode* pSyncNode) { // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + /* + sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode); tsem_wait(&pSyncNode->restoreSem); + sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode); + */ + + /* + while (pSyncNode->restoreFinish != true) { + taosMsleep(10); + } + */ + sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -573,7 +596,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pSnapshot); } - tsem_destroy(&pSyncNode->restoreSem); + //tsem_destroy(&pSyncNode->restoreSem); // free memory in syncFreeNode // taosMemoryFree(pSyncNode); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e71e19edceedc32fa196fbdba37fb1d38ac62525..30f799f39ec046b4819a35f9adaec06ff8f6b81f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -94,7 +94,7 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) -#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit +#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_CONN_TIMEOUT 3 // connect timeout