diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ef635e6efc1ca5f071c64dbe00920c3987837494..d93a23711b002713f7e8076be2824ec44d46f919 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -310,6 +310,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { int code = 0; if (pNode == NULL) return 0; + syncAddNodeRef(pNode); if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, @@ -318,6 +319,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { pPeer = pNode->peerInfo[i]; syncRestartConnection(pPeer); } + syncDecNodeRef(pNode); return TSDB_CODE_SYN_INVALID_VERSION; } @@ -326,10 +328,16 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], qtype, pWalHead->version); - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) { + syncDecNodeRef(pNode); + return 0; + } // only pkt from RPC or CQ can be forwarded - if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) { + syncDecNodeRef(pNode); + return 0; + } // a hacker way to improve the performance pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); @@ -361,6 +369,7 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { pthread_mutex_unlock(&(pNode->mutex)); + syncDecNodeRef(pNode); return code; }