From 2854d6105e45492ac4f4f5b8cc9f7b8a83e7060e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 16 Mar 2021 01:39:08 +0000 Subject: [PATCH] TD-3308 --- src/dnode/src/dnodeVWrite.c | 2 +- src/inc/tsync.h | 4 ++-- src/inc/vnode.h | 2 +- src/mnode/src/mnodeSdb.c | 6 +++--- src/sync/src/syncMain.c | 14 +++++++------- src/vnode/inc/vnodeSync.h | 2 +- src/vnode/src/vnodeSync.c | 6 +++--- src/vnode/src/vnodeWrite.c | 3 ++- 8 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index a3ff459396..87b31e4604 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); } else { if (qtype == TAOS_QTYPE_FWD) { - vnodeConfirmForward(pVnode, pWrite->pHead.version, 0); + vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); } if (pWrite->rspRet.rsp) { rpcFreeCont(pWrite->rspRet.rsp); diff --git a/src/inc/tsync.h b/src/inc/tsync.h index e5aa07fe7b..99dfd3a6a3 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -112,8 +112,8 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo *); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg *); -int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype); -void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force); +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force); void syncRecover(int64_t rid); // recover from other nodes: int32_t syncGetNodesRole(int64_t rid, SNodesRole *); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 7c9ebd8a0b..dddec83da8 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -78,7 +78,7 @@ void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); // vnodeSync -void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force); // vnodeRead int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index fe1f70cb50..319b16e62a 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -680,7 +680,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * if (pRow != NULL) { // forward to peers pRow->processedCount = 0; - int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); + int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false); if (syncCode <= 0) pRow->processedCount = 1; if (syncCode < 0) { @@ -700,7 +700,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); // even it is WAL/FWD, it shall be called to update version in sync - syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); + syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false); // from wal or forward msg, row not created, should add into hash if (action == SDB_ACTION_INSERT) { @@ -1119,7 +1119,7 @@ static void *sdbWorkerFp(void *pWorker) { sdbConfirmForward(1, pRow, pRow->code); } else { if (qtype == TAOS_QTYPE_FWD) { - syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code); + syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code, false); } sdbFreeFromQueue(pRow); } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index f018f00848..264bbf6b92 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -56,7 +56,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); -static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); static void syncStartCheckPeerConn(SSyncPeer *pPeer); @@ -378,24 +378,24 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) { +int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype, bool force) { if (rid <= 0) return 0; SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return 0; - int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); + int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype, force); syncReleaseNode(pNode); return code; } -void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) { SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; SSyncPeer *pPeer = pNode->pMaster; - if (pPeer && pNode->quorum > 1) { + if (pPeer && (pNode->quorum > 1 || force)) { SFwdRsp rsp; syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code); @@ -1414,7 +1414,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { syncReleaseNode(pNode); } -static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) { +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force) { SSyncPeer *pPeer; SSyncHead *pSyncHead; SWalHead * pWalHead = data; @@ -1458,7 +1458,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (pPeer == NULL || pPeer->peerFd < 0) continue; if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - if (pNode->quorum > 1 && code == 0) { + if ((pNode->quorum > 1 || force) && code == 0) { code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle); if (code >= 0) code = 1; } diff --git a/src/vnode/inc/vnodeSync.h b/src/vnode/inc/vnodeSync.h index 9a7da626f8..75d7ffbabd 100644 --- a/src/vnode/inc/vnodeSync.h +++ b/src/vnode/inc/vnodeSync.h @@ -32,7 +32,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpar int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); int32_t vnodeResetVersion(int32_t vgId, uint64_t fver); -void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeSync.c b/src/vnode/src/vnodeSync.c index 1f9c6c7216..a45eae9b16 100644 --- a/src/vnode/src/vnodeSync.c +++ b/src/vnode/src/vnodeSync.c @@ -174,7 +174,7 @@ int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) { return 0; } -void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) { +void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) { SVnodeObj *pVnode = vparam; - syncConfirmForward(pVnode->sync, version, code); -} + syncConfirmForward(pVnode->sync, version, code, force); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 99b7e7b628..0551ff8d1f 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -88,7 +88,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; - syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype); + bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); + syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force); if (syncCode < 0) return syncCode; // write into WAL -- GitLab