提交 15d6cb55 编写于 作者: S Shengliang Guan

Merge branch 'main' into fix/TD-21424_TD-21389_TD-21420

...@@ -190,7 +190,7 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl ...@@ -190,7 +190,7 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
STableBlockScanInfo* pInfo); STableBlockScanInfo* pScanInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex); int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
...@@ -2482,6 +2482,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2482,6 +2482,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t nextIndex = -1; int32_t nextIndex = -1;
SBlockIndex bIndex = {0}; SBlockIndex bIndex = {0};
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex); bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex);
if (!hasNeighbor) { // do nothing if (!hasNeighbor) { // do nothing
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
......
...@@ -1294,10 +1294,6 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1294,10 +1294,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
if (pSyncNode->pFsm != NULL) {
taosMemoryFree(pSyncNode->pFsm);
}
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] != NULL) { if ((pSyncNode->senders)[i] != NULL) {
sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]); sSTrace((pSyncNode->senders)[i], "snapshot sender destroy while close, data:%p", (pSyncNode->senders)[i]);
...@@ -1320,6 +1316,10 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1320,6 +1316,10 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL; pSyncNode->pNewNodeReceiver = NULL;
} }
if (pSyncNode->pFsm != NULL) {
taosMemoryFree(pSyncNode->pFsm);
}
taosMemoryFree(pSyncNode); taosMemoryFree(pSyncNode);
} }
......
...@@ -122,15 +122,16 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -122,15 +122,16 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->startTime = pSender->startTime; pMsg->startTime = pSender->startTime;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
// event log
sSDebug(pSender, "snapshot sender start");
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1; return -1;
} }
// event log
sSDebug(pSender, "snapshot sender start");
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
return 0; return 0;
} }
...@@ -208,14 +209,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -208,14 +209,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
} }
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs();
// event log // event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq); sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq);
...@@ -224,6 +217,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -224,6 +217,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq); sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending"); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
} }
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs();
return 0; return 0;
} }
...@@ -252,6 +253,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -252,6 +253,10 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
} }
// event log
sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
...@@ -259,10 +264,6 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -259,10 +264,6 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
} }
pSender->lastSendTime = taosGetTimestampMs(); pSender->lastSendTime = taosGetTimestampMs();
// event log
sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
return 0; return 0;
} }
...@@ -748,6 +749,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS ...@@ -748,6 +749,7 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr()); sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
return -1; return -1;
} }
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册