From 79f3d77852125790606ac2fc85bf2b2cac95b1a4 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 23 Apr 2020 12:23:30 +0800 Subject: [PATCH] [td-171]fix bug in column values filter query --- src/query/inc/queryExecutor.h | 4 +- src/query/inc/tsqlfunction.h | 2 +- src/query/src/queryExecutor.c | 316 ++++++++++++++++------------------ src/query/src/queryUtil.c | 8 +- 4 files changed, 151 insertions(+), 179 deletions(-) diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 504d20f992..a9a9424a7c 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -122,13 +122,13 @@ typedef struct SQuery { int64_t slidingTime; // sliding time for sliding window query char slidingTimeUnit; // interval data type, used for daytime revise int8_t precision; - int16_t numOfOutputCols; + int16_t numOfOutput; int16_t interpoType; int16_t checkBuffer; // check if the buffer is full during scan each block SLimitVal limit; int32_t rowSize; SSqlGroupbyExpr* pGroupbyExpr; - SSqlFunctionExpr* pSelectExpr; + SArithExprInfo* pSelectExpr; SColumnInfo* colList; int32_t numOfFilterCols; int64_t* defaultVal; diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 3cfcf83268..54055e9d33 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -114,7 +114,7 @@ enum { #define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0) typedef struct SArithmeticSupport { - SSqlFunctionExpr *pExpr; + SArithExprInfo *pArithExpr; int32_t elemSize[TSDB_MAX_COLUMNS]; int32_t numOfCols; int32_t offset; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index aefe4a3b8b..02580e6e87 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -251,26 +251,12 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) { return true; } -bool vnodeFilterData(SQuery *pQuery, int32_t *numOfActualRead, int32_t index) { - (*numOfActualRead)++; - if (!doFilterData(pQuery, index)) { - return false; - } - - if (pQuery->limit.offset > 0) { - pQuery->limit.offset--; // ignore this qualified row - return false; - } - - return true; -} - int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; bool hasMainFunction = hasMainOutput(pQuery); int64_t maxOutput = 0; - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; /* @@ -347,7 +333,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool hasTags = false; int32_t numOfSelectivity = 0; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) { hasTags = true; @@ -368,7 +354,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; } -bool limitResults(SQInfo *pQInfo) { +static bool limitResults(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { @@ -383,7 +369,7 @@ bool limitResults(SQInfo *pQInfo) { } static bool isTopBottomQuery(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TS) { continue; @@ -730,7 +716,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; pCtx[k].nStartQueryTimestamp = pWin->skey; @@ -754,7 +740,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) { - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { pCtx[k].nStartQueryTimestamp = pWin->skey; int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; @@ -839,7 +825,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 int32_t functionId = pQuery->pSelectExpr[col].pBase.functionId; if (functionId == TSDB_FUNC_ARITHM) { - sas->pExpr = &pQuery->pSelectExpr[col]; + sas->pArithExpr = &pQuery->pSelectExpr[col]; // set the start offset to be the lowest start position, no matter asc/desc query order if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -907,9 +893,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * primaryKeyCol = (TSKEY *)(pColInfo->pData); } - SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport)); + SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; SDataStatis *tpField = NULL; @@ -966,7 +952,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * * since the selectivity + tag_prj query needs all parameters been set done. * tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY */ - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); @@ -1098,7 +1084,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); - SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport)); + SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); int16_t type = 0; int16_t bytes = 0; @@ -1109,7 +1095,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes); } - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; SDataStatis *pColStatis = NULL; @@ -1218,7 +1204,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // all startOffset are identical offset -= pCtx[0].startOffset; - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pCtx[k], offset); @@ -1350,9 +1336,9 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { int16_t tagLen = 0; - SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; + SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) { tagLen += pCtx[i].outputBytes; pTagCtx[num++] = &pCtx[i]; @@ -1374,7 +1360,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { } static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interResBytes, isStableQuery); } } @@ -1383,16 +1369,16 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order qTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); SQuery *pQuery = pRuntimeEnv->pQuery; - pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); - pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutputCols, sizeof(SQLFunctionCtx)); + pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo)); + pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) { goto _error_clean; } pRuntimeEnv->offset[0] = 0; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->inputType = GET_COLUMN_TYPE(pQuery, i); @@ -1400,8 +1386,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->ptsOutputBuf = NULL; - pCtx->outputBytes = pQuery->pSelectExpr[i].resBytes; - pCtx->outputType = pQuery->pSelectExpr[i].resType; + pCtx->outputBytes = pQuery->pSelectExpr[i].bytes; + pCtx->outputType = pQuery->pSelectExpr[i].type; pCtx->order = pQuery->order.order; pCtx->functionId = pSqlFuncMsg->functionId; @@ -1463,10 +1449,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; qTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pQuery)); - cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutputCols); + cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput); if (pRuntimeEnv->pCtx != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -1485,7 +1471,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo); if (pRuntimeEnv->pInterpoBuf != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { tfree(pRuntimeEnv->pInterpoBuf[i]); } @@ -1528,8 +1514,8 @@ bool isFixedOutputQuery(SQuery *pQuery) { return true; } - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SSqlFuncExprMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase; + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase; // ignore the ts_comp function if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 && @@ -1550,7 +1536,7 @@ bool isFixedOutputQuery(SQuery *pQuery) { } bool isPointInterpoQuery(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId; if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { return true; @@ -1562,7 +1548,7 @@ bool isPointInterpoQuery(SQuery *pQuery) { // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION bool isSumAvgRateQuery(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TS) { continue; @@ -1578,7 +1564,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) { } bool isFirstLastRowQuery(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId; if (functionID == TSDB_FUNC_LAST_ROW) { return true; @@ -1594,7 +1580,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) { } static bool needReverseScan(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) { continue; @@ -1767,8 +1753,8 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { pQuery->checkBuffer = 0; } else { bool hasMultioutput = false; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SSqlFuncExprMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase; + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase; if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) { continue; } @@ -1800,7 +1786,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) { // todo ignore the avg/sum/min/max/count/stddev/top/bottom functions, of which // the scan order is not matter static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG || @@ -1977,7 +1963,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI if (key == pQuery->window.skey) { // the queried timestamp has value, return it directly without interpolation - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT); pRuntimeEnv->pCtx[i].param[0].i64Key = key; @@ -1988,7 +1974,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI count = 2; if (pQuery->interpoType == TSDB_INTERPO_SET_VALUE) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; // only the function of interp needs the corresponding information @@ -2023,7 +2009,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI TSKEY prevKey = *(TSKEY *)pPointInterpSupport->pPrevPoint[0]; TSKEY nextKey = *(TSKEY *)pPointInterpSupport->pNextPoint[0]; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; // tag column does not need the interp environment @@ -2107,11 +2093,11 @@ static UNUSED_FUNC void allocMemForInterpo(SQInfo *pQInfo, SQuery *pQuery, void assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery))); if (isIntervalQuery(pQuery)) { - pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutputCols); + pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { pQInfo->runtimeEnv.pInterpoBuf[i] = - calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].resBytes * pMeterObj->pointsPerFileBlock); + calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock); } } } @@ -2162,7 +2148,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi int32_t realRowId = pResult->pos.rowId * getRowParamForMultiRowsOutput(pQuery, pRuntimeEnv->stableQuery); return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * numOfRows + - pQuery->pSelectExpr[columnIndex].resBytes * realRowId; + pQuery->pSelectExpr[columnIndex].bytes * realRowId; } /** @@ -2206,7 +2192,7 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_SPREAD) { @@ -2265,7 +2251,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun } // todo disable this opt code block temporarily - // for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + // for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { // int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; // if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { // return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max); @@ -2293,7 +2279,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl if (pQuery->numOfFilterCols > 0) { r = BLK_DATA_ALL_NEEDED; } else { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t colId = pQuery->pSelectExpr[i].pBase.colInfo.colId; r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId); @@ -2446,8 +2432,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t remain = pRec->capacity - pRec->rows; int32_t newSize = pRec->capacity + (blockInfo.rows - remain); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - int32_t bytes = pQuery->pSelectExpr[i].resBytes; + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t bytes = pQuery->pSelectExpr[i].bytes; char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData)); if (tmp == NULL) { // todo handle the oom @@ -2519,13 +2505,13 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) { SQuery *pQuery = pRuntimeEnv->pQuery; - SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; - if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { + SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase; + if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { assert(pFuncMsg->numOfParams == 1); doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag); } else { // set tag value, by which the results are aggregated. - for (int32_t idx = 0; idx < pQuery->numOfOutputCols; ++idx) { + for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { SColIndex *pCol = &pQuery->pSelectExpr[idx].pBase.colInfo; // ts_comp column required the tag value for join filter @@ -2551,7 +2537,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (!mergeFlag) { pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes; @@ -2575,7 +2561,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes } } - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId == TSDB_FUNC_TAG_DUMMY) { continue; @@ -2656,7 +2642,7 @@ static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOfRows) { #if 0 - int32_t numOfCols = pQuery->numOfOutputCols; + int32_t numOfCols = pQuery->numOfOutput; printf("super table query intermediate result, total:%d\n", numOfRows); SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery)); @@ -2664,32 +2650,32 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { - switch (pQuery->pSelectExpr[i].resType) { + switch (pQuery->pSelectExpr[i].type) { case TSDB_DATA_TYPE_BINARY: { int32_t colIndex = pQuery->pSelectExpr[i].pBase.colInfo.colIndex; int32_t type = 0; if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].pBase.colInfo.flag)) { - type = pQuery->pSelectExpr[i].resType; + type = pQuery->pSelectExpr[i].type; } else { type = pMeterObj->schema[colIndex].type; } - printBinaryData(pQuery->pSelectExpr[i].pBase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j, + printBinaryData(pQuery->pSelectExpr[i].pBase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j, type); break; } case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: - printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j)); + printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); break; case TSDB_DATA_TYPE_INT: - printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j)); + printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); break; case TSDB_DATA_TYPE_FLOAT: - printf("%f\t", *(float *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j)); + printf("%f\t", *(float *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); break; case TSDB_DATA_TYPE_DOUBLE: - printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j)); + printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j)); break; } } @@ -2807,7 +2793,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { for (int32_t num = 0; num < list.size; ++num) { tFilePage *pData = getResultBufferPageById(pResultBuf, list.pData[num]); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; char * pDest = pQuery->sdata[i]->data; @@ -2828,7 +2814,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW SQuery *pQuery = pRuntimeEnv->pQuery; int64_t maxOutput = 0; - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; /* @@ -2885,7 +2871,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SLoserTreeInfo *pTree = NULL; tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); - SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); + SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo)); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); @@ -2971,7 +2957,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { tfree(posList); pQInfo->offset = 0; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { tfree(pResultInfo[i].interResultBuf); } @@ -3002,7 +2988,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId); // pagewise copy to dest buffer - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; buf->numOfElems = r; @@ -3019,7 +3005,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { } void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo) { - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes; pCtx[k].size = 1; pCtx[k].startOffset = 0; @@ -3044,7 +3030,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * SWindowResult *buf = getWindowResult(pWindowResInfo, i); // open/close the specified query for each group result - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || @@ -3066,7 +3052,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } else { // for simple result of table query, - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; @@ -3085,7 +3071,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; } @@ -3106,14 +3092,14 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order); // = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } } void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { - int32_t numOfCols = pQuery->numOfOutputCols; + int32_t numOfCols = pQuery->numOfOutput; pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo)); pResultRow->pos = *posInfo; @@ -3125,7 +3111,7 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->aOutputBuf = pQuery->sdata[i]->data; @@ -3142,7 +3128,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } - memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity); + memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].bytes * pQuery->rec.capacity); } initCtxOutputBuf(pRuntimeEnv); @@ -3152,7 +3138,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { SQuery *pQuery = pRuntimeEnv->pQuery; // reset the execution contexts - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; assert(functionId != TSDB_FUNC_DIFF); @@ -3179,7 +3165,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; pRuntimeEnv->pCtx[j].currentStage = 0; @@ -3205,7 +3191,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfSkip = (int32_t) pQuery->limit.offset; pQuery->rec.rows -= numOfSkip; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; @@ -3247,7 +3233,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { setWindowResOutputBuf(pRuntimeEnv, pResult); - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int16_t functId = pQuery->pSelectExpr[j].pBase.functionId; if (functId == TSDB_FUNC_TS) { continue; @@ -3260,7 +3246,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { } } } else { - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int16_t functId = pQuery->pSelectExpr[j].pBase.functionId; if (functId == TSDB_FUNC_TS) { continue; @@ -3428,7 +3414,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { setWindowResOutputBuf(pRuntimeEnv, buf); - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); } @@ -3440,14 +3426,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { } } else { - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]); } } } static bool hasMainOutput(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) { @@ -3550,7 +3536,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * SQuery *pQuery = pRuntimeEnv->pQuery; // Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); @@ -3651,7 +3637,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK } bool requireTimestamp(SQuery *pQuery) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; i++) { + for (int32_t i = 0; i < pQuery->numOfOutput; i++) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) { return true; @@ -3732,7 +3718,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde pQInfo->groupIndex += 1; } - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t size = pRuntimeEnv->pCtx[j].outputBytes; char *out = pQuery->sdata[j]->data + numOfResult * size; @@ -3855,18 +3841,18 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE); // build support structure for performing interpolation - SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutputCols); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes; - pSchema[i].type = pQuery->pSelectExpr[i].resType; + pSchema[i].type = pQuery->pSelectExpr[i].type; } -// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutputCols, pQuery->pointsToRead); +// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead); char * srcData[TSDB_MAX_COLUMNS] = {0}; int32_t functions[TSDB_MAX_COLUMNS] = {0}; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { srcData[i] = pDataSrc[i]->data; functions[i] = pQuery->pSelectExpr[i].pBase.functionId; } @@ -3884,8 +3870,8 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { - int32_t bytes = pQuery->pSelectExpr[col].resBytes; + for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { + int32_t bytes = pQuery->pSelectExpr[col].bytes; memmove(data, pQuery->sdata[col]->data, bytes * numOfRows); data += bytes * numOfRows; @@ -3922,9 +3908,9 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage ret -= pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. // todo refactor move to the beginning of buffer - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset, - ret * pQuery->pSelectExpr[i].resBytes); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].bytes * pQuery->limit.offset, + ret * pQuery->pSelectExpr[i].bytes); } pQuery->limit.offset = 0; return ret; @@ -4302,7 +4288,7 @@ static UNUSED_FUNC bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey) static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); if (pResInfo != NULL) { pResInfo->complete = false; @@ -4672,7 +4658,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pStatus->closed = true; // enable return all results for group by normal columns SWindowResult *pResult = &pWindowResInfo->pResult[i]; - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes); } } @@ -5018,8 +5004,8 @@ static void tableIntervalProcess(SQInfo *pQInfo) { taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.rows, pQuery->interpoType); SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.rows * pQuery->pSelectExpr[i].resBytes); + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.rows * pQuery->pSelectExpr[i].bytes); } numOfInterpo = 0; @@ -5159,7 +5145,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { sem_post(&pQInfo->dataReady); } -static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) { +static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg) { int32_t j = 0; while (j < pQueryMsg->numOfCols) { @@ -5173,7 +5159,7 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg return j; } -bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) { +bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg) { int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg); return j < pQueryMsg->numOfCols; } @@ -5199,8 +5185,8 @@ static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) { return -1; } - if (pQueryMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutputCols <= 0) { - qError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutputCols); + if (pQueryMsg->numOfOutput > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutput <= 0) { + qError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutput); return -1; } @@ -5244,7 +5230,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p * @param pExpr * @return */ -static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, +static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, char **tagCond, SColIndex **groupbyCols) { pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); @@ -5260,7 +5246,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->queryType = htons(pQueryMsg->queryType); pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); - pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols); + pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput); pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols); pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen); pQueryMsg->tsOffset = htonl(pQueryMsg->tsOffset); @@ -5316,10 +5302,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, bool hasArithmeticFunction = false; - *pExpr = calloc(pQueryMsg->numOfOutputCols, POINTER_BYTES); - SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg; + *pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES); + SSqlFuncMsg *pExprMsg = (SSqlFuncMsg *)pMsg; - for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { (*pExpr)[i] = pExprMsg; pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); @@ -5328,7 +5314,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); - pMsg += sizeof(SSqlFuncExprMsg); + pMsg += sizeof(SSqlFuncMsg); for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); @@ -5355,7 +5341,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, } } - pExprMsg = (SSqlFuncExprMsg *)pMsg; + pExprMsg = (SSqlFuncMsg *)pMsg; } pQueryMsg->colNameLen = htonl(pQueryMsg->colNameLen); @@ -5393,11 +5379,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg->defaultVal = (uint64_t)(pMsg); int64_t *v = (int64_t *)pMsg; - for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { v[i] = htobe64(v[i]); } - pMsg += sizeof(int64_t) * pQueryMsg->numOfOutputCols; + pMsg += sizeof(int64_t) * pQueryMsg->numOfOutput; } // the tag query condition expression string is located at the end of query msg @@ -5411,14 +5397,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, - pQueryMsg->order, pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime, + pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); return 0; } -static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMsg *pQueryMsg) { -// SSqlBinaryExprInfo *pBinaryExprInfo = &pExpr->binExprInfo; +static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pExpr, SQueryTableMsg *pQueryMsg) { +// SExprInfo *pBinaryExprInfo = &pExpr->binExprInfo; // SColumnInfo * pColMsg = pQueryMsg->colList; #if 0 tExprNode* pBinExpr = NULL; @@ -5468,12 +5454,12 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs return TSDB_CODE_SUCCESS; } -static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr, - SSqlFuncExprMsg **pExprMsg) { +static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExprInfo **pSqlFuncExpr, + SSqlFuncMsg **pExprMsg) { *pSqlFuncExpr = NULL; int32_t code = TSDB_CODE_SUCCESS; - SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols); + SArithExprInfo *pExprs = (SArithExprInfo *)calloc(1, sizeof(SArithExprInfo) * pQueryMsg->numOfOutput); if (pExprs == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -5481,9 +5467,9 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); int16_t tagLen = 0; - for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { pExprs[i].pBase = *pExprMsg[i]; - pExprs[i].resBytes = 0; + pExprs[i].bytes = 0; int16_t type = 0; int16_t bytes = 0; @@ -5509,22 +5495,22 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct } int32_t param = pExprs[i].pBase.arg[0].argValue.i64; - if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].resType, &pExprs[i].resBytes, + if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) { tfree(pExprs); return TSDB_CODE_INVALID_QUERY_MSG; } if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) { - tagLen += pExprs[i].resBytes; + tagLen += pExprs[i].bytes; } - assert(isValidDataType(pExprs[i].resType, pExprs[i].resBytes)); + assert(isValidDataType(pExprs[i].type, pExprs[i].bytes)); } // get the correct result size for top/bottom query, according to the number of tags columns in selection clause // TODO refactor - for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { pExprs[i].pBase = *pExprMsg[i]; int16_t functId = pExprs[i].pBase.functionId; if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) { @@ -5537,7 +5523,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct int32_t ret = getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, pExprs[i].pBase.arg[0].argValue.i64, - &pExprs[i].resType, &pExprs[i].resBytes, &pExprs[i].interResBytes, tagLen, isSuperTable); + &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable); assert(ret == TSDB_CODE_SUCCESS); } } @@ -5655,23 +5641,9 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { static void doUpdateExprColumnIndex(SQuery *pQuery) { assert(pQuery->pSelectExpr != NULL && pQuery != NULL); - // int32_t i = 0, j = 0; - // while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) { - // if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) { - // pQuery->colList[i++].colIndex = (int16_t)j++; - // } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) { - // pQuery->colList[i++].colIndex = -1; - // } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) { - // j++; - // } - // } - - // while (i < pQuery->numOfCols) { - // pQuery->colList[i++].colIndex = -1; // not such column in current meter - // } - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - SSqlFuncExprMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase; if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) { continue; } @@ -5686,7 +5658,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } } -static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, +static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SArithExprInfo *pExprs, STableGroupInfo *groupInfo) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -5697,10 +5669,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pQuery = pQuery; int16_t numOfCols = pQueryMsg->numOfCols; - int16_t numOfOutputCols = pQueryMsg->numOfOutputCols; + int16_t numOfOutput = pQueryMsg->numOfOutput; pQuery->numOfCols = numOfCols; - pQuery->numOfOutputCols = numOfOutputCols; + pQuery->numOfOutput = numOfOutput; pQuery->limit.limit = pQueryMsg->limit; pQuery->limit.offset = pQueryMsg->offset; pQuery->order.order = pQueryMsg->order; @@ -5725,9 +5697,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // calculate the result row size - for (int16_t col = 0; col < numOfOutputCols; ++col) { - assert(pExprs[col].resBytes > 0); - pQuery->rowSize += pExprs[col].resBytes; + for (int16_t col = 0; col < numOfOutput; ++col) { + assert(pExprs[col].bytes > 0); + pQuery->rowSize += pExprs[col].bytes; } doUpdateExprColumnIndex(pQuery); @@ -5738,7 +5710,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // prepare the result buffer - pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, POINTER_BYTES); + pQuery->sdata = (SData **)calloc(pQuery->numOfOutput, POINTER_BYTES); if (pQuery->sdata == NULL) { goto _cleanup; } @@ -5747,11 +5719,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->rec.capacity = 4096; pQuery->rec.threshold = 4000; - for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { - assert(pExprs[col].interResBytes >= pExprs[col].resBytes); + for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { + assert(pExprs[col].interResBytes >= pExprs[col].bytes); // allocate additional memory for interResults that are usually larger then final results - size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); + size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interResBytes + sizeof(SData); pQuery->sdata[col] = (SData *)calloc(1, size); if (pQuery->sdata[col] == NULL) { goto _cleanup; @@ -5759,13 +5731,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } if (pQuery->interpoType != TSDB_INTERPO_NONE) { - pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutputCols); + pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); if (pQuery->defaultVal == NULL) { goto _cleanup; } // the first column is the timestamp - memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutputCols * sizeof(int64_t)); + memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutput * sizeof(int64_t)); } // to make sure third party won't overwrite this structure @@ -5792,7 +5764,7 @@ _cleanup: tfree(pQuery->defaultVal); if (pQuery->sdata != NULL) { - for (int16_t col = 0; col < pQuery->numOfOutputCols; ++col) { + for (int16_t col = 0; col < pQuery->numOfOutput; ++col) { tfree(pQuery->sdata[col]); } } @@ -5872,7 +5844,7 @@ static void freeQInfo(SQInfo *pQInfo) { setQueryKilled(pQInfo); qTrace("QInfo:%p start to free QInfo", pQInfo); - for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { + for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { tfree(pQuery->sdata[col]); } @@ -5891,8 +5863,8 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->sdata); if (pQuery->pSelectExpr != NULL) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; if (pBinExprInfo->numOfCols > 0) { tfree(pBinExprInfo->pReqColumns); @@ -5988,7 +5960,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) char * tagCond = NULL; SArray * pTableIdList = NULL; - SSqlFuncExprMsg **pExprMsg = NULL; + SSqlFuncMsg **pExprMsg = NULL; SColIndex * pGroupColIndex = NULL; if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) { @@ -6007,7 +5979,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) goto _query_over; } - SSqlFunctionExpr *pExprs = NULL; + SArithExprInfo *pExprs = NULL; if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) { goto _query_over; } diff --git a/src/query/src/queryUtil.c b/src/query/src/queryUtil.c index 17410b2868..b4d8911284 100644 --- a/src/query/src/queryUtil.c +++ b/src/query/src/queryUtil.c @@ -217,11 +217,11 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow return; } - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { SResultInfo *pResultInfo = &pWindowRes->resultInfo[i]; char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes); - size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes; + size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; memset(s, 0, size); resetResultInfo(pResultInfo); @@ -245,7 +245,7 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con dst->window = src->window; dst->status = src->status; - int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols; + int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput; for (int32_t i = 0; i < nOutputCols; ++i) { SResultInfo *pDst = &dst->resultInfo[i]; @@ -261,7 +261,7 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con // copy the output buffer data from src to dst, the position info keep unchanged char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst); char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src); - size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes; + size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes; memcpy(dstBuf, srcBuf, s); } -- GitLab