提交 5b354c18 编写于 作者: A Alex Duan

[TS-445]<fix>(query):Limit offset optimition of performance

上级 e1e9b623
......@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
......@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
......@@ -4902,6 +4902,11 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
.loadExternalRows = false,
// set offset with
if(pQueryAttr->skipOffset) {
cond.offset = pQueryAttr->limit.offset;
TIME_WINDOW_COPY(cond.twindow, *win);
return cond;
......@@ -5855,19 +5860,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
return NULL;
bool move = false;
int32_t skip = 0;
int32_t remain = 0;
int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle);
if (pRuntimeEnv->currentOffset == 0) {
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
move = true;
skip = (int32_t)(pRuntimeEnv->currentOffset - srows);
remain = (int32_t)(pBlock->info.rows - skip);
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
move = true;
skip = (int32_t)pRuntimeEnv->currentOffset;
remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
// need move
if(move) {
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes);
memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes);
pRuntimeEnv->currentOffset = 0;
......@@ -39,6 +39,9 @@
.tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid})
// limit offset start optimization for rows read over this value
enum {
......@@ -117,6 +120,9 @@ typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb;
SQueryFilePos cur; // current position
int16_t order;
int64_t offset; // limit offset
int64_t srows; // skip offset rows
int64_t frows; // forbid skip offset rows
STimeWindow window; // the primary query time window that applies to all queries
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks;
......@@ -155,6 +161,11 @@ typedef struct STableGroupSupporter {
STSchema* pTagSchema;
} STableGroupSupporter;
typedef struct SRange {
int32_t from;
int32_t to;
} SRange;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
......@@ -413,6 +424,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN;
......@@ -529,6 +543,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1;
......@@ -1073,63 +1090,302 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot;
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int32_t* numOfBlocks) {
int32_t code = 0;
// array :1 2 3 5 7 -2 (8 9) skip 4 and 6
int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
// pArray is NULL or size is zero , no need block to move
if(pArray == NULL)
return 0;
size_t count = taosArrayGetSize(pArray);
if(count == 0)
return 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, index);
pCheckInfo->numOfBlocks = 0;
// memmove
int32_t num = 0;
SRange* ranges = (SRange*)TARRAY_GET_START(pArray);
for(size_t i = 0; i < count; i++) {
int32_t step = ranges[i].to - ranges[i].from + 1;
memmove(blocks + num, blocks + ranges[i].from, sizeof(SBlock) * step);
num += step;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
return num;
// if block data in memory return false else true
bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) {
if(q->pMemRef == NULL) {
return false;
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// mem
if(q->pMemRef->snapshot.mem) {
SMemTable* mem = q->pMemRef->snapshot.mem;
if(timeIntersect(mem->keyFirst, mem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
// imem
if(q->pMemRef->snapshot.imem) {
SMemTable* imem = q->pMemRef->snapshot.imem;
if(timeIntersect(imem->keyFirst, imem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
// no data block in this file, try next file
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
return true;
#define MAYBE_IN_MEMORY_ROWS 4000 // approximately the capacity of one block
// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, int64_t skey, int64_t ekey,
int32_t sblock, int32_t eblock, SArray** ppArray, bool order) {
int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks;
SArray* pArray = NULL;
SRange range;
range.from = -1;
// ASC
if(order) {
for(int32_t i = sblock; i < eblock; i++) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == sblock && skey > pBlock->keyFirst) {
q->frows += pBlock->numOfRows; // some rows time < s
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
range.to = eblock - 1;
taosArrayPush(pArray, &range);
range.from = -1;
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
range.to = i;
// end append
if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
// ASC return
*ppArray = pArray;
return num;
// DES
for(int32_t i = eblock - 1; i >= sblock; i--) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == eblock - 1 && ekey < pBlock->keyLast) {
q->frows += pBlock->numOfRows; // some rows time > e
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to - 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
range.to = 0;
taosArrayPush(pArray, &range);
range.from = -1;
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
range.to = i;
assert(compIndex->len > 0);
// end append
if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
if(pArray == NULL)
return num;
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo),
(uint32_t*)(&pCheckInfo->compSize)) < 0) {
return terrno;
// reverse array
size_t count = taosArrayGetSize(pArray);
SRange* ranges = TARRAY_GET_START(pArray);
SArray* pArray1 = taosArrayInit(count, sizeof(SRange));
size_t i = count - 1;
while(i >= 0) {
range.from = ranges[i].to;
range.to = ranges[i].from;
taosArrayPush(pArray1, &range);
if(i == 0)
i --;
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
*ppArray = pArray1;
return num;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// shrink blocks by condition of query
static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo *pCheckInfo) {
SBlockInfo *pCompInfo = pCheckInfo->pCompInfo;
SBlockIdx *compIndex = pQueryHandle->rhelper.pBlkIdx;
bool order = ASCENDING_TRAVERSE(pQueryHandle->order);
if (order) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) {
return 0;
return ;
// todo speedup the procedure of located end block
int32_t end = start;
// locate e index of blocks -> end
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
pCheckInfo->numOfBlocks = (end - start);
// calc offset can skip blocks number
int32_t nSkip = 0;
SArray *pArray = NULL;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, s, e, start, end, &pArray, order);
if(nSkip > 0) { // have offset and can skip
pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray);
} else { // no offset
pCheckInfo->numOfBlocks = end - start;
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
// load one table (tsd_index point to) need load blocks info and put into pCheckInfo->pCompInfo->blocks
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) {
// ONE PART. Load all blocks info from one table of tsd_index
int32_t code = 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// no data block in this file, try next file
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
if (pCheckInfo->compSize < (int32_t)compIndex->len) {
assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
if (t == NULL) {
return code;
if (start > 0) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
pCheckInfo->pCompInfo = (SBlockInfo*)t;
pCheckInfo->compSize = compIndex->len;
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo),
(uint32_t*)(&pCheckInfo->compSize)) < 0) {
return terrno;
// TWO PART. shrink no need blocks from all blocks by condition of query
shrinkBlocksByQuery(pQueryHandle, pCheckInfo);
(*numOfBlocks) += pCheckInfo->numOfBlocks;
return 0;
......@@ -58,6 +58,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar
memcpy(target, context.digest, TSDB_KEY_LEN);
// TSKEY util
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2);
#ifdef __cplusplus
......@@ -549,3 +549,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) {
memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem
return dv;
// TSKEY util
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2) {
// s1,e1 and s2,e2 have 7 scenarios, 5 is intersection, 2 is no intersection, so we pick up 2.
if(e2 < s1 || s2 > e1)
return false;
return true;
\ No newline at end of file
......@@ -286,6 +286,7 @@ python3 ./test.py -f query/queryCnameDisplay.py
python3 test.py -f query/nestedQuery/queryWithSpread.py
python3 ./test.py -f query/bug6586.py
# python3 ./test.py -f query/bug5903.py
python3 ./test.py -f query/queryLimit.py
python3 ./test.py -f stream/metric_1.py
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
# -*- coding: utf-8 -*-
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
# --------------- main frame -------------------
def caseDescription(self):
limit and offset keyword function test cases;
case1: limit offset base function test
case2: limit offset advance test
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data("t1", self.ts, 300*10000, 30000);
# test base case
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdLog.success("%s successfully executed" % __file__)
# --------------- case -------------------
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table t2 using st tags(2)");
tdSql.execute("create table t3 using st tags(3)");
# insert data1
def insert_data(self, tbname, ts_start, count, batch_num):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, i)
if i >0 and i%batch_num == 0:
sql = pre_insert
# end sql
if sql != pre_insert:
tdLog.debug("INSERT TABLE DATA ............ [OK]")
# test case1 base
def test_case1(self):
# limit base function
# base no where
sql = "select * from t1 limit 10"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 0)
tdSql.checkData(9, 1, 9)
sql = "select * from t1 order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999999)
tdSql.checkData(9, 1, 2999990)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:01' and ts<'2017-07-14 10:40:06' limit 10"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 1)
tdSql.checkData(4, 1, 5)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 2999996)
tdSql.checkData(4, 1, 2999992)
# offset base function
# no where
sql = "select * from t1 limit 10 offset 5"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 5)
tdSql.checkData(9, 1, 14)
sql = "select * from t1 order by ts desc limit 10 offset 5" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999994)
tdSql.checkData(9, 1, 2999985)
# have where only ts
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:20' limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 15)
tdSql.checkData(4, 1, 19)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10 offset 4" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 2999992)
# have where with other column condition
sql = "select * from t1 where i1>=1 and i1<11 limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 6)
tdSql.checkData(4, 1, 10)
sql = "select * from t1 where i1>=300000 and i1<=500000 order by ts desc limit 10 offset 100000" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 400000)
tdSql.checkData(9, 1, 399991)
# have where with ts and other column condition
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:50' and i1>=20 and i1<=25 limit 10 offset 5"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 25)
# test advance
def test_case2(self):
# OFFSET merge file data with memory data
# offset
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000)
# each insert one row into NO.0 NO.2 NO.7 blocks
sql = "insert into t1 values (%d, 0) (%d, 2) (%d, 7)"%(self.ts+1, self.ts + 2*3300*1000+1, self.ts + 7*3300*1000+1)
# query result
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3 + 10 + 1)
# have where desc
sql = "select * from t1 where ts<'2017-07-14 20:40:00' order by ts desc limit 15 offset 36000"
tdSql.waitedQuery(sql, 3, WAITS)
tdSql.checkData(0, 1, 1)
# add case with filename
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册