未验证 提交 3a3ceedf 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9554 from taosdata/fix/TS-445-V24

[TS-445]<fix>(query):Limit offset optimition of performance
...@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details ...@@ -173,6 +173,7 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
STimeWindow twindow; STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
bool loadExternalRows; // load external rows or not bool loadExternalRows; // load external rows or not
...@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -391,6 +392,9 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/** /**
* get the statistics of repo usage * get the statistics of repo usage
* @param repo. point to the tsdbrepo * @param repo. point to the tsdbrepo
......
...@@ -237,6 +237,7 @@ typedef struct SQueryAttr { ...@@ -237,6 +237,7 @@ typedef struct SQueryAttr {
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock bool multigroupResult; // multigroup result can exist in one SSDataBlock
bool needSort; // need sort rowRes bool needSort; // need sort rowRes
bool skipOffset; // can skip offset if true
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
......
...@@ -4905,6 +4905,11 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { ...@@ -4905,6 +4905,11 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
.loadExternalRows = false, .loadExternalRows = false,
}; };
// set offset with
if(pQueryAttr->skipOffset) {
cond.offset = pQueryAttr->limit.offset;
}
TIME_WINDOW_COPY(cond.twindow, *win); TIME_WINDOW_COPY(cond.twindow, *win);
return cond; return cond;
} }
...@@ -5858,19 +5863,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5858,19 +5863,38 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
return NULL; return NULL;
} }
bool move = false;
int32_t skip = 0;
int32_t remain = 0;
int64_t srows = tsdbSkipOffset(pRuntimeEnv->pQueryHandle);
if (pRuntimeEnv->currentOffset == 0) { if (pRuntimeEnv->currentOffset == 0) {
break; break;
}
else if(srows > 0) {
if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else {
move = true;
skip = (int32_t)(pRuntimeEnv->currentOffset - srows);
remain = (int32_t)(pBlock->info.rows - skip);
}
} else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) {
pRuntimeEnv->currentOffset -= pBlock->info.rows; pRuntimeEnv->currentOffset -= pBlock->info.rows;
} else { } else {
int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); move = true;
skip = (int32_t)pRuntimeEnv->currentOffset;
remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset);
}
// need move
if(move) {
pBlock->info.rows = remain; pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes; int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); memmove(pColInfoData->pData, pColInfoData->pData + skip * bytes, remain * bytes);
} }
pRuntimeEnv->currentOffset = 0; pRuntimeEnv->currentOffset = 0;
......
...@@ -39,6 +39,9 @@ ...@@ -39,6 +39,9 @@
.tid = (_checkInfo)->tableId.tid, \ .tid = (_checkInfo)->tableId.tid, \
.uid = (_checkInfo)->tableId.uid}) .uid = (_checkInfo)->tableId.uid})
// limit offset start optimization for rows read over this value
#define OFFSET_SKIP_THRESHOLD 5000
enum { enum {
TSDB_QUERY_TYPE_ALL = 1, TSDB_QUERY_TYPE_ALL = 1,
TSDB_QUERY_TYPE_LAST = 2, TSDB_QUERY_TYPE_LAST = 2,
...@@ -117,6 +120,9 @@ typedef struct STsdbQueryHandle { ...@@ -117,6 +120,9 @@ typedef struct STsdbQueryHandle {
STsdbRepo* pTsdb; STsdbRepo* pTsdb;
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order; int16_t order;
int64_t offset; // limit offset
int64_t srows; // skip offset rows
int64_t frows; // forbid skip offset rows
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -155,6 +161,11 @@ typedef struct STableGroupSupporter { ...@@ -155,6 +161,11 @@ typedef struct STableGroupSupporter {
STSchema* pTagSchema; STSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
typedef struct SRange {
int32_t from;
int32_t to;
} SRange;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
...@@ -413,6 +424,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC ...@@ -413,6 +424,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN; pQueryHandle->cur.fid = INT32_MIN;
...@@ -529,6 +543,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) { ...@@ -529,6 +543,9 @@ void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
} }
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->offset = pCond->offset;
pQueryHandle->srows = 0;
pQueryHandle->frows = 0;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
...@@ -1073,63 +1090,302 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s ...@@ -1073,63 +1090,302 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot; return midSlot;
} }
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t index, int32_t* numOfBlocks) { // array :1 2 3 5 7 -2 (8 9) skip 4 and 6
int32_t code = 0; int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
// pArray is NULL or size is zero , no need block to move
if(pArray == NULL)
return 0;
size_t count = taosArrayGetSize(pArray);
if(count == 0)
return 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, index); // memmove
pCheckInfo->numOfBlocks = 0; int32_t num = 0;
SRange* ranges = (SRange*)TARRAY_GET_START(pArray);
for(size_t i = 0; i < count; i++) {
int32_t step = ranges[i].to - ranges[i].from + 1;
memmove(blocks + num, blocks + ranges[i].from, sizeof(SBlock) * step);
num += step;
}
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) { return num;
code = terrno; }
return code;
// if block data in memory return false else true
bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) {
if(q->pMemRef == NULL) {
return false;
} }
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx; // mem
if(q->pMemRef->snapshot.mem) {
SMemTable* mem = q->pMemRef->snapshot.mem;
if(timeIntersect(mem->keyFirst, mem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
}
// imem
if(q->pMemRef->snapshot.imem) {
SMemTable* imem = q->pMemRef->snapshot.imem;
if(timeIntersect(imem->keyFirst, imem->keyLast, pBlock->keyFirst, pBlock->keyLast))
return false;
}
// no data block in this file, try next file return true;
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) { }
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
#define MAYBE_IN_MEMORY_ROWS 4000 // approximately the capacity of one block
// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, int64_t skey, int64_t ekey,
int32_t sblock, int32_t eblock, SArray** ppArray, bool order) {
int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks;
SArray* pArray = NULL;
SRange range;
range.from = -1;
//
// ASC
//
if(order) {
for(int32_t i = sblock; i < eblock; i++) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == sblock && skey > pBlock->keyFirst) {
q->frows += pBlock->numOfRows; // some rows time < s
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = eblock - 1;
taosArrayPush(pArray, &range);
range.from = -1;
break;
}
}
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = i;
}
}
// end append
if(range.from != -1) {
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
}
// ASC return
*ppArray = pArray;
return num;
} }
// DES
for(int32_t i = eblock - 1; i >= sblock; i--) {
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == eblock - 1 && ekey < pBlock->keyLast) {
q->frows += pBlock->numOfRows; // some rows time > e
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows + MAYBE_IN_MEMORY_ROWS < q->offset) { // approximately calculate
if(blockNoItemInMem(q, pBlock)) {
// can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else {
// the remainder be put to pArray
if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to - 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = 0;
taosArrayPush(pArray, &range);
range.from = -1;
break;
}
}
assert(compIndex->len > 0); if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(SRange));
if(range.from == -1) {
range.from = i;
} else {
if(range.to + 1 != i) {
// add the previous
taosArrayPush(pArray, &range);
range.from = i;
}
}
range.to = i;
}
}
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo), // end append
(uint32_t*)(&pCheckInfo->compSize)) < 0) { if(range.from != -1) {
return terrno; if(pArray == NULL)
pArray = taosArrayInit(1, sizeof(SRange));
taosArrayPush(pArray, &range);
} }
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; if(pArray == NULL)
return num;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; // reverse array
size_t count = taosArrayGetSize(pArray);
SRange* ranges = TARRAY_GET_START(pArray);
SArray* pArray1 = taosArrayInit(count, sizeof(SRange));
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { size_t i = count - 1;
while(i >= 0) {
range.from = ranges[i].to;
range.to = ranges[i].from;
taosArrayPush(pArray1, &range);
if(i == 0)
break;
i --;
}
*ppArray = pArray1;
taosArrayDestroy(&pArray);
return num;
}
// shrink blocks by condition of query
static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo *pCheckInfo) {
SBlockInfo *pCompInfo = pCheckInfo->pCompInfo;
SBlockIdx *compIndex = pQueryHandle->rhelper.pBlkIdx;
bool order = ASCENDING_TRAVERSE(pQueryHandle->order);
if (order) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey); assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else { } else {
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey); assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
} }
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window // discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) { if (s > pCompInfo->blocks[start].keyLast) {
return 0; return ;
} }
// todo speedup the procedure of located end block int32_t end = start;
// locate e index of blocks -> end
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1; end += 1;
} }
pCheckInfo->numOfBlocks = (end - start); // calc offset can skip blocks number
int32_t nSkip = 0;
SArray *pArray = NULL;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, s, e, start, end, &pArray, order);
}
if(nSkip > 0) { // have offset and can skip
pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray);
} else { // no offset
pCheckInfo->numOfBlocks = end - start;
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
}
if(pArray)
taosArrayDestroy(&pArray);
}
// load one table (tsd_index point to) need load blocks info and put into pCheckInfo->pCompInfo->blocks
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) {
//
// ONE PART. Load all blocks info from one table of tsd_index
//
int32_t code = 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
}
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// no data block in this file, try next file
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable
}
if (start > 0) { if (pCheckInfo->compSize < (int32_t)compIndex->len) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
return code;
}
pCheckInfo->pCompInfo = (SBlockInfo*)t;
pCheckInfo->compSize = compIndex->len;
}
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void**)(&pCheckInfo->pCompInfo),
(uint32_t*)(&pCheckInfo->compSize)) < 0) {
return terrno;
} }
//
// TWO PART. shrink no need blocks from all blocks by condition of query
//
shrinkBlocksByQuery(pQueryHandle, pCheckInfo);
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
return 0; return 0;
} }
...@@ -4312,4 +4568,11 @@ end: ...@@ -4312,4 +4568,11 @@ end:
return string; return string;
} }
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle) {
return pQueryHandle->srows;
}
return 0;
}
...@@ -58,6 +58,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar ...@@ -58,6 +58,13 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar
memcpy(target, context.digest, TSDB_KEY_LEN); memcpy(target, context.digest, TSDB_KEY_LEN);
} }
//
// TSKEY util
//
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -549,3 +549,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) { ...@@ -549,3 +549,16 @@ FORCE_INLINE double taos_align_get_double(const char* pBuf) {
memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem
return dv; return dv;
} }
//
// TSKEY util
//
// if time area(s1,e1) intersect with time area(s2,e2) then return true else return false
bool timeIntersect(TSKEY s1, TSKEY e1, TSKEY s2, TSKEY e2) {
// s1,e1 and s2,e2 have 7 scenarios, 5 is intersection, 2 is no intersection, so we pick up 2.
if(e2 < s1 || s2 > e1)
return false;
else
return true;
}
\ No newline at end of file
...@@ -286,6 +286,7 @@ python3 ./test.py -f query/queryCnameDisplay.py ...@@ -286,6 +286,7 @@ python3 ./test.py -f query/queryCnameDisplay.py
python3 test.py -f query/nestedQuery/queryWithSpread.py python3 test.py -f query/nestedQuery/queryWithSpread.py
python3 ./test.py -f query/bug6586.py python3 ./test.py -f query/bug6586.py
# python3 ./test.py -f query/bug5903.py # python3 ./test.py -f query/bug5903.py
python3 ./test.py -f query/queryLimit.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from numpy.lib.function_base import insert
import taos
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
# constant define
WAITS = 5 # wait seconds
class TDTestCase:
#
# --------------- main frame -------------------
#
def caseDescription(self):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: limit offset advance test
'''
return
# init
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.prepare()
self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# insert data
self.insert_data("t1", self.ts, 300*10000, 30000);
# test base case
self.test_case1()
tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test advance case
self.test_case2()
tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
#
# --------------- case -------------------
#
# create table
def create_tables(self):
# super table
tdSql.execute("create table st(ts timestamp, i1 int) tags(area int)");
# child table
tdSql.execute("create table t1 using st tags(1)");
tdSql.execute("create table t2 using st tags(2)");
tdSql.execute("create table t3 using st tags(3)");
return
# insert data1
def insert_data(self, tbname, ts_start, count, batch_num):
pre_insert = "insert into %s values"%tbname
sql = pre_insert
tdLog.debug("doing insert table %s rows=%d ..."%(tbname, count))
for i in range(count):
sql += " (%d,%d)"%(ts_start + i*1000, i)
if i >0 and i%batch_num == 0:
tdSql.execute(sql)
sql = pre_insert
# end sql
if sql != pre_insert:
tdSql.execute(sql)
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
# test case1 base
def test_case1(self):
#
# limit base function
#
# base no where
sql = "select * from t1 limit 10"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 0)
tdSql.checkData(9, 1, 9)
sql = "select * from t1 order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999999)
tdSql.checkData(9, 1, 2999990)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:01' and ts<'2017-07-14 10:40:06' limit 10"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 1)
tdSql.checkData(4, 1, 5)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10" # desc
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 2999996)
tdSql.checkData(4, 1, 2999992)
#
# offset base function
#
# no where
sql = "select * from t1 limit 10 offset 5"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 5)
tdSql.checkData(9, 1, 14)
sql = "select * from t1 order by ts desc limit 10 offset 5" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 2999994)
tdSql.checkData(9, 1, 2999985)
# have where only ts
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:20' limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 15)
tdSql.checkData(4, 1, 19)
sql = "select * from t1 where ts>='2017-08-18 03:59:52' and ts<'2017-08-18 03:59:57' order by ts desc limit 10 offset 4" # desc
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 2999992)
# have where with other column condition
sql = "select * from t1 where i1>=1 and i1<11 limit 10 offset 5"
tdSql.waitedQuery(sql, 5, WAITS)
tdSql.checkData(0, 1, 6)
tdSql.checkData(4, 1, 10)
sql = "select * from t1 where i1>=300000 and i1<=500000 order by ts desc limit 10 offset 100000" # desc
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 400000)
tdSql.checkData(9, 1, 399991)
# have where with ts and other column condition
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-14 10:40:50' and i1>=20 and i1<=25 limit 10 offset 5"
tdSql.waitedQuery(sql, 1, WAITS)
tdSql.checkData(0, 1, 25)
return
# test advance
def test_case2(self):
#
# OFFSET merge file data with memory data
#
# offset
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000)
# each insert one row into NO.0 NO.2 NO.7 blocks
sql = "insert into t1 values (%d, 0) (%d, 2) (%d, 7)"%(self.ts+1, self.ts + 2*3300*1000+1, self.ts + 7*3300*1000+1)
tdSql.execute(sql)
# query result
sql = "select * from t1 limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3)
# have where
sql = "select * from t1 where ts>='2017-07-14 10:40:10' and ts<'2017-07-22 18:40:10' limit 10 offset 72000"
tdSql.waitedQuery(sql, 10, WAITS)
tdSql.checkData(0, 1, 72000 - 3 + 10 + 1)
# have where desc
sql = "select * from t1 where ts<'2017-07-14 20:40:00' order by ts desc limit 15 offset 36000"
tdSql.waitedQuery(sql, 3, WAITS)
tdSql.checkData(0, 1, 1)
#
# add case with filename
#
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册