提交 0bef64a3 编写于 作者: A Alex Duan

offset skip block is ok except think about mem delay data

上级 347c9cee
...@@ -172,6 +172,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details ...@@ -172,6 +172,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
STimeWindow twindow; STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
bool loadExternalRows; // load external rows or not bool loadExternalRows; // load external rows or not
...@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/** /**
* get the statistics of repo usage * get the statistics of repo usage
* @param repo. point to the tsdbrepo * @param repo. point to the tsdbrepo
......
...@@ -228,6 +228,7 @@ typedef struct SQueryAttr { ...@@ -228,6 +228,7 @@ typedef struct SQueryAttr {
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock bool multigroupResult; // multigroup result can exist in one SSDataBlock
bool needSort; // need sort rowRes bool needSort; // need sort rowRes
bool skipOffset; // can skip offset if true
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
......
...@@ -4922,6 +4922,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI ...@@ -4922,6 +4922,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
} }
} }
STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
STsdbQueryCond cond = { STsdbQueryCond cond = {
.colList = pQueryAttr->tableCols, .colList = pQueryAttr->tableCols,
...@@ -4931,6 +4932,12 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { ...@@ -4931,6 +4932,12 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
.loadExternalRows = false, .loadExternalRows = false,
}; };
// set offset with
if(pQueryAttr->skipOffset) {
cond.offset = pQueryAttr->limit.offset;
}
TIME_WINDOW_COPY(cond.twindow, *win); TIME_WINDOW_COPY(cond.twindow, *win);
return cond; return cond;
} }
...@@ -5851,19 +5858,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5851,19 +5858,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
return NULL; return NULL;
} }
bool move = false;
int32_t skip = 0;
int32_t remain = 0;
int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle);
if (pRuntimeEnv->currentOffset == 0) { if (pRuntimeEnv->currentOffset == 0) {
break; break;
}
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
move = true;
skip = (int32_t)(pRuntimeEnv->currentOffset - srows);
remain = (int32_t)(pBlock->info.rows - skip);
}
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows; pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else { } else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); move = true;
skip = (int32_t)pRuntimeEnv->currentOffset;
remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
}
// need move
if(move) {
pBlock->info.rows = remain; pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes);
} }
pRuntimeEnv->currentOffset = 0; pRuntimeEnv->currentOffset = 0;
...@@ -8470,6 +8496,18 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -8470,6 +8496,18 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
goto _cleanup; goto _cleanup;
} }
// calc skipOffset
if(pQueryMsg->offset > 0 && TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_PROJECTION_QUERY)) {
pQueryAttr->skipOffset = true;
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
if (pQueryAttr->tableCols[i].flist.numOfFilters > 0
&& pQueryAttr->tableCols[i].colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pQueryAttr->skipOffset = false;
break;
}
}
}
if (pSecExprs != NULL) { if (pSecExprs != NULL) {
int32_t resultRowSize = 0; int32_t resultRowSize = 0;
......
...@@ -37,6 +37,9 @@ ...@@ -37,6 +37,9 @@
.tid = (_checkInfo)->tableId.tid, \ .tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid}) .uid = (_checkInfo)->tableId.uid})
// limit offset start optimization for rows read over this value
#define OFFSET_SKIP_THRESHOLD 5000
enum { enum {
TSDB_QUERY_TYPE_ALL = 1, TSDB_QUERY_TYPE_ALL = 1,
TSDB_QUERY_TYPE_LAST = 2, TSDB_QUERY_TYPE_LAST = 2,
...@@ -115,6 +118,9 @@ typedef struct STsdbQueryHandle { ...@@ -115,6 +118,9 @@ typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb; STsdbRepo* pTsdb;
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order; int16_t order;
int64_t offset; // limit offset
int64_t srows; // skip offset rows
int64_t frows; // forbid offset rows
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -409,6 +415,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC ...@@ -409,6 +415,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN; pQueryHandle->cur.fid = INT32_MIN;
...@@ -525,6 +534,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { ...@@ -525,6 +534,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
...@@ -1033,6 +1045,51 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s ...@@ -1033,6 +1045,51 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot; return midSlot;
} }
// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks, int64_t s, int64_t e) {
int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks;
// asc
if(ASCENDING_TRAVERSE(q->order)) {
for(int32_t i = 0; i < numBlocks; i++) {
SBlock* pBlock = &blocks[i];
if(i == 0 && pBlock->keyFirst != s) {
//first block only keyFirst == s can skip
q->frows += pBlock->numOfRows;
continue;
} else {
// skip to read
if(q->srows + q->frows + pBlock->numOfRows <= q->offset) {
q->srows += pBlock->numOfRows;
num ++;
} else {
break;
}
}
}
} else { // des
for(int32_t i = numBlocks - 1; i >= 0; i--) {
SBlock* pBlock = &blocks[i];
if(i == numBlocks - 1 && pBlock->keyLast != e) {
//first block only keyFirst == s can skipƒ
q->frows += pBlock->numOfRows;
continue;
} else {
// skip to read
if(q->srows + q->frows + pBlock->numOfRows <= q->offset) {
q->srows += pBlock->numOfRows;
num ++;
} else {
break;
}
}
}
}
return num;
}
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int32_t* numOfBlocks) { static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int32_t* numOfBlocks) {
int32_t code = 0; int32_t code = 0;
...@@ -1083,21 +1140,38 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int ...@@ -1083,21 +1140,38 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int
// discard the unqualified data block based on the query time window // discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) { if (s > pCompInfo->blocks[start].keyLast) {
return 0; return 0;
} }
int32_t end = start;
// todo speedup the procedure of located end block // todo speedup the procedure of located end block
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1; end += 1;
} }
pCheckInfo->numOfBlocks = (end - start); // calc offset can skip blocks number
int32_t nSkip = 0;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e);
}
if (start > 0) { pCheckInfo->numOfBlocks = (end - start) - nSkip;
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); if(pCheckInfo->numOfBlocks > 0) {
if(nSkip > 0) {
if(ASCENDING_TRAVERSE(pQueryHandle->order)) {
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], 1 * sizeof(SBlock));
if(pCheckInfo->numOfBlocks > 1)
memmove(pCompInfo->blocks + 1, &pCompInfo->blocks[start + nSkip + 1], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock));
} else {
if(start > 0 && pCheckInfo->numOfBlocks > 1)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock));
memmove(pCompInfo->blocks + (pCheckInfo->numOfBlocks - 1), &pCompInfo->blocks[end - 1], 1 * sizeof(SBlock));
}
} else if(start > 0) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
}
} }
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
...@@ -4208,3 +4282,12 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re ...@@ -4208,3 +4282,12 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param); applyFilterToSkipListNode(pSkipList, pExpr, result, param);
} }
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle) {
return pQueryHandle->srows;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册