From 7488757485fdab87ef0317f39113a2b28c529ca3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Mar 2021 14:43:51 +0800 Subject: [PATCH] [td-2895] refactor. --- src/query/inc/qExecutor.h | 43 ++-- src/query/inc/qFill.h | 2 - src/query/src/qExecutor.c | 425 ++++++++++++++++---------------------- src/query/src/qFill.c | 36 +++- src/query/src/qUtil.c | 6 +- 5 files changed, 237 insertions(+), 275 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 5f101e9d5f..4db86c25aa 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -37,9 +37,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) -#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) -#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables)) - #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) @@ -241,10 +238,10 @@ struct SOperatorInfo; typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; - uint32_t status; // query status + uint32_t status; // query status void* qinfo; uint16_t scanFlag; // denotes reversed scan of data or not - SFillInfo* pFillInfo; // todo move to operatorInfo +// SFillInfo* pFillInfo; // todo move to operatorInfo void* pQueryHandle; int32_t prevGroupId; // previous executed group id @@ -262,14 +259,13 @@ typedef struct SQueryRuntimeEnv { SArithmeticSupport *sasArray; SSDataBlock *outputBuf; - int32_t tableIndex; //TODO remove it STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo *proot; struct SOperatorInfo *pTableScanner; // table scan operator SGroupResInfo groupResInfo; int64_t currentOffset; // dynamic offset value - SRspResultInfo resultInfo; + SRspResultInfo resultInfo; } SQueryRuntimeEnv; enum { @@ -278,15 +274,31 @@ enum { OP_EXEC_DONE = 3, }; +enum OPERATOR_TYPE_E { + OP_TableScan = 1, + OP_DataBlocksOptScan = 2, + OP_TableSeqScan = 3, + OP_TagScan = 4, + OP_Aggregate = 5, + OP_Arithmetic = 6, + OP_Groupby = 7, + OP_Limit = 8, + OP_Offset = 9, + OP_TimeInterval = 10, + OP_Fill = 11, + OP_MultiTableAggregate = 12, + OP_MultiTableTimeInterval = 13, +}; + typedef struct SOperatorInfo { - uint8_t operatorType; - bool blockingOptr; // block operator or not - uint8_t status; // denote if current operator is completed - int32_t numOfOutput; // number of columns of the current operator results - char *name; // name, used to show the query execution plan - void *info; // extension attribution - SExprInfo *pExpr; - SQueryRuntimeEnv *pRuntimeEnv; + uint8_t operatorType; + bool blockingOptr; // block operator or not + uint8_t status; // denote if current operator is completed + int32_t numOfOutput; // number of columns of the current operator results + char *name; // name, used to show the query execution plan + void *info; // extension attribution + SExprInfo *pExpr; + SQueryRuntimeEnv *pRuntimeEnv; struct SOperatorInfo *upstream; __operator_fn_t exec; @@ -402,6 +414,7 @@ typedef struct SOffsetOperatorInfo { } SOffsetOperatorInfo; typedef struct SFillOperatorInfo { + SFillInfo *pFillInfo; SSDataBlock *pRes; int64_t totalInputRows; } SFillOperatorInfo; diff --git a/src/query/inc/qFill.h b/src/query/inc/qFill.h index dc08dcce4e..00ac86caf4 100644 --- a/src/query/inc/qFill.h +++ b/src/query/inc/qFill.h @@ -80,8 +80,6 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); -void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput); - void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput); void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 80229f3986..86bbfb32d7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -27,8 +27,6 @@ #include "tlosertree.h" #include "ttype.h" -#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1) - /** * check if the primary column is load by default, otherwise, the program will * forced to load primary column explicitly. @@ -150,7 +148,6 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { } static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); -//static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols, int32_t* rowCellInfoOffset); @@ -168,31 +165,31 @@ static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); static STableIdInfo createTableIdInfo(SQuery* pQuery); -static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); -static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); -static SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows); - static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); static int32_t getNumOfScanTimes(SQuery* pQuery); static bool isFixedOutputQuery(SQuery* pQuery); -static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); -static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); -static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); -static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); +static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); +static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); +static SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows); +static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); +static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); +static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); +static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); + static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); @@ -1723,10 +1720,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // interval (down sampling operation) if (QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pQuery->stableQuery) { - pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); } else { - pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { @@ -1740,7 +1737,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else if (pQuery->groupbyColumn) { - pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { @@ -1748,7 +1745,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else if (isFixedOutputQuery(pQuery)) { if (pQuery->stableQuery && !isTsCompQuery(pQuery)) { - pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); + pRuntimeEnv->proot = createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); } else { pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); } @@ -1811,8 +1808,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->sasArray); } - pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo); - destroyResultBuf(pRuntimeEnv->pResultBuf); doFreeQueryHandle(pRuntimeEnv); @@ -2936,7 +2931,7 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { cleanupGroupResInfo(pGroupResInfo); if (!incNextGroup(pGroupResInfo)) { - SET_STABLE_QUERY_OVER(pRuntimeEnv); + break; } } @@ -3408,20 +3403,6 @@ bool requireTimestamp(SQuery *pQuery) { return false; } -bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { - /* - * 1. if skey or ekey locates in this block, we need to load the timestamp column to decide the precise position - * 2. if there are top/bottom, first_dst/last_dst functions, we need to load timestamp column in any cases; - */ - STimeWindow *w = &pDataBlockInfo->window; - STableQueryInfo* pTableQueryInfo = pQuery->current; - - bool loadPrimaryTS = (pTableQueryInfo->lastKey >= w->skey && pTableQueryInfo->lastKey <= w->ekey) || - (pQuery->window.ekey >= w->skey && pQuery->window.ekey <= w->ekey) || requireTimestamp(pQuery); - - return loadPrimaryTS; -} - //static int32_t doCopyToSData(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType) { // void* qinfo = pRuntimeEnv->qinfo; // SQuery *pQuery = pRuntimeEnv->pQuery; @@ -3592,48 +3573,44 @@ static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv, } } -bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; - - if (!Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { - return false; - } - - if (pQuery->limit.limit > 0 && pRuntimeEnv->resultInfo.total >= pQuery->limit.limit) { - return false; - } - - if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { - // There are results not returned to client yet, so filling applied to the remain result is required firstly. - if (taosFillHasMoreResults(pFillInfo)) { - return true; - } - - /* - * While the code reaches here, there are no results remains now. - * If query is not completed yet, the gaps between two results blocks need to be handled after next data block - * is retrieved from TSDB. - * - * NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result - * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the - * first result row in the actual result set will fill nothing. - */ - int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity); - return numOfTotal > 0; - } else { // there are results waiting for returned to client. - if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && - (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { - return true; - } - } - - return false; -} - -static int16_t getNumOfFinalResCol(SQuery* pQuery) { - return pQuery->pExpr2 == NULL? pQuery->numOfOutput:pQuery->numOfExpr2; -} +//bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) { +// SQuery *pQuery = pRuntimeEnv->pQuery; +//// SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo; +// +// if (!Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { +// return false; +// } +// +// if (pQuery->limit.limit > 0 && pRuntimeEnv->resultInfo.total >= pQuery->limit.limit) { +// return false; +// } +// +// if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { +// // There are results not returned to client yet, so filling applied to the remain result is required firstly. +// if (taosFillHasMoreResults(pFillInfo)) { +// return true; +// } +// +// /* +// * While the code reaches here, there are no results remains now. +// * If query is not completed yet, the gaps between two results blocks need to be handled after next data block +// * is retrieved from TSDB. +// * +// * NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result +// * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the +// * first result row in the actual result set will fill nothing. +// */ +// int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pQuery->window.ekey, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return numOfTotal > 0; +// } else { // there are results waiting for returned to client. +// if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && hasRemainDataInCurrentGroup(pGroupResInfo) && +// (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery))) { +// return true; +// } +// } +// +// return false; +//} static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; @@ -3676,31 +3653,20 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } qDebug("QInfo:%p set %d subscribe info", pQInfo, total); - // Check if query is completed or not for stable query or normal table query respectively. - if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)) { - if (pQInfo->query.stableQuery) { - if (IS_STASBLE_QUERY_OVER(&pQInfo->runtimeEnv)) { - setQueryStatus(pRuntimeEnv, QUERY_OVER); - } - } else { - if (!hasNotReturnedResults(&pQInfo->runtimeEnv, &pRuntimeEnv->groupResInfo)) { - setQueryStatus(pRuntimeEnv, QUERY_OVER); - } - } + if (Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) && pRuntimeEnv->proot->status == OP_EXEC_DONE) { + setQueryStatus(pRuntimeEnv, QUERY_OVER); } } -int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) { - SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo; - +int32_t doFillGapsInResults_rv(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) { void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); p[i] = pColInfoData->pData; } - pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pRuntimeEnv->resultInfo.capacity); + pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity); tfree(p); return pOutput->info.rows; } @@ -3787,6 +3753,7 @@ static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) { int idx = (int32_t)((len - 1) * rate); return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows; } + static int compareBlockInfo(const void *pLeft, const void *pRight) { int32_t left = ((SDataBlockInfo *)pLeft)->rows; int32_t right = ((SDataBlockInfo *)pRight)->rows; @@ -4126,18 +4093,16 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) return terrno; } -static SFillColInfo* createFillColInfo(SQuery* pQuery) { - int32_t numOfCols = getNumOfFinalResCol(pQuery); +static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, int64_t* fillVal) { int32_t offset = 0; - SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); + SFillColInfo* pFillCol = calloc(numOfOutput, sizeof(SFillColInfo)); if (pFillCol == NULL) { return NULL; } - // TODO refactor - for(int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExprInfo = (pQuery->pExpr2 == NULL)? &pQuery->pExpr1[i]:&pQuery->pExpr2[i]; + for(int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pExprInfo = &pExpr[i]; pFillCol[i].col.bytes = pExprInfo->bytes; pFillCol[i].col.type = (int8_t)pExprInfo->type; @@ -4145,7 +4110,7 @@ static SFillColInfo* createFillColInfo(SQuery* pQuery) { pFillCol[i].tagIndex = -2; pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query pFillCol[i].functionId = pExprInfo->base.functionId; - pFillCol[i].fillVal.i = pQuery->fillVal[i]; + pFillCol[i].fillVal.i = fillVal[i]; offset += pExprInfo->bytes; } @@ -4190,9 +4155,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); } else if (isTsCompQuery(pQuery) || isPointInterpoQuery(pQuery)) { - pRuntimeEnv->pTableScanner = createSeqTableBlockScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery)); + pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, isPointInterpoQuery(pQuery)); } else if (needReverseScan(pQuery)) { - pRuntimeEnv->pTableScanner = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); + pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); } @@ -4228,20 +4193,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts return code; } - if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { - SFillColInfo* pColInfo = createFillColInfo(pQuery); - STimeWindow w = TSWINDOW_INITIALIZER; - - TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey); - TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey); - getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w); - - int32_t numOfCols = getNumOfFinalResCol(pQuery); - pRuntimeEnv->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfCols, - pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision, - pQuery->fillType, pColInfo, pQInfo); - } - setQueryStatus(pRuntimeEnv, QUERY_NOT_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -4469,38 +4420,37 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->times = repeatTime; + pInfo->times = repeatTime; pInfo->reverseTimes = 0; - pInfo->order = pRuntimeEnv->pQuery->order.order; - - pInfo->current = 0; - pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->order = pRuntimeEnv->pQuery->order.order; + pInfo->current = 0; + pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "SeqScanTableOp"; + pOperator->name = "TableScanOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; - pOperator->exec = doTableScan; + pOperator->exec = doTableScan; return pOperator; } -SOperatorInfo* createSeqTableBlockScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows) { +SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, bool loadExternalRows) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->times = 1; - pInfo->reverseTimes = 0; - pInfo->order = pRuntimeEnv->pQuery->order.order; - - pInfo->current = 0; - pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pQueryHandle = pTsdbQueryHandle; + pInfo->times = 1; + pInfo->reverseTimes = 0; + pInfo->order = pRuntimeEnv->pQuery->order.order; + pInfo->current = 0; + pInfo->pRuntimeEnv = pRuntimeEnv; pInfo->loadExternalRows = loadExternalRows; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "TableBlockSeqScan"; + pOperator->name = "TableSeqScanOperator"; + pOperator->operatorType = OP_TableSeqScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -4516,35 +4466,34 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr pTableScanInfo->numOfOutput = pDownstream->numOfOutput; - char* name = pDownstream->name; - if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { + if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) { SAggOperatorInfo* pAggInfo = pDownstream->info; pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; - } else if (strcasecmp(name, "HashIntervalAgg") == 0) { + } else if (pDownstream->operatorType == OP_TimeInterval) { STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; - } else if (strcasecmp(name, "HashGroupbyAgg") == 0) { + } else if (pDownstream->operatorType == OP_Groupby) { SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; - } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { + } else if (pDownstream->operatorType == OP_MultiTableTimeInterval) { STableIntervalOperatorInfo *pInfo = pDownstream->info; pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; - } else if (strcasecmp(name, "ArithmeticOp") == 0) { + } else if (pDownstream->operatorType == OP_Arithmetic) { SArithOperatorInfo *pInfo = pDownstream->info; pTableScanInfo->pCtx = pInfo->binfo.pCtx; @@ -4555,24 +4504,23 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } } -static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { +static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; - pInfo->times = repeatTime; + pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; - - pInfo->current = 0; - pInfo->order = pRuntimeEnv->pQuery->order.order; - - pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->current = 0; + pInfo->order = pRuntimeEnv->pQuery->order.order; + pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); - pOptr->name = "BidirectionSeqScanTableOp"; - pOptr->blockingOptr = false; - pOptr->info = pInfo; - pOptr->exec = doTableScan; + pOptr->name = "DataBlocksOptimizedScanOperator"; + pOptr->operatorType = OP_DataBlocksOptScan; + pOptr->blockingOptr = false; + pOptr->info = pInfo; + pOptr->exec = doTableScan; return pOptr; } @@ -4606,8 +4554,7 @@ static SSDataBlock* doAggregate(void* param) { setTagVal_rv(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - // TODO opt perf - if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { + if (upstream->operatorType == OP_DataBlocksOptScan) { STableScanInfo* pScanInfo = upstream->info; order = getTableScanOrder(pScanInfo); } @@ -4660,8 +4607,7 @@ static SSDataBlock* doSTableAggregate(void* param) { setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - // TODO opt perf - if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { + if (upstream->operatorType == OP_DataBlocksOptScan) { STableScanInfo* pScanInfo = upstream->info; order = getTableScanOrder(pScanInfo); } @@ -4967,8 +4913,8 @@ static SSDataBlock* doFill(void* param) { SFillOperatorInfo *pInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { - doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); + if (taosFillHasMoreResults(pInfo->pFillInfo)) { + doFillGapsInResults_rv(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); return pInfo->pRes; } @@ -4980,17 +4926,17 @@ static SSDataBlock* doFill(void* param) { return NULL; } - taosFillSetStartInfo(pRuntimeEnv->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey); + taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey); } else { pInfo->totalInputRows += pBlock->info.rows; int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQuery->window.ekey:pBlock->info.window.ekey; - taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, ekey); - taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock); + taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); } - doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); + doFillGapsInResults_rv(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } } @@ -5037,6 +4983,7 @@ static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; + pOperator->operatorType = OP_Aggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5067,6 +5014,7 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; + pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = destroyOutputBuf(pInfo->pRes); } @@ -5080,7 +5028,7 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); @@ -5089,7 +5037,8 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera initResultRowInfo(&pInfo->binfo.resultRowInfo, tableGroup, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "STableAggregate"; + pOperator->name = "MultiTableAggregate"; + pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5118,7 +5067,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "ArithmeticOp"; + pOperator->name = "ArithmeticOperator"; + pOperator->operatorType = OP_Arithmetic; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5139,7 +5089,8 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "LimitOp"; + pOperator->name = "LimitOperator"; + pOperator->operatorType = OP_Limit; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; @@ -5156,9 +5107,10 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->offset = pRuntimeEnv->pQuery->limit.offset; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "OffsetOp"; + pOperator->name = "OffsetOperator"; + pOperator->operatorType = OP_Offset; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->exec = doOffset; pOperator->info = pInfo; @@ -5167,7 +5119,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator return pOperator; } -SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -5176,22 +5128,22 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "HashIntervalAgg"; + pOperator->name = "TimeIntervalAggOperator"; + pOperator->operatorType = OP_TimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; return pOperator; } -SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -5199,9 +5151,10 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "STableIntervalAggOp"; + pOperator->name = "MultiTableTimeIntervalOperator"; + pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -5214,7 +5167,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S return pOperator; } -SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index @@ -5223,15 +5176,15 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "HashGroupbyAgg"; + pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; + pOperator->operatorType = OP_Groupby; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doHashGroupbyAgg; pOperator->cleanup = destroyGroupbyOperatorInfo; @@ -5241,13 +5194,29 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + + { + SQuery* pQuery = pRuntimeEnv->pQuery; + SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQuery->fillVal); + STimeWindow w = TSWINDOW_INITIALIZER; + + TSKEY sk = MIN(pQuery->window.skey, pQuery->window.ekey); + TSKEY ek = MAX(pQuery->window.skey, pQuery->window.ekey); + getAlignQueryTimeWindow(pQuery, pQuery->window.skey, sk, ek, &w); + + pInfo->pFillInfo = taosCreateFillInfo(pQuery->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, + pQuery->interval.sliding, pQuery->interval.slidingUnit, (int8_t)pQuery->precision, + pQuery->fillType, pColInfo, pRuntimeEnv->qinfo); + } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "FillOp"; + pOperator->name = "FillOperator"; pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; + pOperator->operatorType = OP_Fill; + pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -5392,6 +5361,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTableTagScan"; + pOperator->operatorType = OP_TagScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5927,8 +5897,29 @@ static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable return TSDB_CODE_SUCCESS; } -int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, - SColumnInfo* pTagCols) { +static UNUSED_FUNC int32_t ddx(SQueryTableMsg* pQueryMsg, SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, SExprInfo* pExprs, + int32_t numOfOutput, int32_t tagLen, bool superTable) { + for (int32_t i = 0; i < numOfOutput; ++i) { + int16_t functId = pExprs[i].base.functionId; + + if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { + int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); + if (j < 0 || j >= pQueryMsg->numOfCols) { + return TSDB_CODE_QRY_INVALID_MSG; + } else { + SColumnInfo* pCol = &pQueryMsg->colList[j]; + int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64, + &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, superTable); + assert(ret == TSDB_CODE_SUCCESS); + } + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, + SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -6001,6 +5992,11 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu } int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; + if (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes) { + tfree(pExprs); + return TSDB_CODE_QRY_INVALID_MSG; + } + if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { tfree(pExprs); @@ -6014,24 +6010,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu assert(isValidDataType(pExprs[i].type)); } - // TODO refactor - for (int32_t i = 0; i < numOfOutput; ++i) { - pExprs[i].base = *pExprMsg[i]; - int16_t functId = pExprs[i].base.functionId; - - if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { - int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); - if (j < 0 || j >= pQueryMsg->numOfCols) { - return TSDB_CODE_QRY_INVALID_MSG; - } else { - SColumnInfo *pCol = &pQueryMsg->colList[j]; - int32_t ret = - getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64, - &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable); - assert(ret == TSDB_CODE_SUCCESS); - } - } - } +// ddx(pQueryMsg, pExprMsg, pTagCols, pExprs, numOfOutput, tagLen, isSuperTable); *pExprInfo = pExprs; return TSDB_CODE_SUCCESS; @@ -6085,25 +6064,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu assert(isValidDataType(pExprs[i].type)); } -// // TODO refactor -// for (int32_t i = 0; i < numOfOutput; ++i) { -// pExprs[i].base = *pExprMsg[i]; -// int16_t functId = pExprs[i].base.functionId; -// -// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { -// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); -// if (j < 0 || j >= pQueryMsg->numOfCols) { -// return TSDB_CODE_QRY_INVALID_MSG; -// } else { -// SColumnInfo *pCol = &pQueryMsg->colList[j]; -// int32_t ret = -// getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64, -// &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable); -// assert(ret == TSDB_CODE_SUCCESS); -// } -// } -// } - *pExprInfo = pExprs; return TSDB_CODE_SUCCESS; } @@ -6312,29 +6272,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr goto _cleanup; } - // prepare the result buffer -// pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES); -// if (pQuery->sdata == NULL) { -// goto _cleanup; -// } - -// for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { -// // allocate additional memory for interResults that are usually larger then final results -// // TODO refactor -// int16_t bytes = 0; -// if (pQuery->pExpr2 == NULL || col >= pQuery->numOfExpr2) { -// bytes = pExprs[col].bytes; -// } else { -// bytes = MAX(pQuery->pExpr2[col].bytes, pExprs[col].bytes); -// } -// -// size_t size = (size_t)((pRuntimeEnv->resultInfo.capacity + 1) * bytes + pExprs[col].interBytes + sizeof(tFilePage)); -// pQuery->sdata[col] = (tFilePage *)calloc(1, size); -// if (pQuery->sdata[col] == NULL) { -// goto _cleanup; -// } -// } - if (pQuery->fillType != TSDB_FILL_NONE) { pQuery->fillVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); if (pQuery->fillVal == NULL) { diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 52071bbae7..17bd7f6b55 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -357,6 +357,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); assert(pFillInfo->rowSize > 0); + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + pFillInfo->pData[i] = malloc(pFillInfo->pFillCol[i].col.bytes * pFillInfo->alloc); + } + return pFillInfo; } @@ -381,6 +385,10 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { tfree(pFillInfo->pTags[i].tagVal); } + for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + tfree(pFillInfo->pData[i]); + } + tfree(pFillInfo->pTags); tfree(pFillInfo->pData); @@ -415,17 +423,19 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) } } -// copy the data into source data buffer -void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput) { - for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { - memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); - } -} - void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); - pFillInfo->pData[i] = pColData->pData; +// pFillInfo->pData[i] = pColData->pData; + if (pInput->info.rows > pFillInfo->alloc) { + char* t = realloc(pFillInfo->pData[i], pColData->info.bytes * pInput->info.rows); + assert(t != NULL); + + pFillInfo->pData[i] = t; + pFillInfo->alloc = pInput->info.rows; + } + + memcpy(pFillInfo->pData[i], pColData->pData, pColData->info.bytes * pInput->info.rows); } } @@ -436,11 +446,15 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* SFillColInfo* pCol = &pFillInfo->pFillCol[i]; const char* data = pInput->data + pCol->col.offset * pInput->num; - if (pFillInfo->pData[i] == NULL) { - pFillInfo->pData[i] = calloc(4096, pCol->col.bytes); + if (pInput->num > pFillInfo->alloc) { + char* t = realloc(pFillInfo->pData[i], pCol->col.bytes * pInput->num); + assert(t != NULL); + + pFillInfo->pData[i] = t; + pFillInfo->alloc = pInput->num; } + memcpy(pFillInfo->pData[i], data, pCol->col.bytes * pInput->num); -// pFillInfo->pData[i] = (char*) data; if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index dd9d5d1958..dc8f6d56c0 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -572,9 +572,9 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRu incNextGroup(pGroupResInfo); } - if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) { - SET_STABLE_QUERY_OVER(pRuntimeEnv); - } +// if (pGroupResInfo->currentGroup >= pGroupResInfo->totalGroup && !hasRemainData(pGroupResInfo)) { +// SET_STABLE_QUERY_OVER(pRuntimeEnv); +// } int64_t elapsedTime = taosGetTimestampUs() - st; qDebug("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", pRuntimeEnv->qinfo, -- GitLab