提交 5663550d 编写于 作者: H Haojun Liao

enh(query): opt query perf by allocate the SLDataIter when opening reader.

上级 7e943260
......@@ -162,7 +162,6 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
// tsdb
// typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader;
#define TSDB_DEFAULT_STT_FILE 8
......
......@@ -705,7 +705,6 @@ typedef struct SSttBlockLoadInfo {
typedef struct SMergeTree {
int8_t backward;
SRBTree rbt;
SArray *pIterList;
SLDataIter *pIter;
bool destroyLoadInfo;
SSttBlockLoadInfo *pLoadInfo;
......@@ -751,9 +750,25 @@ struct SDiskDataBuilder {
SBlkInfo bi;
};
typedef struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
SDataFReader *pReader;
int32_t iStt;
int8_t backward;
int32_t iSttBlk;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs;
} SLDataIter;
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
......@@ -782,6 +797,7 @@ typedef struct SCacheRowsReader {
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo;
SLDataIter *pDataIter;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
SDataFReader *pDataFReaderLast;
......
......@@ -598,6 +598,7 @@ typedef struct {
SMergeTree mergeTree;
SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
SLDataIter* pDataIter;
int64_t lastTs;
} SFSLastNextRowIter;
......@@ -645,7 +646,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
}
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter);
state->pMergeTree = &state->mergeTree;
state->state = SFSLASTNEXTROW_BLOCKROW;
}
......@@ -1211,7 +1212,7 @@ typedef struct {
} CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SDataFReader **pDataFReaderLast, int64_t lastTs) {
int code = 0;
......@@ -1274,6 +1275,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsLastState.lastTs = lastTs;
pIter->fsLastState.pDataIter = pLDataIter;
pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb;
......@@ -1465,7 +1467,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
......@@ -1622,7 +1624,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
......
......@@ -187,13 +187,21 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
}
int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
int32_t numOfStt = pCfg->sttTrigger;
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
if (p->pLoadInfo == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->pDataIter = taosMemoryCalloc(pCfg->sttTrigger, sizeof(SLDataIter));
if (p->pDataIter == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->idstr = taosStrdup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL);
......@@ -215,6 +223,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pSchema);
}
taosMemoryFreeClear(p->pDataIter);
taosMemoryFree(p->pCurrSchema);
destroyLastBlockLoadInfo(p->pLoadInfo);
......
......@@ -16,22 +16,6 @@
#include "tsdb.h"
// SLDataIter =================================================
struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
SDataFReader *pReader;
int32_t iStt;
int8_t backward;
int32_t iSttBlk;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs;
};
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
int32_t numOfSttTrigger) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
......@@ -268,25 +252,19 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
}
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
int32_t code = TSDB_CODE_SUCCESS;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(*pIter)->uid = uid;
(*pIter)->pReader = pReader;
(*pIter)->iStt = iStt;
(*pIter)->backward = backward;
(*pIter)->verRange = *pRange;
(*pIter)->timeWindow = *pTimeWindow;
pIter->uid = uid;
pIter->pReader = pReader;
pIter->iStt = iStt;
pIter->backward = backward;
pIter->verRange = *pRange;
pIter->timeWindow = *pTimeWindow;
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
pIter->pBlockLoadInfo = pBlockLoadInfo;
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
......@@ -294,7 +272,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
goto _exit;
return code;
}
// only apply to the child tables, ordinary tables will not incur this filter procedure.
......@@ -310,7 +288,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
(*pIter)->iSttBlk = -1;
pIter->iSttBlk = -1;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
return code;
......@@ -343,31 +321,27 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block
(*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
if ((*pIter)->iSttBlk != -1) {
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
if (pIter->iSttBlk != -1) {
pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
(!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
(*pIter)->pSttBlk = NULL;
if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
(!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
pIter->pSttBlk = NULL;
}
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
(*pIter)->pSttBlk = NULL;
(*pIter)->ignoreEarlierTs = true;
if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
(!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
pIter->pSttBlk = NULL;
pIter->ignoreEarlierTs = true;
}
}
return code;
_exit:
taosMemoryFree(*pIter);
return code;
}
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */}
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1;
......@@ -594,43 +568,38 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
if (pMTree->pIterList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMTree->idStr = idStr;
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
} else { // desc
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
int32_t code = TSDB_CODE_SUCCESS;
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
pMTree->ignoreEarlierTs = false;
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter *pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
if (hasVal) {
taosArrayPush(pMTree->pIterList, &pIter);
tMergeTreeAddIter(pMTree, pIter);
tMergeTreeAddIter(pMTree, &pLDataIter[i]);
} else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
}
tLDataIterClose(pIter);
}
}
......@@ -681,15 +650,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
void tMergeTreeClose(SMergeTree *pMTree) {
size_t size = taosArrayGetSize(pMTree->pIterList);
for (int32_t i = 0; i < size; ++i) {
SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
tLDataIterClose(pIter);
}
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
pMTree->pIter = NULL;
if (pMTree->destroyLoadInfo) {
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
pMTree->destroyLoadInfo = false;
......
......@@ -156,6 +156,7 @@ typedef struct SReaderStatus {
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
SLDataIter* pLDataIter;
} SReaderStatus;
typedef struct SBlockInfoBuf {
......@@ -185,7 +186,6 @@ struct STsdbReader {
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema; // the newest version schema
// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
......@@ -741,6 +741,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
pReader->status.pLDataIter = taosMemoryCalloc(pVnode->config.sttTrigger, sizeof(SLDataIter));
if (pReader->pResBlock == NULL) {
pReader->freeBlock = true;
......@@ -2547,7 +2548,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
pLBlockReader->pInfo, false, pReader->idStr, false);
pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册