From 75984ed35272cbbd43e6db491a82e457e0b0c794 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Feb 2021 15:16:24 +0800 Subject: [PATCH] [td-2895] refactor --- src/query/inc/qAggMain.h | 4 +- src/query/inc/qExecutor.h | 27 ++- src/query/src/qAggMain.c | 38 ++--- src/query/src/qExecutor.c | 347 +++++++++++++++++++++----------------- 4 files changed, 233 insertions(+), 183 deletions(-) diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 7122f63593..2e9f50159d 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -214,7 +214,7 @@ typedef struct SAggFunctionInfo { void (*xFinalize)(SQLFunctionCtx *pCtx); void (*mergeFunc)(SQLFunctionCtx *pCtx); - int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId); + int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId); } SAggFunctionInfo; #define GET_RES_INFO(ctx) ((ctx)->resultInfo) @@ -247,7 +247,7 @@ extern struct SAggFunctionInfo aAggs[]; extern int32_t functionCompatList[]; // compatible check array list -bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval); +bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval); /** * the numOfRes should be kept, since it may be used later diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a412bd123a..b2841a29c1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -249,6 +249,7 @@ typedef struct SOperatorInfo { int32_t numOfOutput; __operator_fn_t exec; + __operator_fn_t cleanup; struct SOperatorInfo *upstream; } SOperatorInfo; @@ -292,11 +293,6 @@ typedef struct SQueryRuntimeEnv { SGroupResInfo groupResInfo; } SQueryRuntimeEnv; -typedef struct { - char* name; - void* info; -} SQEStage; - enum { QUERY_RESULT_NOT_READY = 1, QUERY_RESULT_READY = 2, @@ -345,11 +341,11 @@ typedef struct SQueryParam { typedef struct STableScanInfo { SQueryRuntimeEnv *pRuntimeEnv; + void *pQueryHandle; int32_t numOfBlocks; int32_t numOfSkipped; int32_t numOfBlockStatis; - int64_t numOfRows; int32_t order; // scan order @@ -359,11 +355,16 @@ typedef struct STableScanInfo { int32_t reverseTimes; // 0 by default SSDataBlock block; + + SQLFunctionCtx* pCtx; // next operator query context + SResultRowInfo* pResultRowInfo; + int32_t numOfOutput; + int64_t elapsedTime; } STableScanInfo; typedef struct SAggOperatorInfo { - SResultRowInfo *pResultRowInfo; + SResultRowInfo resultRowInfo; STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; @@ -372,7 +373,9 @@ typedef struct SAggOperatorInfo { typedef struct SArithOperatorInfo { STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; - SQLFunctionCtx* pCtx; + SQLFunctionCtx *pCtx; + SResultRowInfo resultRowInfo; + SSDataBlock *pOutput; } SArithOperatorInfo; typedef struct SLimitOperatorInfo { @@ -388,10 +391,10 @@ typedef struct SOffsetOperatorInfo { } SOffsetOperatorInfo; typedef struct SHashIntervalOperatorInfo { - SResultRowInfo *pResultRowInfo; STableQueryInfo *pTableQueryInfo; SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; + SResultRowInfo resultRowInfo; } SHashIntervalOperatorInfo; typedef struct SFillOperatorInfo { @@ -400,6 +403,12 @@ typedef struct SFillOperatorInfo { SQueryRuntimeEnv *pRuntimeEnv; } SFillOperatorInfo; +typedef struct SFilterOperatorInfo { + SResultRowInfo *pResultRowInfo; + STableQueryInfo *pTableQueryInfo; + SQueryRuntimeEnv *pRuntimeEnv; +} SFilterOperatorInfo; + void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 82953203f3..b1ad96d21d 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -457,7 +457,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) { * @param filterCols * @return */ -int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { return BLK_DATA_NO_NEEDED; } else { @@ -465,7 +465,7 @@ int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32 } } -int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { return BLK_DATA_NO_NEEDED; } @@ -667,16 +667,16 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) { } } -static int32_t statisRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +static int32_t statisRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { return BLK_DATA_STATIS_NEEDED; } -static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { return BLK_DATA_ALL_NEEDED; } -// todo: if column in current data block are null, opt for this case -static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +// todo: if column in current data block are null, opt for this case +static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order == TSDB_ORDER_DESC) { return BLK_DATA_NO_NEEDED; } @@ -689,7 +689,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, i } } -static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order != pCtx->param[0].i64) { return BLK_DATA_NO_NEEDED; } @@ -701,7 +701,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, in } } -static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order == TSDB_ORDER_DESC) { return BLK_DATA_NO_NEEDED; } @@ -717,11 +717,11 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY en if (pInfo->hasResult != DATA_SET_FLAG) { return BLK_DATA_ALL_NEEDED; } else { // data in current block is not earlier than current result - return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; + return (pInfo->ts <= w->skey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; } } -static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { +static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { if (pCtx->order != pCtx->param[0].i64) { return BLK_DATA_NO_NEEDED; } @@ -737,7 +737,7 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end if (pInfo->hasResult != DATA_SET_FLAG) { return BLK_DATA_ALL_NEEDED; } else { - return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; + return (pInfo->ts > w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; } } @@ -2412,7 +2412,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { } } -bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval) { +bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); if (pResInfo == NULL) { return true; @@ -2427,7 +2427,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha tValuePair **pRes = (tValuePair**) pTopBotInfo->res; - if (functionId == TSDB_FUNC_TOP) { + if (pCtx->functionId == TSDB_FUNC_TOP) { switch (pCtx->inputType) { case TSDB_DATA_TYPE_TINYINT: return GET_INT8_VAL(maxval) > pRes[0]->v.i64; @@ -4549,7 +4549,7 @@ SAggFunctionInfo aAggs[] = {{ no_next_step, doFinalizer, count_func_merge, - count_load_data_info, + countRequired, }, { // 1 @@ -4734,7 +4734,7 @@ SAggFunctionInfo aAggs[] = {{ no_next_step, spread_function_finalizer, spread_func_merge, - count_load_data_info, + countRequired, }, { // 14 @@ -4776,7 +4776,7 @@ SAggFunctionInfo aAggs[] = {{ no_next_step, doFinalizer, copy_function, - no_data_info, + noDataRequired, }, { // 17 @@ -4804,7 +4804,7 @@ SAggFunctionInfo aAggs[] = {{ no_next_step, doFinalizer, copy_function, - no_data_info, + noDataRequired, }, { // 19 @@ -4832,7 +4832,7 @@ SAggFunctionInfo aAggs[] = {{ no_next_step, doFinalizer, copy_function, - no_data_info, + noDataRequired, }, { // 21, column project sql function @@ -4860,7 +4860,7 @@ SAggFunctionInfo aAggs[] = {{ no_next_step, doFinalizer, copy_function, - no_data_info, + noDataRequired, }, { // 23 diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 247fa8e012..ee9348963c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -175,17 +175,20 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery); static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); +static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); + static int32_t getNumOfScanTimes(SQuery* pQuery); static char *getArithemicInputSrc(void *param, const char *name, int32_t colId); -static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv); - -static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, - SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); -static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, - SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); -static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); -static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); -static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr); +static bool isFixedOutputQuery(SQuery* pQuery); + +static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +//static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); + static void destroyOperatorInfo(SOperatorInfo* pOperator); void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size); void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); @@ -2316,13 +2319,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf // group by normal column, sliding window query, interval query are handled by interval query processor if (!pQuery->stableQuery) { // interval (down sampling operation) - if (isFixedOutputQuery(pRuntimeEnv)) { - pRuntimeEnv->proot = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + if (isFixedOutputQuery(pQuery)) { + pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + if (pQuery->pExpr2 != NULL) { pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); } } else if (QUERY_IS_INTERVAL_QUERY(pQuery)) { pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + + if (pQuery->pExpr2 != NULL) { + pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); @@ -2457,8 +2467,7 @@ bool isQueryKilled(SQInfo *pQInfo) { void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} -static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) { - SQuery* pQuery = pRuntimeEnv->pQuery; +static bool isFixedOutputQuery(SQuery* pQuery) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) { return false; } @@ -2802,9 +2811,9 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) -static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, - int32_t numOfRows) { +static bool doDataBlockStaticFilter(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { SQuery* pQuery = pRuntimeEnv->pQuery; + if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pQuery->topBotQuery))) { return true; } @@ -2864,15 +2873,6 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat } } - if (pQuery->topBotQuery) { - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pExpr1[i].base.functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - return topbot_datablock_filter(&pCtx[i], functionId, (char *)&pDataStatis[i].min, (char *)&pDataStatis[i].max); - } - } - } - return false; } @@ -2963,7 +2963,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW int32_t functionId = pSqlFunc->functionId; int32_t colId = pSqlFunc->colInfo.colId; - (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); + (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], &pBlockInfo->window, colId); if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { break; } @@ -2992,7 +2992,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW pCost->loadBlockStatis += 1; tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); - if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { + if (!doDataBlockStaticFilter(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { // current block has been discard due to filter applied pCost->discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), @@ -3011,11 +3011,12 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW return TSDB_CODE_SUCCESS; } -int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) { +int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo * pWindowResInfo, + void* pQueryHandle, SSDataBlock* pBlock, uint32_t* status) { *status = BLK_DATA_NO_NEEDED; SQuery *pQuery = pRuntimeEnv->pQuery; -// int64_t groupId = pQuery->current->groupIndex; + int64_t groupId = pQuery->current->groupIndex; SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); SQueryCostInfo* pCost = &pQInfo->summary; @@ -3025,71 +3026,91 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * } else { // check if this data block is required to load // Calculate all time windows that are overlapping or contain current data block. // If current data block is contained by all possible time window, do not load current data block. -// if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) { + if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info)) { *status = BLK_DATA_ALL_NEEDED; -// } + } if ((*status) != BLK_DATA_ALL_NEEDED) { // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer -// if (QUERY_IS_INTERVAL_QUERY(pQuery)) { -// SResultRow* pResult = NULL; -// -// bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); -// -// TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; -// STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); -// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) { -// // todo handle error in set result for timewindow -// } -// } -// -// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base; -// -// int32_t functionId = pSqlFunc->functionId; -// int32_t colId = pSqlFunc->colInfo.colId; -// (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); -// if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { -// break; -// } -// } + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + SResultRow* pResult = NULL; + + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.skey:pBlock->info.window.ekey; + STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) { + // todo handle error in set result for timewindow + } + } + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base; + + int32_t functionId = pSqlFunc->functionId; + int32_t colId = pSqlFunc->colInfo.colId; + (*status) |= aAggs[functionId].dataReqFunc(&pCtx[i], &pBlock->info.window, colId); + if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + break; + } + } } } + SDataBlockInfo* pBlockInfo = &pBlock->info; + if ((*status) == BLK_DATA_NO_NEEDED) { - qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); pCost->discardBlocks += 1; } else if ((*status) == BLK_DATA_STATIS_NEEDED) { // this function never returns error? - tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); pCost->loadBlockStatis += 1; + tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis); - if (*pStatis == NULL) { // data block statistics does not exist, load data block - *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); - pCost->totalCheckedRows += pBlockInfo->rows; + if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block + pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + pCost->totalCheckedRows += pBlock->info.rows; } } else { assert((*status) == BLK_DATA_ALL_NEEDED); // load the data block statistics to perform further filter pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis); + tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis); - if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { - // current block has been discard due to filter applied + if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) { + + bool load = false; + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t functionId = pCtx[i].functionId; + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { + load = topbot_datablock_filter(&pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max)); + if (!load) { + // current block has been discard due to filter applied + pCost->discardBlocks += 1; + qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); + (*status) = BLK_DATA_DISCARD; + return TSDB_CODE_SUCCESS; + } + } + } + } + + // current block has been discard due to filter applied + if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pCtx, pBlockInfo->rows)) { pCost->discardBlocks += 1; - qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); (*status) = BLK_DATA_DISCARD; + return TSDB_CODE_SUCCESS; } pCost->totalCheckedRows += pBlockInfo->rows; pCost->loadBlocks += 1; - *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); - if (*pDataBlock == NULL) { + pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); + if (pBlock->pDataBlock == NULL) { return terrno; } } @@ -3186,7 +3207,7 @@ static void expandBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t newSize, void* q static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; - if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pQuery->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTsCompQuery(pQuery)) { + if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pQuery->groupbyColumn && !isFixedOutputQuery(pQuery) && !isTsCompQuery(pQuery)) { SResultRec *pRec = &pQuery->rec; int32_t remain = (int32_t)(pRec->capacity - pRec->rows); @@ -3684,10 +3705,10 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx); } -void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SSDataBlock* pDataBlock) { +void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, SSDataBlock* pDataBlock) { int32_t tid = 0; int64_t uid = 0; - SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid); + SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); @@ -4417,7 +4438,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { * In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional * operations involve. */ - STimeWindow w = TSWINDOW_INITIALIZER; + STimeWindow w = TSWINDOW_INITIALIZER; TSKEY sk = MIN(win.skey, win.ekey); TSKEY ek = MAX(win.skey, win.ekey); @@ -5115,7 +5136,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) return TSDB_CODE_SUCCESS; } - if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) { + if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) { return TSDB_CODE_SUCCESS; } @@ -5126,7 +5147,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) && (cond.order == TSDB_ORDER_ASC) && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isGroupbyColumn(pQuery->pGroupbyExpr)) - && (!isFixedOutputQuery(pRuntimeEnv)) + && (!isFixedOutputQuery(pQuery)) ) { SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0); @@ -6182,8 +6203,9 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) { // this function never returns error? uint32_t status; - int32_t code = loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis, - &pBlock->pDataBlock, &status); + int32_t code = + loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, pTableScanInfo->pCtx, pTableScanInfo->pResultRowInfo, + pTableScanInfo->pQueryHandle, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { longjmp(pTableScanInfo->pRuntimeEnv->env, code); } @@ -6260,7 +6282,7 @@ static SSDataBlock* doTableScan(void* param) { return NULL; } -static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { +SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); @@ -6272,15 +6294,34 @@ static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle pInfo->current = 0; pInfo->pRuntimeEnv = pRuntimeEnv; - SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); - pOptr->name = "SeqScanTableOp"; - pOptr->blockingOptr = false; - pOptr->optInfo = pInfo; - pOptr->completed = false; - pOptr->numOfOutput = pRuntimeEnv->pQuery->numOfCols; - pOptr->exec = doTableScan; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "SeqScanTableOp"; + pOperator->blockingOptr = false; + pOperator->completed = false; + pOperator->optInfo = pInfo; + pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; + pOperator->exec = doTableScan; - return pOptr; + return pOperator; +} + +void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { + assert(pTableScanInfo != NULL && pDownstream != NULL); + + char* name = pDownstream->name; + if (strcasecmp(name, "AggregationOp") == 0) { + SAggOperatorInfo* pAggInfo = pDownstream->optInfo; + + pTableScanInfo->pCtx = pAggInfo->pCtx; + pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; + } else if (strcasecmp(name, "HashIntervalAggOp") == 0){ + SHashIntervalOperatorInfo* pIntervalInfo = pDownstream->optInfo; + + pTableScanInfo->pCtx = pIntervalInfo->pCtx; + pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; + } else { + assert(0); + } } static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { @@ -6324,13 +6365,13 @@ static SSDataBlock* doAggregation(void* param) { SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t order = pQuery->order.order; - SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, order, pQuery->vgId); SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - setDefaultOutputBuf(pRuntimeEnv, pCtx, pRes); + setDefaultOutputBuf(pRuntimeEnv, pCtx, &pRuntimeEnv->resultRowInfo, pRes); - int32_t order = pQuery->order.order; SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; @@ -6368,14 +6409,17 @@ static SSDataBlock* doArithmeticOperation(void* param) { SQuery* pQuery = pRuntimeEnv->pQuery; - SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); if (pArithInfo->pCtx == NULL) { + pArithInfo->pOutput = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + initResultRowInfo(&pArithInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); } - setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes); + setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput); pRuntimeEnv->pQuery->pos = 0; + pArithInfo->pOutput->info.rows = 0; + while(1) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { @@ -6387,7 +6431,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pArithInfo->pCtx[i].size = pBlock->info.rows; if (pArithInfo->pCtx[i].functionId == TSDB_FUNC_ARITHM) { - setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, pOperator->pExpr, pBlock); + setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); } else { SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { @@ -6405,13 +6449,13 @@ static SSDataBlock* doArithmeticOperation(void* param) { } arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->pCtx, pOperator->pExpr, pOperator->numOfOutput); - pRes->info.rows += pBlock->info.rows; - if (pRes->info.rows > 4096) { + pArithInfo->pOutput->info.rows += pBlock->info.rows; + if (pArithInfo->pOutput->info.rows > 4096) { break; } } - return pRes; + return pArithInfo->pOutput; } static SSDataBlock* doLimit(void* param) { @@ -6483,11 +6527,10 @@ static SSDataBlock* doHashIntervalAgg(void* param) { SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t order = pQuery->order.order; - SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - int32_t order = pQuery->order.order; SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; @@ -6503,8 +6546,8 @@ static SSDataBlock* doHashIntervalAgg(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pCtx, pBlock, order); - hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock); + setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, order); + hashIntervalAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock); } pOperator->completed = true; @@ -6513,8 +6556,6 @@ static SSDataBlock* doHashIntervalAgg(void* param) { setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult(pRuntimeEnv); - destroySQLFunctionCtx(pCtx, pOperator->numOfOutput); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRes); @@ -6559,6 +6600,10 @@ static SSDataBlock* doFill(void* param) { return NULL; } +//SSDataBlock* doFilter(void* param) { +// +//} + // todo set the attribute of query scan count static int32_t getNumOfScanTimes(SQuery* pQuery) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -6576,33 +6621,41 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { return; } + if (pOperator->cleanup != NULL) { + pOperator->cleanup(pOperator->optInfo); + } + destroyOperatorInfo(pOperator->upstream); tfree(pOperator->optInfo); tfree(pOperator); } -static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, - SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pResultRowInfo = pResultRowInfo; pInfo->pTableQueryInfo = pTableQueryInfo; pInfo->pRuntimeEnv = pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "AggregationOp"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->optInfo = pInfo; - pOperator->upstream = inputOptr; + pOperator->upstream = upstream; pOperator->exec = doAggregation; pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; + + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + return pOperator; } -static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); pInfo->pTableQueryInfo = pTableQueryInfo; @@ -6613,7 +6666,7 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, pOperator->blockingOptr = false; pOperator->completed = false; pOperator->optInfo = pInfo; - pOperator->upstream = inputOptr; + pOperator->upstream = upstream; pOperator->exec = doArithmeticOperation; pOperator->pExpr = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->pExpr1:pRuntimeEnv->pQuery->pExpr2; pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2; @@ -6621,7 +6674,7 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, return pOperator; } -static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); pInfo->limit = pRuntimeEnv->pQuery->limit.limit; @@ -6632,7 +6685,7 @@ static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, pOperator->name = "LimitOp"; pOperator->blockingOptr = false; pOperator->completed = false; - pOperator->upstream = inputOptr; + pOperator->upstream = upstream; pOperator->exec = doLimit; pOperator->pExpr = NULL; pOperator->numOfOutput = 0; @@ -6641,7 +6694,7 @@ static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, return pOperator; } -static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); pInfo->offset = pRuntimeEnv->pQuery->limit.offset; @@ -6653,7 +6706,7 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, pOperator->name = "OffsetOp"; pOperator->blockingOptr = false; pOperator->completed = false; - pOperator->upstream = inputOptr; + pOperator->upstream = upstream; pOperator->exec = doOffset; pOperator->pExpr = NULL; pOperator->numOfOutput = 0; @@ -6662,19 +6715,43 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, return pOperator; } -static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); pInfo->pRuntimeEnv = pRuntimeEnv; pInfo->pTableQueryInfo = pTableQueryInfo; + SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "HashIntervalAggOp"; pOperator->blockingOptr = true; pOperator->completed = false; - pOperator->upstream = inputOptr; + pOperator->upstream = upstream; pOperator->exec = doHashIntervalAgg; + pOperator->pExpr = pQuery->pExpr1; + pOperator->numOfOutput = pQuery->numOfOutput; + pOperator->optInfo = pInfo; + + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + return pOperator; +} + +UNUSED_FUNC SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { + SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); + + pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pTableQueryInfo = pTableQueryInfo; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + pOperator->name = "FilterOp"; + pOperator->blockingOptr = false; + pOperator->completed = false; + pOperator->upstream = upstream; + pOperator->exec = NULL; pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; pOperator->optInfo = pInfo; @@ -6682,7 +6759,7 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ return pOperator; } -static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) { +static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); pInfo->pRuntimeEnv = pRuntimeEnv; @@ -6693,7 +6770,7 @@ static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTable pOperator->name = "FillOp"; pOperator->blockingOptr = false; pOperator->completed = false; - pOperator->upstream = inputOptr; + pOperator->upstream = upstream; pOperator->exec = doFill; pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; @@ -6832,6 +6909,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; + #if 0 // scanOneTableDataBlocks(pRuntimeEnv, newStartKey); // finalizeQueryResult(pRuntimeEnv); @@ -6911,7 +6989,7 @@ void tableQueryImpl(SQInfo *pQInfo) { // group by normal column, sliding window query, interval query are handled by interval query processor if (QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->groupbyColumn) { // interval (down sampling operation) tableIntervalProcess(pQInfo, item); - } else if (isFixedOutputQuery(pRuntimeEnv)) { + } else if (isFixedOutputQuery(pQuery)) { tableAggregationProcess(pQInfo, item); } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); @@ -6984,7 +7062,7 @@ void stableQueryImpl(SQInfo *pQInfo) { int64_t st = taosGetTimestampUs(); if (QUERY_IS_INTERVAL_QUERY(pQuery) || - (isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { + (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) { multiTableQueryProcess(pQInfo); } else { assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn); @@ -7553,7 +7631,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu return TSDB_CODE_QRY_OUT_OF_MEMORY; } - bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); + bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); for (int32_t i = 0; i < numOfOutput; ++i) { pExprs[i].base = *pExprMsg[i]; @@ -7573,49 +7651,12 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu type = TSDB_DATA_TYPE_DOUBLE; bytes = tDataTypes[type].bytes; -// } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column -// SSchema* s = tGetTbnameColumnSchema(); -// type = s->type; -// bytes = s->bytes; -// } else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { -// SSchema s = tGetBlockDistColumnSchema(); -// type = s.type; -// bytes = s.bytes; -// } else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) { -// // it is a user-defined constant value column -// assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ); -// -// type = pExprs[i].base.arg[1].argType; -// bytes = pExprs[i].base.arg[1].argBytes; -// -// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { -// bytes += VARSTR_HEADER_SIZE; -// } } else { int32_t index = pExprs[i].base.colInfo.colIndex; assert(prevExpr[index].base.resColId == pExprs[i].base.colInfo.colId); -// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); -// if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) { -// if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) { -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// } else { -// if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) { -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// } -// -// if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { -// SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; -// type = pCol->type; -// bytes = pCol->bytes; -// } else { -// SSchema* s = tGetTbnameColumnSchema(); - - type = prevExpr[index].type; - bytes = prevExpr[index].bytes; -// } + type = prevExpr[index].type; + bytes = prevExpr[index].bytes; } int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; -- GitLab