提交 07ce9aa3 编写于 作者: A Alex Duan

[TS-445]<feature>: limit offset optimization

上级 711bc76e
...@@ -119,8 +119,8 @@ typedef struct STsdbQueryHandle { ...@@ -119,8 +119,8 @@ typedef struct STsdbQueryHandle {
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
int16_t order; int16_t order;
int64_t offset; // limit offset int64_t offset; // limit offset
int64_t srows; // skip offset rows int64_t srows; // skip offset rows
int64_t frows; // forbid offset rows int64_t frows; // forbid skip offset rows
STimeWindow window; // the primary query time window that applies to all queries STimeWindow window; // the primary query time window that applies to all queries
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
int32_t numOfBlocks; int32_t numOfBlocks;
...@@ -1071,64 +1071,238 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s ...@@ -1071,64 +1071,238 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
return midSlot; return midSlot;
} }
// array :1 2 3 5 7 -2 (8 9) skip 4 and 6
int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
size_t count = taosArrayGetSize(pArray);
int32_t *idxs = (int32_t*)TARRAY_GET_START(pArray);
size_t i = 0;
assert(count > 0);
assert(idxs[0] > 0);
// memmove while
while(i < count) {
size_t step = 1; // self so is 1
size_t j = i;
bool end = false;
// calc consecutive while
while(j + 1 < count) {
// encounter number with negatived
if(idxs[j + 1] < 0) {
int32_t num = idxs[j + 1] * -1;
step += num;
end = true;
break;
}
// next is consecutive, step ++
if(idxs[j] + 1 == idxs[j + 1]) {
j++;
step++;
} else {
// can't consecutive
break;
}
}
size_t src_pos = idxs[i];
if(i == src_pos) {
// same so no need move
i += step;
continue;
}
// do memmove
memmove(blocks + i, blocks + src_pos, sizeof(SBlock) * step);
i += step;
if(end)
break;
}
return i + 1;
}
// if block data in memory return false else true
bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) {
// mem->first
TSKEY key = q->pMemRef->snapshot.mem->keyFirst;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
// mem->last
key = q->pMemRef->snapshot.mem->keyLast;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
// imem->first
key = q->pMemRef->snapshot.imem->keyFirst;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
// imem->last
key = q->pMemRef->snapshot.imem->keyLast;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
return true;
}
// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset // skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks, int64_t s, int64_t e) { static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks,
int64_t s, int64_t e, SArray** ppArray) {
int32_t num = 0; int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks; SBlock* blocks = pBlockInfo->blocks;
SArray* pArray = NULL;
// asc // ASC
if(ASCENDING_TRAVERSE(q->order)) { if(ASCENDING_TRAVERSE(q->order)) {
for(int32_t i = 0; i < numBlocks; i++) { for(int32_t i = 0; i < numBlocks; i++) {
bool skip = false;
SBlock* pBlock = &blocks[i]; SBlock* pBlock = &blocks[i];
if(i == 0 && pBlock->keyFirst != s) { if(i == 0 && pBlock->keyFirst != s) {
//first block only keyFirst == s can skip q->frows += pBlock->numOfRows; // some rows time < s
q->frows += pBlock->numOfRows;
continue;
} else { } else {
// skip to read // check can skip
if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { // approximately calculate
q->srows += pBlock->numOfRows; if(blockNoItemInMem(q, pBlock)) {
num ++; // can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else { } else {
if(pArray == NULL)
pArray = taosArrayInit(2, sizeof(int32_t));
taosArrayPush(pArray, &i);
//the remainder be put to pArray
int32_t numRemain = numBlocks - (i + 1);
if(numRemain > 0) {
numRemain *= -1; //if list end element is negative, that is number for the remainder
taosArrayPush(pArray, &numRemain);
}
break; break;
} }
} }
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(int32_t));
taosArrayPush(pArray, &i);
}
} }
} else { // des } else { // DES
for(int32_t i = numBlocks - 1; i >= 0; i--) { for(int32_t i = numBlocks - 1; i >= 0; i--) {
bool skip = false;
SBlock* pBlock = &blocks[i]; SBlock* pBlock = &blocks[i];
if(i == numBlocks - 1 && pBlock->keyLast != e) { if(i == numBlocks - 1 && pBlock->keyLast != e) {
//first block only keyFirst == s can skipƒ q->frows += pBlock->numOfRows; // some rows time > e
q->frows += pBlock->numOfRows;
continue;
} else { } else {
// skip to read // check can skip
if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { // approximately calculate
q->srows += pBlock->numOfRows; if(blockNoItemInMem(q, pBlock)) {
num ++; // can skip
q->srows += pBlock->numOfRows;
skip = true;
} else {
q->frows += pBlock->numOfRows; // maybe have some row in memroy
}
} else { } else {
if(pArray == NULL)
pArray = taosArrayInit(2, sizeof(int32_t));
taosArrayPush(pArray, &i);
//the remainder be put to pArray
int32_t numRemain = i;
if(numRemain > 0) {
numRemain *= -1; //if list end element is negative, that is number for the remainder
taosArrayPush(pArray, &numRemain);
}
break; break;
} }
} }
if(skip) {
num ++;
} else {
// can't skip, append block index to pArray
if(pArray == NULL)
pArray = taosArrayInit(10, sizeof(int32_t));
taosArrayPush(pArray, &i);
}
} }
} }
if(ppArray && pArray) {
*ppArray = pArray;
}
return num; return num;
} }
// shrink blocks by condition of query
static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo *pCheckInfo) {
SBlockInfo *pCompInfo = pCheckInfo->pCompInfo;
SBlockIdx *compIndex = pQueryHandle->rhelper.pBlkIdx;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
if (s > pCompInfo->blocks[start].keyLast) {
return ;
}
int32_t end = start;
// todo speedup the procedure of located end block
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
}
// calc offset can skip blocks number
int32_t nSkip = 0;
SArray *pArray = NULL;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e, &pArray);
}
if(nSkip > 0) { // have offset and can skip
assert(pArray);
pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray);
} else { // no offset
pCheckInfo->numOfBlocks = end - start;
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
}
if(pArray) {
taosArrayDestroy(pArray);
}
}
// load one table (tsd_index point to) need load blocks info and put into pCheckInfo->pCompInfo->blocks
static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) { static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) {
//
// ONE PART. Load all blocks info from one table of tsd_index
//
int32_t code = 0; int32_t code = 0;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index);
pCheckInfo->numOfBlocks = 0; pCheckInfo->numOfBlocks = 0;
if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) { if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
code = terrno; code = terrno;
return code; return code;
} }
SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx; SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx;
// no data block in this file, try next file // no data block in this file, try next file
if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) { if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) {
return 0; // no data blocks in the file belongs to pCheckInfo->pTable return 0; // no data blocks in the file belongs to pCheckInfo->pTable
...@@ -1136,7 +1310,6 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, ...@@ -1136,7 +1310,6 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index,
if (pCheckInfo->compSize < (int32_t)compIndex->len) { if (pCheckInfo->compSize < (int32_t)compIndex->len) {
assert(compIndex->len > 0); assert(compIndex->len > 0);
char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); char* t = realloc(pCheckInfo->pCompInfo, compIndex->len);
if (t == NULL) { if (t == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -1151,56 +1324,13 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, ...@@ -1151,56 +1324,13 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index,
if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) { if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) {
return terrno; return terrno;
} }
SBlockInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
if (s > pCompInfo->blocks[start].keyLast) {
return 0;
}
int32_t end = start;
// todo speedup the procedure of located end block
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
}
// calc offset can skip blocks number
int32_t nSkip = 0;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e);
}
pCheckInfo->numOfBlocks = (end - start) - nSkip;
if(pCheckInfo->numOfBlocks > 0) {
if(nSkip > 0) {
if(ASCENDING_TRAVERSE(pQueryHandle->order)) {
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], 1 * sizeof(SBlock));
if(pCheckInfo->numOfBlocks > 1)
memmove(pCompInfo->blocks + 1, &pCompInfo->blocks[start + nSkip + 1], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock));
} else {
if(start > 0 && pCheckInfo->numOfBlocks > 1)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock));
memmove(pCompInfo->blocks + (pCheckInfo->numOfBlocks - 1), &pCompInfo->blocks[end - 1], 1 * sizeof(SBlock));
}
} else if(start > 0) {
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
}
}
//
// TWO PART. shrink no need blocks from all blocks by condition of query
//
shrinkBlocksByQuery(pQueryHandle, pCheckInfo);
(*numOfBlocks) += pCheckInfo->numOfBlocks; (*numOfBlocks) += pCheckInfo->numOfBlocks;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册