diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 93f2fa11ea95f22ca4addf12496b79cdaddbf290..ffdec02eb6ca3fcb2b6cd230fb93ebb26dea26de 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -43,8 +43,8 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode); void mnodeDecMnodeRef(struct SMnodeObj *pMnode); char * mnodeGetMnodeRoleStr(); -void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet); -void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect); +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect); char* mnodeGetMnodeMasterEp(); void mnodeGetMnodeInfos(void *mnodes); diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 68acae7dec0e5505e72733a638abcc142eeeb407..ea5260c76d74ab371b004f8457d0ac265b4203be 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -273,14 +273,14 @@ void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) { mnodeMnodeUnLock(); } -void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { +void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) { mnodeMnodeRdLock(); *epSet = tsMEpForPeer; mnodeMnodeUnLock(); mTrace("vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d", tsMEpForPeer.numOfEps, tsMEpForPeer.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { - if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { + if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { epSet->inUse = (i + 1) % epSet->numOfEps; mTrace("vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { @@ -289,14 +289,14 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { } } -void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { +void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) { mnodeMnodeRdLock(); *epSet = tsMEpForShell; mnodeMnodeUnLock(); mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { - if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { + if (redirect && strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { epSet->inUse = (i + 1) % epSet->numOfEps; mTrace("vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index cfb7b7781b04c0145f66a58c6d3d2eaefa0913f3..aaf8b694279299215dbfe386755c43ed2200e555 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -54,7 +54,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); - mnodeGetMnodeEpSetForPeer(epSet); + mnodeGetMnodeEpSetForPeer(epSet, true); rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 5d63ae9ff411cdddd82400d1f2853e6e882ba527..7c35829f88f4e7d1753a3ee00b13a9f87d03ae48 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -282,10 +282,11 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi // not thread safe, need optimized int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) { - pConn->numOfQueries = 0; + pConn->numOfQueries = 0; pConn->numOfStreams = 0; - int32_t numOfQueries = htonl(pHBMsg->numOfQueries); + int32_t numOfStreams = htonl(pHBMsg->numOfStreams); + if (numOfQueries > 0) { if (pConn->pQueries == NULL) { pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_STREAM_SAVE_SIZE); @@ -299,7 +300,6 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) { } } - int32_t numOfStreams = htonl(pHBMsg->numOfStreams); if (numOfStreams > 0) { if (pConn->pStreams == NULL) { pConn->pStreams = calloc(sizeof(SStreamDesc), QUERY_STREAM_SAVE_SIZE); @@ -309,7 +309,7 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) { int32_t saveSize = pConn->numOfStreams * sizeof(SStreamDesc); if (saveSize > 0 && pConn->pStreams != NULL) { - memcpy(pConn->pStreams, pHBMsg->pData + pConn->numOfQueries * sizeof(SQueryDesc), saveSize); + memcpy(pConn->pStreams, pHBMsg->pData + numOfQueries * sizeof(SQueryDesc), saveSize); } } diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index c2a70bc01d2194fff5b8095d06e40535b940dc25..200f589b7866d3fa6cf9865be397935657d7ba86 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -50,7 +50,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); - mnodeGetMnodeEpSetForShell(epSet); + mnodeGetMnodeEpSetForShell(epSet, true); rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 2da46d5b4bd01d47794b0e77011101ce7dc137cc..3c1c92226a26d0d0195020c05b579c0b6c326fa3 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -282,7 +282,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { pRsp->onlineDnodes = htonl(mnodeGetOnlineDnodesNum()); pRsp->totalDnodes = htonl(mnodeGetDnodesNum()); - mnodeGetMnodeEpSetForShell(&pRsp->epSet); + mnodeGetMnodeEpSetForShell(&pRsp->epSet, false); pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = sizeof(SHeartBeatRsp); @@ -349,7 +349,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; - mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet); + mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false); connect_over: if (code != TSDB_CODE_SUCCESS) { diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 53981238a76ee3976689f77315eabab118816e81..c0699b05b364927492b8c2656bead0e14d46ab5a 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -50,7 +50,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { if (!sdbIsMaster()) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); - mnodeGetMnodeEpSetForShell(epSet); + mnodeGetMnodeEpSetForShell(epSet, true); rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 6d0c52284f7f2429ccfa20389ccd66b2447610b9..2be25447c4a6b45863718ef2b6c402961fecb93f 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -62,12 +62,15 @@ typedef struct { typedef struct { SSyncHead syncHead; uint16_t port; + uint16_t tranId; char fqdn[TSDB_FQDN_LEN]; int32_t sourceId; // only for arbitrator } SFirstPkt; typedef struct { - int8_t sync; + int8_t sync; + int8_t reserved; + uint16_t tranId; } SFirstPktRsp; typedef struct { @@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer); void syncBroadcastStatus(SSyncNode *pNode); void syncAddPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer); +uint16_t syncGenTranId(); #ifdef __cplusplus } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 3fa6323f4d073eaed32c726d51709bb54034aa68..c731f8bcaceab8c1c9686724885169520d7405f9 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { pFwdRsp->code = code; int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen); - - if (retLen == msgLen) { + if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) { sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); } else { sDebug("%s, failed to send forward ack, restart", pPeer->id); @@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ; firstPkt.syncHead.vgId = pNode->vgId; firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); + firstPkt.tranId = syncGenTranId(); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); @@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { nodeSStatus = TAOS_SYNC_STATUS_START; - sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); + sInfo("%s, sync-req is sent to peer, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]); } } @@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; } - int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen); - if (retLen == statusMsgLen) { + if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) { sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, @@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { firstPkt.syncHead.type = TAOS_SMSG_STATUS; tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; + firstPkt.tranId = syncGenTranId(); firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { - sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd); + sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); @@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { return; } + sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId); + SSyncNode *pNode = *ppNode; pthread_mutex_lock(&pNode->mutex); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 3d262d6e7fa6d092d760df87b7b0e76bbd2e1dba..589ff470f170f2ff94efe1974b1c755ea8e16bb1 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { sinfo.index = 0; while (1) { // read file info - int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); - if (ret < 0) { + int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(SFileInfo)); + if (ret != sizeof(SFileInfo)) { sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); break; } @@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { // send file ack ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret < 0) { + if (ret != sizeof(fileAck)) { sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); break; } @@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { while (1) { ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); - if (ret < 0) { + if (ret != sizeof(SWalHead)) { sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno)); break; } @@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { } // wal sync over ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); - if (ret < 0) { + if (ret != pHead->len) { sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno)); break; } @@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { uint64_t fversion = 0; sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - SFirstPktRsp firstPktRsp = {.sync = 1}; - if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { + SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()}; + if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); return -1; } + sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); int32_t code = syncRestoreFile(pPeer, &fversion); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 36b197dd46484c7ce676391aea5dc4efaf00b9d8..5e534a1e56ddd6df03e9ddec608ed56e7269c602 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { uint64_t fver, wver; int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); if (code != 0) { - sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); + sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); return -1; } @@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { int32_t code = -1; char name[TSDB_FILENAME_LEN * 2] = {0}; - if (syncGetFileVersion(pNode, pPeer) < 0) return -1; + if (syncGetFileVersion(pNode, pPeer) < 0) { + pPeer->fileChanged = 1; + return -1; + } while (1) { // retrieve file info @@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.size = 0; fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); - // fileInfo.size = htonl(size); sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); - if (ret < 0) { + if (ret != sizeof(fileInfo)) { code = -1; sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { } // wait for the ack from peer - ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); - if (ret < 0) { + ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); + if (ret != sizeof(SFileAck)) { code = -1; sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; @@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { } if (code == 0) { - pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; - sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); - SWalHead walHead; memset(&walHead, 0, sizeof(walHead)); - taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); + if (taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)) == sizeof(walHead)) { + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; + sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); + } else { + sError("%s, failed to send last wal record since %s", pPeer->id, strerror(errno)); + code = -1; + } } else { sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); } @@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { memset(&firstPkt, 0, sizeof(firstPkt)); firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA; firstPkt.syncHead.vgId = pNode->vgId; + firstPkt.tranId = syncGenTranId(); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) { - sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno)); + if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { + sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); return -1; } + sDebug("%s, send firstPkt to peer, tranId:%u", pPeer->id, firstPkt.tranId); SFirstPktRsp firstPktRsp; - if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { - sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno)); + if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { + sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); return -1; } + sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId); return 0; } diff --git a/tests/pytest/stream/stream2.py b/tests/pytest/stream/stream2.py index 44882f59721c4967ebb3fea12e356bb7e95d71ae..d71742048a45774fbe6909093840f265b829eb4c 100644 --- a/tests/pytest/stream/stream2.py +++ b/tests/pytest/stream/stream2.py @@ -87,6 +87,10 @@ class TDTestCase: tdSql.checkData(0, 3, rowNum) except Exception as e: tdLog.info(repr(e)) + + tdSql.query("show streams") + tdSql.checkRows(1) + tdSql.checkData(0, 2, 's0') tdLog.info("===== step8 =====") tdSql.query( @@ -142,6 +146,12 @@ class TDTestCase: except Exception as e: tdLog.info(repr(e)) + tdSql.query("show streams") + tdSql.checkRows(2) + tdSql.checkData(0, 2, 's1') + tdSql.checkData(1, 2, 's0') + + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__)