diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 26a881d4773911e3bf2166ec64645bc130203b0c..36e1a1eb2b1b8316b2cb4eee1ad40fa93e9b06ac 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -211,7 +211,7 @@ void sdbUpdateMnodeRoles() { if (tsSdbMgmt.sync <= 0) return; SNodesRole roles = {0}; - syncGetNodesRole(tsSdbMgmt.sync, &roles); + if (syncGetNodesRole(tsSdbMgmt.sync, &roles) != 0) return; sdbInfo("vgId:1, update mnodes role, replica:%d", tsSdbMgmt.cfg.replica); for (int32_t i = 0; i < tsSdbMgmt.cfg.replica; ++i) { diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 079013499720407508fcbae60164ff08394fa03b..aacd72e930d95b2721dfe28e92abba5e16654d38 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -159,14 +159,14 @@ typedef struct SSyncNode { SSyncFwds * pSyncFwds; // saved forward info if quorum >1 void * pFwdTimer; void * pRoleTimer; - FGetFileInfo getFileInfo; - FGetWalInfo getWalInfo; - FWriteToCache writeToCache; - FConfirmForward confirmForward; - FNotifyRole notifyRole; - FNotifyFlowCtrl notifyFlowCtrl; + FGetFileInfo getFileInfo; + FGetWalInfo getWalInfo; + FWriteToCache writeToCache; + FConfirmForward confirmForward; + FNotifyRole notifyRole; + FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; - pthread_mutex_t mutex; + pthread_mutex_t mutex; } SSyncNode; // sync module global diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 41d68b3149f8992289233913f59f274b9ee0c566..dd759ed9d4d61e53dc4e0b53544e9c07bfa277bc 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -552,18 +552,16 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { } void syncBroadcastStatus(SSyncNode *pNode) { - SSyncPeer *pPeer; - - for (int32_t i = 0; i < pNode->replica; ++i) { - if (i == pNode->selfIndex) continue; - pPeer = pNode->peerInfo[i]; + for (int32_t index = 0; index < pNode->replica; ++index) { + if (index == pNode->selfIndex) continue; + SSyncPeer *pPeer = pNode->peerInfo[index]; syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId()); } } static void syncResetFlowCtrl(SSyncNode *pNode) { - for (int32_t i = 0; i < pNode->replica; ++i) { - pNode->peerInfo[i]->numOfRetrieves = 0; + for (int32_t index = 0; index < pNode->replica; ++index) { + pNode->peerInfo[index]->numOfRetrieves = 0; } if (pNode->notifyFlowCtrl) { @@ -1171,7 +1169,7 @@ static void syncProcessBrokenLink(void *param) { static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; - uint64_t time = taosGetTimestampMs(); + int64_t time = taosGetTimestampMs(); if (pSyncFwds->fwds >= tsMaxFwdInfo) { pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; @@ -1289,7 +1287,6 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t fwdLen; int32_t code = 0; - if (pWalHead->version > nodeVersion + 1) { sError("vgId:%d, hver:%" PRIu64 ", inconsistent with sver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion); if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { @@ -1305,14 +1302,15 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle // always update version nodeVersion = pWalHead->version; - sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, - syncRole[nodeRole], qtypeStr[qtype], pWalHead->version); if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; // only pkt from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, + syncRole[nodeRole], qtypeStr[qtype], pWalHead->version); + // a hacker way to improve the performance pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead->type = TAOS_SMSG_FORWARD; @@ -1332,7 +1330,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle code = 1; } - int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen); if (retLen == fwdLen) { sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); } else { diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 7c1286f419a2deb980dac5a79c9dd067295ff649..b6dacaa262f1e58b8406650003c5775efbaebf3f 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -438,7 +438,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - if (write(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) { + if (taosWriteMsg(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) { sError("%s, failed to send syncCmd", pPeer->id); return -1; }