提交 e76ae4ff 编写于 作者: H Haojun Liao

fix(tsdb) : fix memory leak.

上级 84bd2fb7
......@@ -967,14 +967,53 @@ static void cleanupTableScanInfo(SReaderStatus* pStatus) {
doCleanupTableScanInfo(*px);
}
}
// pStatus->mapDataCleaned = true;
typedef struct SBrinRecordIter {
SArray* pBrinBlockList;
SBrinBlk* pCurrentBlk;
int32_t blockIndex;
int32_t recordIndex;
SDataFileReader* pReader;
SBrinBlock block;
SBrinRecord record;
} SBrinRecordIter;
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) {
memset(&pIter->block, 0, sizeof(SBrinBlock));
memset(&pIter->record, 0, sizeof(SBrinRecord));
pIter->blockIndex = -1;
pIter->recordIndex = -1;
pIter->pReader = pReader;
pIter->pBrinBlockList = pList;
}
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) {
pIter->blockIndex += 1;
if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
return NULL;
}
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
tBrinBlockClear(&pIter->block);
tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
pIter->recordIndex = -1;
}
pIter->recordIndex += 1;
tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record);
return &pIter->record;
}
void clearBrinBlockIter(SBrinRecordIter* pIter) {
tBrinBlockDestroy(&pIter->block);
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
size_t sizeInDisk = 0;
size_t numOfBlocks = taosArrayGetSize(pIndexList);
int64_t st = taosGetTimestampUs();
cleanupTableScanInfo(&pReader->status);
......@@ -982,95 +1021,79 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
int32_t i = 0, k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
while(i < numOfBlocks && k < numOfTables) {
SBrinBlk* pBlk = taosArrayGet(pIndexList, i);
uint64_t uid = pReader->status.uidList.tableUidList[k];
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
STimeWindow w = pReader->window;
SBrinBlock block = {0};
tsdbDataFileReadBrinBlock(pReader->pFileReader, pBlk, &block);
SBrinRecordIter iter = {0};
initBrinRecordIter(&iter, pReader->pFileReader, pIndexList);
SBrinRecord* pRecord = NULL;
// tMapDataReset(&pScanInfo->mapData);
// tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
// taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
while (k < numOfTables) {
pRecord = getNextBrinRecord(&iter);
// todo set the correct size
sizeInDisk += 0;//pScanInfo->mapData.nData;
uint64_t uid = pReader->status.uidList.tableUidList[k];
if (pRecord == NULL || pRecord->suid > pReader->suid) {
break;
}
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
STimeWindow w = pReader->window;
if (isEmptyQueryTimeWindow(&w)) {
if (pRecord->suid < pReader->suid) {
continue;
}
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (ASCENDING_TRAVERSE(pReader->order)) {
w.skey = pScanInfo->lastKey + step;
} else {
w.ekey = pScanInfo->lastKey + step;
ASSERT(pRecord->suid == pReader->suid);
if (pRecord->uid < uid) {
continue;
}
SBrinRecord record = {0};
for (int32_t j = 0; j < TARRAY2_SIZE(block.numRow); ++j) {
tBrinBlockGet(&block, j, &record);
if (record.suid < pReader->suid) {
continue;
while (pRecord->uid > uid && k < numOfTables) {
k += 1;
}
if (record.suid > pReader->suid) {
if (k >= numOfTables) {
break;
}
{
while (record.uid > uid && (k + 1) < numOfTables) {
k += 1;
uid = pReader->status.uidList.tableUidList[k];
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
ASSERT(pRecord->suid == pReader->suid && uid == pRecord->uid);
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (ASCENDING_TRAVERSE(pReader->order)) {
w.skey = pScanInfo->lastKey + step;
} else {
w.ekey = pScanInfo->lastKey + step;
}
}
if (k >= numOfTables) {
break;
}
if (record.uid < uid) {
if (isEmptyQueryTimeWindow(&w)) {
k += 1;
continue;
}
}
ASSERT(record.suid == pReader->suid);
// 1. time range check
if (record.firstKey > w.ekey || record.lastKey < w.skey) {
if (pRecord->firstKey > w.ekey || pRecord->lastKey < w.skey) {
continue;
}
// 2. version range check
if (record.firstKeyVer > pReader->verRange.maxVer || record.lastKeyVer < pReader->verRange.minVer) {
if (pRecord->firstKeyVer > pReader->verRange.maxVer || pRecord->lastKeyVer < pReader->verRange.minVer) {
continue;
}
// SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = record.blockOffset};
// bIndex.window = (STimeWindow){.skey = record.firstKey, .ekey = record.lastKey};
void* p1 = taosArrayPush(pScanInfo->pBlockList, &record);
void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord);
if (p1 == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlockNum->numOfBlocks += 1;
}
i += 1;
if (taosArrayGetSize(pTableScanInfoList) == 0) {
taosArrayPush(pTableScanInfoList, &pScanInfo);
} else {
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
if ((p == NULL || (*p)->uid != uid) && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
if ((*p)->uid != uid) {
taosArrayPush(pTableScanInfoList, &pScanInfo);
}
}
}
clearBrinBlockIter(&iter);
pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size;
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
......@@ -1336,7 +1359,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
// SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
int32_t numOfOutputCols = pSupInfo->numOfCols;
int32_t code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册