From 602d2d1cc6f81e5005bb6c4368d9817a0aec57ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 7 Feb 2021 14:03:09 +0800 Subject: [PATCH] [td-225] refactor --- src/query/inc/qExecutor.h | 13 ++-- src/query/src/qExecutor.c | 142 ++++++++++++++++++++++---------------- 2 files changed, 89 insertions(+), 66 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index fce798d929..f63ba8604a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -179,6 +179,12 @@ typedef struct { SArray* pResult; // SArray } SInterResult; +typedef struct SSDataBlock { + SDataStatis *pBlockStatis; + SArray *pDataBlock; + SDataBlockInfo info; +} SSDataBlock; + typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; @@ -214,6 +220,7 @@ typedef struct SQuery { SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; + SSDataBlock *ouptputBuf; } SQuery; typedef struct SQueryRuntimeEnv { @@ -307,12 +314,6 @@ typedef struct SQueryParam { SSqlGroupbyExpr *pGroupbyExpr; } SQueryParam; -typedef struct SSDataBlock { - SDataStatis *pBlockStatis; - SArray *pDataBlock; - SDataBlockInfo info; -} SSDataBlock; - typedef struct STableScanInfo { SQInfo* pQInfo; void *pQueryHandle; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index df787626a4..e12d1fba1a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -176,6 +176,22 @@ static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQ static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime); static int32_t getNumOfScanTimes(SQuery* pQuery); +static SSDataBlock* createOutputBuf(SQuery* pQuery) { + // setup the output buffer + SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); + res->info.numOfCols = pQuery->numOfOutput; + + res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData)); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SColumnInfoData idata = {0}; + idata.info.type = pQuery->pExpr1[i].type; + idata.info.bytes = pQuery->pExpr1[i].bytes; + idata.info.colId = pQuery->pExpr1[i].base.resColId; + idata.pData = calloc(4096, idata.info.bytes); + taosArrayPush(res->pDataBlock, &idata); + } +} + bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; @@ -1150,6 +1166,36 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc } } +static void aggApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, + SArray *pDataBlock) { + SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; + SQuery * pQuery = pRuntimeEnv->pQuery; + + TSKEY *tsCols = NULL; + if (pDataBlock != NULL) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); + tsCols = (TSKEY *)(pColInfo->pData); + } + + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock); + setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]); + } + + /* + * the sqlfunctionCtx parameters should be set done before all functions are invoked, + * since the selectivity + tag_prj query needs all parameters been set done. + * tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY + */ + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + int32_t functionId = pQuery->pExpr1[k].base.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + pCtx[k].startTs = pQuery->window.skey; + aAggs[functionId].xFunction(&pCtx[k]); + } + } +} + /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -5634,37 +5680,46 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { tfree(arithSup.data); } -static SSDataBlock* doTableScan(void* param) { - STableScanInfo * pTableScanInfo = (STableScanInfo *)param; - SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv; - +static SSDataBlock* doScanImpl(STableScanInfo *pTableScanInfo) { SSDataBlock *pBlock = &pTableScanInfo->block; - while (pTableScanInfo->current < pTableScanInfo->times) { - while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { - pTableScanInfo->numOfBlocks += 1; - // todo check for query cancel + while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { + pTableScanInfo->numOfBlocks += 1; - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + // todo check for query cancel - SDataStatis *pStatis = pBlock->pBlockStatis; + tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); - // this function never returns error? - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); - pTableScanInfo->numOfBlockStatis += 1; + SDataStatis *pStatis = pBlock->pBlockStatis; - if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); - pTableScanInfo->numOfRows += pBlock->info.rows; - } + // this function never returns error? + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); + pTableScanInfo->numOfBlockStatis += 1; - return pBlock; + if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block + pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); + pTableScanInfo->numOfRows += pBlock->info.rows; + } + + return pBlock; + } +} + +static SSDataBlock* doTableScan(void* param) { + STableScanInfo * pTableScanInfo = (STableScanInfo *)param; + SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv; + + while (pTableScanInfo->current < pTableScanInfo->times) { + SSDataBlock* p = doScanImpl(pTableScanInfo); + if (p != NULL) { + return p; } if (++pTableScanInfo->current >= pTableScanInfo->times) { return NULL; } + // do prepare for the next round table scan operation tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); pTableScanInfo->pQueryHandle = @@ -5697,29 +5752,16 @@ static SSDataBlock* doTableScan(void* param) { tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef); - while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { - pTableScanInfo->numOfBlocks += 1; - - // todo check for query cancel - tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); - - SDataStatis *pStatis = pBlock->pBlockStatis; - - // this function never returns error? - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis); - pTableScanInfo->numOfBlockStatis += 1; + qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); - if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block - pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); - pTableScanInfo->numOfRows += pBlock->info.rows; - } + pTableScanInfo->times = 1; + pTableScanInfo->current = 0; - return pBlock; + SSDataBlock* p = doScanImpl(pTableScanInfo); + if (p != NULL) { + return p; } - - - qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, - pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey); } return NULL; @@ -5761,27 +5803,6 @@ static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) { static SSDataBlock* doAggOperator(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; - // setup the output buffer - SSDataBlock* res = calloc(1, sizeof(SSDataBlock)); - - SQuery* pQuery = pInfo->pRuntimeEnv->pQuery; - res->info.numOfCols = pQuery->numOfOutput; - - res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - - SColumnInfoData idata = {0}; - idata.info.type = pQuery->pExpr1[i].type; - idata.info.bytes = pQuery->pExpr1[i].bytes; - idata.info.colId = pQuery->pExpr1[i].base.resColId; - idata.pData = calloc(4096, idata.info.bytes); - taosArrayPush(res->pDataBlock, &idata); - - pInfo->pRuntimeEnv->pCtx[i].pOutput = idata.pData; - } - - pQuery->pos = 0; - int32_t countId = 0; while(1) { SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo); @@ -5793,7 +5814,7 @@ static SSDataBlock* doAggOperator(void* param) { needRepeatScan(pInfo->pRuntimeEnv); } - blockwiseApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pInfo->pResultRowInfo, binarySearchForKey, pBlock->pDataBlock); + aggApplyFunctions_rv(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock); } setQueryStatus(pQuery, QUERY_COMPLETED); @@ -6872,6 +6893,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr } doUpdateExprColumnIndex(pQuery); + pQuery->ouptputBuf = createOutputBuf(pQuery); int32_t ret = createFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { -- GitLab