From b9867fab6a99f278f9d1ab6e918d21bbed83e9a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Feb 2021 10:48:34 +0800 Subject: [PATCH] [td-2895] refactor. --- src/query/inc/qExecutor.h | 4 +- src/query/src/qAggMain.c | 16 +++++++- src/query/src/qExecutor.c | 77 +++++++++++++++++++++------------------ 3 files changed, 59 insertions(+), 38 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 07ae0b27f8..5e47f399bb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -356,9 +356,11 @@ typedef struct STableScanInfo { SQLFunctionCtx *pCtx; // next operator query context SResultRowInfo *pResultRowInfo; + int32_t *rowCellInfoOffset; + SExprInfo *pExpr; int32_t numOfOutput; - int32_t *rowCellInfoOffset; + int64_t elapsedTime; } STableScanInfo; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index b191c33048..2b69eb57a8 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -1352,7 +1352,21 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) { } static void stddev_function(SQLFunctionCtx *pCtx) { - SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); + + if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { + pStd->stage++; + avg_finalizer(pCtx); + + pResInfo->initialized = true; // set it initialized to avoid re-initialization + + // save average value into tmpBuf, for second stage scan + SAvgInfo *pAvg = GET_ROWCELL_INTERBUF(pResInfo); + + pStd->avg = GET_DOUBLE_VAL(pCtx->pOutput); + assert((isnan(pAvg->sum) && pAvg->num == 0) || (pStd->num == pAvg->num && pStd->avg == pAvg->sum)); + } if (pStd->stage == 0) { // the first stage is to calculate average value diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 37a4bbc274..df753d6f18 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1126,6 +1126,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].size = pBlock->info.rows; pCtx[i].order = order; + pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag; setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); } @@ -1152,6 +1153,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].size = pBlock->info.rows; pCtx[i].order = order; + pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag; setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); @@ -1181,7 +1183,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); +// setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { @@ -1925,15 +1927,13 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde pCtx->hasNull = hasNullRv(pColIndex, pStatis); - // limit/offset query will affect this value - pCtx->size = pSDataBlock->info.rows; #if 0 // set the statistics data for primary time stamp column -// if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { -// pCtx->preAggVals.isSet = true; -// pCtx->preAggVals.statis.min = pBlockInfo->window.skey; -// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey; -// } + if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + pCtx->preAggVals.isSet = true; + pCtx->preAggVals.statis.min = pBlockInfo->window.skey; + pCtx->preAggVals.statis.max = pBlockInfo->window.ekey; + } #endif } @@ -2222,7 +2222,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } - if (!pQuery->stableQuery) { // TODO this problem should be handed at the client side + if (!pQuery->stableQuery || isProjQuery(pQuery)) { // TODO this problem should be handed at the client side if (pQuery->limit.offset > 0) { pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); } @@ -2654,10 +2654,10 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) -static bool doDataBlockStaticFilter(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { +static bool doFilterOnBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { SQuery* pQuery = pRuntimeEnv->pQuery; - if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pQuery->topBotQuery))) { + if (pDataStatis == NULL || pQuery->numOfFilterCols == 0) { return true; } @@ -2783,7 +2783,7 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte qualified = false; for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) { - SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j]; + SColumnFilterElem *pFilterElem = &pFilterInfo[k].pFilters[j]; bool isnull = isNull(pElem, pFilterInfo[k].info.type); if (isnull) { @@ -2858,12 +2858,12 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte pBlock->pBlockStatis = NULL; // clean the block statistics info if (start > 0) { - SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0); assert(pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX); - pBlock->info.window.skey = *(int64_t *)pColumnInfoData->pData; - pBlock->info.window.ekey = *(int64_t *)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1)); + pBlock->info.window.skey = *(int64_t*)pColumnInfoData->pData; + pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1)); } } @@ -2905,11 +2905,12 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* } } - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base; + int32_t numOfOutput = pTableScanInfo->numOfOutput; + SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; - int32_t functionId = pSqlFunc->functionId; - int32_t colId = pSqlFunc->colInfo.colId; + for (int32_t i = 0; i < numOfOutput; ++i) { + int32_t functionId = pCtx[i].functionId; + int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; // group by + first/last should not apply the first/last block filter if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) { @@ -2968,7 +2969,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* } // current block has been discard due to filter applied - if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + if (!doFilterOnBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; @@ -2983,6 +2984,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* } if (pQuery->numOfFilterCols > 0) { + // set the initial static data value filter expression if (pQuery->pFilterInfo[0].pData == NULL) { for(int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { @@ -3547,7 +3549,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR assert(pCtx[i].pOutput != NULL); // set the timestamp output buffer for top/bottom/diff query - int32_t functionId = pCtx->functionId; + int32_t functionId = pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { pCtx[i].ptsOutputBuf = pCtx[0].pOutput; } @@ -4198,7 +4200,6 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, bufPage, pResult->offset, offset); - pCtx[i].currentStage = 0; offset += pCtx[i].outputBytes; int32_t functionId = pCtx[i].functionId; @@ -5975,8 +5976,6 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { // } } - - #if 0 static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -6062,6 +6061,7 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { SSDataBlock *pBlock = &pTableScanInfo->block; SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; + STableGroupInfo* pTableGroupInfo = &pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { pTableScanInfo->numOfBlocks += 1; @@ -6069,18 +6069,14 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { // todo check for query cancel tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); - if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1 || - (pQuery->current == NULL && pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)) { - STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( - pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.map, &pBlock->info.tid, sizeof(pBlock->info.tid)); + if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) { + STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); if (pTableQueryInfo == NULL) { break; } pQuery->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); - } else if (pTableScanInfo->pRuntimeEnv->pQuery->current == NULL) { - } // this function never returns error? @@ -6091,7 +6087,7 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { } // current block is ignored according to filter result by block statistics data, continue load the next block - if (status == BLK_DATA_DISCARD) { + if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) { continue; } @@ -6147,6 +6143,8 @@ static SSDataBlock* doTableScan(void* param) { qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey); + pRuntimeEnv->scanFlag = REVERSE_SCAN; + pTableScanInfo->times = 1; pTableScanInfo->current = 0; pTableScanInfo->reverseTimes = 0; @@ -6187,6 +6185,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { assert(pTableScanInfo != NULL && pDownstream != NULL); + pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr + pTableScanInfo->numOfOutput = pDownstream->numOfOutput; + char* name = pDownstream->name; if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { SAggOperatorInfo* pAggInfo = pDownstream->info; @@ -6248,14 +6249,14 @@ static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQu return pOptr; } -static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) { - return pTableScanInfo->current; -} - static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } +//static int32_t getTableScanFlag(STableScanInfo* pTableScanInfo) { +// return pTableScanInfo-> +//} + // this is a blocking operator static SSDataBlock* doAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; @@ -6388,7 +6389,11 @@ static SSDataBlock* doArithmeticOperation(void* param) { } } - return pArithInfo->binfo.pRes; + if (pArithInfo->binfo.pRes->info.rows > 0) { + return pArithInfo->binfo.pRes; + } else { + return NULL; + } } static SSDataBlock* doLimit(void* param) { -- GitLab