From 9b224d9bbcda2fd36b0d08b69364d786d6cf191b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Feb 2021 15:56:55 +0800 Subject: [PATCH] [td-225] refactor --- src/query/inc/qExecutor.h | 27 +++--- src/query/src/qExecutor.c | 175 +++++++++++++++++++++++++++++--------- 2 files changed, 151 insertions(+), 51 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index fa2bfb5f29..5a1e317c49 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -237,6 +237,17 @@ typedef struct SQuery { STableGroupInfo tableGroupInfo; // table list SArray } SQuery; +typedef SSDataBlock* (*__operator_fn_t)(void* param); + +typedef struct SOperatorInfo { + char *name; + bool blockingOptr; + void *optInfo; + + __operator_fn_t exec; + struct SOperatorInfo *upstream; +} SOperatorInfo; + typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; @@ -329,8 +340,6 @@ typedef struct SQueryParam { SSqlGroupbyExpr *pGroupbyExpr; } SQueryParam; -typedef SSDataBlock* (*__operator_fn_t)(void* param); - typedef struct STableScanInfo { SQueryRuntimeEnv *pRuntimeEnv; void *pQueryHandle; @@ -350,23 +359,19 @@ typedef struct STableScanInfo { int64_t elapsedTime; } STableScanInfo; -typedef struct SOperatorInfo { - char *name; - bool blockingOptr; - void *optInfo; - - __operator_fn_t exec; -} SOperatorInfo; - SOperatorInfo optrList[5]; typedef struct SAggOperatorInfo { SResultRowInfo *pResultRowInfo; STableQueryInfo *pTableQueryInfo; - SOperatorInfo *prevOptr; SQueryRuntimeEnv *pRuntimeEnv; } SAggOperatorInfo; +typedef struct SArithOperatorInfo { + STableQueryInfo *pTableQueryInfo; + SQueryRuntimeEnv *pRuntimeEnv; +} SArithOperatorInfo; + void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3251dd8ead..2915987bff 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -173,8 +173,8 @@ static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArr static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STableIdInfo createTableIdInfo(SQuery* pQuery); -static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); -static STableScanInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); +static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); +static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); static int32_t getNumOfScanTimes(SQuery* pQuery); static SSDataBlock* createOutputBuf(SQuery* pQuery) { @@ -1220,6 +1220,63 @@ static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSData } } +static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) { + SArithmeticSupport arithSup = {0}; + + SQuery *pQuery = pRuntimeEnv->pQuery; + + // create the output result buffer + tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES); + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + int32_t bytes = pQuery->pExpr2[i].bytes; + data[i] = (tFilePage *)malloc((size_t)(bytes * pQuery->rec.rows) + sizeof(tFilePage)); + } + + arithSup.numOfCols = (int32_t)pSDataBlock->info.numOfCols; + arithSup.exprList = pQuery->pExpr1; + arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); + + // set the input column data + for (int32_t f = 0; f < arithSup.numOfCols; ++f) { + SColumnInfoData *pColumnInfoData = taosArrayGet(pSDataBlock->pDataBlock, f); + arithSup.data[f] = pColumnInfoData->pData; + } + + // output result number of columns + for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) { + for (int i = 0; i < pQuery->numOfExpr2; ++i) { + SExprInfo *pExpr = &pQuery->pExpr2[i]; + + // calculate the result from several other columns + SSqlFuncMsg *pSqlFunc = &pExpr->base; + if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + if (pSqlFunc->functionId == pQuery->pExpr1[j].base.functionId && + pSqlFunc->colInfo.colId == pQuery->pExpr1[j].base.colInfo.colId) { + memcpy(data[i]->data, pQuery->sdata[j]->data, (size_t)(pQuery->pExpr1[j].bytes * pSDataBlock->info.rows)); + break; + } + } + } else { + arithSup.pArithExpr = pExpr; + arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup, + TSDB_ORDER_ASC, getArithemicInputSrc); + } + } + + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + memcpy(pQuery->sdata[i]->data, data[i]->data, (size_t)(pQuery->pExpr2[i].bytes * pQuery->rec.rows)); + } + + for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) { + tfree(data[i]); + } + + tfree(data); + tfree(arithSup.data); + } +} + /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -5890,17 +5947,16 @@ static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle pOptr->name = "SeqScanTableOp"; pOptr->blockingOptr = false; pOptr->optInfo = pInfo; - pOptr->exec = doScanTable; + pOptr->exec = doTableScan; return pOptr; } -static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { +static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->exec = doTableScan; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; @@ -5908,7 +5964,14 @@ static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQ pInfo->order = pRuntimeEnv->pQuery->order.order; pInfo->pRuntimeEnv = pRuntimeEnv; - return pInfo; + + SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); + pOptr->name = "BidirectionSeqScanTableOp"; + pOptr->blockingOptr = false; + pOptr->optInfo = pInfo; + pOptr->exec = doTableScan; + + return pOptr; } static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) { @@ -5920,34 +5983,53 @@ static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { } // this is a blocking operator -static SSDataBlock* doAggOperator(void* param) { - SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; - SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; - STableScanInfo* pTableScanInfo = pInfo->pTableScanInfo; +static SSDataBlock* doAggregation(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; - int32_t countId = 0; - int32_t order = getTableScanOrder(pInfo->pTableScanInfo); + SAggOperatorInfo* pAggInfo = pOperator->optInfo; + SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; - resetDefaultResInfoOutputBuf_rv(pInfo->pRuntimeEnv); + SOperatorInfo* upstream = pOperator->upstream; + + resetDefaultResInfoOutputBuf_rv(pRuntimeEnv); pRuntimeEnv->pQuery->pos = 0; while(1) { - SSDataBlock* pBlock = pTableScanInfo->exec(pTableScanInfo); + SSDataBlock* pBlock = upstream->exec(upstream->optInfo); if (pBlock == NULL) { break; } - if (countId != getTableScanId(pTableScanInfo) && order == getTableScanOrder(pTableScanInfo)) { - prepareRepeatTableScan(pRuntimeEnv); - countId = getTableScanId(pTableScanInfo); - } + // the pDataBlock are always the same one, no need to call this again + setInputSDataBlock(pRuntimeEnv, pBlock); + aggApplyFunctions(pRuntimeEnv, pBlock); + } + + setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); + finalizeQueryResult(pRuntimeEnv); + + pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); + return pRuntimeEnv->outputBuf; +} + +static SSDataBlock* doArithmeticOperation(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + SArithOperatorInfo* pArithInfo = pOperator->optInfo; + SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv; - // the order has changed - if (order != getTableScanOrder(pTableScanInfo)) { - order = getTableScanOrder(pTableScanInfo); + SOperatorInfo* upstream = pOperator->upstream; + + resetDefaultResInfoOutputBuf_rv(pRuntimeEnv); + pRuntimeEnv->pQuery->pos = 0; + + while(1) { + SSDataBlock* pBlock = upstream->exec(upstream->optInfo); + if (pBlock == NULL) { + break; } - // the pDataBlock are alway the same one, no need to call this again + // the pDataBlock are always the same one, no need to call this again setInputSDataBlock(pRuntimeEnv, pBlock); aggApplyFunctions(pRuntimeEnv, pBlock); } @@ -5971,29 +6053,43 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) { return 1; } -//void createBasicOperatorInfo() { -// optrList[0].name = "SeqScanTableOp"; -// optrList[0].blockingOptr = false; -// optrList[0].exec = doTableScan; -// -// optrList[0].name = "SeqScanTableOp"; -// optrList[0].blockingOptr = false; -// optrList[0].exec = doTableScan; -//} - -static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, - SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) { +static UNUSED_FUNC SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, + SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); pInfo->pResultRowInfo = pResultRowInfo; pInfo->pTableQueryInfo = pTableQueryInfo; - pInfo->pTableScanInfo = pTableScanInfo; pInfo->pRuntimeEnv = pRuntimeEnv; - pInfo->apply = doAggOperator; - return pInfo; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "AggregationOp"; + pOperator->blockingOptr = true; + pOperator->optInfo = pInfo; + pOperator->upstream = inputOptr; + pOperator->exec = doAggregation; + + return pOperator; } +static UNUSED_FUNC SOperatorInfo* createArithOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, + SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { + SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); + + pInfo->pTableQueryInfo = pTableQueryInfo; + pInfo->pRuntimeEnv = pRuntimeEnv; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "ArithmeticOp"; + pOperator->blockingOptr = false; + pOperator->optInfo = pInfo; + pOperator->upstream = inputOptr; + pOperator->exec = doArithmeticOperation; + + return pOperator; +} + +static + /* * in each query, this function will be called only once, no retry for further result. * @@ -6008,10 +6104,9 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) return; } - SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); - SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo); + SOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + SSDataBlock* pResBlock = pAggInfo->exec(pAggInfo->optInfo); - // since the numOfRows must be identical for all functions that are allowed to be executed simutaneously. pQuery->rec.rows = pResBlock->info.rows; // doSecondaryArithmeticProcess(pQuery); -- GitLab