diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 3328b4b9ed3c200a7434804eaf9a436a02c801e4..4407ed31a02a58eea790a9262d1188f738369205 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -951,14 +951,14 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp // todo extract function int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey; - tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); + void** pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalMerge->resColModel->capacity); + pResPages[i] = calloc(1, pField->bytes * pLocalMerge->resColModel->capacity); } while (1) { - int64_t newRows = taosFillResultDataBlock(pFillInfo, (void**)pResPages, pLocalMerge->resColModel->capacity); + int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity); if (pQueryInfo->limit.offset < newRows) { newRows -= pQueryInfo->limit.offset; @@ -966,7 +966,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp if (pQueryInfo->limit.offset > 0) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset, + memmove(pResPages[i], pResPages[i] + pField->bytes * pQueryInfo->limit.offset, (size_t)(newRows * pField->bytes)); } } @@ -1010,7 +1010,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp int32_t offset = 0; for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows)); + memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i], (size_t)(pField->bytes * pRes->numOfRows)); offset += pField->bytes; } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 0e21041704e296cff924d4db1122f7da8295807b..c52279f4ce26e82c4e4551391496c7abe7a995b1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -241,27 +241,12 @@ typedef struct SQuery { typedef SSDataBlock* (*__operator_fn_t)(void* param); -typedef struct SOperatorInfo { - char *name; - bool blockingOptr; - bool completed; - void *optInfo; - SExprInfo *pExpr; - - int32_t* rowCellInfoOffset; - int32_t numOfOutput; - - __operator_fn_t exec; - __operator_fn_t cleanup; - struct SOperatorInfo *upstream; -} SOperatorInfo; +struct SOperatorInfo; typedef struct SQueryRuntimeEnv { jmp_buf env; SQuery* pQuery; void* qinfo; -// int32_t numOfRowsPerPage; -// uint16_t* offset; uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; void* pQueryHandle; @@ -282,16 +267,31 @@ typedef struct SQueryRuntimeEnv { char* tagVal; // tag value of current data block SArithmeticSupport *sasArray; - SOperatorInfo* pi; + struct SOperatorInfo* pi; SSDataBlock *outputBuf; int32_t groupIndex; int32_t tableIndex; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - SOperatorInfo *proot; + struct SOperatorInfo *proot; SGroupResInfo groupResInfo; } SQueryRuntimeEnv; +typedef struct SOperatorInfo { + char *name; + bool blockingOptr; + bool completed; + void *info; + SExprInfo *pExpr; + int32_t numOfOutput; + + SQueryRuntimeEnv *pRuntimeEnv; + + __operator_fn_t exec; + __operator_fn_t cleanup; + struct SOperatorInfo *upstream; +} SOperatorInfo; + enum { QUERY_RESULT_NOT_READY = 1, QUERY_RESULT_READY = 2, @@ -363,23 +363,18 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STagScanInfo { - SQueryRuntimeEnv *pRuntimeEnv; SColumnInfo* pCols; SSDataBlock* pRes; } STagScanInfo; typedef struct SAggOperatorInfo { SResultRowInfo resultRowInfo; - STableQueryInfo *pTableQueryInfo; - SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; int32_t *rowCellInfoOffset; SSDataBlock *pRes; } SAggOperatorInfo; typedef struct SArithOperatorInfo { - STableQueryInfo *pTableQueryInfo; - SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; int32_t *rowCellInfoOffset; SResultRowInfo resultRowInfo; @@ -390,18 +385,14 @@ typedef struct SArithOperatorInfo { typedef struct SLimitOperatorInfo { int64_t limit; int64_t total; - SQueryRuntimeEnv* pRuntimeEnv; } SLimitOperatorInfo; typedef struct SOffsetOperatorInfo { int64_t offset; int64_t currentOffset; - SQueryRuntimeEnv* pRuntimeEnv; } SOffsetOperatorInfo; typedef struct SHashIntervalOperatorInfo { - STableQueryInfo *pTableQueryInfo; - SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; int32_t *rowCellInfoOffset; SResultRowInfo resultRowInfo; @@ -409,13 +400,10 @@ typedef struct SHashIntervalOperatorInfo { } SHashIntervalOperatorInfo; typedef struct SFillOperatorInfo { - SQueryRuntimeEnv *pRuntimeEnv; SSDataBlock *pRes; } SFillOperatorInfo; typedef struct SHashGroupbyOperatorInfo { - STableQueryInfo *pTableQueryInfo; - SQueryRuntimeEnv *pRuntimeEnv; SQLFunctionCtx *pCtx; int32_t *rowCellInfoOffset; SResultRowInfo resultRowInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 612416a69dde5296c97d263fbddb726408d46da7..942bf0df6332c79130fc4ca351e03bf4dac387a6 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -110,8 +110,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) -//static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); - int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } @@ -184,15 +182,17 @@ static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOper static int32_t getNumOfScanTimes(SQuery* pQuery); static bool isFixedOutputQuery(SQuery* pQuery); -static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -static SOperatorInfo* createStableAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, + int32_t numOfOutput); +static SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, + int32_t numOfOutput); +static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv); static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); @@ -1126,15 +1126,15 @@ static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, S } } -static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; +static void aggApplyFunctions(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo); int32_t functionId = pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - pCtx[k].startTs = pQuery->window.skey; + pCtx[k].startTs = startTs;// this can be set during create the struct aAggs[functionId].xFunction(&pCtx[k]); } } @@ -1168,9 +1168,11 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC } } -static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, SHashIntervalOperatorInfo* pInfo, - int32_t numOfOutput, SSDataBlock* pSDataBlock) { - SQuery *pQuery = pRuntimeEnv->pQuery; +static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, + SHashIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) { + SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + int32_t numOfOutput = pOperatorInfo->numOfOutput; + SQuery* pQuery = pRuntimeEnv->pQuery; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); @@ -1179,63 +1181,68 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); tsCols = pColDataInfo->pData; - assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows-1] == pSDataBlock->info.window.ekey); + assert(tsCols[0] == pSDataBlock->info.window.skey && + tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } - pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0:(pSDataBlock->info.rows-1); + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = pQuery->pos; TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQuery); - bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN)? true:false; + bool masterScan = (pRuntimeEnv->scanFlag == MASTER_SCAN) ? true : false; - SResultRow *pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, - pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + SResultRow* pResult = NULL; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, + numOfOutput, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // goto _end; } int32_t forwardStep = 0; - TSKEY ekey = reviseWindowEkey(pQuery, &win); - forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true); + TSKEY ekey = reviseWindowEkey(pQuery, &win); + forwardStep = + getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, pQuery->pos, ekey, binarySearchForKey, true); // prev time window not interpolation yet. int32_t curIndex = curTimeWindowIndex(pResultRowInfo); if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) { for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. - SResultRow *pRes = pResultRowInfo->pResult[j]; + SResultRow* pRes = pResultRowInfo->pResult[j]; if (pRes->closed) { - assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); + assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && + resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); continue; } STimeWindow w = pRes->win; - ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, 0, pInfo->pCtx, numOfOutput, - pInfo->rowCellInfoOffset); + ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx, + numOfOutput, pInfo->rowCellInfoOffset); assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); -// int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; -// doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[0], p, -// w.ekey, RESULT_ROW_END_INTERP); + // int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; + // doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], + // -1, tsCols[0], p, + // w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, - numOfOutput); + numOfOutput); } // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, 0, - pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, + numOfOutput, pInfo->rowCellInfoOffset); assert(ret == TSDB_CODE_SUCCESS); } // window start key interpolation - //doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, forwardStep); + // doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &win, pQuery->pos, + // forwardStep); doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, - numOfOutput); + numOfOutput); STimeWindow nextWin = win; while (1) { @@ -1246,19 +1253,21 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResu } // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, 0, - pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId, + pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { break; } ekey = reviseWindowEkey(pQuery, &nextWin); - forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); + forwardStep = + getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // window start(end) key interpolation -// doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, startPos, forwardStep); - doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, - numOfOutput); + // doWindowBorderInterpolation(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, pResult, &nextWin, + // startPos, forwardStep); + doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, + pSDataBlock->info.rows, numOfOutput); } } @@ -2166,68 +2175,66 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); - - // group by normal column, sliding window query, interval query are handled by interval query processor -// if (!pQuery->stableQuery) { // interval (down sampling operation) - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - if (pQuery->stableQuery) { - pRuntimeEnv->proot = createStableIntervalOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); - } else { - pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + // interval (down sampling operation) + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + if (pQuery->stableQuery) { + pRuntimeEnv->proot = createStableIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); + } else { + pRuntimeEnv->proot = createIntervalAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); - if (pQuery->pExpr2 != NULL) { - pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); - } + if (pQuery->pExpr2 != NULL) { + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); + } - if (pQuery->fillType != TSDB_FILL_NONE) { - pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); - } + if (pQuery->fillType != TSDB_FILL_NONE) { + SOperatorInfo* pInfo = pRuntimeEnv->proot; + pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); } + } } else if (pQuery->groupbyColumn) { - pRuntimeEnv->proot = createHashGroupbyAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + pRuntimeEnv->proot = createHashGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { - pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } } else if (isFixedOutputQuery(pQuery)) { if (!pQuery->stableQuery) { - pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); } else { - pRuntimeEnv->proot = createStableAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + pRuntimeEnv->proot = createStableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); } - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); if (pQuery->pExpr2 != NULL) { - pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); if (!onlyQueryTags(pQuery)) { - pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); - setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pi, pQuery->pExpr1, pQuery->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->info, pRuntimeEnv->proot); } } if (!pQuery->stableQuery) { // TODO this problem should be handed at the client side if (pQuery->limit.offset > 0) { - pRuntimeEnv->proot = createOffsetOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); } if (pQuery->limit.limit > 0) { - pRuntimeEnv->proot = createLimitOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot); + pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); } } return TSDB_CODE_SUCCESS; _clean: -// tfree(pRuntimeEnv->rowCellInfoOffset); tfree(pRuntimeEnv->sasArray); tfree(pRuntimeEnv->pResultRowHashTable); tfree(pRuntimeEnv->keyBuf); @@ -3229,15 +3236,17 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num } -void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr, SQLFunctionCtx* pCtx, int32_t numOfOutput) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); +void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCtx, int32_t numOfOutput) { + SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - SExprInfo *pExprInfo = &pQuery->pExpr1[0]; + SExprInfo *pExpr = pOperatorInfo->pExpr; + SQuery *pQuery = pRuntimeEnv->pQuery; + + SExprInfo* pExprInfo = &pExpr[0]; if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP && pQuery->stableQuery) { assert(pExprInfo->base.numOfParams == 1); - int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); @@ -3254,7 +3263,8 @@ void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr, } // todo use tag column index to optimize performance - doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type, pLocalExprInfo->bytes); + doSetTagValueInParam(pTable, pLocalExprInfo->base.colInfo.colId, &pCtx[idx].tag, pLocalExprInfo->type, + pLocalExprInfo->bytes); if (IS_NUMERIC_TYPE(pLocalExprInfo->type) || pLocalExprInfo->type == TSDB_DATA_TYPE_BOOL) { memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->bytes); @@ -3266,22 +3276,22 @@ void setTagVal_rv(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, SExprInfo* pExpr, } // set the join tag for first column - SSqlFuncMsg *pFuncMsg = &pExprInfo->base; - if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTsBuf != NULL && - pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + SSqlFuncMsg* pFuncMsg = &pExprInfo->base; + if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && + pRuntimeEnv->pTsBuf != NULL && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { assert(pFuncMsg->numOfParams == 1); int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; - SColumnInfo *pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); + SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); int16_t tagType = pCtx[0].tag.nType; if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pQInfo, + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo, pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz); } else { - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pQInfo, + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo, pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64); } } @@ -4050,9 +4060,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { #endif -void finalizeQueryResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, - SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { +void finalizeQueryResult_rv(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery; + + int32_t numOfOutput = pOperator->numOfOutput; if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { // for each group result, call the finalize function for each column if (pQuery->groupbyColumn) { @@ -4500,7 +4512,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG SQuery *pQuery = pRuntimeEnv->pQuery; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); - int32_t numOfResult = 0;//pQuery->rec.rows; // there are already exists result rows + int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t start = 0; int32_t step = -1; @@ -4772,6 +4784,7 @@ int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutp } pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pQuery->rec.capacity); + tfree(p); return pOutput->info.rows; } @@ -6107,7 +6120,7 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { static SSDataBlock* doTableScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - STableScanInfo *pTableScanInfo = pOperator->optInfo; + STableScanInfo *pTableScanInfo = pOperator->info; SQueryRuntimeEnv *pRuntimeEnv = pTableScanInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -6180,7 +6193,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pOperator->name = "SeqScanTableOp"; pOperator->blockingOptr = false; pOperator->completed = false; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; pOperator->exec = doTableScan; @@ -6192,34 +6205,34 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf char* name = pDownstream->name; if ((strcasecmp(name, "AggregationOp") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { - SAggOperatorInfo* pAggInfo = pDownstream->optInfo; + SAggOperatorInfo* pAggInfo = pDownstream->info; pTableScanInfo->pCtx = pAggInfo->pCtx; pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset; } else if (strcasecmp(name, "HashIntervalAggOp") == 0) { - SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->optInfo; + SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->info; pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; } else if (strcasecmp(name, "HashGroupbyAggOp") == 0) { - SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->optInfo; + SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; pTableScanInfo->pCtx = pGroupbyInfo->pCtx; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->rowCellInfoOffset; } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { - SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo; + SHashIntervalOperatorInfo *pInfo = pDownstream->info; pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; } else if (strcasecmp(name, "ArithmeticOp") == 0) { - SArithOperatorInfo *pInfo = pDownstream->optInfo; + SArithOperatorInfo *pInfo = pDownstream->info; pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; @@ -6246,7 +6259,7 @@ static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQu SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "BidirectionSeqScanTableOp"; pOptr->blockingOptr = false; - pOptr->optInfo = pInfo; + pOptr->info = pInfo; pOptr->exec = doTableScan; return pOptr; @@ -6261,14 +6274,14 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { } // this is a blocking operator -static SSDataBlock* doAggregation(void* param) { +static SSDataBlock* doAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->completed) { return NULL; } - SAggOperatorInfo* pAggInfo = pOperator->optInfo; - SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; + SAggOperatorInfo* pAggInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; @@ -6282,25 +6295,23 @@ static SSDataBlock* doAggregation(void* param) { break; } - setTagVal_rv(pRuntimeEnv, pQuery->current->pTable, pOperator->pExpr, pAggInfo->pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput); // TODO opt perf if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { - STableScanInfo* pScanInfo = upstream->optInfo; + STableScanInfo* pScanInfo = upstream->info; order = getTableScanOrder(pScanInfo); } // the pDataBlock are always the same one, no need to call this again setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); - aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock); + aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); } pOperator->completed = true; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - if (!pQuery->stableQuery) { - finalizeQueryResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); - } + finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput); destroySQLFunctionCtx(pAggInfo->pCtx, pAggInfo->pRes->info.numOfCols); @@ -6308,14 +6319,14 @@ static SSDataBlock* doAggregation(void* param) { return pAggInfo->pRes; } -static SSDataBlock* doSTableAggregation(void* param) { +static SSDataBlock* doSTableAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->completed) { return NULL; } - SAggOperatorInfo* pAggInfo = pOperator->optInfo; - SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv; + SAggOperatorInfo* pAggInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (hasRemainData(&pRuntimeEnv->groupResInfo)) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); @@ -6330,10 +6341,6 @@ static SSDataBlock* doSTableAggregation(void* param) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; - if (pAggInfo->pRes == NULL) { - pAggInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - } - SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; @@ -6343,11 +6350,11 @@ static SSDataBlock* doSTableAggregation(void* param) { break; } - setTagVal_rv(pRuntimeEnv, pRuntimeEnv->pQuery->current->pTable, pOperator->pExpr, pAggInfo->pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput); // TODO opt perf if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { - STableScanInfo* pScanInfo = upstream->optInfo; + STableScanInfo* pScanInfo = upstream->info; order = getTableScanOrder(pScanInfo); } @@ -6356,7 +6363,7 @@ static SSDataBlock* doSTableAggregation(void* param) { TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); - aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock); + aggApplyFunctions(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); } closeAllResultRows(&pAggInfo->resultRowInfo); @@ -6375,8 +6382,8 @@ static SSDataBlock* doSTableAggregation(void* param) { static SSDataBlock* doArithmeticOperation(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - SArithOperatorInfo* pArithInfo = pOperator->optInfo; - SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv; + SArithOperatorInfo* pArithInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput, pArithInfo->rowCellInfoOffset); @@ -6390,7 +6397,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { break; } - setTagVal_rv(pRuntimeEnv, pRuntimeEnv->pQuery->current->pTable, pOperator->pExpr, pArithInfo->pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pArithInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { @@ -6431,11 +6438,11 @@ static SSDataBlock* doLimit(void* param) { return NULL; } - SLimitOperatorInfo* pInfo = pOperator->optInfo; + SLimitOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); pOperator->completed = true; return NULL; } @@ -6443,7 +6450,7 @@ static SSDataBlock* doLimit(void* param) { if (pInfo->total + pBlock->info.rows >= pInfo->limit) { pBlock->info.rows = (pInfo->limit - pInfo->total); - setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); pOperator->completed = true; } @@ -6453,12 +6460,12 @@ static SSDataBlock* doLimit(void* param) { static SSDataBlock* doOffset(void* param) { SOperatorInfo *pOperator = (SOperatorInfo *)param; - SOffsetOperatorInfo *pInfo = pOperator->optInfo; + SOffsetOperatorInfo *pInfo = pOperator->info; while (1) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { - setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); + setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); return NULL; } @@ -6489,9 +6496,9 @@ static SSDataBlock* doHashIntervalAgg(void* param) { return NULL; } - SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo; + SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (hasRemainData(&pRuntimeEnv->groupResInfo)) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); @@ -6517,7 +6524,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { // the pDataBlock are always the same one, no need to call this again setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); - hashIntervalAgg(pRuntimeEnv, &pIntervalInfo->resultRowInfo, pIntervalInfo, pOperator->numOfOutput, pBlock); + hashIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pIntervalInfo, pBlock, 0); } // restore the value @@ -6526,7 +6533,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { closeAllResultRows(&pIntervalInfo->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult_rv(pRuntimeEnv, pIntervalInfo->pCtx, pOperator->numOfOutput, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); + finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); @@ -6544,9 +6551,9 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { return NULL; } - SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo; + SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (hasRemainData(&pRuntimeEnv->groupResInfo)) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); @@ -6571,11 +6578,13 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { } // the pDataBlock are always the same one, no need to call this again + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; + + setTagVal_rv(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQuery->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); - hashIntervalAgg(pRuntimeEnv, &pRuntimeEnv->pQuery->current->resInfo, pIntervalInfo, pOperator->numOfOutput, - pBlock); + hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex); } pQuery->order.order = order; // TODO : restore the order @@ -6596,9 +6605,9 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { return NULL; } - SHashGroupbyOperatorInfo *pInfo = pOperator->optInfo; + SHashGroupbyOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (hasRemainData(&pRuntimeEnv->groupResInfo)) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); if (pInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { @@ -6629,7 +6638,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); if (!pRuntimeEnv->pQuery->stableQuery) { - finalizeQueryResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); } updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); @@ -6650,8 +6659,8 @@ static SSDataBlock* doFill(void* param) { return NULL; } - SFillOperatorInfo *pInfo = pOperator->optInfo; - SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv; + SFillOperatorInfo *pInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) { doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes); @@ -6693,91 +6702,83 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->cleanup != NULL) { - pOperator->cleanup(pOperator->optInfo); + pOperator->cleanup(pOperator->info); } destroyOperatorInfo(pOperator->upstream); - tfree(pOperator->optInfo); + tfree(pOperator->info); tfree(pOperator); } -static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pTableQueryInfo = pTableQueryInfo; - pInfo->pRuntimeEnv = pRuntimeEnv; - SQuery* pQuery = pRuntimeEnv->pQuery; + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "AggregationOp"; + pOperator->name = "Aggregate"; pOperator->blockingOptr = true; pOperator->completed = false; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; pOperator->upstream = upstream; - pOperator->exec = doAggregation; - pOperator->pExpr = pQuery->pExpr1; - pOperator->numOfOutput = pQuery->numOfOutput; - - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - - pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset); + pOperator->exec = doAggregate; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createStableAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pTableQueryInfo = pTableQueryInfo; - pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "STableAggregate"; + pOperator->name = "STableAggregate"; pOperator->blockingOptr = true; pOperator->completed = false; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; pOperator->upstream = upstream; - pOperator->exec = doSTableAggregation; - pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; - pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; - - - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pOperator->exec = doSTableAggregate; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); - pInfo->pTableQueryInfo = pTableQueryInfo; - pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pOutput = createOutputBuf(pExpr, numOfOutput); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOp"; pOperator->blockingOptr = false; pOperator->completed = false; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->exec = doArithmeticOperation; - pOperator->pExpr = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->pExpr1:pRuntimeEnv->pQuery->pExpr2; - pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2; - - pInfo->pOutput = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); - pInfo->limit = pRuntimeEnv->pQuery->limit.limit; - pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6786,19 +6787,17 @@ static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, pOperator->completed = false; pOperator->upstream = upstream; pOperator->exec = doLimit; - pOperator->pExpr = NULL; - pOperator->numOfOutput = 0; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo)); pInfo->offset = pRuntimeEnv->pQuery->limit.offset; pInfo->currentOffset = pInfo->offset; - pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6807,43 +6806,40 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, pOperator->completed = false; pOperator->upstream = upstream; pOperator->exec = doOffset; - pOperator->pExpr = NULL; - pOperator->numOfOutput = 0; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); - pInfo->pRuntimeEnv = pRuntimeEnv; - pInfo->pTableQueryInfo = pTableQueryInfo; + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "HashIntervalAggOp"; + pOperator->name = "HashIntervalAgg"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; pOperator->exec = doHashIntervalAgg; - pOperator->pExpr = pQuery->pExpr1; - pOperator->numOfOutput = pQuery->numOfOutput; - pOperator->optInfo = pInfo; - - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); - pInfo->pRuntimeEnv = pRuntimeEnv; - pInfo->pTableQueryInfo = pTableQueryInfo; + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6855,44 +6851,39 @@ static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQu pOperator->exec = doSTableIntervalAgg; pOperator->pExpr = pQuery->pExpr1; pOperator->numOfOutput = pQuery->numOfOutput; - pOperator->optInfo = pInfo; - - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo)); + pInfo->colIndex = -1; // group by column index - pInfo->pRuntimeEnv = pRuntimeEnv; - pInfo->pTableQueryInfo = pTableQueryInfo; - pInfo->colIndex = -1; + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); + initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - SQuery* pQuery = pRuntimeEnv->pQuery; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "HashGroupbyAggOp"; + pOperator->name = "HashGroupby"; pOperator->blockingOptr = true; pOperator->completed = false; pOperator->upstream = upstream; pOperator->exec = doHashGroupbyAgg; - pOperator->pExpr = pQuery->pExpr1; - pOperator->numOfOutput = pQuery->numOfOutput; - pOperator->optInfo = pInfo; - - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->pExpr, pOperator->numOfOutput, &pInfo->rowCellInfoOffset); - pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } -static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, + int32_t numOfOutput) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); - pInfo->pRuntimeEnv = pRuntimeEnv; + pInfo->pRes = createOutputBuf(pExpr, numOfOutput); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6901,19 +6892,19 @@ static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, S pOperator->completed = false; pOperator->upstream = upstream; pOperator->exec = doFill; - pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; - pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; - pOperator->optInfo = pInfo; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; - pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); return pOperator; } static SSDataBlock* doTagScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - STagScanInfo *pTagScanInfo = pOperator->optInfo; - SQueryRuntimeEnv *pRuntimeEnv = pTagScanInfo->pRuntimeEnv; + STagScanInfo *pTagScanInfo = pOperator->info; + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); @@ -7046,16 +7037,16 @@ static SSDataBlock* doTagScan(void* param) { static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); - pInfo->pRuntimeEnv = pRuntimeEnv; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTagScanOp"; pOperator->blockingOptr = false; pOperator->completed = false; - pOperator->optInfo = pInfo; + pOperator->info = pInfo; + pOperator->exec = doTagScan; + pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; - pOperator->exec = doTagScan; - pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; + pOperator->pRuntimeEnv = pRuntimeEnv; return pOperator; } diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 540b4e9dcce00d0ed97c919f0e5f2c92c245f924..5b5a7b66567eda8fbd9d729bbf3d5ed4927ee3d6 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -425,8 +425,6 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); pFillInfo->pData[i] = pColData->pData; - -// memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes); } }