From 0bef64a34c14bc57ca01edb0c156a5d49a934a42 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 28 Oct 2021 10:04:56 +0800 Subject: [PATCH] offset skip block is ok except think about mem delay data --- src/inc/tsdb.h | 4 ++ src/query/inc/qExecutor.h | 1 + src/query/src/qExecutor.c | 44 ++++++++++++++++-- src/tsdb/src/tsdbRead.c | 93 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 134 insertions(+), 8 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 7abe3e99c7..6b68b467a3 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -172,6 +172,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details typedef struct STsdbQueryCond { STimeWindow twindow; int32_t order; // desc|asc order to iterate the data block + int64_t offset; // skip offset put down to tsdb int32_t numOfCols; SColumnInfo *colList; bool loadExternalRows; // load external rows or not @@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); +// obtain queryHandle attribute +int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle); + /** * get the statistics of repo usage * @param repo. point to the tsdbrepo diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index acd70b6688..1f3b6e8465 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -228,6 +228,7 @@ typedef struct SQueryAttr { bool createFilterOperator; // if filter operator is needed bool multigroupResult; // multigroup result can exist in one SSDataBlock bool needSort; // need sort rowRes + bool skipOffset; // can skip offset if true int32_t interBufSize; // intermediate buffer sizse int32_t havingNum; // having expr number diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1d90fcf1ac..101cc3197d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4922,6 +4922,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI } } + STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { STsdbQueryCond cond = { .colList = pQueryAttr->tableCols, @@ -4931,6 +4932,12 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { .loadExternalRows = false, }; + // set offset with + if(pQueryAttr->skipOffset) { + cond.offset = pQueryAttr->limit.offset; + } + + TIME_WINDOW_COPY(cond.twindow, *win); return cond; } @@ -5851,19 +5858,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { return NULL; } + bool move = false; + int32_t skip = 0; + int32_t remain = 0; + int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle); + if (pRuntimeEnv->currentOffset == 0) { break; + } + else if(srows > 0) { + if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) { + pRuntimeEnv->currentOffset -= pBlock->info.rows; + } else { + move = true; + skip = (int32_t)(pRuntimeEnv->currentOffset - srows); + remain = (int32_t)(pBlock->info.rows - skip); + } } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { pRuntimeEnv->currentOffset -= pBlock->info.rows; } else { - int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); + move = true; + skip = (int32_t)pRuntimeEnv->currentOffset; + remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); + } + + // need move + if(move) { pBlock->info.rows = remain; - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); + memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes); } pRuntimeEnv->currentOffset = 0; @@ -8470,6 +8496,18 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S goto _cleanup; } + // calc skipOffset + if(pQueryMsg->offset > 0 && TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + pQueryAttr->skipOffset = true; + for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { + if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 + && pQueryAttr->tableCols[i].colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + pQueryAttr->skipOffset = false; + break; + } + } + } + if (pSecExprs != NULL) { int32_t resultRowSize = 0; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 705ede0cf1..e6cf07ce7f 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -37,6 +37,9 @@ .tid = (_checkInfo)->tableId.tid, \ .uid = (_checkInfo)->tableId.uid}) +// limit offset start optimization for rows read over this value +#define OFFSET_SKIP_THRESHOLD 5000 + enum { TSDB_QUERY_TYPE_ALL = 1, TSDB_QUERY_TYPE_LAST = 2, @@ -115,6 +118,9 @@ typedef struct STsdbQueryHandle { STsdbRepo* pTsdb; SQueryFilePos cur; // current position int16_t order; + int64_t offset; // limit offset + int64_t srows; // skip offset rows + int64_t frows; // forbid offset rows STimeWindow window; // the primary query time window that applies to all queries SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time int32_t numOfBlocks; @@ -409,6 +415,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC } pQueryHandle->order = pCond->order; + pQueryHandle->offset = pCond->offset; + pQueryHandle->srows = 0; + pQueryHandle->frows = 0; pQueryHandle->pTsdb = tsdb; pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->cur.fid = INT32_MIN; @@ -525,6 +534,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { } pQueryHandle->order = pCond->order; + pQueryHandle->offset = pCond->offset; + pQueryHandle->srows = 0; + pQueryHandle->frows = 0; pQueryHandle->window = pCond->twindow; pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->cur.fid = -1; @@ -1033,6 +1045,51 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s return midSlot; } +// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset +static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks, int64_t s, int64_t e) { + int32_t num = 0; + SBlock* blocks = pBlockInfo->blocks; + + // asc + if(ASCENDING_TRAVERSE(q->order)) { + for(int32_t i = 0; i < numBlocks; i++) { + SBlock* pBlock = &blocks[i]; + if(i == 0 && pBlock->keyFirst != s) { + //first block only keyFirst == s can skip + q->frows += pBlock->numOfRows; + continue; + } else { + // skip to read + if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { + q->srows += pBlock->numOfRows; + num ++; + } else { + break; + } + } + } + } else { // des + for(int32_t i = numBlocks - 1; i >= 0; i--) { + SBlock* pBlock = &blocks[i]; + if(i == numBlocks - 1 && pBlock->keyLast != e) { + //first block only keyFirst == s can skipÆ’ + q->frows += pBlock->numOfRows; + continue; + } else { + // skip to read + if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { + q->srows += pBlock->numOfRows; + num ++; + } else { + break; + } + } + } + } + + return num; +} + static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int32_t* numOfBlocks) { int32_t code = 0; @@ -1083,21 +1140,38 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int // discard the unqualified data block based on the query time window int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); - int32_t end = start; - if (s > pCompInfo->blocks[start].keyLast) { return 0; } + int32_t end = start; // todo speedup the procedure of located end block while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { end += 1; } - pCheckInfo->numOfBlocks = (end - start); + // calc offset can skip blocks number + int32_t nSkip = 0; + if(pQueryHandle->offset > 0) { + nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e); + } - if (start > 0) { - memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); + pCheckInfo->numOfBlocks = (end - start) - nSkip; + if(pCheckInfo->numOfBlocks > 0) { + if(nSkip > 0) { + if(ASCENDING_TRAVERSE(pQueryHandle->order)) { + if(start > 0) + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], 1 * sizeof(SBlock)); + if(pCheckInfo->numOfBlocks > 1) + memmove(pCompInfo->blocks + 1, &pCompInfo->blocks[start + nSkip + 1], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock)); + } else { + if(start > 0 && pCheckInfo->numOfBlocks > 1) + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock)); + memmove(pCompInfo->blocks + (pCheckInfo->numOfBlocks - 1), &pCompInfo->blocks[end - 1], 1 * sizeof(SBlock)); + } + } else if(start > 0) { + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); + } } (*numOfBlocks) += pCheckInfo->numOfBlocks; @@ -4208,3 +4282,12 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes applyFilterToSkipListNode(pSkipList, pExpr, result, param); } + +// obtain queryHandle attribute +int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle; + if (pQueryHandle) { + return pQueryHandle->srows; + } + return 0; +} \ No newline at end of file -- GitLab