diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a937e68b72ce6f189012d44acebe748708f667de..d7d2f6d6782619833ff4f448ef5c3d09f03369bb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -240,6 +240,7 @@ typedef struct SQuery { typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; + void* qinfo; SQLFunctionCtx* pCtx; int32_t numOfRowsPerPage; @@ -267,14 +268,18 @@ typedef struct SQueryRuntimeEnv { SArithmeticSupport *sasArray; struct STableScanInfo* pi; - SSDataBlock *ouptputBuf; + SSDataBlock *outputBuf; int32_t groupIndex; int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - } SQueryRuntimeEnv; +typedef struct { + char* name; + void* info; +} SQEStage; + enum { QUERY_RESULT_NOT_READY = 1, QUERY_RESULT_READY = 2, @@ -325,7 +330,7 @@ typedef struct SQueryParam { } SQueryParam; typedef struct STableScanInfo { - SQInfo* pQInfo; + SQueryRuntimeEnv* pRuntimeEnv; void *pQueryHandle; int32_t numOfBlocks; int32_t numOfSkipped; @@ -341,7 +346,7 @@ typedef struct STableScanInfo { SSDataBlock block; int64_t elapsedTime; - SSDataBlock* (*apply)(void* param); + SSDataBlock* (*exec)(void* param); } STableScanInfo; typedef struct SAggOperatorInfo { diff --git a/src/query/inc/qSqlparser.h b/src/query/inc/qSqlparser.h index 77647813d688183edc343c349969c15c26edfb4d..70b6fbe0ba1e9b8b84b5eed6ddbb786c74e14269 100644 --- a/src/query/inc/qSqlparser.h +++ b/src/query/inc/qSqlparser.h @@ -45,7 +45,7 @@ typedef struct SLimitVal { typedef struct SOrderVal { uint32_t order; - int32_t orderColId; + int32_t orderColId; } SOrderVal; typedef struct tVariantListItem { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 79ddb857390f0948717f56bc8668e5072047c46e..af9daeac394fa9d0bb8bb45e47e387a656d87c0c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -159,8 +159,9 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, SDataStatis *pStatis, SExprInfo* pExprInfo); +static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *tsCol, SExprInfo* pExprInfo); -static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); + static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); @@ -172,8 +173,8 @@ static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArr static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STableIdInfo createTableIdInfo(SQuery* pQuery); -static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime); -static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime); +static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); +static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); static int32_t getNumOfScanTimes(SQuery* pQuery); static SSDataBlock* createOutputBuf(SQuery* pQuery) { @@ -975,6 +976,7 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { return NULL; } + // todo refactor static char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { if (pDataBlock == NULL) { @@ -1164,24 +1166,40 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc } } -static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, - SArray *pDataBlock) { + +static void setInputSDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + if (pRuntimeEnv->pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SColIndex *pCol = &pQuery->pExpr1[i].base.colInfo; + if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { + SColIndex* pColIndex = &pQuery->pExpr1[i].base.colInfo; + SColumnInfoData *p = taosArrayGet(pSDataBlock->pDataBlock, pColIndex->colIndex); + assert(p->info.colId == pColIndex->colId); + + pRuntimeEnv->pCtx[i].pInput = p->pData; + } + } + } +} + +static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; TSKEY *tsCols = NULL; - if (pDataBlock != NULL) { - SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData *pColInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); tsCols = (TSKEY *)(pColInfo->pData); } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); - setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); + setBlockStatisInfo(&pCtx[k], pSDataBlock, tsCols, &pQuery->pExpr1[k]); } - + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pExpr1[k].base.functionId; + int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { pCtx[k].startTs = pQuery->window.skey; aAggs[functionId].xFunction(&pCtx[k]); @@ -1817,6 +1835,37 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl return numOfRes; } + +void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, TSKEY *tsCol, SExprInfo* pExprInfo) { + SDataStatis *tpField = NULL; + pCtx->hasNull = hasNullValue(&pExprInfo->base.colInfo, pSDataBlock->pBlockStatis, &tpField); + + if (tpField != NULL) { + pCtx->preAggVals.isSet = true; + pCtx->preAggVals.statis = *tpField; + assert(pCtx->preAggVals.statis.numOfNull <= pSDataBlock->info.rows); + } else { + pCtx->preAggVals.isSet = false; + } + + pCtx->preAggVals.dataBlockLoaded = (pSDataBlock->pDataBlock != NULL); + + // limit/offset query will affect this value + pCtx->size = pSDataBlock->info.rows; + + uint32_t status = aAggs[pCtx->functionId].status; + if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { + pCtx->ptsList = tsCol; + } + + // set the statistics data for primary time stamp column +// if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { +// pCtx->preAggVals.isSet = true; +// pCtx->preAggVals.statis.min = pBlockInfo->window.skey; +// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey; +// } +} + void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo, SDataStatis *pStatis, SExprInfo* pExprInfo) { @@ -3248,9 +3297,8 @@ void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { } } -static void setupQueryRangeForReverseScan(SQInfo* pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; +static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery* pQuery = pRuntimeEnv->pQuery; int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); for(int32_t i = 0; i < numOfGroups; ++i) { @@ -3317,6 +3365,37 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { initCtxOutputBuf(pRuntimeEnv); } +void resetDefaultResInfoOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + int32_t tid = 0; + int64_t uid = 0; + SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid); + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; + SColumnInfoData* pData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, i); + + /* + * set the output buffer information and intermediate buffer + * not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc. + */ + SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i); + RESET_RESULT_INFO(pCellInfo); + + pCtx->resultInfo = pCellInfo; + pCtx->pOutput = pData->pData; + + // set the timestamp output buffer for top/bottom/diff query + int32_t functionId = pCtx->functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput; + } + } + + initCtxOutputBuf(pRuntimeEnv); +} + void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -3349,7 +3428,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - int32_t functionId = pQuery->pExpr1[j].base.functionId; + int32_t functionId = pRuntimeEnv->pCtx[j].functionId; pRuntimeEnv->pCtx[j].currentStage = 0; SResultRowCellInfo* pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); @@ -3412,6 +3491,38 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { } } +void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { + // for each group result, call the finalize function for each column + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SResultRow *pResult = getResultRow(pWindowResInfo, i); + + setResultOutputBuf(pRuntimeEnv, pResult); + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j]; + if (pCtx->functionId == TSDB_FUNC_TS) { // ignore more table + continue; + } + + aAggs[pCtx->functionId].xNextStep(pCtx); + } + } + } else { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j]; + if (pCtx->functionId == TSDB_FUNC_TS) { + continue; + } + + aAggs[pCtx->functionId].xNextStep(pCtx); + } + } +} + bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -3497,7 +3608,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pRuntimeEnv); - setupQueryRangeForReverseScan(pQInfo); + setupQueryRangeForReverseScan(pRuntimeEnv); // clean unused handle if (pRuntimeEnv->pSecQueryHandle != NULL) { @@ -3511,7 +3622,6 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI } static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) { - SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; if (pRuntimeEnv->pTsBuf) { @@ -3524,17 +3634,11 @@ static void setEnvBeforeReverseScan_rv(SQueryRuntimeEnv *pRuntimeEnv) { SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWITCH_ORDER(pQuery->order.order); - if (QUERY_IS_ASC_QUERY(pQuery)) { - assert(pQuery->window.skey <= pQuery->window.ekey); - } else { - assert(pQuery->window.skey >= pQuery->window.ekey); - } - SET_REVERSE_SCAN_FLAG(pRuntimeEnv); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pRuntimeEnv); - setupQueryRangeForReverseScan(pQInfo); + setupQueryRangeForReverseScan(pRuntimeEnv); } static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { @@ -4188,14 +4292,16 @@ static int16_t getNumOfFinalResCol(SQuery* pQuery) { } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->pExpr2 == NULL) { - for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { - int32_t bytes = pQuery->pExpr1[col].bytes; + SSDataBlock* pRes = pRuntimeEnv->outputBuf; - memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); - data += bytes * numOfRows; + for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, col); + memmove(data, pColRes->pData, pColRes->info.bytes * pRes->info.rows); + data += pColRes->info.bytes * pRes->info.rows; } } else { for (int32_t col = 0; col < pQuery->numOfExpr2; ++col) { @@ -4720,9 +4826,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); if (needReverseScan(pQuery)) { - pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery), 1); + pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { - pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pQInfo, getNumOfScanTimes(pQuery)); + pRuntimeEnv->pi = createTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); } if (pTsBuf != NULL) { @@ -4735,7 +4841,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); int32_t TENMB = 1024*1024*10; - if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) { + if (isSTableQuery && !onlyQueryTags(pQuery)) { code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TENMB, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; @@ -5495,7 +5601,7 @@ static int32_t doSaveContext(SQInfo *pQInfo) { setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); disableFuncInReverseScan(pRuntimeEnv); - setupQueryRangeForReverseScan(pQInfo); + setupQueryRangeForReverseScan(pRuntimeEnv); pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef); @@ -5677,20 +5783,26 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { pTableScanInfo->numOfBlocks += 1; // todo check for query cancel - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); - SDataStatis *pStatis = pBlock->pBlockStatis; - // this function never returns error? - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); - pTableScanInfo->numOfBlockStatis += 1; + uint32_t status; + int32_t code = loadDataBlockOnDemand(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis, + &pBlock->pDataBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTableScanInfo->pRuntimeEnv->env, code); + } - if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); - pTableScanInfo->numOfRows += pBlock->info.rows; + // current block is ignored according to filter result by block statistics data, continue load the next block + if (status == BLK_DATA_DISCARD) { + continue; } +// if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block +// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); +// pTableScanInfo->numOfRows += pBlock->info.rows; +// } + return pBlock; } @@ -5699,7 +5811,7 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { static SSDataBlock* doTableScan(void* param) { STableScanInfo * pTableScanInfo = (STableScanInfo *)param; - SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; while (pTableScanInfo->current < pTableScanInfo->times) { @@ -5716,8 +5828,7 @@ static SSDataBlock* doTableScan(void* param) { tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); pTableScanInfo->pQueryHandle = - tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, - pTableScanInfo->pQInfo, &pQuery->memRef); + tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pRuntimeEnv->qinfo, &pQuery->memRef); if (pTableScanInfo->pQueryHandle == NULL) { longjmp(pRuntimeEnv->env, terrno); } @@ -5732,7 +5843,7 @@ static SSDataBlock* doTableScan(void* param) { } qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); + pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey); } if (pTableScanInfo->reverseTimes > 0) { @@ -5742,11 +5853,10 @@ static SSDataBlock* doTableScan(void* param) { STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window); pTableScanInfo->pQueryHandle = - tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, - pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->query.memRef); + tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pRuntimeEnv->qinfo, &pQuery->memRef); qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); + pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey); pTableScanInfo->times = 1; pTableScanInfo->current = 0; @@ -5760,39 +5870,39 @@ static SSDataBlock* doTableScan(void* param) { return NULL; } -static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime) { +static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->apply = doTableScan; + pInfo->exec = doTableScan; pInfo->times = repeatTime; pInfo->reverseTimes = 0; pInfo->current = 0; - pInfo->pQInfo = pQInfo; + pInfo->pRuntimeEnv = pRuntimeEnv; return pInfo; } -static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime, int32_t reverseTime) { +static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->apply = doTableScan; + pInfo->exec = doTableScan; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; pInfo->current = 0; - pInfo->pQInfo = pQInfo; + pInfo->pRuntimeEnv = pRuntimeEnv; return pInfo; } -static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) { +static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) { return pTableScanInfo->current; } -static UNUSED_FUNC int32_t getScanOrder(STableScanInfo* pTableScanInfo) { +static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -5800,34 +5910,40 @@ static UNUSED_FUNC int32_t getScanOrder(STableScanInfo* pTableScanInfo) { static SSDataBlock* doAggOperator(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; + STableScanInfo* pTableScanInfo = pInfo->pTableScanInfo; int32_t countId = 0; - int32_t order = getScanOrder(pInfo->pTableScanInfo); + int32_t order = getTableScanOrder(pInfo->pTableScanInfo); + + resetDefaultResInfoOutputBuf_rv(pInfo->pRuntimeEnv); + pRuntimeEnv->pQuery->pos = 0; while(1) { - SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo); + SSDataBlock* pBlock = pTableScanInfo->exec(pTableScanInfo); if (pBlock == NULL) { break; } - if (countId != getTableScanTime(pInfo->pTableScanInfo)) { - needRepeatScan(pRuntimeEnv); - countId = getTableScanTime(pInfo->pTableScanInfo); + if (countId != getTableScanId(pTableScanInfo)) { + prepareRepeatTableScan(pRuntimeEnv); + countId = getTableScanId(pTableScanInfo); } - if (order != getScanOrder(pInfo->pTableScanInfo)) { - setEnvBeforeReverseScan_rv(pRuntimeEnv); - order = getScanOrder(pInfo->pTableScanInfo); + // the order has changed + if (order != getTableScanOrder(pTableScanInfo)) { + order = getTableScanOrder(pTableScanInfo); } - aggApplyFunctions(pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock); + // the pDataBlock are alway the same one, no need to call this again + setInputSDataBlock(pRuntimeEnv, pBlock); + aggApplyFunctions(pRuntimeEnv, pBlock); } setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); - pRuntimeEnv->ouptputBuf->info.rows = getNumOfResult(pRuntimeEnv); - return pRuntimeEnv->ouptputBuf; + pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); + return pRuntimeEnv->outputBuf; } // todo set the attribute of query scan count @@ -5877,8 +5993,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) // doSecondaryArithmeticProcess(pQuery); // TODO limit/offset refactor to be one operator - skipResults(pRuntimeEnv); - limitOperator(pQuery, pQInfo); +// skipResults(pRuntimeEnv); +// limitOperator(pQuery, pQInfo); } static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { @@ -6891,7 +7007,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr } doUpdateExprColumnIndex(pQuery); - pQInfo->runtimeEnv.ouptputBuf = createOutputBuf(pQuery); + pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery); int32_t ret = createFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) {