提交 79f3d778 编写于 作者: H hjxilinx

[td-171]fix bug in column values filter query

上级 47c2796b
......@@ -122,13 +122,13 @@ typedef struct SQuery {
int64_t slidingTime; // sliding time for sliding window query
char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision;
int16_t numOfOutputCols;
int16_t numOfOutput;
int16_t interpoType;
int16_t checkBuffer; // check if the buffer is full during scan each block
SLimitVal limit;
int32_t rowSize;
SSqlGroupbyExpr* pGroupbyExpr;
SSqlFunctionExpr* pSelectExpr;
SArithExprInfo* pSelectExpr;
SColumnInfo* colList;
int32_t numOfFilterCols;
int64_t* defaultVal;
......
......@@ -114,7 +114,7 @@ enum {
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
typedef struct SArithmeticSupport {
SSqlFunctionExpr *pExpr;
SArithExprInfo *pArithExpr;
int32_t elemSize[TSDB_MAX_COLUMNS];
int32_t numOfCols;
int32_t offset;
......
......@@ -251,26 +251,12 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) {
return true;
}
bool vnodeFilterData(SQuery *pQuery, int32_t *numOfActualRead, int32_t index) {
(*numOfActualRead)++;
if (!doFilterData(pQuery, index)) {
return false;
}
if (pQuery->limit.offset > 0) {
pQuery->limit.offset--; // ignore this qualified row
return false;
}
return true;
}
int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool hasMainFunction = hasMainOutput(pQuery);
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
/*
......@@ -347,7 +333,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
bool hasTags = false;
int32_t numOfSelectivity = 0;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
if (functId == TSDB_FUNC_TAG_DUMMY || functId == TSDB_FUNC_TS_DUMMY) {
hasTags = true;
......@@ -368,7 +354,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; }
bool limitResults(SQInfo *pQInfo) {
static bool limitResults(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
......@@ -383,7 +369,7 @@ bool limitResults(SQInfo *pQInfo) {
}
static bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
......@@ -730,7 +716,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
pCtx[k].nStartQueryTimestamp = pWin->skey;
......@@ -754,7 +740,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
......@@ -839,7 +825,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
int32_t functionId = pQuery->pSelectExpr[col].pBase.functionId;
if (functionId == TSDB_FUNC_ARITHM) {
sas->pExpr = &pQuery->pSelectExpr[col];
sas->pArithExpr = &pQuery->pSelectExpr[col];
// set the start offset to be the lowest start position, no matter asc/desc query order
if (QUERY_IS_ASC_QUERY(pQuery)) {
......@@ -907,9 +893,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
primaryKeyCol = (TSKEY *)(pColInfo->pData);
}
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
SDataStatis *tpField = NULL;
......@@ -966,7 +952,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
* since the selectivity + tag_prj query needs all parameters been set done.
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
......@@ -1098,7 +1084,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport));
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
int16_t type = 0;
int16_t bytes = 0;
......@@ -1109,7 +1095,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
SDataStatis *pColStatis = NULL;
......@@ -1218,7 +1204,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
// all startOffset are identical
offset -= pCtx[0].startOffset;
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
......@@ -1350,9 +1336,9 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
int16_t tagLen = 0;
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutputCols, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pCtx[i].outputBytes;
pTagCtx[num++] = &pCtx[i];
......@@ -1374,7 +1360,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
}
static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interResBytes, isStableQuery);
}
}
......@@ -1383,16 +1369,16 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
qTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo));
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutputCols, sizeof(SQLFunctionCtx));
pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo));
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) {
goto _error_clean;
}
pRuntimeEnv->offset[0] = 0;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlFuncExprMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].pBase;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
......@@ -1400,8 +1386,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pCtx->ptsOutputBuf = NULL;
pCtx->outputBytes = pQuery->pSelectExpr[i].resBytes;
pCtx->outputType = pQuery->pSelectExpr[i].resType;
pCtx->outputBytes = pQuery->pSelectExpr[i].bytes;
pCtx->outputType = pQuery->pSelectExpr[i].type;
pCtx->order = pQuery->order.order;
pCtx->functionId = pSqlFuncMsg->functionId;
......@@ -1463,10 +1449,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
qTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pQuery));
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutputCols);
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput);
if (pRuntimeEnv->pCtx != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
......@@ -1485,7 +1471,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo);
if (pRuntimeEnv->pInterpoBuf != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pRuntimeEnv->pInterpoBuf[i]);
}
......@@ -1528,8 +1514,8 @@ bool isFixedOutputQuery(SQuery *pQuery) {
return true;
}
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlFuncExprMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase;
// ignore the ts_comp function
if (i == 0 && pExprMsg->functionId == TSDB_FUNC_PRJ && pExprMsg->numOfParams == 1 &&
......@@ -1550,7 +1536,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
}
bool isPointInterpoQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId;
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) {
return true;
......@@ -1562,7 +1548,7 @@ bool isPointInterpoQuery(SQuery *pQuery) {
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
bool isSumAvgRateQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
......@@ -1578,7 +1564,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) {
}
bool isFirstLastRowQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionID = pQuery->pSelectExpr[i].pBase.functionId;
if (functionID == TSDB_FUNC_LAST_ROW) {
return true;
......@@ -1594,7 +1580,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) {
}
static bool needReverseScan(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG) {
continue;
......@@ -1767,8 +1753,8 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
pQuery->checkBuffer = 0;
} else {
bool hasMultioutput = false;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlFuncExprMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pExprMsg = &pQuery->pSelectExpr[i].pBase;
if (pExprMsg->functionId == TSDB_FUNC_TS || pExprMsg->functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
......@@ -1800,7 +1786,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
// todo ignore the avg/sum/min/max/count/stddev/top/bottom functions, of which
// the scan order is not matter
static bool onlyOneQueryType(SQuery *pQuery, int32_t functId, int32_t functIdDst) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_TAG ||
......@@ -1977,7 +1963,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
if (key == pQuery->window.skey) {
// the queried timestamp has value, return it directly without interpolation
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT);
pRuntimeEnv->pCtx[i].param[0].i64Key = key;
......@@ -1988,7 +1974,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
count = 2;
if (pQuery->interpoType == TSDB_INTERPO_SET_VALUE) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// only the function of interp needs the corresponding information
......@@ -2023,7 +2009,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
TSKEY prevKey = *(TSKEY *)pPointInterpSupport->pPrevPoint[0];
TSKEY nextKey = *(TSKEY *)pPointInterpSupport->pNextPoint[0];
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
// tag column does not need the interp environment
......@@ -2107,11 +2093,11 @@ static UNUSED_FUNC void allocMemForInterpo(SQInfo *pQInfo, SQuery *pQuery, void
assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery)));
if (isIntervalQuery(pQuery)) {
pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutputCols);
pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pQInfo->runtimeEnv.pInterpoBuf[i] =
calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].resBytes * pMeterObj->pointsPerFileBlock);
calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock);
}
}
}
......@@ -2162,7 +2148,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
int32_t realRowId = pResult->pos.rowId * getRowParamForMultiRowsOutput(pQuery, pRuntimeEnv->stableQuery);
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * numOfRows +
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
}
/**
......@@ -2206,7 +2192,7 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_SPREAD) {
......@@ -2265,7 +2251,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
}
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
......@@ -2293,7 +2279,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
if (pQuery->numOfFilterCols > 0) {
r = BLK_DATA_ALL_NEEDED;
} else {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t colId = pQuery->pSelectExpr[i].pBase.colInfo.colId;
r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pQuery->window.skey, pQuery->window.ekey, colId);
......@@ -2446,8 +2432,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t remain = pRec->capacity - pRec->rows;
int32_t newSize = pRec->capacity + (blockInfo.rows - remain);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].resBytes;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].bytes;
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData));
if (tmp == NULL) { // todo handle the oom
......@@ -2519,13 +2505,13 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SSqlFuncExprMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutputCols == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].pBase;
if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) {
assert(pFuncMsg->numOfParams == 1);
doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag);
} else {
// set tag value, by which the results are aggregated.
for (int32_t idx = 0; idx < pQuery->numOfOutputCols; ++idx) {
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
SColIndex *pCol = &pQuery->pSelectExpr[idx].pBase.colInfo;
// ts_comp column required the tag value for join filter
......@@ -2551,7 +2537,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (!mergeFlag) {
pCtx[i].aOutputBuf = pCtx[i].aOutputBuf + pCtx[i].outputBytes;
......@@ -2575,7 +2561,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
}
}
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY) {
continue;
......@@ -2656,7 +2642,7 @@ static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t
void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOfRows) {
#if 0
int32_t numOfCols = pQuery->numOfOutputCols;
int32_t numOfCols = pQuery->numOfOutput;
printf("super table query intermediate result, total:%d\n", numOfRows);
SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery));
......@@ -2664,32 +2650,32 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
for (int32_t j = 0; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) {
switch (pQuery->pSelectExpr[i].resType) {
switch (pQuery->pSelectExpr[i].type) {
case TSDB_DATA_TYPE_BINARY: {
int32_t colIndex = pQuery->pSelectExpr[i].pBase.colInfo.colIndex;
int32_t type = 0;
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].pBase.colInfo.flag)) {
type = pQuery->pSelectExpr[i].resType;
type = pQuery->pSelectExpr[i].type;
} else {
type = pMeterObj->schema[colIndex].type;
}
printBinaryData(pQuery->pSelectExpr[i].pBase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j,
printBinaryData(pQuery->pSelectExpr[i].pBase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
type);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j));
printf("%" PRId64 "\t", *(int64_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
break;
case TSDB_DATA_TYPE_INT:
printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j));
printf("%d\t", *(int32_t *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
break;
case TSDB_DATA_TYPE_FLOAT:
printf("%f\t", *(float *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j));
printf("%f\t", *(float *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
break;
case TSDB_DATA_TYPE_DOUBLE:
printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pSelectExpr[i].resBytes * j));
printf("%lf\t", *(double *)(pdata[i]->data + pQuery->pSelectExpr[i].bytes * j));
break;
}
}
......@@ -2807,7 +2793,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
for (int32_t num = 0; num < list.size; ++num) {
tFilePage *pData = getResultBufferPageById(pResultBuf, list.pData[num]);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char * pDest = pQuery->sdata[i]->data;
......@@ -2828,7 +2814,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
/*
......@@ -2885,7 +2871,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo));
SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo));
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
......@@ -2971,7 +2957,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tfree(posList);
pQInfo->offset = 0;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pResultInfo[i].interResultBuf);
}
......@@ -3002,7 +2988,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId);
// pagewise copy to dest buffer
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
buf->numOfElems = r;
......@@ -3019,7 +3005,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
}
void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo) {
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes;
pCtx[k].size = 1;
pCtx[k].startOffset = 0;
......@@ -3044,7 +3030,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
SWindowResult *buf = getWindowResult(pWindowResInfo, i);
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
......@@ -3066,7 +3052,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
......@@ -3085,7 +3071,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
}
......@@ -3106,14 +3092,14 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SWITCH_ORDER(pRuntimeEnv->pCtx[i]
.order); // = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
}
}
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
int32_t numOfCols = pQuery->numOfOutputCols;
int32_t numOfCols = pQuery->numOfOutput;
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
pResultRow->pos = *posInfo;
......@@ -3125,7 +3111,7 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = pQuery->sdata[i]->data;
......@@ -3142,7 +3128,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * pQuery->rec.capacity);
memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].bytes * pQuery->rec.capacity);
}
initCtxOutputBuf(pRuntimeEnv);
......@@ -3152,7 +3138,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// reset the execution contexts
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
assert(functionId != TSDB_FUNC_DIFF);
......@@ -3179,7 +3165,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
pRuntimeEnv->pCtx[j].currentStage = 0;
......@@ -3205,7 +3191,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t numOfSkip = (int32_t) pQuery->limit.offset;
pQuery->rec.rows -= numOfSkip;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
......@@ -3247,7 +3233,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].pBase.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
......@@ -3260,7 +3246,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
} else {
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pSelectExpr[j].pBase.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
......@@ -3428,7 +3414,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf(pRuntimeEnv, buf);
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
......@@ -3440,14 +3426,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
}
} else {
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
aAggs[pQuery->pSelectExpr[j].pBase.functionId].xFinalize(&pRuntimeEnv->pCtx[j]);
}
}
}
static bool hasMainOutput(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) {
......@@ -3550,7 +3536,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
......@@ -3651,7 +3637,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK
}
bool requireTimestamp(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; i++) {
for (int32_t i = 0; i < pQuery->numOfOutput; i++) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_NEED_TS) != 0) {
return true;
......@@ -3732,7 +3718,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
pQInfo->groupIndex += 1;
}
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
char *out = pQuery->sdata[j]->data + numOfResult * size;
......@@ -3855,18 +3841,18 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t
assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE);
// build support structure for performing interpolation
SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutputCols);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes;
pSchema[i].type = pQuery->pSelectExpr[i].resType;
pSchema[i].type = pQuery->pSelectExpr[i].type;
}
// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutputCols, pQuery->pointsToRead);
// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead);
char * srcData[TSDB_MAX_COLUMNS] = {0};
int32_t functions[TSDB_MAX_COLUMNS] = {0};
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
srcData[i] = pDataSrc[i]->data;
functions[i] = pQuery->pSelectExpr[i].pBase.functionId;
}
......@@ -3884,8 +3870,8 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].resBytes;
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
int32_t bytes = pQuery->pSelectExpr[col].bytes;
memmove(data, pQuery->sdata[col]->data, bytes * numOfRows);
data += bytes * numOfRows;
......@@ -3922,9 +3908,9 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
ret -= pQuery->limit.offset;
// todo !!!!there exactly number of interpo is not valid.
// todo refactor move to the beginning of buffer
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset,
ret * pQuery->pSelectExpr[i].resBytes);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].bytes * pQuery->limit.offset,
ret * pQuery->pSelectExpr[i].bytes);
}
pQuery->limit.offset = 0;
return ret;
......@@ -4302,7 +4288,7 @@ static UNUSED_FUNC bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey)
static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]);
if (pResInfo != NULL) {
pResInfo->complete = false;
......@@ -4672,7 +4658,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pStatus->closed = true; // enable return all results for group by normal columns
SWindowResult *pResult = &pWindowResInfo->pResult[i];
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
}
}
......@@ -5018,8 +5004,8 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.rows, pQuery->interpoType);
SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.rows * pQuery->pSelectExpr[i].resBytes);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.rows * pQuery->pSelectExpr[i].bytes);
}
numOfInterpo = 0;
......@@ -5159,7 +5145,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
sem_post(&pQInfo->dataReady);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) {
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg) {
int32_t j = 0;
while (j < pQueryMsg->numOfCols) {
......@@ -5173,7 +5159,7 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg
return j;
}
bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) {
bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg) {
int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg);
return j < pQueryMsg->numOfCols;
}
......@@ -5199,8 +5185,8 @@ static int32_t validateQueryMsg(SQueryTableMsg *pQueryMsg) {
return -1;
}
if (pQueryMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutputCols <= 0) {
qError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutputCols);
if (pQueryMsg->numOfOutput > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutput <= 0) {
qError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutput);
return -1;
}
......@@ -5244,7 +5230,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
* @param pExpr
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
char **tagCond, SColIndex **groupbyCols) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
......@@ -5260,7 +5246,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->queryType = htons(pQueryMsg->queryType);
pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols);
pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols);
pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput);
pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols);
pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen);
pQueryMsg->tsOffset = htonl(pQueryMsg->tsOffset);
......@@ -5316,10 +5302,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
bool hasArithmeticFunction = false;
*pExpr = calloc(pQueryMsg->numOfOutputCols, POINTER_BYTES);
SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg;
*pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES);
SSqlFuncMsg *pExprMsg = (SSqlFuncMsg *)pMsg;
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
(*pExpr)[i] = pExprMsg;
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
......@@ -5328,7 +5314,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pMsg += sizeof(SSqlFuncExprMsg);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
......@@ -5355,7 +5341,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
}
pExprMsg = (SSqlFuncExprMsg *)pMsg;
pExprMsg = (SSqlFuncMsg *)pMsg;
}
pQueryMsg->colNameLen = htonl(pQueryMsg->colNameLen);
......@@ -5393,11 +5379,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->defaultVal = (uint64_t)(pMsg);
int64_t *v = (int64_t *)pMsg;
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
v[i] = htobe64(v[i]);
}
pMsg += sizeof(int64_t) * pQueryMsg->numOfOutputCols;
pMsg += sizeof(int64_t) * pQueryMsg->numOfOutput;
}
// the tag query condition expression string is located at the end of query msg
......@@ -5411,14 +5397,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64
", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset);
return 0;
}
static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMsg *pQueryMsg) {
// SSqlBinaryExprInfo *pBinaryExprInfo = &pExpr->binExprInfo;
static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pExpr, SQueryTableMsg *pQueryMsg) {
// SExprInfo *pBinaryExprInfo = &pExpr->binExprInfo;
// SColumnInfo * pColMsg = pQueryMsg->colList;
#if 0
tExprNode* pBinExpr = NULL;
......@@ -5468,12 +5454,12 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
return TSDB_CODE_SUCCESS;
}
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr,
SSqlFuncExprMsg **pExprMsg) {
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExprInfo **pSqlFuncExpr,
SSqlFuncMsg **pExprMsg) {
*pSqlFuncExpr = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols);
SArithExprInfo *pExprs = (SArithExprInfo *)calloc(1, sizeof(SArithExprInfo) * pQueryMsg->numOfOutput);
if (pExprs == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
......@@ -5481,9 +5467,9 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
int16_t tagLen = 0;
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i];
pExprs[i].resBytes = 0;
pExprs[i].bytes = 0;
int16_t type = 0;
int16_t bytes = 0;
......@@ -5509,22 +5495,22 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct
}
int32_t param = pExprs[i].pBase.arg[0].argValue.i64;
if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].resType, &pExprs[i].resBytes,
if (getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
&pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return TSDB_CODE_INVALID_QUERY_MSG;
}
if (pExprs[i].pBase.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].pBase.functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExprs[i].resBytes;
tagLen += pExprs[i].bytes;
}
assert(isValidDataType(pExprs[i].resType, pExprs[i].resBytes));
assert(isValidDataType(pExprs[i].type, pExprs[i].bytes));
}
// get the correct result size for top/bottom query, according to the number of tags columns in selection clause
// TODO refactor
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].pBase = *pExprMsg[i];
int16_t functId = pExprs[i].pBase.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
......@@ -5537,7 +5523,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct
int32_t ret =
getResultDataInfo(type, bytes, pExprs[i].pBase.functionId, pExprs[i].pBase.arg[0].argValue.i64,
&pExprs[i].resType, &pExprs[i].resBytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
assert(ret == TSDB_CODE_SUCCESS);
}
}
......@@ -5655,23 +5641,9 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert(pQuery->pSelectExpr != NULL && pQuery != NULL);
// int32_t i = 0, j = 0;
// while (i < pQuery->numOfCols && j < pMeterObj->numOfColumns) {
// if (pQuery->colList[i].data.colId == pMeterObj->schema[j].colId) {
// pQuery->colList[i++].colIndex = (int16_t)j++;
// } else if (pQuery->colList[i].data.colId < pMeterObj->schema[j].colId) {
// pQuery->colList[i++].colIndex = -1;
// } else if (pQuery->colList[i].data.colId > pMeterObj->schema[j].colId) {
// j++;
// }
// }
// while (i < pQuery->numOfCols) {
// pQuery->colList[i++].colIndex = -1; // not such column in current meter
// }
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
SSqlFuncExprMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].pBase;
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
continue;
}
......@@ -5686,7 +5658,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SArithExprInfo *pExprs,
STableGroupInfo *groupInfo) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
......@@ -5697,10 +5669,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo->runtimeEnv.pQuery = pQuery;
int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutputCols = pQueryMsg->numOfOutputCols;
int16_t numOfOutput = pQueryMsg->numOfOutput;
pQuery->numOfCols = numOfCols;
pQuery->numOfOutputCols = numOfOutputCols;
pQuery->numOfOutput = numOfOutput;
pQuery->limit.limit = pQueryMsg->limit;
pQuery->limit.offset = pQueryMsg->offset;
pQuery->order.order = pQueryMsg->order;
......@@ -5725,9 +5697,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
}
// calculate the result row size
for (int16_t col = 0; col < numOfOutputCols; ++col) {
assert(pExprs[col].resBytes > 0);
pQuery->rowSize += pExprs[col].resBytes;
for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].bytes > 0);
pQuery->rowSize += pExprs[col].bytes;
}
doUpdateExprColumnIndex(pQuery);
......@@ -5738,7 +5710,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
}
// prepare the result buffer
pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, POINTER_BYTES);
pQuery->sdata = (SData **)calloc(pQuery->numOfOutput, POINTER_BYTES);
if (pQuery->sdata == NULL) {
goto _cleanup;
}
......@@ -5747,11 +5719,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->rec.capacity = 4096;
pQuery->rec.threshold = 4000;
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
assert(pExprs[col].interResBytes >= pExprs[col].resBytes);
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
assert(pExprs[col].interResBytes >= pExprs[col].bytes);
// allocate additional memory for interResults that are usually larger then final results
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData);
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interResBytes + sizeof(SData);
pQuery->sdata[col] = (SData *)calloc(1, size);
if (pQuery->sdata[col] == NULL) {
goto _cleanup;
......@@ -5759,13 +5731,13 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
}
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutputCols);
pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
if (pQuery->defaultVal == NULL) {
goto _cleanup;
}
// the first column is the timestamp
memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutputCols * sizeof(int64_t));
memcpy(pQuery->defaultVal, (char *)pQueryMsg->defaultVal, pQuery->numOfOutput * sizeof(int64_t));
}
// to make sure third party won't overwrite this structure
......@@ -5792,7 +5764,7 @@ _cleanup:
tfree(pQuery->defaultVal);
if (pQuery->sdata != NULL) {
for (int16_t col = 0; col < pQuery->numOfOutputCols; ++col) {
for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
tfree(pQuery->sdata[col]);
}
}
......@@ -5872,7 +5844,7 @@ static void freeQInfo(SQInfo *pQInfo) {
setQueryKilled(pQInfo);
qTrace("QInfo:%p start to free QInfo", pQInfo);
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
tfree(pQuery->sdata[col]);
}
......@@ -5891,8 +5863,8 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->sdata);
if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns);
......@@ -5988,7 +5960,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
char * tagCond = NULL;
SArray * pTableIdList = NULL;
SSqlFuncExprMsg **pExprMsg = NULL;
SSqlFuncMsg **pExprMsg = NULL;
SColIndex * pGroupColIndex = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex)) != TSDB_CODE_SUCCESS) {
......@@ -6007,7 +5979,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
goto _query_over;
}
SSqlFunctionExpr *pExprs = NULL;
SArithExprInfo *pExprs = NULL;
if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
......
......@@ -217,11 +217,11 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
return;
}
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i];
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memset(s, 0, size);
resetResultInfo(pResultInfo);
......@@ -245,7 +245,7 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
dst->window = src->window;
dst->status = src->status;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput;
for (int32_t i = 0; i < nOutputCols; ++i) {
SResultInfo *pDst = &dst->resultInfo[i];
......@@ -261,7 +261,7 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
// copy the output buffer data from src to dst, the position info keep unchanged
char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst);
char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SWindowResult *)src);
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
size_t s = pRuntimeEnv->pQuery->pSelectExpr[i].bytes;
memcpy(dstBuf, srcBuf, s);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册