提交 a5933fc6 编写于 作者: H Haojun Liao

fix(query): optimize last block read performance.

上级 8f92ffcd
......@@ -643,20 +643,34 @@ typedef struct {
TSDBROW row;
} SRowInfo;
typedef struct SSttBlockLoadInfo {
int32_t sttFileIndex;
SBlockData blockData[2];
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
} SSttBlockLoadInfo;
typedef struct SMergeTree {
int8_t backward;
SRBTree rbt;
SArray *pIterList;
SLDataIter *pIter;
bool destroyLoadInfo;
SSttBlockLoadInfo* pLoadInfo;
} SMergeTree;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange);
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo* tCreateLastBlockLoadInfo();
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
......@@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX});
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL);
bool hasVal = tMergeTreeNext(&state->mergeTree);
if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET;
......
......@@ -22,26 +22,105 @@ struct SLDataIter {
SDataFReader *pReader;
int32_t iStt;
int8_t backward;
SArray *aSttBlk;
int32_t iSttBlk;
SBlockData bData[2];
int32_t loadIndex;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo* pBlockLoadInfo;
};
static SBlockData *getCurrentBlock(SLDataIter *pIter) { return &pIter->bData[pIter->loadIndex]; }
SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
pLoadInfo[i].currentLoadBlockIndex = 1;
int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
if (code) {
terrno = code;
}
code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
if (code) {
terrno = code;
}
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
}
return pLoadInfo;
}
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
static SBlockData *getNextBlock(SLDataIter *pIter) {
pIter->loadIndex ^= 1;
return getCurrentBlock(pIter);
taosArrayClear(pLoadInfo[i].aSttBlk);
}
}
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
tBlockDataDestroy(&pLoadInfo[i].blockData[0], true);
tBlockDataDestroy(&pLoadInfo[i].blockData[1], true);
taosArrayDestroy(pLoadInfo[i].aSttBlk);
}
taosMemoryFree(pLoadInfo);
return NULL;
}
static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
int32_t code = 0;
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockIndex[0] == pIter->iStt) {
return &pInfo->blockData[0];
}
if (pInfo->blockIndex[1] == pIter->iStt) {
return &pInfo->blockData[1];
}
pInfo->currentLoadBlockIndex ^= 1;
if (pIter->pSttBlk != NULL) { // current block not loaded yet
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iStt;
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
}
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
_exit:
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
}
return NULL;
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) {
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
int32_t code = 0;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) {
......@@ -55,34 +134,35 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->backward = backward;
(*pIter)->verRange = *pRange;
(*pIter)->timeWindow = *pTimeWindow;
(*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if ((*pIter)->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataCreate(&(*pIter)->bData[0]);
if (code) {
goto _exit;
}
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
code = tBlockDataCreate(&(*pIter)->bData[1]);
if (code) {
goto _exit;
if (pBlockLoadInfo->aSttBlk == NULL) {
// loaded into the common shared objects
pBlockLoadInfo->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pBlockLoadInfo->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
goto _exit;
}
}
code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk);
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
goto _exit;
}
size_t size = taosArrayGetSize((*pIter)->aSttBlk);
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block
int32_t index = -1;
if (!backward) { // asc
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
if (p->suid != suid) {
continue;
}
......@@ -94,7 +174,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
}
} else { // desc
for (int32_t i = size - 1; i >= 0; --i) {
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
if (p->suid != suid) {
continue;
}
......@@ -108,7 +188,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->iSttBlk = index;
if (index != -1) {
(*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk);
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
}
_exit:
......@@ -116,9 +196,6 @@ _exit:
}
void tLDataIterClose(SLDataIter *pIter) {
tBlockDataDestroy(&pIter->bData[0], 1);
tBlockDataDestroy(&pIter->bData[1], 1);
taosArrayDestroy(pIter->aSttBlk);
taosMemoryFree(pIter);
}
......@@ -127,9 +204,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter->iSttBlk += step;
int32_t index = -1;
size_t size = taosArrayGetSize(pIter->aSttBlk);
size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
SSttBlk *p = taosArrayGet(pIter->aSttBlk, i);
SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
if ((!pIter->backward) && p->minUid > pIter->uid) {
break;
}
......@@ -169,7 +246,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
if (index == -1) {
pIter->pSttBlk = NULL;
} else {
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
}
}
......@@ -178,7 +255,8 @@ static void findNextValidRow(SLDataIter *pIter) {
bool hasVal = false;
int32_t i = pIter->iRow;
SBlockData *pBlockData = getCurrentBlock(pIter);
SBlockData *pBlockData = loadBlockIfMissing(pIter);
for (; i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid != NULL) {
......@@ -238,18 +316,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
return false;
}
int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = getCurrentBlock(pIter);
if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet
pBlockData = getNextBlock(pIter);
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1;
}
int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = loadBlockIfMissing(pIter);
pIter->iRow += step;
......@@ -266,12 +334,7 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
if (iBlockL != pIter->iSttBlk) {
pBlockData = getNextBlock(pIter);
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
if (code) {
goto _exit;
}
pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0;
pBlockData = loadBlockIfMissing(pIter);
}
}
......@@ -313,7 +376,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
}
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) {
pMTree->backward = backward;
pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
......@@ -322,21 +385,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
}
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = TSDB_CODE_SUCCESS;
SSttBlockLoadInfo* pLoadInfo = NULL;
if (pBlockLoadInfo == NULL) {
if (pMTree->pLoadInfo == NULL) {
pMTree->destroyLoadInfo = true;
pMTree->pLoadInfo = tCreateLastBlockLoadInfo();
}
pLoadInfo = pMTree->pLoadInfo;
} else {
pLoadInfo = pBlockLoadInfo;
}
struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0};
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange);
struct SLDataIter* pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(pIterList[i]);
bool hasVal = tLDataIterNextRow(pIter);
if (hasVal) {
taosArrayPush(pMTree->pIterList, &pIterList[i]);
tMergeTreeAddIter(pMTree, pIterList[i]);
taosArrayPush(pMTree->pIterList, &pIter);
tMergeTreeAddIter(pMTree, pIter);
} else {
tLDataIterClose(pIterList[i]);
tLDataIterClose(pIter);
}
}
......@@ -393,4 +468,9 @@ void tMergeTreeClose(SMergeTree *pMTree) {
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
pMTree->pIter = NULL;
if (pMTree->destroyLoadInfo) {
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
pMTree->destroyLoadInfo = false;
}
}
......@@ -17,8 +17,6 @@
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define INITIAL_ROW_INDEX_VAL (-1)
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
......@@ -88,6 +86,7 @@ typedef struct SLastBlockReader {
int32_t order;
uint64_t uid;
SMergeTree mergeTree;
SSttBlockLoadInfo* pInfo;
} SLastBlockReader;
typedef struct SFilesetIter {
......@@ -226,13 +225,12 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return NULL;
}
int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1;
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
info.lastKey = pTsdbReader->window.skey - step;
info.lastKey = pTsdbReader->window.skey - 1;
} else {
info.lastKey = pTsdbReader->window.ekey - step;
info.lastKey = pTsdbReader->window.ekey + 1;
}
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
......@@ -319,8 +317,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
}
// init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
STsdbReader* pReader /*int32_t order, const char* idstr*/) {
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
size_t numOfFileset = taosArrayGetSize(aDFileSet);
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
......@@ -345,6 +342,12 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
pLReader->pInfo = tCreateLastBlockLoadInfo();
if (pLReader->pInfo == NULL) {
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
return terrno;
}
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -360,6 +363,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
resetLastBlockLoadInfo(pIter->pLastBlockReader->pInfo);
// check file the time range of coverage
STimeWindow win = {0};
......@@ -1377,7 +1381,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
// SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
......@@ -1866,36 +1869,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
}
}
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) {
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table.
if (pLastBlockReader->uid == pBlockScanInfo->uid) {
if (pLBlockReader->uid == pScanInfo->uid) {
return true;
}
if (pLastBlockReader->uid != 0) {
tMergeTreeClose(&pLastBlockReader->mergeTree);
if (pLBlockReader->uid != 0) {
tMergeTreeClose(&pLBlockReader->mergeTree);
}
initMemDataIterator(pBlockScanInfo, pReader);
pLastBlockReader->uid = pBlockScanInfo->uid;
initMemDataIterator(pScanInfo, pReader);
pLBlockReader->uid = pScanInfo->uid;
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
STimeWindow w = pLastBlockReader->window;
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
w.skey = pBlockScanInfo->lastKey + step;
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1;
STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKey + step;
} else {
w.ekey = pBlockScanInfo->lastKey + step;
w.ekey = pScanInfo->lastKey + step;
}
int32_t code =
tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange);
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
return nextRowFromLastBlocks(pLastBlockReader, pBlockScanInfo);
return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
}
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
......@@ -3305,6 +3307,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) {
tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree);
pFilesetIter->pLastBlockReader->pInfo = destroyLastBlockLoadInfo(pFilesetIter->pLastBlockReader->pInfo);
taosMemoryFree(pFilesetIter->pLastBlockReader);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册