From d57b3f381ddefeba03cff1e6041b00ec25a88dce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Mar 2021 10:28:40 +0800 Subject: [PATCH] [td-2895] refactor. --- src/query/inc/qExecutor.h | 21 ++---- src/query/src/qExecutor.c | 138 +++++++++----------------------------- 2 files changed, 38 insertions(+), 121 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 0dedacf7a9..de92006877 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#ifndef TDENGINE_QUERYEXECUTOR_H -#define TDENGINE_QUERYEXECUTOR_H +#ifndef TDENGINE_QEXECUTOR_H +#define TDENGINE_QEXECUTOR_H #include "os.h" @@ -45,21 +45,16 @@ enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, - /* result output buffer is full, current query is paused. - * this status is only exist in group-by clause and diff/add/division/multiply/ query. - */ - QUERY_RESBUF_FULL = 0x2u, - /* query is over * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc. * 2. when all data within queried time window, it is also denoted as query_completed */ - QUERY_COMPLETED = 0x4u, + QUERY_COMPLETED = 0x2u, /* when the result is not completed return to client, this status will be * usually used in case of interval query with interpolation option */ - QUERY_OVER = 0x8u, + QUERY_OVER = 0x4u, }; typedef struct SResultRowPool { @@ -197,7 +192,6 @@ typedef struct SQuery { int32_t interBufSize; // intermediate buffer sizse SOrderVal order; - int16_t numOfCols; int16_t numOfTags; @@ -242,7 +236,6 @@ typedef struct SQueryRuntimeEnv { uint32_t status; // query status void* qinfo; uint16_t scanFlag; // denotes reversed scan of data or not -// SFillInfo* pFillInfo; // todo move to operatorInfo void* pQueryHandle; int32_t prevGroupId; // previous executed group id @@ -353,8 +346,6 @@ typedef struct SQueryParam { } SQueryParam; typedef struct STableScanInfo { - SQueryRuntimeEnv *pRuntimeEnv; - void *pQueryHandle; int32_t numOfBlocks; int32_t numOfSkipped; @@ -372,7 +363,6 @@ typedef struct STableScanInfo { SExprInfo *pExpr; SSDataBlock block; bool loadExternalRows; // load external rows (prev & next rows) - bool externalLoaded; // external rows loaded int32_t numOfOutput; int64_t elapsedTime; @@ -445,7 +435,6 @@ bool doBuildResCheck(SQInfo* pQInfo); void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); bool onlyQueryTags(SQuery* pQuery); -void tableQueryImpl(SQInfo *pQInfo); bool isValidQInfo(void *param); int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); @@ -457,4 +446,4 @@ void freeQInfo(SQInfo *pQInfo); int32_t getMaximumIdleDurationSec(); -#endif // TDENGINE_QUERYEXECUTOR_H +#endif // TDENGINE_QEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e5ca038d02..d2298fa6dc 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -49,6 +49,11 @@ enum { TS_JOIN_TAG_NOT_EQUALS = 2, }; +typedef enum SResultTsInterpType { + RESULT_ROW_START_INTERP = 1, + RESULT_ROW_END_INTERP = 2, +} SResultTsInterpType; + typedef struct { int32_t status; // query status TSKEY lastKey; // the lastKey value before query executed @@ -135,8 +140,8 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { } tw->ekey -= 1; } -static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); +static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols, int32_t* rowCellInfoOffset); @@ -180,6 +185,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); +static void destroyOperatorInfo(SOperatorInfo* pOperator); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); @@ -187,9 +193,8 @@ static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx * pCtx, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex, int32_t* offset); -static void destroyOperatorInfo(SOperatorInfo* pOperator); -void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); -void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); +static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); +static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static bool isPointInterpoQuery(SQuery *pQuery); // setup the output buffer for each operator @@ -234,14 +239,13 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3 int64_t maxOutput = 0; for (int32_t j = 0; j < numOfOutput; ++j) { - int32_t functionId = pCtx[j].functionId; + int32_t id = pCtx[j].functionId; /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output */ - if (hasMainFunction && - (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) { + if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ)) { continue; } @@ -255,14 +259,14 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3 return maxOutput; } -static void setNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) { +static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t j = 0; j < numOfOutput; ++j) { SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]); pResInfo->numOfRes = 0; } } -bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) { +static bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) { if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) { return false; } @@ -282,7 +286,7 @@ bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) { return false; } -bool isStabledev(SQuery* pQuery) { +static bool isStabledev(SQuery* pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functId = pQuery->pExpr1[i].base.functionId; if (functId == TSDB_FUNC_STDDEV_DST) { @@ -293,30 +297,6 @@ bool isStabledev(SQuery* pQuery) { return false; } -int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) { - assert(pGroupbyExpr != NULL); - - int32_t colId = -2; - int16_t type = TSDB_DATA_TYPE_NULL; - - for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { - SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i); - if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) { - colId = pColIndex->colId; - break; - } - } - - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - if (colId == pQuery->colList[i].colId) { - type = pQuery->colList[i].type; - break; - } - } - - return type; -} - static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput) { bool hasTags = false; int32_t numOfSelectivity = 0; @@ -619,16 +599,6 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow return TSDB_CODE_SUCCESS; } -static UNUSED_FUNC bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) { - assert(slot >= 0 && slot < pWindowResInfo->size); - return pWindowResInfo->pResult[slot]->closed; -} - -typedef enum SResultTsInterpType { - RESULT_ROW_START_INTERP = 1, - RESULT_ROW_END_INTERP = 2, -} SResultTsInterpType; - static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); if (type == RESULT_ROW_START_INTERP) { @@ -4073,10 +4043,13 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { } } -static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { +static SSDataBlock* doTableScanImpl(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + STableScanInfo* pTableScanInfo = pOperator->info; SSDataBlock* pBlock = &pTableScanInfo->block; - SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery; - STableGroupInfo* pTableGroupInfo = &pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo; + SQuery* pQuery = pOperator->pRuntimeEnv->pQuery; + STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { pTableScanInfo->numOfBlocks += 1; @@ -4097,9 +4070,9 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { // this function never returns error? uint32_t status; - int32_t code = loadDataBlockOnDemand(pTableScanInfo->pRuntimeEnv, pTableScanInfo, pBlock, &status); + int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTableScanInfo->pRuntimeEnv->env, code); + longjmp(pOperator->pRuntimeEnv->env, code); } // current block is ignored according to filter result by block statistics data, continue load the next block @@ -4117,13 +4090,13 @@ static SSDataBlock* doTableScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; STableScanInfo *pTableScanInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; while (pTableScanInfo->current < pTableScanInfo->times) { - SSDataBlock* p = doTableScanImpl(pTableScanInfo); + SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { return p; } @@ -4176,7 +4149,7 @@ static SSDataBlock* doTableScan(void* param) { pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; } - SSDataBlock* p = doTableScanImpl(pTableScanInfo); + SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { return p; } @@ -4185,32 +4158,6 @@ static SSDataBlock* doTableScan(void* param) { return NULL; } -static SSDataBlock* doSeqTableBlocksScan(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - - STableScanInfo *pTableScanInfo = pOperator->info; -// SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv; - -// int32_t totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables; - -// while (1) { - return doTableScanImpl(pTableScanInfo); -// SSDataBlock* p = doTableScanImpl(pTableScanInfo); -// if (p != NULL) { -// return p; -// } - - // try the next table -// if (++pTableScanInfo->tableIndex >= totalTables) { -// return NULL; -// } -// -// setTableQueryHandle(pRuntimeEnv, pTableScanInfo->tableIndex); -// pTableScanInfo->pQueryHandle = pRuntimeEnv->pQueryHandle; -// pTableScanInfo->externalLoaded = false; -// } -} - static SSDataBlock* doBlockInfoScan(void* param) { SOperatorInfo *pOperator = (SOperatorInfo*)param; if (pOperator->status == OP_EXEC_DONE) { @@ -4220,7 +4167,7 @@ static SSDataBlock* doBlockInfoScan(void* param) { STableScanInfo *pTableScanInfo = pOperator->info; STableBlockDist tableBlockDist = {0}; - tableBlockDist.numOfTables = pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; + tableBlockDist.numOfTables = pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; tableBlockDist.dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo)); tsdbGetFileBlocksDistInfo(pTableScanInfo->pQueryHandle, &tableBlockDist); @@ -4258,7 +4205,6 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQuery->order.order; pInfo->current = 0; - pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableScanOperator"; @@ -4266,6 +4212,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; + pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->exec = doTableScan; return pOperator; @@ -4279,7 +4226,6 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQuery->order.order; pInfo->current = 0; - pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableSeqScanOperator"; @@ -4288,7 +4234,8 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; - pOperator->exec = doSeqTableBlocksScan; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doTableScanImpl; return pOperator; } @@ -4297,7 +4244,6 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->pRuntimeEnv = pRuntimeEnv; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); SColumnInfoData infoData = {{0}}; @@ -4372,11 +4318,11 @@ static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQuery pInfo->reverseTimes = reverseTime; pInfo->current = 0; pInfo->order = pRuntimeEnv->pQuery->order.order; - pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->operatorType = OP_DataBlocksOptScan; + pOptr->pRuntimeEnv = pRuntimeEnv; pOptr->blockingOptr = false; pOptr->info = pInfo; pOptr->exec = doTableScan; @@ -4519,12 +4465,12 @@ static SSDataBlock* doArithmeticOperation(void* param) { arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - if (pInfo->pRes->info.rows >= 4096) { + if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; } } - setNumOfRes(pInfo->pCtx, pOperator->numOfOutput); + clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } @@ -5242,24 +5188,6 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf return pOperator; } -void tableQueryImpl(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; -// SQuery * pQuery = pRuntimeEnv->pQuery; - - // number of points returned during this query - int64_t st = taosGetTimestampUs(); - -// assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); - -// SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0); -// pQuery->current = taosArrayGetP(g, 0); - - pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - - // record the total elapsed time - pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); -} - static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { int32_t j = 0; @@ -6166,7 +6094,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr // todo refactor pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); - + qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; -- GitLab