提交 d1b7d077 编写于 作者: H Haojun Liao

refactor:do some internal refactor.

上级 b2cf2818
......@@ -307,12 +307,6 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
struct SLDataIter;
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pRange);
void tLDataIterClose(struct SLDataIter *pIter);
bool tLDataIterNextRow(struct SLDataIter *pIter);
// structs =======================
struct STsdbFS {
SDelFile *pDelFile;
......@@ -640,16 +634,16 @@ typedef struct {
typedef struct SMergeTree {
int8_t backward;
SRBTreeNode *pNode;
SRBTree rbt;
SArray *pIterList;
struct SLDataIter *pIter;
SDataFReader* pLFileReader;
} SMergeTree;
void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange);
void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree* pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree* pMTree);
void tMergeTreeClose(SMergeTree* pMTree);
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
......
......@@ -93,6 +93,7 @@ _exit:
void tLDataIterClose(SLDataIter *pIter) {
tBlockDataDestroy(&pIter->bData, 1);
taosArrayDestroy(pIter->aBlockL);
taosMemoryFree(pIter);
}
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData);
......@@ -272,8 +273,9 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange) {
pMTree->backward = backward;
pMTree->pNode = NULL;
pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
struct SLDataIter* pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
......@@ -281,7 +283,10 @@ void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader,
/*int32_t code = */tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
bool hasVal = tLDataIterNextRow(pIterList[i]);
if (hasVal) {
taosArrayPush(pMTree->pIterList, &pIterList[i]);
tMergeTreeAddIter(pMTree, pIterList[i]);
} else {
tLDataIterClose(pIterList[i]);
}
}
}
......@@ -326,5 +331,12 @@ TSDBROW tMergeTreeGetRow(SMergeTree* pMTree) {
}
void tMergeTreeClose(SMergeTree* pMTree) {
size_t size = taosArrayGetSize(pMTree->pIterList);
for(int32_t i = 0; i < size; ++i) {
SLDataIter* pIter = taosArrayGetP(pMTree->pIterList, i);
tLDataIterClose(pIter);
}
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
pMTree->pIter = NULL;
}
......@@ -48,7 +48,6 @@ typedef struct STableBlockScanInfo {
int32_t fileDelIndex; // file block delete index
int32_t lastBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
int16_t indexInBlockL; // row position in last block
} STableBlockScanInfo;
typedef struct SBlockOrderWrapper {
......@@ -228,7 +227,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL};
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
info.lastKey = pTsdbReader->window.skey;
......@@ -355,6 +354,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
pIter->index += step;
pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
return false;
}
......@@ -567,7 +567,6 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
}
// reset the index in last block when handing a new file
px->indexInBlockL = INITIAL_ROW_INDEX_VAL;
tMapDataClear(&px->mapData);
taosArrayClear(px->pBlockList);
}
......@@ -1764,17 +1763,20 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos, SDataFReader* pFReader) {
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, SDataFReader* pFReader) {
// the last block reader has been initialized for this table.
if (pLastBlockReader->uid == uid) {
return true;
}
if (pLastBlockReader->uid != 0) {
tMergeTreeClose(&pLastBlockReader->mergeTree);
}
pLastBlockReader->uid = uid;
/*int32_t code = */ tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC),
pFReader, uid, &pLastBlockReader->window, &pLastBlockReader->verRange);
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
return hasVal;
return tMergeTreeNext(&pLastBlockReader->mergeTree);
}
static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
......@@ -2274,8 +2276,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) {
// load the last data block of current table
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
bool hasVal =
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL, pReader->pFileReader);
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo->uid, pReader->pFileReader);
if (!hasVal) {
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (!hasNexTable) {
......@@ -2284,26 +2285,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue;
}
// int32_t index = pScanInfo->indexInBlockL;
// if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
// bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
// if (!hasData) { // current table does not have rows in last block, try next table
// bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
// if (!hasNexTable) {
// return TSDB_CODE_SUCCESS;
// }
// continue;
// }
// }
// } else { // no data in last block, try next table
// bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
// if (!hasNexTable) {
// return TSDB_CODE_SUCCESS;
// }
// continue;
// }
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -3144,7 +3125,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
ASSERT(pReader != NULL);
taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL};
STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
return TDB_CODE_SUCCESS;
}
......@@ -3287,7 +3268,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
taosMemoryFreeClear(pSupInfo->plist);
taosMemoryFree(pSupInfo->colIds);
......@@ -3312,10 +3292,13 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDataFReaderClose(&pReader->pFileReader);
}
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap);
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) {
tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree);
taosMemoryFree(pFilesetIter->pLastBlockReader);
}
......
......@@ -695,6 +695,7 @@ static void destroyTableScanOperatorInfo(void* param) {
cleanupQueryTableDataCond(&pTableScanInfo->cond);
tsdbReaderClose(pTableScanInfo->dataReader);
pTableScanInfo->dataReader = NULL;
if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册