提交 7da7390d 编写于 作者: A Alex Duan

[TS-445] offset think about memory data

上级 07ce9aa3
......@@ -1077,7 +1077,7 @@ int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
int32_t *idxs = (int32_t*)TARRAY_GET_START(pArray);
size_t i = 0;
assert(count > 0);
assert(idxs[0] > 0);
assert(idxs[0] >= 0);
// memmove while
while(i < count) {
......@@ -1117,48 +1117,65 @@ int32_t memMoveByArray(SBlock *blocks, SArray *pArray) {
break;
}
return i + 1;
return i;
}
// if block data in memory return false else true
bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) {
// mem->first
TSKEY key = q->pMemRef->snapshot.mem->keyFirst;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
TSKEY key;
if(q->pMemRef == NULL) {
return false;
}
// mem->last
key = q->pMemRef->snapshot.mem->keyLast;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
// imem->first
key = q->pMemRef->snapshot.imem->keyFirst;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
// mem
if(q->pMemRef->snapshot.mem) {
// first
key = q->pMemRef->snapshot.mem->keyFirst;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
// last
key = q->pMemRef->snapshot.mem->keyLast;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
}
// imem->last
key = q->pMemRef->snapshot.imem->keyLast;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
// imem
if(q->pMemRef->snapshot.imem) {
// first
key = q->pMemRef->snapshot.imem->keyFirst;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
// last
key = q->pMemRef->snapshot.imem->keyLast;
if(key >= pBlock->keyFirst && key <= pBlock->keyLast) {
return false;
}
}
return true;
}
// skip blocks . return value is skip blocks number, skip rows reduce from *pOffset
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks,
int64_t s, int64_t e, SArray** ppArray) {
static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, int64_t skey, int64_t ekey,
int32_t sblock, int32_t eblock, SArray** ppArray) {
int32_t num = 0;
SBlock* blocks = pBlockInfo->blocks;
SArray* pArray = NULL;
// ASC
if(ASCENDING_TRAVERSE(q->order)) {
for(int32_t i = 0; i < numBlocks; i++) {
for(int32_t i = 0; i <= eblock; i++) {
// in block index range
if(i < sblock) {
continue;
}
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == 0 && pBlock->keyFirst != s) {
if(i == sblock && skey > pBlock->keyFirst) {
q->frows += pBlock->numOfRows; // some rows time < s
} else {
// check can skip
......@@ -1175,7 +1192,7 @@ static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint
pArray = taosArrayInit(2, sizeof(int32_t));
taosArrayPush(pArray, &i);
//the remainder be put to pArray
int32_t numRemain = numBlocks - (i + 1);
int32_t numRemain = eblock - i;
if(numRemain > 0) {
numRemain *= -1; //if list end element is negative, that is number for the remainder
taosArrayPush(pArray, &numRemain);
......@@ -1194,11 +1211,16 @@ static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint
}
}
} else { // DES
for(int32_t i = numBlocks - 1; i >= 0; i--) {
for(int32_t i = eblock; i >= 0; i--) {
// in block index range
if(i < sblock ) {
break;
}
bool skip = false;
SBlock* pBlock = &blocks[i];
if(i == numBlocks - 1 && pBlock->keyLast != e) {
q->frows += pBlock->numOfRows; // some rows time > e
if(i == eblock && ekey < pBlock->keyLast) {
q->frows += pBlock->numOfRows; // some rows time > e
} else {
// check can skip
if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { // approximately calculate
......@@ -1214,7 +1236,7 @@ static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint
pArray = taosArrayInit(2, sizeof(int32_t));
taosArrayPush(pArray, &i);
//the remainder be put to pArray
int32_t numRemain = i;
int32_t numRemain = i - sblock;
if(numRemain > 0) {
numRemain *= -1; //if list end element is negative, that is number for the remainder
taosArrayPush(pArray, &numRemain);
......@@ -1263,23 +1285,28 @@ static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo
}
int32_t end = start;
// todo speedup the procedure of located end block
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1;
// locate e index of blocks -> end
while (end < (int32_t)compIndex->numOfBlocks) {
if(pCompInfo->blocks[end].keyFirst <= e) {
end += 1;
} else {
end -= 1;
break;
}
}
// calc offset can skip blocks number
int32_t nSkip = 0;
SArray *pArray = NULL;
if(pQueryHandle->offset > 0) {
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e, &pArray);
nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, s, e, start, end, &pArray);
}
if(nSkip > 0) { // have offset and can skip
assert(pArray);
pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray);
} else { // no offset
pCheckInfo->numOfBlocks = end - start;
pCheckInfo->numOfBlocks = end - start + 1;
if(start > 0)
memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册