diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 7dbb1985d4cd6e68b7009004a563f66ab63a6697..05b7adc5f44b5853cd177443d062167c77681dce 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -28,19 +28,21 @@ extern "C" { #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} typedef enum { - TAOS_SMSG_SYNC_DATA = 1, - TAOS_SMSG_FORWARD = 2, - TAOS_SMSG_FORWARD_RSP = 3, - TAOS_SMSG_SYNC_REQ = 4, - TAOS_SMSG_SYNC_RSP = 5, - TAOS_SMSG_SYNC_MUST = 6, - TAOS_SMSG_STATUS = 7 + TAOS_SMSG_SYNC_DATA = 1, + TAOS_SMSG_FORWARD = 2, + TAOS_SMSG_FORWARD_RSP = 3, + TAOS_SMSG_SYNC_REQ = 4, + TAOS_SMSG_SYNC_RSP = 5, + TAOS_SMSG_SYNC_MUST = 6, + TAOS_SMSG_STATUS = 7, + TAOS_SMSG_SYNC_DATA_RSP = 8, } ESyncMsgType; #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_FWD_TIMER 300 #define SYNC_ROLE_TIMER 10000 +#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 #define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version @@ -64,6 +66,10 @@ typedef struct { int32_t sourceId; // only for arbitrator } SFirstPkt; +typedef struct { + int8_t sync; +} SFirstPktRsp; + typedef struct { int8_t role; uint64_t version; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 8d3997a5d695916bf229715f3f68345568e80d05..bc3803b732d9f089452efa084c1050eb21a04628 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -626,17 +626,9 @@ static void syncChooseMaster(SSyncNode *pNode) { sInfo("vgId:%d, start to work as master", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; -#if 0 - for (int32_t i = 0; i < pNode->replica; ++i) { - if (i == index) continue; - pPeer = pNode->peerInfo[i]; - if (pPeer->version == nodeVersion) { - pPeer->role = TAOS_SYNC_ROLE_SLAVE; - pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; - sInfo("%s, it shall work as slave", pPeer->id); - } - } -#endif + // Wait for other nodes to receive status to avoid version inconsistency + taosMsleep(SYNC_WAIT_AFTER_CHOOSE_MASTER); + syncResetFlowCtrl(pNode); (*pNode->notifyRole)(pNode->vgId, nodeRole); } else { @@ -761,7 +753,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new sDebug("vgId:%d, choose master", pNode->vgId); syncChooseMaster(pNode); } else { - sDebug("vgId:%d, cannot choose master since roles inconsistent", pNode->vgId); + sDebug("vgId:%d, cannot choose master since roles inequality", pNode->vgId); } } @@ -1124,7 +1116,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { } int32_t vgId = firstPkt.syncHead.vgId; - SSyncNode **ppNode = (SSyncNode **)taosHashGet(tsVgIdHash, (const char *)&vgId, sizeof(int32_t)); + SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); if (ppNode == NULL || *ppNode == NULL) { sError("vgId:%d, vgId could not be found", vgId); taosCloseSocket(connFd); @@ -1321,6 +1313,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle } // always update version + sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, + syncRole[nodeRole], qtypeStr[qtype], pWalHead->version); nodeVersion = pWalHead->version; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; @@ -1328,10 +1322,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle // only pkt from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; - sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, - syncRole[nodeRole], qtypeStr[qtype], pWalHead->version); - - // a hacker way to improve the performance + // a hacker way to improve the performance pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead->type = TAOS_SMSG_FORWARD; pSyncHead->pversion = 0; @@ -1352,9 +1343,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen); if (retLen == fwdLen) { - sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + sTrace("%s, forward is sent, role:%s sstatus:%s hver:%" PRIu64 " contLen:%d", pPeer->id, syncRole[pPeer->role], + syncStatus[pPeer->sstatus], pWalHead->version, pWalHead->len); } else { - sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + sError("%s, failed to forward, role:%s sstatus:%s hver:%" PRIu64 " retLen:%d", pPeer->id, syncRole[pPeer->role], + syncStatus[pPeer->sstatus], pWalHead->version, retLen); syncRestartConnection(pPeer); } } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 2ca4b5424ee3134d3742c29e3af2b21671637b19..ed6b63c92deaa1f077311599ff3e88469bd77593 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -147,12 +147,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { static int32_t syncRestoreWal(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; int32_t ret, code = -1; + uint64_t lastVer = 0; - void *buffer = calloc(SYNC_MAX_SIZE, 1); // size for one record - if (buffer == NULL) return -1; - - SWalHead *pHead = (SWalHead *)buffer; - uint64_t lastVer = 0; + SWalHead *pHead = calloc(SYNC_MAX_SIZE, 1); // size for one record + if (pHead == NULL) return -1; while (1) { ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); @@ -188,7 +186,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno)); } - free(buffer); + free(pHead); return code; } @@ -233,10 +231,13 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { SSyncNode * pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; - - if (pRecv == NULL) return -1; int32_t len = pHead->len + sizeof(SWalHead); + if (pRecv == NULL) { + sError("%s, recv buffer is not create yet", pPeer->id); + return -1; + } + if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) { memcpy(pRecv->offset, pHead, len); pRecv->offset += len; @@ -284,7 +285,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { nodeSStatus = TAOS_SYNC_STATUS_FILE; uint64_t fversion = 0; - sDebug("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); + sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + SFirstPktRsp firstPktRsp = {.sync = 1}; + if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { + sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); + return -1; + } + + sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); int32_t code = syncRestoreFile(pPeer, &fversion); if (code < 0) { sError("%s, failed to restore file", pPeer->id); @@ -301,14 +309,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { nodeVersion = fversion; - sDebug("%s, start to restore wal", pPeer->id); + sInfo("%s, start to restore wal", pPeer->id); if (syncRestoreWal(pPeer) < 0) { sError("%s, failed to restore wal", pPeer->id); return -1; } nodeSStatus = TAOS_SYNC_STATUS_CACHE; - sDebug("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); + sInfo("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); if (syncProcessBufferedFwd(pPeer) < 0) { sError("%s, failed to insert buffered points", pPeer->id); return -1; diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 58d09d080ecca141cdc55ae8b0fac1e791f6725a..348f91820b5be6d35a8fbff1ce32ba5bd2bb1689 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -448,7 +448,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { return code; } -static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { +static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFirstPkt firstPkt; @@ -458,8 +458,24 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - if (taosWriteMsg(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) { - sError("%s, failed to send syncCmd", pPeer->id); + if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) { + sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno)); + return -1; + } + + SFirstPktRsp firstPktRsp; + if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { + sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno)); + return -1; + } + + return 0; +} + +static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { + sInfo("%s, start to retrieve, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + if (syncRetrieveFirstPkt(pPeer) < 0) { + sError("%s, failed to start retrieve", pPeer->id); return -1; } diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index d1d9815f4af2e787f251dd15e41c2d8eccfbebe9..eb05cf7c6f8d8d9a068b60c54f0ae1426e62c429 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -203,16 +203,19 @@ static void *taosProcessTcpData(void *param) { assert(pConn); if (events[i].events & EPOLLERR) { + sDebug("conn is broken since EPOLLERR"); taosProcessBrokenLink(pConn); continue; } if (events[i].events & EPOLLHUP) { + sDebug("conn is broken since EPOLLHUP"); taosProcessBrokenLink(pConn); continue; } if (events[i].events & EPOLLRDHUP) { + sDebug("conn is broken since EPOLLRDHUP"); taosProcessBrokenLink(pConn); continue; }