diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 6d0c52284f7f2429ccfa20389ccd66b2447610b9..2be25447c4a6b45863718ef2b6c402961fecb93f 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -62,12 +62,15 @@ typedef struct { typedef struct { SSyncHead syncHead; uint16_t port; + uint16_t tranId; char fqdn[TSDB_FQDN_LEN]; int32_t sourceId; // only for arbitrator } SFirstPkt; typedef struct { - int8_t sync; + int8_t sync; + int8_t reserved; + uint16_t tranId; } SFirstPktRsp; typedef struct { @@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer); void syncBroadcastStatus(SSyncNode *pNode); void syncAddPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer); +uint16_t syncGenTranId(); #ifdef __cplusplus } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 3fa6323f4d073eaed32c726d51709bb54034aa68..c731f8bcaceab8c1c9686724885169520d7405f9 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { pFwdRsp->code = code; int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen); - - if (retLen == msgLen) { + if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) { sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); } else { sDebug("%s, failed to send forward ack, restart", pPeer->id); @@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ; firstPkt.syncHead.vgId = pNode->vgId; firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); + firstPkt.tranId = syncGenTranId(); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); @@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { nodeSStatus = TAOS_SYNC_STATUS_START; - sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); + sInfo("%s, sync-req is sent to peer, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]); } } @@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; } - int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen); - if (retLen == statusMsgLen) { + if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) { sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, @@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { firstPkt.syncHead.type = TAOS_SMSG_STATUS; tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; + firstPkt.tranId = syncGenTranId(); firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { - sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd); + sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); @@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } + sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId); + SSyncNode *pNode = *ppNode; pthread_mutex_lock(&pNode->mutex); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 3d262d6e7fa6d092d760df87b7b0e76bbd2e1dba..589ff470f170f2ff94efe1974b1c755ea8e16bb1 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { sinfo.index = 0; while (1) { // read file info - int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); - if (ret < 0) { + int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(SFileInfo)); + if (ret != sizeof(SFileInfo)) { sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); break; } @@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { // send file ack ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret < 0) { + if (ret != sizeof(fileAck)) { sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); break; } @@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { while (1) { ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); - if (ret < 0) { + if (ret != sizeof(SWalHead)) { sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno)); break; } @@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { } // wal sync over ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); - if (ret < 0) { + if (ret != pHead->len) { sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno)); break; } @@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { uint64_t fversion = 0; sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - SFirstPktRsp firstPktRsp = {.sync = 1}; - if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { + SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()}; + if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); return -1; } + sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); int32_t code = syncRestoreFile(pPeer, &fversion); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 36b197dd46484c7ce676391aea5dc4efaf00b9d8..5e534a1e56ddd6df03e9ddec608ed56e7269c602 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { uint64_t fver, wver; int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); if (code != 0) { - sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); + sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); return -1; } @@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; - if (syncGetFileVersion(pNode, pPeer) < 0) return -1; + if (syncGetFileVersion(pNode, pPeer) < 0) { + pPeer->fileChanged = 1; + return -1; + } while (1) { // retrieve file info @@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); - // fileInfo.size = htonl(size); sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret < 0) { + if (ret != sizeof(fileInfo)) { code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { } // wait for the ack from peer - ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret < 0) { + ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { code = -1; sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { } if (code == 0) { - pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; - sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); - taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); + if (taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)) == sizeof(walHead)) { + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; + sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + } else { + sError("%s, failed to send last wal record since %s", pPeer->id, strerror(errno)); + code = -1; + } } else { sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); } @@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { memset(&firstPkt, 0, sizeof(firstPkt)); firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA; firstPkt.syncHead.vgId = pNode->vgId; + firstPkt.tranId = syncGenTranId(); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) { - sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno)); + if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { + sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); return -1; } + sDebug("%s, send firstPkt to peer, tranId:%u", pPeer->id, firstPkt.tranId); SFirstPktRsp firstPktRsp; - if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { - sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno)); + if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { + sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); return -1; } + sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId); return 0; }