diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 11b7692e7ea65169dfb50ac66cf1b1d85e9abab9..8e83cb4d4a0b4130acb10916ed91704005b533c1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -97,7 +97,9 @@ typedef struct { STSCursor cur; } SQueryStatusInfo; +#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) static void setQueryStatus(SQuery *pQuery, int8_t status); + static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } // todo move to utility @@ -278,6 +280,20 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { return maxOutput; } +/* + * the value of number of result needs to be update due to offset value upated. + */ +void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + assert(pResInfo->numOfRes > numOfRes); + + pResInfo->numOfRes = numOfRes; + } +} + static int32_t getGroupResultId(int32_t groupIndex) { int32_t base = 200000; return base + (groupIndex * 10000); @@ -354,9 +370,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; } -static bool limitResults(SQInfo *pQInfo) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - +static bool limitResults(SQuery *pQuery) { if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total; assert(pQuery->rec.rows > 0); @@ -626,6 +640,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, int32_t i = 0; int64_t skey = TSKEY_INITIAL_VAL; + // TODO opt performance: get the closed time window here for (i = 0; i < pWindowResInfo->size; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->status.closed) { @@ -2408,6 +2423,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); if (tmp == NULL) { // todo handle the oom + assert(0); } else { pQuery->sdata[i] = (tFilePage *)tmp; } @@ -2421,7 +2437,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } - qTrace("QInfo: %p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv), + qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv), newSize, pRec->capacity, newSize - pRec->rows); pRec->capacity = newSize; @@ -2434,11 +2450,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); - // save last access position - if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { + // while the output buffer is full or limit/offset is applied, query may be paused here + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) { break; } } @@ -3173,30 +3189,37 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { } if (pQuery->rec.rows <= pQuery->limit.offset) { + qTrace("QInfo:%p skip rows:%d, new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pQuery->rec.rows, + pQuery->limit.offset - pQuery->rec.rows); + pQuery->limit.offset -= pQuery->rec.rows; pQuery->rec.rows = 0; resetCtxOutputBuf(pRuntimeEnv); // clear the buffer is full flag if exists - pQuery->status &= (~QUERY_RESBUF_FULL); + CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL); } else { - int32_t numOfSkip = (int32_t) pQuery->limit.offset; + int64_t numOfSkip = pQuery->limit.offset; pQuery->rec.rows -= numOfSkip; - + pQuery->limit.offset = 0; + + qTrace("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip, + 0, pQuery->rec.rows); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].base.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes); - pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; + pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pQuery->rec.rows; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - pRuntimeEnv->pCtx[i].ptsOutputBuf += TSDB_KEYSIZE * numOfSkip; + pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } } - - pQuery->limit.offset = 0; + + updateNumOfResult(pRuntimeEnv, pQuery->rec.rows); } } @@ -3205,7 +3228,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { pQuery->status = status; } else { // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first - pQuery->status &= (~QUERY_NOT_COMPLETED); + CLEAR_QUERY_STATUS(pQuery, QUERY_NOT_COMPLETED); pQuery->status |= status; } } @@ -3957,7 +3980,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes); } @@ -4075,7 +4098,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index - qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", + qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); return true; } else { // do nothing @@ -4532,8 +4555,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { continue; } - // SPointInterpoSupporter pointInterpSupporter = {0}; - // TODO handle the limit offset problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { // skipBlocks(pRuntimeEnv); @@ -4544,12 +4565,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); - - pQuery->rec.rows = getNumOfResult(pRuntimeEnv); skipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed - if (limitResults(pQInfo)) { + if (limitResults(pQuery)) { pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; break; } @@ -4578,18 +4597,15 @@ static void sequentialTableProcess(SQInfo *pQInfo) { break; } - } else { // forward query range - pQuery->window.skey = pQuery->current->lastKey; - + } else { // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter if (pQuery->rec.rows == 0) { assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); continue; } else { - // pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; - // // buffer is full, wait for the next round to retrieve data from current meter - // assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); - // break; + // buffer is full, wait for the next round to retrieve data from current meter + assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); + break; } } } @@ -4809,7 +4825,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->rec.rows = getNumOfResult(pRuntimeEnv); skipResults(pRuntimeEnv); - limitResults(pQInfo); + limitResults(pQuery); } static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { @@ -4857,7 +4873,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) resetCtxOutputBuf(pRuntimeEnv); } - limitResults(pQInfo); + limitResults(pQuery); if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->current->lastKey, pQuery->window.ekey); @@ -4935,7 +4951,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { // the offset is handled at prepare stage if no interpolation involved if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) { - limitResults(pQInfo); + limitResults(pQuery); break; } else { TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, @@ -4947,7 +4963,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - limitResults(pQInfo); + limitResults(pQuery); break; } @@ -4982,7 +4998,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0) { - limitResults(pQInfo); + limitResults(pQuery); } qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index ad3da226f6d5e5baaa04864f92103fbfd82f856a..9a7c7b88adde0a209b7c215520043dc239ea0edf 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -395,6 +395,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; if (compIndex->len == 0 || compIndex->numOfBlocks == 0) { // no data block in this file, try next file + pCheckInfo->numOfBlocks = 0; continue;//no data blocks in the file belongs to pCheckInfo->pTable } else { if (pCheckInfo->compSize < compIndex->len) {