未验证 提交 49326aa9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16690 from taosdata/feature/3_liaohj

fix(query): optimize last block read performance.
...@@ -643,20 +643,33 @@ typedef struct { ...@@ -643,20 +643,33 @@ typedef struct {
TSDBROW row; TSDBROW row;
} SRowInfo; } SRowInfo;
typedef struct SSttBlockLoadInfo {
SBlockData blockData[2];
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
} SSttBlockLoadInfo;
typedef struct SMergeTree { typedef struct SMergeTree {
int8_t backward; int8_t backward;
SRBTree rbt; SRBTree rbt;
SArray *pIterList; SArray *pIterList;
SLDataIter *pIter; SLDataIter *pIter;
bool destroyLoadInfo;
SSttBlockLoadInfo* pLoadInfo;
} SMergeTree; } SMergeTree;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange); STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo* tCreateLastBlockLoadInfo();
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
...@@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}); &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL);
bool hasVal = tMergeTreeNext(&state->mergeTree); bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) { if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
......
...@@ -22,26 +22,106 @@ struct SLDataIter { ...@@ -22,26 +22,106 @@ struct SLDataIter {
SDataFReader *pReader; SDataFReader *pReader;
int32_t iStt; int32_t iStt;
int8_t backward; int8_t backward;
SArray *aSttBlk;
int32_t iSttBlk; int32_t iSttBlk;
SBlockData bData[2];
int32_t loadIndex;
int32_t iRow; int32_t iRow;
SRowInfo rInfo; SRowInfo rInfo;
uint64_t uid; uint64_t uid;
STimeWindow timeWindow; STimeWindow timeWindow;
SVersionRange verRange; SVersionRange verRange;
SSttBlockLoadInfo* pBlockLoadInfo;
}; };
static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; } SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
pLoadInfo[i].currentLoadBlockIndex = 1;
int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
if (code) {
terrno = code;
}
code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
if (code) {
terrno = code;
}
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
}
return pLoadInfo;
}
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
static SBlockData *getNextBlock(SLDataIter *pIter) { taosArrayClear(pLoadInfo[i].aSttBlk);
pIter->loadIndex ^= 1; }
return getCurrentBlock(pIter); }
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
tBlockDataDestroy(&pLoadInfo[i].blockData[0], true);
tBlockDataDestroy(&pLoadInfo[i].blockData[1], true);
taosArrayDestroy(pLoadInfo[i].aSttBlk);
}
taosMemoryFree(pLoadInfo);
return NULL;
}
static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
int32_t code = 0;
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
return &pInfo->blockData[0];
}
if (pInfo->blockIndex[1] == pIter->iSttBlk) {
return &pInfo->blockData[1];
}
pInfo->currentLoadBlockIndex ^= 1;
if (pIter->pSttBlk != NULL) { // current block not loaded yet
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]);
tsdbDebug("read last block, index:%d, last file index:%d", pIter->iSttBlk, pIter->iStt);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
}
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
_exit:
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
}
return NULL;
} }
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) { uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
int32_t code = 0; int32_t code = 0;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) { if (*pIter == NULL) {
...@@ -55,34 +135,22 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -55,34 +135,22 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->backward = backward; (*pIter)->backward = backward;
(*pIter)->verRange = *pRange; (*pIter)->verRange = *pRange;
(*pIter)->timeWindow = *pTimeWindow; (*pIter)->timeWindow = *pTimeWindow;
(*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if ((*pIter)->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData[0]);
if (code) {
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData[1]); (*pIter)->pBlockLoadInfo = pBlockLoadInfo;
if (code) { if (taosArrayGetSize(pBlockLoadInfo->aSttBlk) == 0) {
goto _exit; code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
} if (code) {
goto _exit;
code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk); }
if (code) {
goto _exit;
} }
size_t size = taosArrayGetSize((*pIter)->aSttBlk); size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block // find the start block
int32_t index = -1; int32_t index = -1;
if (!backward) { // asc if (!backward) { // asc
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
if (p->suid != suid) { if (p->suid != suid) {
continue; continue;
} }
...@@ -94,7 +162,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -94,7 +162,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
} }
} else { // desc } else { // desc
for (int32_t i = size - 1; i >= 0; --i) { for (int32_t i = size - 1; i >= 0; --i) {
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
if (p->suid != suid) { if (p->suid != suid) {
continue; continue;
} }
...@@ -108,7 +176,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -108,7 +176,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->iSttBlk = index; (*pIter)->iSttBlk = index;
if (index != -1) { if (index != -1) {
(*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk); (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
} }
_exit: _exit:
...@@ -116,9 +185,6 @@ _exit: ...@@ -116,9 +185,6 @@ _exit:
} }
void tLDataIterClose(SLDataIter *pIter) { void tLDataIterClose(SLDataIter *pIter) {
tBlockDataDestroy(&pIter->bData[0], 1);
tBlockDataDestroy(&pIter->bData[1], 1);
taosArrayDestroy(pIter->aSttBlk);
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
...@@ -127,9 +193,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) { ...@@ -127,9 +193,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter->iSttBlk += step; pIter->iSttBlk += step;
int32_t index = -1; int32_t index = -1;
size_t size = taosArrayGetSize(pIter->aSttBlk); size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
SSttBlk *p = taosArrayGet(pIter->aSttBlk, i); SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
if ((!pIter->backward) && p->minUid > pIter->uid) { if ((!pIter->backward) && p->minUid > pIter->uid) {
break; break;
} }
...@@ -169,7 +235,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) { ...@@ -169,7 +235,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
if (index == -1) { if (index == -1) {
pIter->pSttBlk = NULL; pIter->pSttBlk = NULL;
} else { } else {
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
} }
} }
...@@ -178,7 +244,8 @@ static void findNextValidRow(SLDataIter *pIter) { ...@@ -178,7 +244,8 @@ static void findNextValidRow(SLDataIter *pIter) {
bool hasVal = false; bool hasVal = false;
int32_t i = pIter->iRow; int32_t i = pIter->iRow;
SBlockData *pBlockData = getCurrentBlock(pIter);
SBlockData *pBlockData = loadBlockIfMissing(pIter);
for (; i < pBlockData->nRow && i >= 0; i += step) { for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) { if (pBlockData->aUid != NULL) {
...@@ -238,19 +305,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) { ...@@ -238,19 +305,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
return false; return false;
} }
int32_t iBlockL = pIter->iSttBlk; int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = getCurrentBlock(pIter); SBlockData *pBlockData = loadBlockIfMissing(pIter);
if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet
pBlockData = getNextBlock(pIter);
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1;
}
pIter->iRow += step; pIter->iRow += step;
while (1) { while (1) {
...@@ -266,12 +322,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) { ...@@ -266,12 +322,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
} }
if (iBlockL != pIter->iSttBlk) { if (iBlockL != pIter->iSttBlk) {
pBlockData = getNextBlock(pIter); pBlockData = loadBlockIfMissing(pIter);
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); pIter->iRow += step;
if (code) {
goto _exit;
}
pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0;
} }
} }
...@@ -313,7 +365,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { ...@@ -313,7 +365,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
} }
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange) { STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) {
pMTree->backward = backward; pMTree->backward = backward;
pMTree->pIter = NULL; pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
...@@ -322,21 +374,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead ...@@ -322,21 +374,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
} }
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_SUCCESS;
SSttBlockLoadInfo* pLoadInfo = NULL;
if (pBlockLoadInfo == NULL) {
if (pMTree->pLoadInfo == NULL) {
pMTree->destroyLoadInfo = true;
pMTree->pLoadInfo = tCreateLastBlockLoadInfo();
}
pLoadInfo = pMTree->pLoadInfo;
} else {
pLoadInfo = pBlockLoadInfo;
}
struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0};
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange); struct SLDataIter* pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
bool hasVal = tLDataIterNextRow(pIterList[i]); bool hasVal = tLDataIterNextRow(pIter);
if (hasVal) { if (hasVal) {
taosArrayPush(pMTree->pIterList, &pIterList[i]); taosArrayPush(pMTree->pIterList, &pIter);
tMergeTreeAddIter(pMTree, pIterList[i]); tMergeTreeAddIter(pMTree, pIter);
} else { } else {
tLDataIterClose(pIterList[i]); tLDataIterClose(pIter);
} }
} }
...@@ -393,4 +457,9 @@ void tMergeTreeClose(SMergeTree *pMTree) { ...@@ -393,4 +457,9 @@ void tMergeTreeClose(SMergeTree *pMTree) {
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList); pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
pMTree->pIter = NULL; pMTree->pIter = NULL;
if (pMTree->destroyLoadInfo) {
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
pMTree->destroyLoadInfo = false;
}
} }
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
#include "tsdb.h" #include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define INITIAL_ROW_INDEX_VAL (-1)
typedef enum { typedef enum {
EXTERNAL_ROWS_PREV = 0x1, EXTERNAL_ROWS_PREV = 0x1,
...@@ -88,6 +86,7 @@ typedef struct SLastBlockReader { ...@@ -88,6 +86,7 @@ typedef struct SLastBlockReader {
int32_t order; int32_t order;
uint64_t uid; uint64_t uid;
SMergeTree mergeTree; SMergeTree mergeTree;
SSttBlockLoadInfo* pInfo;
} SLastBlockReader; } SLastBlockReader;
typedef struct SFilesetIter { typedef struct SFilesetIter {
...@@ -226,13 +225,14 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -226,13 +225,14 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return NULL; return NULL;
} }
int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1;
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
info.lastKey = pTsdbReader->window.skey - step; int64_t skey = pTsdbReader->window.skey;
info.lastKey = (skey > INT64_MIN)? (skey - 1):skey;
} else { } else {
info.lastKey = pTsdbReader->window.ekey - step; int64_t ekey = pTsdbReader->window.ekey;
info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey;
} }
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
...@@ -319,8 +319,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap ...@@ -319,8 +319,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
} }
// init file iterator // init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
STsdbReader* pReader /*int32_t order, const char* idstr*/) {
size_t numOfFileset = taosArrayGetSize(aDFileSet); size_t numOfFileset = taosArrayGetSize(aDFileSet);
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
...@@ -345,6 +344,14 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, ...@@ -345,6 +344,14 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
pLReader->uid = 0; pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
if (pLReader->pInfo == NULL) {
pLReader->pInfo = tCreateLastBlockLoadInfo();
if (pLReader->pInfo == NULL) {
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
return terrno;
}
}
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -360,6 +367,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { ...@@ -360,6 +367,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
pIter->pLastBlockReader->uid = 0; pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
// check file the time range of coverage // check file the time range of coverage
STimeWindow win = {0}; STimeWindow win = {0};
...@@ -1377,7 +1385,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1377,7 +1385,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) { bool mergeBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
// SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
...@@ -1866,36 +1873,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc ...@@ -1866,36 +1873,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
} }
} }
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
STsdbReader* pReader) {
// the last block reader has been initialized for this table. // the last block reader has been initialized for this table.
if (pLastBlockReader->uid == pBlockScanInfo->uid) { if (pLBlockReader->uid == pScanInfo->uid) {
return true; return true;
} }
if (pLastBlockReader->uid != 0) { if (pLBlockReader->uid != 0) {
tMergeTreeClose(&pLastBlockReader->mergeTree); tMergeTreeClose(&pLBlockReader->mergeTree);
} }
initMemDataIterator(pBlockScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
pLastBlockReader->uid = pBlockScanInfo->uid; pLBlockReader->uid = pScanInfo->uid;
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
STimeWindow w = pLastBlockReader->window; STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pBlockScanInfo->lastKey + step; w.skey = pScanInfo->lastKey + step;
} else { } else {
w.ekey = pBlockScanInfo->lastKey + step; w.ekey = pScanInfo->lastKey + step;
} }
int32_t code = int32_t code =
tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange); pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
return nextRowFromLastBlocks(pLastBlockReader, pBlockScanInfo); return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
} }
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
...@@ -3305,6 +3311,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3305,6 +3311,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) { if (pFilesetIter->pLastBlockReader != NULL) {
tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree);
pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo);
taosMemoryFree(pFilesetIter->pLastBlockReader); taosMemoryFree(pFilesetIter->pLastBlockReader);
} }
......
...@@ -97,6 +97,7 @@ while $loop <= $loops ...@@ -97,6 +97,7 @@ while $loop <= $loops
endw endw
sql select count(*) from $stb sql select count(*) from $stb
if $data00 != $totalNum then if $data00 != $totalNum then
print expect $totalNum , actual: $data00
return -1 return -1
endi endi
$loop = $loop + 1 $loop = $loop + 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册