diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5e47f399bb7be0e95a6b112ac2d4f2c27acdc279..a9dc7f00f910c570830512df8b8b8563e3ecf600 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -274,19 +274,27 @@ typedef struct SQueryRuntimeEnv { STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo *proot; SGroupResInfo groupResInfo; + int64_t currentOffset; // dynamic offset value } SQueryRuntimeEnv; +enum { + OP_IN_EXECUTING = 1, + OP_RES_TO_RETURN = 2, + OP_EXEC_DONE = 3, +}; + typedef struct SOperatorInfo { - char *name; - bool blockingOptr; - bool completed; - void *info; - SExprInfo *pExpr; - int32_t numOfOutput; - - SQueryRuntimeEnv *pRuntimeEnv; - struct SOperatorInfo *upstream; + uint8_t operatorType; + bool blockingOptr; // block operator or not + uint8_t completed; // denote if current operator is completed + uint32_t seed; // operator seed + int32_t numOfOutput; // number of columns of the current operator results + char *name; // name, used to show the query execution plan + void *info; // extension attribution + SExprInfo *pExpr; + SQueryRuntimeEnv *pRuntimeEnv; + struct SOperatorInfo *upstream; __operator_fn_t exec; __optr_cleanup_fn_t cleanup; } SOperatorInfo; @@ -391,11 +399,11 @@ typedef struct SLimitOperatorInfo { typedef struct SOffsetOperatorInfo { int64_t offset; - int64_t currentOffset; } SOffsetOperatorInfo; typedef struct SFillOperatorInfo { - SSDataBlock *pRes; + SSDataBlock *pRes; + int64_t totalInputRows; } SFillOperatorInfo; typedef struct SHashGroupbyOperatorInfo { diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 9ab2b492e318d319759962dbfeda4fda78c1fffb..ac9740e7ba5bf112bbaeaed433765c2d7e7c7c0a 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -84,7 +84,9 @@ void freeInterResult(void* param); void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); +bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo); bool hasRemainData(SGroupResInfo* pGroupResInfo); + bool incNextGroup(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 9fbd60639a60f8b3a89de9140d6ef31449945091..ef6da984c632b19676672b8100b2416026628612 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -1439,7 +1439,20 @@ static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) { // the second stage to calculate standard deviation SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); - + + if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { + pStd->stage++; + avg_finalizer(pCtx); + + pResInfo->initialized = true; // set it initialized to avoid re-initialization + + // save average value into tmpBuf, for second stage scan + SAvgInfo *pAvg = GET_ROWCELL_INTERBUF(pResInfo); + + pStd->avg = GET_DOUBLE_VAL(pCtx->pOutput); + assert((isnan(pAvg->sum) && pAvg->num == 0) || (pStd->num == pAvg->num && pStd->avg == pAvg->sum)); + } + /* the first stage is to calculate average value */ if (pStd->stage == 0) { avg_function_f(pCtx, index); @@ -2695,19 +2708,34 @@ static void percentile_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + // all data are null, set it completed + if (pInfo->numOfElems == 0) { + pResInfo->complete = true; + } else { + pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); + } + + pInfo->stage += 1; + } + // the first stage, only acquire the min/max value if (pInfo->stage == 0) { if (pCtx->preAggVals.isSet) { double tmin = 0.0, tmax = 0.0; - if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min); tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max); - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { + } else if (IS_FLOAT_TYPE(pCtx->inputType)) { tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min); tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max); + } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { + tmin = (double)GET_UINT64_VAL(pCtx->preAggVals.statis.min); + tmax = (double)GET_UINT64_VAL(pCtx->preAggVals.statis.max); } else { assert(true); } + if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) { SET_DOUBLE_VAL(&pInfo->minval, tmin); } @@ -2764,10 +2792,20 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { } SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo); - if (pInfo->stage == 0) { + if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + // all data are null, set it completed + if (pInfo->numOfElems == 0) { + pResInfo->complete = true; + } else { + pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); + } + + pInfo->stage += 1; + } + + if (pInfo->stage == 0) { double v = 0; GET_TYPED_DATA(v, double, pCtx->inputType, pData); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5904f606b56ba77af67170006a369e72ecdd490a..e21852dd572f859d8998d541d357f6907c3169ca 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -189,31 +189,20 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); - static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); - static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); - static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); - -static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, - int32_t numOfOutput); - -static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, - int32_t numOfOutput); - +static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); - static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); - -static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv); +static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock); static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); -//static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset); static void destroyOperatorInfo(SOperatorInfo* pOperator); @@ -221,6 +210,7 @@ void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); // setup the output buffer +// TODO prepare the output buffer dynamically static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); res->info.numOfCols = numOfOutput; @@ -2163,7 +2153,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); pRuntimeEnv->prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + pQuery->srcRowSize); pRuntimeEnv->tagVal = malloc(pQuery->tagLen); -// pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); + pRuntimeEnv->currentOffset = pQuery->limit.offset; + pRuntimeEnv->sasArray = calloc(pQuery->numOfOutput, sizeof(SArithmeticSupport)); if (/*pRuntimeEnv->rowCellInfoOffset == NULL || */pRuntimeEnv->sasArray == NULL || @@ -2256,10 +2247,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) { SQuery* pQuery = pRuntimeEnv->pQuery; tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); -// tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; -// pRuntimeEnv->pSecQueryHandle = NULL; SMemRef* pMemRef = &pQuery->memRef; assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL); @@ -3394,9 +3382,10 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) { SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo; + pBlock->info.rows = 0; int32_t code = TSDB_CODE_SUCCESS; - while(pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { + while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { // all results in current group have been returned to client, try next group if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { assert(pGroupResInfo->index == 0); @@ -3408,18 +3397,19 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, TSDB_ORDER_ASC, pBlock); // current data are all dumped to result buffer, clear it - if (!hasRemainData(pGroupResInfo)) { + if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { cleanupGroupResInfo(pGroupResInfo); if (!incNextGroup(pGroupResInfo)) { SET_STABLE_QUERY_OVER(pRuntimeEnv); } } - // enough results in data buffer, return - if (pBlock->info.rows >= threshold) { - break; + // enough results in data buffer, return + if (pBlock->info.rows >= threshold) { + break; + } } - } + } static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) { @@ -3536,9 +3526,8 @@ int32_t initResultRow(SResultRow *pResultRow) { } void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, - SSDataBlock* pDataBlock, int32_t* rowCellInfoOffset) { + SSDataBlock* pDataBlock, int32_t* rowCellInfoOffset, int64_t uid) { int32_t tid = 0; - int64_t uid = 0; SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { @@ -3718,8 +3707,6 @@ void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) { void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { - pCtx[j].currentStage = 0; - SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); if (pResInfo->initialized) { continue; @@ -4568,8 +4555,9 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); + pBlock->info.rows = 0; - if (!hasRemainData(pGroupResInfo)) { + if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { return; } @@ -4659,7 +4647,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pQuery->rec.capacity); return numOfTotal > 0; } else { // there are results waiting for returned to client. - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainData(pGroupResInfo) && + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { return true; } @@ -5212,18 +5200,17 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->tsdb = tsdb; pQuery->vgId = vgId; + pQuery->stableQuery = isSTableQuery; + pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); + pRuntimeEnv->groupResInfo.totalGroup = isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTsBuf = pTsBuf; pRuntimeEnv->cur.vgroupIndex = -1; - pQuery->stableQuery = isSTableQuery; - - pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); - if (onlyQueryTags(pQuery)) { - pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv); + pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); } else if (needReverseScan(pQuery)) { pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { @@ -5810,7 +5797,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) { * If the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ - if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); pQuery->rec.total += pQuery->rec.rows; @@ -6189,7 +6176,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqScanTableOp"; pOperator->blockingOptr = false; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; pOperator->exec = doTableScan; @@ -6268,14 +6255,10 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } -//static int32_t getTableScanFlag(STableScanInfo* pTableScanInfo) { -// return pTableScanInfo-> -//} - // this is a blocking operator static SSDataBlock* doAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } @@ -6307,7 +6290,7 @@ static SSDataBlock* doAggregate(void* param) { doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); } - pOperator->completed = true; + pOperator->completed = OP_EXEC_DONE; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); @@ -6318,18 +6301,18 @@ static SSDataBlock* doAggregate(void* param) { static SSDataBlock* doSTableAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } SAggOperatorInfo* pAggInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pOperator->completed == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); - if (pAggInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + if (pAggInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = OP_EXEC_DONE; } return pAggInfo->pRes; @@ -6363,13 +6346,15 @@ static SSDataBlock* doSTableAggregate(void* param) { doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); } + pOperator->completed = OP_RES_TO_RETURN; closeAllResultRows(&pAggInfo->resultRowInfo); + updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pAggInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); - if (pAggInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + if (pAggInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = OP_EXEC_DONE; } return pAggInfo->pRes; @@ -6415,7 +6400,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { static SSDataBlock* doLimit(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } @@ -6424,7 +6409,7 @@ static SSDataBlock* doLimit(void* param) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); - pOperator->completed = true; + pOperator->completed = OP_EXEC_DONE; return NULL; } @@ -6434,7 +6419,7 @@ static SSDataBlock* doLimit(void* param) { pInfo->total = pInfo->limit; setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); - pOperator->completed = true; + pOperator->completed = OP_EXEC_DONE; } else { pInfo->total += pBlock->info.rows; } @@ -6442,34 +6427,39 @@ static SSDataBlock* doLimit(void* param) { return pBlock; } +// TODO add log static SSDataBlock* doOffset(void* param) { SOperatorInfo *pOperator = (SOperatorInfo *)param; + if (pOperator->completed == OP_EXEC_DONE) { + return NULL; + } - SOffsetOperatorInfo *pInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; while (1) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + pOperator->completed = OP_EXEC_DONE; return NULL; } - if (pInfo->currentOffset == 0) { + if (pRuntimeEnv->currentOffset == 0) { return pBlock; - } else if (pInfo->currentOffset > pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; + } else if (pRuntimeEnv->currentOffset > pBlock->info.rows) { + pRuntimeEnv->currentOffset -= pBlock->info.rows; } else { - int32_t remain = pBlock->info.rows - pInfo->currentOffset; + int32_t remain = pBlock->info.rows - pRuntimeEnv->currentOffset; pBlock->info.rows = remain; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); } - pInfo->currentOffset = 0; + pRuntimeEnv->currentOffset = 0; return pBlock; } } @@ -6477,18 +6467,18 @@ static SSDataBlock* doOffset(void* param) { static SSDataBlock* doHashIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pOperator->completed == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = OP_EXEC_DONE; } return pIntervalInfo->pRes; @@ -6516,6 +6506,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { pQuery->order.order = order; pQuery->window = win; + pOperator->completed = OP_RES_TO_RETURN; closeAllResultRows(&pIntervalInfo->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); @@ -6523,27 +6514,26 @@ static SSDataBlock* doHashIntervalAgg(void* param) { initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = OP_EXEC_DONE; } - return pIntervalInfo->pRes; + return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; } static SSDataBlock* doSTableIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (hasRemainData(&pRuntimeEnv->groupResInfo)) { - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + if (pOperator->completed == OP_RES_TO_RETURN) { + copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + pOperator->completed = OP_EXEC_DONE; } return pIntervalInfo->pRes; @@ -6571,13 +6561,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex); } + pOperator->completed = OP_RES_TO_RETURN; pQuery->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + pOperator->completed = OP_EXEC_DONE; } return pIntervalInfo->pRes; @@ -6585,18 +6576,20 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { static SSDataBlock* doHashGroupbyAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } SHashGroupbyOperatorInfo *pInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (hasRemainData(&pRuntimeEnv->groupResInfo)) { + if (pOperator->completed == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = OP_EXEC_DONE; } + return pInfo->binfo.pRes; } @@ -6611,6 +6604,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQuery->order.order); + setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); if (pInfo->colIndex == -1) { pInfo->colIndex = getGroupbyColumnData_rv(pRuntimeEnv->pQuery->pGroupbyExpr, pBlock); } @@ -6618,6 +6612,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); } + pOperator->completed = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); @@ -6630,8 +6625,8 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = true; + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->completed = OP_EXEC_DONE; } return pInfo->binfo.pRes; @@ -6639,7 +6634,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { static SSDataBlock* doFill(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed) { + if (pOperator->completed == OP_EXEC_DONE) { return NULL; } @@ -6654,18 +6649,24 @@ static SSDataBlock* doFill(void* param) { while(1) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { + if (pInfo->totalInputRows == 0) { + pOperator->completed = OP_EXEC_DONE; + return NULL; + } + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey); } else { + pInfo->totalInputRows += pBlock->info.rows; + int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->pQuery->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey; + taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, ekey); taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock); } doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); - return pInfo->pRes; + return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } - - return pInfo->pRes; } // todo set the attribute of query scan count @@ -6697,21 +6698,24 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput); - setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset); + int64_t seed = rand(); + setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset, seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; pOperator->blockingOptr = true; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->seed = seed; // TODO refactor: seed to move to pInfo?? pOperator->exec = doAggregate; pOperator->cleanup = destroyBasicOperatorInfo; @@ -6758,7 +6762,7 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "STableAggregate"; pOperator->blockingOptr = true; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; @@ -6774,22 +6778,26 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); + int64_t seed = rand(); pInfo->bufCapacity = 4096; - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + SOptrBasicInfo* pBInfo = &pInfo->binfo; - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); - setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, pInfo->binfo.rowCellInfoOffset); + pBInfo->pRes = createOutputBuf(pExpr, numOfOutput); + pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); + + initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset, seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOp"; pOperator->blockingOptr = false; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->seed = seed; pOperator->exec = doArithmeticOperation; pOperator->cleanup = destroyArithOperatorInfo; @@ -6805,7 +6813,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "LimitOp"; pOperator->blockingOptr = false; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->exec = doLimit; pOperator->info = pInfo; @@ -6818,13 +6826,11 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); pInfo->offset = pRuntimeEnv->pQuery->limit.offset; - pInfo->currentOffset = pInfo->offset; - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "OffsetOp"; pOperator->blockingOptr = false; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->exec = doOffset; pOperator->info = pInfo; @@ -6844,7 +6850,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe pOperator->name = "HashIntervalAgg"; pOperator->blockingOptr = true; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6867,7 +6873,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "STableIntervalAggOp"; pOperator->blockingOptr = true; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6891,7 +6897,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "HashGroupbyAgg"; pOperator->blockingOptr = true; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6913,7 +6919,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->name = "FillOp"; pOperator->blockingOptr = false; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6928,6 +6934,9 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn static SSDataBlock* doTagScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->completed == OP_EXEC_DONE) { + return NULL; + } STagScanInfo *pTagScanInfo = pOperator->info; SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; @@ -6935,7 +6944,6 @@ static SSDataBlock* doTagScan(void* param) { size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); - if (numOfGroup == 0) { return NULL; } @@ -6945,10 +6953,6 @@ static SSDataBlock* doTagScan(void* param) { size_t num = taosArrayGetSize(pa); assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); - if (pTagScanInfo->pRes == NULL) { - pTagScanInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - } - int32_t count = 0; // int32_t functionId = pOperator->pExpr[0].base.functionId; /*if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id @@ -7012,17 +7016,12 @@ static SSDataBlock* doTagScan(void* param) { qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count); } else*/ { // return only the tags|table name etc. count = 0; - SSchema* tbnameSchema = tGetTbnameColumnSchema(); - int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity; -// if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) { -// maxNumOfTables = (int32_t)pQuery->limit.limit; -// } + SExprInfo* pExprInfo = pOperator->pExpr; while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) { int32_t i = pRuntimeEnv->tableIndex++; - SExprInfo* pExprInfo = pOperator->pExpr; STableQueryInfo* item = taosArrayGetP(pa, i); char *data = NULL, *dst = NULL; @@ -7034,23 +7033,19 @@ static SSDataBlock* doTagScan(void* param) { } SColumnInfoData* pColInfo = taosArrayGet(pTagScanInfo->pRes->pDataBlock, j); + type = pExprInfo[j].type; + bytes = pExprInfo[j].bytes; if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - bytes = tbnameSchema->bytes; - type = tbnameSchema->type; - data = tsdbGetTableName(item->pTable); - dst = pColInfo->pData + count * tbnameSchema->bytes; } else { - type = pExprInfo[j].type; - bytes = pExprInfo[j].bytes; - data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes); - dst = pColInfo->pData + count * pExprInfo[j].bytes; } + dst = pColInfo->pData + count * pExprInfo[j].bytes; doSetTagValueToResultBuf(dst, data, type, bytes); } + count += 1; } @@ -7058,20 +7053,21 @@ static SSDataBlock* doTagScan(void* param) { qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count); } - return pTagScanInfo->pRes; + return (pTagScanInfo->pRes->info.rows == 0)? NULL:pTagScanInfo->pRes; } -SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTagScanOp"; pOperator->blockingOptr = false; - pOperator->completed = false; + pOperator->completed = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->exec = doTagScan; - pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; - pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; @@ -8503,7 +8499,7 @@ void buildTagQueryResult(SQInfo* pQInfo) { } pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; + pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0; return; SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index a3c69c7707a944dfcac5de0f578f26b26419fe65..52071bbae7512a5ae8544e989922ecf48378fac4 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -376,11 +376,12 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { tfree(pFillInfo->prevValues); tfree(pFillInfo->nextValues); - tfree(pFillInfo->pTags); -// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { -// tfree(pFillInfo->pData[i]); -// } + for(int32_t i = 0; i < pFillInfo->numOfTags; ++i) { + tfree(pFillInfo->pTags[i].tagVal); + } + + tfree(pFillInfo->pTags); tfree(pFillInfo->pData); tfree(pFillInfo->pFillCol); @@ -435,12 +436,16 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* SFillColInfo* pCol = &pFillInfo->pFillCol[i]; const char* data = pInput->data + pCol->col.offset * pInput->num; - memcpy(pFillInfo->pData[i], data, (size_t)(pInput->num * pCol->col.bytes)); + if (pFillInfo->pData[i] == NULL) { + pFillInfo->pData[i] = calloc(4096, pCol->col.bytes); + } + memcpy(pFillInfo->pData[i], data, pCol->col.bytes * pInput->num); +// pFillInfo->pData[i] = (char*) data; if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; assert (pTag->col.colId == pCol->col.colId); - memcpy(pTag->tagVal, data, pCol->col.bytes); + memcpy(pTag->tagVal, data, pCol->col.bytes); // TODO not memcpy?? } } } diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 335d4d57504368f5256302fca330a50130f5263a..dd9d5d1958ff05b7be71b1724a773c80cd0643b2 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -355,7 +355,7 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -bool hasRemainData(SGroupResInfo* pGroupResInfo) { +bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->pRows == NULL) { return false; } @@ -363,6 +363,14 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo) { return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows); } +bool hasRemainData(SGroupResInfo* pGroupResInfo) { + if (hasRemainDataInCurrentGroup(pGroupResInfo)) { + return true; + } + + return pGroupResInfo->currentGroup < pGroupResInfo->totalGroup; +} + bool incNextGroup(SGroupResInfo* pGroupResInfo) { return (++pGroupResInfo->currentGroup) < pGroupResInfo->totalGroup; } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 02e7fb9a8cf3390c75ab6c6a941fbb977e6cefc3..38c2ff83d5a06a73c5cf406d8548eaa8ffe33995 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -326,7 +326,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co (*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows); if (pQInfo->code == TSDB_CODE_SUCCESS) { - (*pRsp)->offset = htobe64(pQuery->limit.offset); + (*pRsp)->offset = htobe64(pQInfo->runtimeEnv.currentOffset); (*pRsp)->useconds = htobe64(pQInfo->summary.elapsedTime); } else { (*pRsp)->offset = 0; diff --git a/tests/script/general/parser/limit1_stb.sim b/tests/script/general/parser/limit1_stb.sim index a0e3a45d2fc28002e2e64b195191b72659849a53..513e2fac026c0d1d617ba4126d22c01f82527ca0 100644 --- a/tests/script/general/parser/limit1_stb.sim +++ b/tests/script/general/parser/limit1_stb.sim @@ -538,6 +538,7 @@ $offset = $offset + 1 sql select max(c1), min(c2), avg(c3), count(c4), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 interval(5m) limit $offset offset $offset $val = $rowNum - $offset if $rows != $val then + print expect $val, actual:$rows return -1 endi if $data00 != @18-10-22 02:30:00.000@ then