提交 f6e3ad27 编写于 作者: H Haojun Liao

fix(tsdb): fix memory leak.

上级 a92d16a5
......@@ -800,7 +800,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint6
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
......@@ -809,6 +809,7 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList,
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter);
// tsdbCache ==============================================================================================
typedef struct SCacheRowsReader {
......
......@@ -17,6 +17,8 @@
#include "tsdbFSet2.h"
#include "tsdbSttFileRW.h"
static void tLDataIterClose2(SLDataIter *pIter);
// SLDataIter =================================================
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
int32_t numOfSttTrigger) {
......@@ -93,6 +95,12 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
return NULL;
}
void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter) {
for(int32_t i = 0; i < numOfIter; ++i) {
tLDataIterClose2(&pLDataIter[i]);
}
}
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
int32_t code = 0;
......@@ -357,6 +365,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32
void tLDataIterClose2(SLDataIter *pIter) {
tsdbSttFileReaderClose(&pIter->pReader);
pIter->pReader = NULL;
}
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
......@@ -623,9 +632,10 @@ _end:
return code;
}
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid,
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet) {
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter,
void *pCurrentFileSet) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
......@@ -643,24 +653,29 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint6
pMTree->ignoreEarlierTs = false;
// todo handle other level of stt files, here only deal with the first level stt
int32_t size = ((STFileSet*)pCurrentFileSet)->lvlArr[0].size;
int32_t size = ((STFileSet *)pCurrentFileSet)->lvlArr[0].size;
if (size == 0) {
goto _end;
}
SSttLvl* pSttLevel = ((STFileSet*)pCurrentFileSet)->lvlArr[0].data[0];
SSttLvl *pSttLevel = ((STFileSet *)pCurrentFileSet)->lvlArr[0].data[0];
ASSERT(pSttLevel->level == 0);
for (int32_t i = 0; i < pSttLevel->fobjArr[0].size; ++i) { // open all last file
SSttFileReader* pSttFileReader = pLDataIter[i].pReader;
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
if (pSttFileReader == NULL) {
SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.szPage};
conf.file[0] = *pSttLevel->fobjArr[0].data[i]->f;
SSttFileReader *pReader = NULL;
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pReader);
code = tsdbSttFileReaderOpen(pSttLevel->fobjArr[0].data[i]->fname, &conf, &pSttFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = tLDataIterOpen2(&pLDataIter[i], pReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
code = tLDataIterOpen2(&pLDataIter[i], pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
......@@ -678,7 +693,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint6
return code;
_end:
_end:
tMergeTreeClose(pMTree);
return code;
}
......
......@@ -910,20 +910,13 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid);
if (pBrinBlk->minTbid.uid < pList->tableUidList[j]) {
if (pBrinBlk->maxTbid.uid < pList->tableUidList[j]) {
i += 1;
continue;
}
if (pBrinBlk->minTbid.uid > pList->tableUidList[j]) {
j += 1;
continue;
}
// todo maxTbid.uid == xxx?
if (pBrinBlk->minTbid.uid == pList->tableUidList[j]) {
// this block belongs to a table that is not queried.
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBrinBlk->minTbid.uid, pReader->idStr);
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pList->tableUidList[j], pReader->idStr);
if (pScanInfo == NULL) {
// tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return terrno;
......@@ -934,11 +927,8 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
}
taosArrayPush(pIndexList, pBrinBlk);
i += 1;
j += 1;
}
}
int64_t et2 = taosGetTimestampUs();
tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set BrinBlk:%.2f ms, size:%.2f Kb %s",
......@@ -1020,6 +1010,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
SBrinRecord record = {0};
for (int32_t j = 0; j < TARRAY2_SIZE(block.numRow); ++j) {
tBrinBlockGet(&block, j, &record);
if (record.suid < pReader->suid) {
continue;
}
if (record.suid > pReader->suid) {
break;
}
{
while (record.uid > uid) {
k += 1;
......@@ -1539,9 +1537,9 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord* pRecord = &pBlockInfo->record;
// SDataBlk* pBlock = getCurrentBlock(pBlockIter);
code = tsdbDataFileReadBlockData(pReader->pFileReader, &pBlockInfo->record, pBlockData);
code = tsdbDataFileReadBlockData(pReader->pFileReader, pRecord, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
......@@ -1552,7 +1550,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
SBrinRecord* pRecord = &pBlockInfo->record;
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->numRow,
......@@ -1983,7 +1980,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
return loadDataBlock;
}
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
......@@ -2870,7 +2867,6 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
// SBlockIndex nxtBIndex = {0};
*loadNeighbor = false;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &rec);
......@@ -2928,13 +2924,14 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
SBrinRecord* pRecord = &pBlockInfo->record;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order);
return code;
}
......@@ -2943,11 +2940,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
goto _end;
}
pRecord = &pBlockInfo->record;
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->resBlockInfo.capacity) {
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
(pRecord->numRow <= pReader->resBlockInfo.capacity)) {
if (asc || (!hasDataInLastBlock(pLastBlockReader))) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
......@@ -2955,14 +2953,15 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
// record the last key value
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
pBlockScanInfo->lastKey = asc ? pRecord->lastKey : pRecord->firstKey;
goto _end;
}
}
} else { // file blocks not exist
ASSERT(0);
pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
return code;
}
}
......@@ -2972,8 +2971,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
while (1) {
bool hasBlockData = false;
{
while (pBlockData->nRow > 0 &&
pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block
while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) {
// find the first qualified row in data block
if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
hasBlockData = true;
break;
......@@ -2981,8 +2980,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pDumpInfo->rowIndex += step;
pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
if (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0) {
pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info
// continue check for the next file block if the last ts in the current block
......@@ -2990,7 +2988,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
bool loadNeighbor = false;
code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
if ((!loadNeighbor) || (code != 0)) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order);
break;
}
}
......@@ -3009,8 +3007,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
// pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order);
break;
}
......@@ -3290,7 +3288,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo);
// pStatus->mapDataCleaned = true;
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) {
......@@ -3625,21 +3622,24 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
// set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
}
}
SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
pDumpInfo->totalRows = pBlockInfo->record.numRow;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlockInfo->record.numRow - 1;
} else {
pDumpInfo->totalRows = 0;
pDumpInfo->rowIndex = 0;
}
pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->allDumped = false;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
pDumpInfo->lastKey = lastKey;
}
......@@ -4041,7 +4041,7 @@ typedef enum {
CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo,
SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
CHECK_FILEBLOCK_STATE* state) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -4085,13 +4085,13 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
CHECK_FILEBLOCK_STATE st;
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
// SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
if (pFileBlockInfo == NULL) {
st = CHECK_FILEBLOCK_QUIT;
break;
}
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
}
......@@ -4767,13 +4767,9 @@ void tsdbReaderClose2(STsdbReader* pReader) {
pReader->pReadSnap = NULL;
tsdbReleaseReader(pReader);
tsdbUninitReaderLock(pReader);
taosMemoryFreeClear(pReader->status.pLDataIter);
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
SIOCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
......@@ -4782,9 +4778,15 @@ void tsdbReaderClose2(STsdbReader* pReader) {
getLastBlockLoadInfo(pLReader->pInfo, &pCost->lastBlockLoad, &pCost->lastBlockLoadTime);
pLReader->pInfo = destroyLastBlockLoadInfo(pLReader->pInfo);
// todo dynamic allocate the number of stt data iter
destroySttBlockReader(pReader->status.pLDataIter, pReader->pTsdb->pVnode->config.sttTrigger);
taosMemoryFree(pLReader);
}
taosMemoryFreeClear(pReader->status.pLDataIter);
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64
......@@ -5210,9 +5212,10 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
int64_t st = taosGetTimestampUs();
ASSERT(0);
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (tDataBlkHasSma(pBlock)) {
// SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (/*tDataBlkHasSma(pBlock)*/1) {
// code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
......@@ -5250,7 +5253,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
// do fill all null column value SMA info
if (doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg)) {
if (doFillNullColSMA(pSup, pFBlock->record.numRow, numOfCols, pTsAgg)) {
*hasNullSMA = true;
return TSDB_CODE_SUCCESS;
}
......@@ -5470,11 +5473,13 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
pTableBlockInfo->numOfTables = numOfTables;
bool hasNext = (pBlockIter->numOfBlocks > 0);
ASSERT(0);
while (true) {
if (hasNext) {
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
// SDataBlk* pBlock = getCurrentBlock(pBlockIter);
int32_t numOfRows = pBlock->nRow;
int32_t numOfRows = 0;//pFB->nRow;
pTableBlockInfo->totalRows += numOfRows;
if (numOfRows > pTableBlockInfo->maxRows) {
......@@ -5489,7 +5494,7 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
pTableBlockInfo->numOfSmallBlocks += 1;
}
pTableBlockInfo->totalSize += pBlock->aSubBlock[0].szBlock;
pTableBlockInfo->totalSize += 0;//pBlock->aSubBlock[0].szBlock;
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBuckets);
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册