diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index f15a59cb626dcbe2d3be9348e0d95ca1414eee74..3e31eb9e8c069e95d763bf9e19700a2ac33a9326 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -119,8 +119,8 @@ typedef struct STsdbQueryHandle { SQueryFilePos cur; // current position int16_t order; int64_t offset; // limit offset - int64_t srows; // skip offset rows - int64_t frows; // forbid offset rows + int64_t srows; // skip offset rows + int64_t frows; // forbid skip offset rows STimeWindow window; // the primary query time window that applies to all queries SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time int32_t numOfBlocks; @@ -1071,64 +1071,238 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s return midSlot; } +// array :1 2 3 5 7 -2 (8 9) skip 4 and 6 +int32_t memMoveByArray(SBlock *blocks, SArray *pArray) { + size_t count = taosArrayGetSize(pArray); + int32_t *idxs = (int32_t*)TARRAY_GET_START(pArray); + size_t i = 0; + assert(count > 0); + assert(idxs[0] > 0); + + // memmove while + while(i < count) { + size_t step = 1; // self so is 1 + size_t j = i; + bool end = false; + // calc consecutive while + while(j + 1 < count) { + // encounter number with negatived + if(idxs[j + 1] < 0) { + int32_t num = idxs[j + 1] * -1; + step += num; + end = true; + break; + } + // next is consecutive, step ++ + if(idxs[j] + 1 == idxs[j + 1]) { + j++; + step++; + } else { + // can't consecutive + break; + } + } + + size_t src_pos = idxs[i]; + if(i == src_pos) { + // same so no need move + i += step; + continue; + } + + // do memmove + memmove(blocks + i, blocks + src_pos, sizeof(SBlock) * step); + i += step; + if(end) + break; + } + + return i + 1; +} + +// if block data in memory return false else true +bool blockNoItemInMem(STsdbQueryHandle* q, SBlock* pBlock) { + // mem->first + TSKEY key = q->pMemRef->snapshot.mem->keyFirst; + if(key >= pBlock->keyFirst && key <= pBlock->keyLast) { + return false; + } + // mem->last + key = q->pMemRef->snapshot.mem->keyLast; + if(key >= pBlock->keyFirst && key <= pBlock->keyLast) { + return false; + } + // imem->first + key = q->pMemRef->snapshot.imem->keyFirst; + if(key >= pBlock->keyFirst && key <= pBlock->keyLast) { + return false; + } + // imem->last + key = q->pMemRef->snapshot.imem->keyLast; + if(key >= pBlock->keyFirst && key <= pBlock->keyLast) { + return false; + } + + return true; +} + // skip blocks . return value is skip blocks number, skip rows reduce from *pOffset -static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks, int64_t s, int64_t e) { +static int32_t offsetSkipBlock(STsdbQueryHandle* q, SBlockInfo* pBlockInfo, uint32_t numBlocks, + int64_t s, int64_t e, SArray** ppArray) { int32_t num = 0; SBlock* blocks = pBlockInfo->blocks; + SArray* pArray = NULL; - // asc + // ASC if(ASCENDING_TRAVERSE(q->order)) { for(int32_t i = 0; i < numBlocks; i++) { + bool skip = false; SBlock* pBlock = &blocks[i]; if(i == 0 && pBlock->keyFirst != s) { - //first block only keyFirst == s can skip - q->frows += pBlock->numOfRows; - continue; + q->frows += pBlock->numOfRows; // some rows time < s } else { - // skip to read - if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { - q->srows += pBlock->numOfRows; - num ++; + // check can skip + if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { // approximately calculate + if(blockNoItemInMem(q, pBlock)) { + // can skip + q->srows += pBlock->numOfRows; + skip = true; + } else { + q->frows += pBlock->numOfRows; // maybe have some row in memroy + } } else { + if(pArray == NULL) + pArray = taosArrayInit(2, sizeof(int32_t)); + taosArrayPush(pArray, &i); + //the remainder be put to pArray + int32_t numRemain = numBlocks - (i + 1); + if(numRemain > 0) { + numRemain *= -1; //if list end element is negative, that is number for the remainder + taosArrayPush(pArray, &numRemain); + } break; } } + + if(skip) { + num ++; + } else { + // can't skip, append block index to pArray + if(pArray == NULL) + pArray = taosArrayInit(10, sizeof(int32_t)); + taosArrayPush(pArray, &i); + } } - } else { // des + } else { // DES for(int32_t i = numBlocks - 1; i >= 0; i--) { + bool skip = false; SBlock* pBlock = &blocks[i]; if(i == numBlocks - 1 && pBlock->keyLast != e) { - //first block only keyFirst == s can skipÆ’ - q->frows += pBlock->numOfRows; - continue; + q->frows += pBlock->numOfRows; // some rows time > e } else { - // skip to read - if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { - q->srows += pBlock->numOfRows; - num ++; + // check can skip + if(q->srows + q->frows + pBlock->numOfRows <= q->offset) { // approximately calculate + if(blockNoItemInMem(q, pBlock)) { + // can skip + q->srows += pBlock->numOfRows; + skip = true; + } else { + q->frows += pBlock->numOfRows; // maybe have some row in memroy + } } else { + if(pArray == NULL) + pArray = taosArrayInit(2, sizeof(int32_t)); + taosArrayPush(pArray, &i); + //the remainder be put to pArray + int32_t numRemain = i; + if(numRemain > 0) { + numRemain *= -1; //if list end element is negative, that is number for the remainder + taosArrayPush(pArray, &numRemain); + } break; } } + + if(skip) { + num ++; + } else { + // can't skip, append block index to pArray + if(pArray == NULL) + pArray = taosArrayInit(10, sizeof(int32_t)); + taosArrayPush(pArray, &i); + } } } + if(ppArray && pArray) { + *ppArray = pArray; + } + return num; } +// shrink blocks by condition of query +static void shrinkBlocksByQuery(STsdbQueryHandle *pQueryHandle, STableCheckInfo *pCheckInfo) { + SBlockInfo *pCompInfo = pCheckInfo->pCompInfo; + SBlockIdx *compIndex = pQueryHandle->rhelper.pBlkIdx; + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey); + } else { + assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey); + } + + TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; + s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); + e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); + + // discard the unqualified data block based on the query time window + int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); + if (s > pCompInfo->blocks[start].keyLast) { + return ; + } + + int32_t end = start; + // todo speedup the procedure of located end block + while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { + end += 1; + } + + // calc offset can skip blocks number + int32_t nSkip = 0; + SArray *pArray = NULL; + if(pQueryHandle->offset > 0) { + nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e, &pArray); + } + + if(nSkip > 0) { // have offset and can skip + assert(pArray); + pCheckInfo->numOfBlocks = memMoveByArray(pCompInfo->blocks, pArray); + } else { // no offset + pCheckInfo->numOfBlocks = end - start; + if(start > 0) + memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); + } + + if(pArray) { + taosArrayDestroy(pArray); + } +} + +// load one table (tsd_index point to) need load blocks info and put into pCheckInfo->pCompInfo->blocks static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, int32_t* numOfBlocks) { + // + // ONE PART. Load all blocks info from one table of tsd_index + // int32_t code = 0; - STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, tsd_index); pCheckInfo->numOfBlocks = 0; - if (tsdbSetReadTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) { code = terrno; return code; } SBlockIdx* compIndex = pQueryHandle->rhelper.pBlkIdx; - // no data block in this file, try next file if (compIndex == NULL || compIndex->uid != pCheckInfo->tableId.uid) { return 0; // no data blocks in the file belongs to pCheckInfo->pTable @@ -1136,7 +1310,6 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, if (pCheckInfo->compSize < (int32_t)compIndex->len) { assert(compIndex->len > 0); - char* t = realloc(pCheckInfo->pCompInfo, compIndex->len); if (t == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -1151,56 +1324,13 @@ static int32_t loadBlockInfo(STsdbQueryHandle * pQueryHandle, int32_t tsd_index, if (tsdbLoadBlockInfo(&(pQueryHandle->rhelper), (void*)(pCheckInfo->pCompInfo)) < 0) { return terrno; } - SBlockInfo* pCompInfo = pCheckInfo->pCompInfo; - - TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - assert(pCheckInfo->lastKey <= pQueryHandle->window.ekey && pQueryHandle->window.skey <= pQueryHandle->window.ekey); - } else { - assert(pCheckInfo->lastKey >= pQueryHandle->window.ekey && pQueryHandle->window.skey >= pQueryHandle->window.ekey); - } - - s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey); - e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); - - // discard the unqualified data block based on the query time window - int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC); - if (s > pCompInfo->blocks[start].keyLast) { - return 0; - } - - int32_t end = start; - // todo speedup the procedure of located end block - while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { - end += 1; - } - - // calc offset can skip blocks number - int32_t nSkip = 0; - if(pQueryHandle->offset > 0) { - nSkip = offsetSkipBlock(pQueryHandle, pCompInfo, compIndex->numOfBlocks, s, e); - } - - pCheckInfo->numOfBlocks = (end - start) - nSkip; - if(pCheckInfo->numOfBlocks > 0) { - if(nSkip > 0) { - if(ASCENDING_TRAVERSE(pQueryHandle->order)) { - if(start > 0) - memmove(pCompInfo->blocks, &pCompInfo->blocks[start], 1 * sizeof(SBlock)); - if(pCheckInfo->numOfBlocks > 1) - memmove(pCompInfo->blocks + 1, &pCompInfo->blocks[start + nSkip + 1], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock)); - } else { - if(start > 0 && pCheckInfo->numOfBlocks > 1) - memmove(pCompInfo->blocks, &pCompInfo->blocks[start], (pCheckInfo->numOfBlocks - 1) * sizeof(SBlock)); - memmove(pCompInfo->blocks + (pCheckInfo->numOfBlocks - 1), &pCompInfo->blocks[end - 1], 1 * sizeof(SBlock)); - } - } else if(start > 0) { - memmove(pCompInfo->blocks, &pCompInfo->blocks[start], pCheckInfo->numOfBlocks * sizeof(SBlock)); - } - } + // + // TWO PART. shrink no need blocks from all blocks by condition of query + // + shrinkBlocksByQuery(pQueryHandle, pCheckInfo); (*numOfBlocks) += pCheckInfo->numOfBlocks; + return 0; }