From f17bf6d7954fd1f0e99b8f549d1101f1c652d511 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Apr 2021 16:51:27 +0800 Subject: [PATCH] [td-3941]: refactor having operator. --- src/client/inc/tsclient.h | 7 +- src/client/src/tscLocalMerge.c | 25 +- src/client/src/tscSQLParser.c | 215 +++++++---------- src/client/src/tscServer.c | 65 +++--- src/client/src/tscSubquery.c | 8 +- src/client/src/tscUtil.c | 43 ++-- src/common/inc/tname.h | 3 +- src/common/src/tname.c | 1 + src/inc/taosmsg.h | 13 +- src/query/inc/qExecutor.h | 12 +- src/query/src/qExecutor.c | 410 +++++++++++++-------------------- src/query/src/qPlan.c | 15 ++ src/query/src/qSqlParser.c | 1 - src/query/src/queryMain.c | 2 +- 14 files changed, 341 insertions(+), 479 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index cbdfb4af14..7fa2242d8e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -113,13 +113,13 @@ typedef struct SExprFilter { SExprInfo *pExprInfo; SArray *fp; SColumn *pFilters; //having filter info -}SExprFilter; +} SExprFilter; typedef struct SInternalField { TAOS_FIELD field; bool visible; SExprInfo *pExpr; - SExprFilter *pFieldFilters; +// SExprFilter *pFieldFilters; } SInternalField; typedef struct SFieldInfo { @@ -128,7 +128,6 @@ typedef struct SFieldInfo { SArray *internalField; // SArray } SFieldInfo; - typedef struct SCond { uint64_t uid; int32_t len; // length of tag query condition data @@ -246,7 +245,7 @@ typedef struct SQueryInfo { struct SQueryInfo *sibling; // sibling SArray *pUpstream; // SArray struct SQueryInfo *pDownstream; - int32_t havingFieldNum; + int32_t havingFieldNum; } SQueryInfo; typedef struct { diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index f6db1459f8..f701ca25f3 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -102,32 +102,8 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem } pCtx[i].interBufBytes = pExpr->base.interBytes; -// pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo)); pCtx[i].stableQuery = true; } - -// int16_t n = 0; -// int16_t tagLen = 0; -// SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES); -// -// SQLFunctionCtx *pCtx1 = NULL; -// for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { -// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); -// if (pExpr->base.functionId == TSDB_FUNC_TAG_DUMMY || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) { -// tagLen += pExpr->base.resBytes; -// pTagCtx[n++] = &pCtx[i]; -// } else if ((aAggs[pExpr->base.functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) { -// pCtx1 = &pCtx[i]; -// } -// } -// -// if (n == 0 || pCtx == NULL) { -// free(pTagCtx); -// } else { -// pCtx1->tagInfo.pTagCtxList = pTagCtx; -// pCtx1->tagInfo.numOfTagCols = n; -// pCtx1->tagInfo.tagsLen = tagLen; -// } } void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, @@ -892,6 +868,7 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, if (size > 0) { ret = compare_aRv(pBlock, columnIndexList, size, index, buf, TSDB_ORDER_ASC); } + // if ret == 0, means the result belongs to the same group return (ret == 0); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 66b6585e74..3b75b8f636 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1988,7 +1988,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if ((pToken->z == NULL || pToken->n == 0) && (TK_INTEGER != sqlOptr)) /*select count(1) from table*/ { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + } + if (sqlOptr == TK_ALL) { // select table.* // check if the table name is valid or not @@ -3067,23 +3068,19 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd } -static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) { - if (pColumn == NULL) { - return NULL; - } +static SColumnFilterInfo* addColumnFilterInfo(SColumnFilterList* filterList) { + int32_t size = (filterList->numOfFilters) + 1; - int32_t size = pColumn->info.numOfFilters + 1; - - char* tmp = (char*) realloc((void*)(pColumn->info.filterInfo), sizeof(SColumnFilterInfo) * (size)); + char* tmp = (char*) realloc((void*)(filterList->filterInfo), sizeof(SColumnFilterInfo) * (size)); if (tmp != NULL) { - pColumn->info.filterInfo = (SColumnFilterInfo*)tmp; + filterList->filterInfo = (SColumnFilterInfo*)tmp; } else { return NULL; } - pColumn->info.numOfFilters++; + filterList->numOfFilters = size; - SColumnFilterInfo* pColFilterInfo = &pColumn->info.filterInfo[pColumn->info.numOfFilters - 1]; + SColumnFilterInfo* pColFilterInfo = &(filterList->filterInfo[size - 1]); memset(pColFilterInfo, 0, sizeof(SColumnFilterInfo)); return pColFilterInfo; @@ -3254,10 +3251,10 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC */ if (sqlOptr == TK_AND) { // this is a new filter condition on this column - if (pColumn->info.numOfFilters == 0) { - pColFilter = addColumnFilterInfo(pColumn); + if (pColumn->info.flist.numOfFilters == 0) { + pColFilter = addColumnFilterInfo(&pColumn->info.flist); } else { // update the existed column filter information, find the filter info here - pColFilter = &pColumn->info.filterInfo[0]; + pColFilter = &pColumn->info.flist.filterInfo[0]; } if (pColFilter == NULL) { @@ -3265,7 +3262,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC } } else if (sqlOptr == TK_OR) { // TODO fixme: failed to invalid the filter expression: "col1 = 1 OR col2 = 2" - pColFilter = addColumnFilterInfo(pColumn); + pColFilter = addColumnFilterInfo(&pColumn->info.flist); if (pColFilter == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -4099,8 +4096,8 @@ static bool validateFilterExpr(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < num; ++i) { SColumn* pCol = taosArrayGetP(pColList, i); - for (int32_t j = 0; j < pCol->info.numOfFilters; ++j) { - SColumnFilterInfo* pColFilter = &pCol->info.filterInfo[j]; + for (int32_t j = 0; j < pCol->info.flist.numOfFilters; ++j) { + SColumnFilterInfo* pColFilter = &pCol->info.flist.filterInfo[j]; int32_t lowerOptr = pColFilter->lowerRelOptr; int32_t upperOptr = pColFilter->upperRelOptr; @@ -6794,142 +6791,98 @@ int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { return TSDB_CODE_SUCCESS; } -int32_t tscInsertExprFields(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** interField) { - tSqlExprItem item = {.pNode = pExpr, .aliasName = NULL, .distinct = false}; - - int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); - - // ADD TRUE FOR TEST - if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, true) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_SQL; - } - - ++pQueryInfo->havingFieldNum; - - size_t n = tscSqlExprNumOfExprs(pQueryInfo); - SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, (int32_t)n - 1); - - int32_t slot = tscNumOfFields(pQueryInfo) - 1; - SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, slot); - pInfo->visible = false; - - if (pInfo->pFieldFilters == NULL) { - SExprFilter* pFieldFilters = calloc(1, sizeof(SExprFilter)); - if (pFieldFilters == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - SColumn* pFilters = calloc(1, sizeof(SColumn)); - if (pFilters == NULL) { - tfree(pFieldFilters); +// TODO normalize the function expression and compare it +int32_t tscGetExprFilters(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelectNodeList, tSqlExpr* pSqlExpr, + SExprInfo** pExpr) { + *pExpr = NULL; - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + size_t num = taosArrayGetSize(pSelectNodeList); + for(int32_t i = 0; i < num; ++i) { + tSqlExprItem* pItem = taosArrayGet(pSelectNodeList, i); + if (tSqlExprCompare(pItem->pNode, pSqlExpr) == 0) { // exists, not added it, - pFieldFilters->pFilters = pFilters; - pFieldFilters->pExprInfo = pExprInfo; - pExprInfo->base.pFilter = pFilters->info.filterInfo; - pInfo->pFieldFilters = pFieldFilters; - } + int32_t functionId = isValidFunction(pSqlExpr->operand.z, pSqlExpr->operand.n); + tSqlExprItem* pParamElem = taosArrayGet(pSqlExpr->pParam, 0); + SStrToken* pToken = &pParamElem->pNode->colInfo; - pInfo->pFieldFilters->pExpr = pExpr; + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + getColumnIndexByName(pCmd, pToken, pQueryInfo, &index); - *interField = pInfo; + size_t numOfNodeInSel = tscSqlExprNumOfExprs(pQueryInfo); + for(int32_t k = 0; k < numOfNodeInSel; ++k) { + SExprInfo* pExpr1 = tscSqlExprGet(pQueryInfo, k); - return TSDB_CODE_SUCCESS; -} + if (pExpr1->base.functionId != functionId) { + continue; + } -int32_t tscGetExprFilters(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** pField) { - SInternalField* pInfo = NULL; + if (pExpr1->base.colInfo.colIndex != index.columnIndex) { + continue; + } - for (int32_t i = pQueryInfo->havingFieldNum - 1; i >= 0; --i) { - pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutput - 1 - i); + ++pQueryInfo->havingFieldNum; + *pExpr = pExpr1; + break; + } - if (pInfo->pFieldFilters && 0 == tSqlExprCompare(pInfo->pFieldFilters->pExpr, pExpr)) { - *pField = pInfo; + assert(*pExpr != NULL); return TSDB_CODE_SUCCESS; } } - int32_t ret = tscInsertExprFields(pCmd, pQueryInfo, pExpr, &pInfo); - if (ret) { - return ret; - } - - *pField = pInfo; + tSqlExprItem item = {.pNode = pSqlExpr, .aliasName = NULL, .distinct = false}; - return TSDB_CODE_SUCCESS; -} + int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); -static int32_t genExprFilter(SExprFilter* exprFilter) { - exprFilter->fp = taosArrayInit(4, sizeof(__filter_func_t)); - if (exprFilter->fp == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + // ADD TRUE FOR TEST + if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, true) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; } - for (int32_t i = 0; i < exprFilter->pFilters->info.numOfFilters; ++i) { - SColumnFilterInfo *filterInfo = &exprFilter->pFilters->info.filterInfo[i]; - - int32_t lower = filterInfo->lowerRelOptr; - int32_t upper = filterInfo->upperRelOptr; - if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) { - tscError("invalid rel optr"); - return TSDB_CODE_TSC_APP_ERROR; - } - - __filter_func_t ffp = getFilterOperator(lower, upper); - if (ffp == NULL) { - tscError("invalid filter info"); - return TSDB_CODE_TSC_APP_ERROR; - } - - taosArrayPush(exprFilter->fp, &ffp); - } + ++pQueryInfo->havingFieldNum; + size_t n = tscSqlExprNumOfExprs(pQueryInfo); + *pExpr = tscSqlExprGet(pQueryInfo, (int32_t)n - 1); return TSDB_CODE_SUCCESS; } -static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t sqlOptr) { +static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelectNodeList, tSqlExpr* pExpr, int32_t sqlOptr) { const char* msg1 = "non binary column not support like operator"; const char* msg2 = "invalid operator for binary column in having clause"; const char* msg3 = "invalid operator for bool column in having clause"; - SColumn* pColumn = NULL; SColumnFilterInfo* pColFilter = NULL; - SInternalField* pInfo = NULL; /* * in case of TK_AND filter condition, we first find the corresponding column and build the query condition together * the already existed condition. */ + SExprInfo *expr = NULL; if (sqlOptr == TK_AND) { - int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo); + int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pSelectNodeList, pExpr->pLeft, &expr); if (ret) { return ret; } - pColumn = pInfo->pFieldFilters->pFilters; - // this is a new filter condition on this column - if (pColumn->info.numOfFilters == 0) { - pColFilter = addColumnFilterInfo(pColumn); + if (expr->base.flist.numOfFilters == 0) { + pColFilter = addColumnFilterInfo(&expr->base.flist); } else { // update the existed column filter information, find the filter info here - pColFilter = &pColumn->info.filterInfo[0]; + pColFilter = &expr->base.flist.filterInfo[0]; } if (pColFilter == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } } else if (sqlOptr == TK_OR) { - int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo); + int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pSelectNodeList, pExpr->pLeft, &expr); if (ret) { return ret; } - pColumn = pInfo->pFieldFilters->pFilters; - // TODO fixme: failed to invalid the filter expression: "col1 = 1 OR col2 = 2" - pColFilter = addColumnFilterInfo(pColumn); + // TODO refactor + pColFilter = addColumnFilterInfo(&expr->base.flist); if (pColFilter == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -6938,7 +6891,7 @@ static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t } pColFilter->filterstr = - ((pInfo->field.type == TSDB_DATA_TYPE_BINARY || pInfo->field.type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0); + ((expr->base.resType == TSDB_DATA_TYPE_BINARY || expr->base.resType == TSDB_DATA_TYPE_NCHAR) ? 1 : 0); if (pColFilter->filterstr) { if (pExpr->tokenId != TK_EQ @@ -6954,22 +6907,23 @@ static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (pInfo->field.type == TSDB_DATA_TYPE_BOOL) { + if (expr->base.resType == TSDB_DATA_TYPE_BOOL) { if (pExpr->tokenId != TK_EQ && pExpr->tokenId != TK_NE) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } } } - int32_t ret = doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pInfo->field.type, pExpr); + int32_t ret = doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, expr->base.resType, pExpr); if (ret) { return ret; } - return genExprFilter(pInfo->pFieldFilters); + return TSDB_CODE_SUCCESS; +// return genExprFilter(pInfo->pFieldFilters); } -int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t parentOptr) { +int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pSelectNodeList, tSqlExpr* pExpr, int32_t parentOptr) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; } @@ -6980,12 +6934,12 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in tSqlExpr* pRight = pExpr->pRight; if (pExpr->tokenId == TK_AND || pExpr->tokenId == TK_OR) { - int32_t ret = getHavingExpr(pCmd, pQueryInfo, pExpr->pLeft, pExpr->tokenId); + int32_t ret = getHavingExpr(pCmd, pQueryInfo, pSelectNodeList, pExpr->pLeft, pExpr->tokenId); if (ret != TSDB_CODE_SUCCESS) { return ret; } - return getHavingExpr(pCmd, pQueryInfo, pExpr->pRight, pExpr->tokenId); + return getHavingExpr(pCmd, pQueryInfo, pSelectNodeList, pExpr->pRight, pExpr->tokenId); } if (pLeft == NULL || pRight == NULL) { @@ -6998,7 +6952,7 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in exchangeExpr(pExpr); - pLeft = pExpr->pLeft; + pLeft = pExpr->pLeft; pRight = pExpr->pRight; @@ -7014,29 +6968,27 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - //if (pLeft->pParam == NULL || pLeft->pParam->nExpr < 1) { - // return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); - //} - if (pLeft->pParam) { size_t size = taosArrayGetSize(pLeft->pParam); for (int32_t i = 0; i < size; i++) { - tSqlExprItem* pParamElem = taosArrayGet(pLeft->pParam, i); - if (pParamElem->pNode->tokenId != TK_ALL && - pParamElem->pNode->tokenId != TK_ID && - pParamElem->pNode->tokenId != TK_STRING && - pParamElem->pNode->tokenId != TK_INTEGER && - pParamElem->pNode->tokenId != TK_FLOAT) { + tSqlExprItem* pParamItem = taosArrayGet(pLeft->pParam, i); + + tSqlExpr* pExpr1 = pParamItem->pNode; + if (pExpr1->tokenId != TK_ALL && + pExpr1->tokenId != TK_ID && + pExpr1->tokenId != TK_STRING && + pExpr1->tokenId != TK_INTEGER && + pExpr1->tokenId != TK_FLOAT) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (pParamElem->pNode->tokenId == TK_ID && (pParamElem->pNode->colInfo.z == NULL && pParamElem->pNode->colInfo.n == 0)) { + if (pExpr1->tokenId == TK_ID && (pExpr1->colInfo.z == NULL && pExpr1->colInfo.n == 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (pParamElem->pNode->tokenId == TK_ID) { + if (pExpr1->tokenId == TK_ID) { SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if ((getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) { + if ((getColumnIndexByName(pCmd, &pExpr1->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -7056,10 +7008,11 @@ int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, in return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - return handleExprInHavingClause(pCmd, pQueryInfo, pExpr, parentOptr); + return handleExprInHavingClause(pCmd, pQueryInfo, pSelectNodeList, pExpr, parentOptr); } -int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* pCmd, bool isSTable, int32_t joinQuery, int32_t timeWindowQuery) { +int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* pCmd, SArray* pSelectNodeList, + int32_t joinQuery, int32_t timeWindowQuery) { const char* msg1 = "having only works with group by"; const char* msg2 = "functions or others can not be mixed up"; const char* msg3 = "invalid expression in having clause"; @@ -7082,7 +7035,7 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p int32_t ret = 0; - if ((ret = getHavingExpr(pCmd, pQueryInfo, pExpr, TK_AND)) != TSDB_CODE_SUCCESS) { + if ((ret = getHavingExpr(pCmd, pQueryInfo, pSelectNodeList, pExpr, TK_AND)) != TSDB_CODE_SUCCESS) { return ret; } @@ -7328,7 +7281,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { } // parse the having clause in the first place - if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, isSTable, joinQuery, timeWindowQuery) != + if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -7515,7 +7468,7 @@ bool hasNormalColumnFilter(SQueryInfo* pQueryInfo) { size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); for (int32_t i = 0; i < numOfCols; ++i) { SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i); - if (pCol->info.numOfFilters > 0) { + if (pCol->info.flist.numOfFilters > 0) { return true; } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b124f8670f..56b5cc1741 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -728,6 +728,38 @@ static char *doSerializeTableInfo(SQueryTableMsg *pQueryMsg, SSqlObj *pSql, STab return pMsg; } +// TODO refactor +static int32_t serializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) { + // append the filter information after the basic column information + for (int32_t f = 0; f < numOfFilters; ++f) { + SColumnFilterInfo *pColFilter = &pColFilters[f]; + + SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg); + pFilterMsg->filterstr = htons(pColFilter->filterstr); + + (*pMsg) += sizeof(SColumnFilterInfo); + + if (pColFilter->filterstr) { + pFilterMsg->len = htobe64(pColFilter->len); + memcpy(*pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1)); + (*pMsg) += (pColFilter->len + 1); // append the additional filter binary info + } else { + pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi); + pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi); + } + + pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr); + pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr); + + if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) { + tscError("invalid filter info"); + return TSDB_CODE_TSC_INVALID_SQL; + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, char** pMsg, void* addr) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -760,6 +792,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, pSqlExpr->functionId = htons(pExpr->functionId); pSqlExpr->numOfParams = htons(pExpr->numOfParams); pSqlExpr->resColId = htons(pExpr->resColId); + pSqlExpr->flist.numOfFilters = htons(pExpr->flist.numOfFilters); (*pMsg) += sizeof(SSqlExpr); for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log @@ -774,6 +807,8 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, } } + serializeColFilterInfo(pExpr->flist.filterInfo, pExpr->flist.numOfFilters, pMsg); + return TSDB_CODE_SUCCESS; } @@ -864,34 +899,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tableCols[i].colId = htons(pCol->colId); pQueryMsg->tableCols[i].bytes = htons(pCol->bytes); pQueryMsg->tableCols[i].type = htons(pCol->type); - pQueryMsg->tableCols[i].numOfFilters = htons(pCol->numOfFilters); + pQueryMsg->tableCols[i].flist.numOfFilters = htons(pCol->flist.numOfFilters); // append the filter information after the basic column information - for (int32_t f = 0; f < pCol->numOfFilters; ++f) { - SColumnFilterInfo *pColFilter = &pCol->filterInfo[f]; - - SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; - pFilterMsg->filterstr = htons(pColFilter->filterstr); - - pMsg += sizeof(SColumnFilterInfo); - - if (pColFilter->filterstr) { - pFilterMsg->len = htobe64(pColFilter->len); - memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1)); - pMsg += (pColFilter->len + 1); // append the additional filter binary info - } else { - pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi); - pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi); - } - - pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr); - pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr); - - if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) { - tscError("invalid filter info"); - return TSDB_CODE_TSC_INVALID_SQL; - } - } + serializeColFilterInfo(pCol->flist.filterInfo, pCol->flist.numOfFilters, &pMsg); } for (int32_t i = 0; i < query.numOfOutput; ++i) { @@ -953,7 +964,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pTagCol->colId = htons(pTag->colId); pTagCol->bytes = htons(pTag->bytes); pTagCol->type = htons(pTag->type); - pTagCol->numOfFilters = 0; + pTagCol->flist.numOfFilters = 0; pMsg += sizeof(SColumnInfo); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 01ed86e4be..ebfaa5b961 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -842,7 +842,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* for (int32_t i = 0; i < s; ++i) { SColumn *pCol = taosArrayGetP(pSupporter->colList, i); - if (pCol->info.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. SColumn *p = tscColumnClone(pCol); taosArrayPush(pQueryInfo->colList, &p); } @@ -1939,7 +1939,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter for (int32_t i = 0; i < s; ++i) { SColumn *pCol = taosArrayGetP(pSupporter->colList, i); - if (pCol->info.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + if (pCol->info.flist.numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. SColumn *p = tscColumnClone(pCol); taosArrayPush(pNewQueryInfo->colList, &p); } @@ -3490,8 +3490,8 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - SQueryAttr *pQueryAttr = &pQInfo->query; - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQueryAttr *pQueryAttr = &pQInfo->query; pRuntimeEnv->pQueryAttr = pQueryAttr; tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, NULL); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7704ef06a7..387352f8cf 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1473,14 +1473,15 @@ int32_t tscGetResRowLength(SArray* pExprList) { return size; } -static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) { - for(int32_t i = 0; i < numOfFilters; ++i) { - if (pFilterInfo[i].filterstr) { - tfree(pFilterInfo[i].pz); +static void destroyFilterInfo(SColumnFilterList* pFilterList) { + for(int32_t i = 0; i < pFilterList->numOfFilters; ++i) { + if (pFilterList->filterInfo[i].filterstr) { + tfree(pFilterList->filterInfo[i].pz); } } - tfree(pFilterInfo); + tfree(pFilterList->filterInfo); + pFilterList->numOfFilters = 0; } void tscFieldInfoClear(SFieldInfo* pFieldInfo) { @@ -1714,6 +1715,9 @@ void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src) { assert(dst != NULL && src != NULL); *dst = *src; + dst->base.flist.filterInfo = calloc(src->base.flist.numOfFilters, sizeof(SColumnFilterInfo)); + memcpy(dst->base.flist.filterInfo, src->base.flist.filterInfo, sizeof(SColumnFilterInfo) * src->base.flist.numOfFilters); + dst->pExpr = exprdup(src->pExpr); memset(dst->base.param, 0, sizeof(tVariant) * tListLen(dst->base.param)); @@ -1789,8 +1793,8 @@ SColumn* tscColumnClone(const SColumn* src) { dst->columnIndex = src->columnIndex; dst->tableUid = src->tableUid; - dst->info.numOfFilters = src->info.numOfFilters; - dst->info.filterInfo = tFilterInfoDup(src->info.filterInfo, src->info.numOfFilters); + dst->info.flist.numOfFilters = src->info.flist.numOfFilters; + dst->info.flist.filterInfo = tFilterInfoDup(src->info.flist.filterInfo, src->info.flist.numOfFilters); dst->info.type = src->info.type; dst->info.colId = src->info.colId; dst->info.bytes = src->info.bytes; @@ -1798,7 +1802,7 @@ SColumn* tscColumnClone(const SColumn* src) { } static void tscColumnDestroy(SColumn* pCol) { - destroyFilterInfo(pCol->info.filterInfo, pCol->info.numOfFilters); + destroyFilterInfo(&pCol->info.flist); free(pCol); } @@ -3427,28 +3431,12 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; SSqlExpr* pse = &pQueryAttr->pExpr3[i].base; - memcpy(pse->aliasName, pExpr->base.aliasName, tListLen(pse->aliasName)); - - pse->uid = pExpr->base.uid; - pse->functionId = pExpr->base.functionId; - pse->resType = pExpr->base.resType; - pse->resBytes = pExpr->base.resBytes; - pse->interBytes = pExpr->base.interBytes; - pse->resColId = pExpr->base.resColId; - pse->offset = pExpr->base.offset; - pse->numOfParams = pExpr->base.numOfParams; - - pse->colInfo = pExpr->base.colInfo; + tscSqlExprAssign(&pQueryAttr->pExpr3[i], pExpr); pse->colInfo.colId = pExpr->base.resColId; pse->colInfo.colIndex = i; pse->colType = pExpr->base.resType; pse->colBytes = pExpr->base.resBytes; - pse->colInfo.flag = pExpr->base.colInfo.flag; - - for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { - tVariantAssign(&pse->param[j], &pExpr->base.param[j]); - } } { @@ -3515,7 +3503,7 @@ static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf pTagCol->colId = pColSchema->colId; pTagCol->bytes = pColSchema->bytes; pTagCol->type = pColSchema->type; - pTagCol->numOfFilters = 0; + pTagCol->flist.numOfFilters = 0; } return TSDB_CODE_SUCCESS; @@ -3546,6 +3534,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->order = pQueryInfo->order; pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); + pQueryAttr->havingNum = pQueryInfo->havingFieldNum; if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor pQueryAttr->window = pQueryInfo->window; @@ -3587,7 +3576,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt } pQueryAttr->tableCols[i] = pCol->info; - pQueryAttr->tableCols[i].filterInfo = tFilterInfoDup(pCol->info.filterInfo, pQueryAttr->tableCols[i].numOfFilters); + pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pCol->info.flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters); } // global aggregate query diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index 7ba681a36a..f37a4d9a36 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -62,8 +62,7 @@ typedef struct SSqlExpr { int32_t offset; // sub result column value of arithmetic expression. int16_t resColId; // result column id - int32_t filterNum; - SColumnFilterInfo *pFilter; + SColumnFilterList flist; } SSqlExpr; typedef struct SExprInfo { diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 787aa1e95b..f1ddc60637 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -68,6 +68,7 @@ bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; } +// TODO refactor SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) { if (numOfFilters == 0) { assert(src == NULL); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0a4de16c28..3b7022fb88 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -420,6 +420,13 @@ typedef struct SColumnFilterInfo { }; } SColumnFilterInfo; +typedef struct SColumnFilterList { + int16_t numOfFilters; + union{ + int64_t placeholder; + SColumnFilterInfo *filterInfo; + }; +} SColumnFilterList; /* * for client side struct, we only need the column id, type, bytes are not necessary * But for data in vnode side, we need all the following information. @@ -428,11 +435,7 @@ typedef struct SColumnInfo { int16_t colId; int16_t type; int16_t bytes; - int16_t numOfFilters; - union{ - int64_t placeholder; - SColumnFilterInfo *filterInfo; - }; + SColumnFilterList flist; } SColumnInfo; typedef struct STableIdInfo { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index fc924c661f..d25e1a991e 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -300,7 +300,7 @@ enum OPERATOR_TYPE_E { OP_DummyInput = 16, //TODO remove it after fully refactor. OP_MultiwaySort = 17, // multi-way data merge into one input stream. OP_GlobalAggregate = 18, // global merge for the multi-way data sources. - OP_Having = 19, + OP_Condition = 19, }; typedef struct SOperatorInfo { @@ -440,10 +440,10 @@ typedef struct SSLimitOperatorInfo { SArray *orderColumnList; } SSLimitOperatorInfo; -typedef struct SHavingOperatorInfo { - SArray* fp; -} SHavingOperatorInfo; - +typedef struct SConditionOperatorInfo { + SSingleColumnFilterInfo *pFilterInfo; + int32_t numOfFilterCols; +} SConditionOperatorInfo; typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; @@ -507,7 +507,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx int32_t numOfRows, void* merger, bool groupMix); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); -SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createConditionOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d4a63f8dbb..657c5f535b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -174,6 +174,9 @@ static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* w static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); +static int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, + SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); +static void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr); @@ -1753,13 +1756,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf assert(pQueryAttr->pExpr2 != NULL); pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); } - - { - -// if (pQueryAttr->limit.offset > 0) { -// pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); -// } - } break; } @@ -1768,9 +1764,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } - case OP_Having: { - if (pQueryAttr->havingNum > 0) { - pRuntimeEnv->proot = createHavingOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + case OP_Condition: { // todo refactor + assert(pQueryAttr->havingNum > 0); + if (pQueryAttr->stableQuery) { + pRuntimeEnv->proot = createConditionOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3); + } else { + pRuntimeEnv->proot = createConditionOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); } break; } @@ -2355,7 +2354,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool asc } void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, - SSDataBlock* pBlock, bool ascQuery) { + SSDataBlock* pBlock, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; int8_t *p = calloc(numOfRows, sizeof(int8_t)); @@ -2504,18 +2503,18 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData return status; } -static void doSetFilterColumnInfo(SQueryAttr* pQueryAttr, SSDataBlock* pBlock) { - if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->pFilterInfo[0].pData != NULL) { +static void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock) { + if (numOfFilterCols > 0 && pFilterInfo[0].pData != NULL) { return; } // set the initial static data value filter expression - for (int32_t i = 0; i < pQueryAttr->numOfFilterCols; ++i) { + for (int32_t i = 0; i < numOfFilterCols; ++i) { for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j); - if (pQueryAttr->pFilterInfo[i].info.colId == pColInfo->info.colId) { - pQueryAttr->pFilterInfo[i].pData = pColInfo->pData; + if (pFilterInfo[i].info.colId == pColInfo->info.colId) { + pFilterInfo[i].pData = pColInfo->pData; break; } } @@ -2660,7 +2659,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa return terrno; } - doSetFilterColumnInfo(pQueryAttr, pBlock); + doSetFilterColumnInfo(pQueryAttr->pFilterInfo, pQueryAttr->numOfFilterCols, pBlock); if (pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) { filterRowsInDataBlock(pRuntimeEnv, pQueryAttr->pFilterInfo, pQueryAttr->numOfFilterCols, pBlock, ascQuery); } @@ -4610,7 +4609,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; SColIndex* index = taosArrayGet(pInfo->orderColumnList, i); - offset += pExpr[index->colIndex].base.resBytes; + offset += pExpr[index->colIndex].base.colBytes; } } @@ -4900,89 +4899,34 @@ bool doFilterData(SColumnInfoData* p, int32_t rid, SColumnFilterElem *filterElem return false; } - -void doHavingImpl(SOperatorInfo *pOperator, SSDataBlock *pBlock) { - SHavingOperatorInfo* pInfo = pOperator->info; - int32_t f = 0; - int32_t allQualified = 1; - int32_t exprQualified = 0; - - for (int32_t r = 0; r < pBlock->info.rows; ++r) { - allQualified = 1; - - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - SExprInfo* pExprInfo = &(pOperator->pExpr[i]); - if (pExprInfo->base.pFilter == NULL) { - continue; - } - - SArray* es = taosArrayGetP(pInfo->fp, i); - assert(es); - - size_t fpNum = taosArrayGetSize(es); - - exprQualified = 0; - for (int32_t m = 0; m < fpNum; ++m) { - __filter_func_t fp = taosArrayGetP(es, m); - - assert(fp); - - //SColIndex* colIdx = &pExprInfo->base.colInfo; - SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i); - - SColumnFilterElem filterElem = {.filterInfo = pExprInfo->base.pFilter[m]}; - - if (doFilterData(p, r, &filterElem, fp)) { - exprQualified = 1; - break; - } - } - - if (exprQualified == 0) { - allQualified = 0; - break; - } - } - - if (allQualified == 0) { - continue; - } - - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData + f * bytes, pColInfoData->pData + bytes * r, bytes); - } - - ++f; - } - - pBlock->info.rows = f; -} - -static SSDataBlock* doHaving(void* param, bool* newgroup) { +static SSDataBlock* doFilter(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo *)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } + SConditionOperatorInfo* pCondInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; while (1) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); if (pBlock == NULL) { - setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; + break; } - doHavingImpl(pOperator, pBlock); + doSetFilterColumnInfo(pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock); + assert(pRuntimeEnv->pTsBuf == NULL); + filterRowsInDataBlock(pRuntimeEnv, pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock, true); - return pBlock; + if (pBlock->info.rows > 0) { + return pBlock; + } } -} + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; +} static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; @@ -5383,11 +5327,9 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = destroyOutputBuf(pInfo->pRes); } -static void destroyHavingOperatorInfo(void* param, int32_t numOfOutput) { - SHavingOperatorInfo* pInfo = (SHavingOperatorInfo*) param; - if (pInfo->fp) { - taosArrayDestroy(pInfo->fp); - } +static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { + SConditionOperatorInfo* pInfo = (SConditionOperatorInfo*) param; + doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols); } SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { @@ -5446,83 +5388,56 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } +SOperatorInfo* createConditionOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, + int32_t numOfOutput) { + SConditionOperatorInfo* pInfo = calloc(1, sizeof(SConditionOperatorInfo)); -int32_t initFilterFp(SExprInfo* pExpr, int32_t numOfOutput, SArray** fps) { - __filter_func_t fp = NULL; - - *fps = taosArrayInit(numOfOutput, sizeof(SArray*)); - if (*fps == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + { + SColumnInfo* pCols = calloc(numOfOutput, sizeof(SColumnInfo)); - for (int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExprInfo = &(pExpr[i]); - SColIndex* colIdx = &pExprInfo->base.colInfo; + int32_t numOfFilter = 0; + for(int32_t i = 0; i < numOfOutput; ++i) { + if (pExpr[i].base.flist.numOfFilters > 0) { + numOfFilter += 1; + } - if (pExprInfo->base.pFilter == NULL || !TSDB_COL_IS_NORMAL_COL(colIdx->flag)) { - taosArrayPush(*fps, &fp); + pCols[i].type = pExpr[i].base.resType; + pCols[i].bytes = pExpr[i].base.resBytes; + pCols[i].colId = pExpr[i].base.resColId; - continue; + pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters; + pCols[i].flist.filterInfo = calloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo)); + memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo)); } - int32_t filterNum = pExprInfo->base.filterNum; - SColumnFilterInfo *filterInfo = pExprInfo->base.pFilter; - - SArray* es = taosArrayInit(filterNum, sizeof(__filter_func_t)); + assert(numOfFilter > 0); + doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); + pInfo->numOfFilterCols = numOfFilter; - for (int32_t j = 0; j < filterNum; ++j) { - int32_t lower = filterInfo->lowerRelOptr; - int32_t upper = filterInfo->upperRelOptr; - if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) { - qError("invalid rel optr"); - taosArrayDestroy(es); - return TSDB_CODE_QRY_APP_ERROR; - } - - __filter_func_t ffp = getFilterOperator(lower, upper); - if (ffp == NULL) { - qError("invalid filter info"); - taosArrayDestroy(es); - return TSDB_CODE_QRY_APP_ERROR; - } - - taosArrayPush(es, &ffp); - - filterInfo += 1; + for(int32_t i = 0; i < numOfOutput; ++i) { + tfree(pCols[i].flist.filterInfo); } - taosArrayPush(*fps, &es); + tfree(pCols); } - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { - SHavingOperatorInfo* pInfo = calloc(1, sizeof(SHavingOperatorInfo)); - - initFilterFp(pExpr, numOfOutput, &pInfo->fp); - - assert(pInfo->fp); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "HavingOperator"; - pOperator->operatorType = OP_Having; + pOperator->name = "ConditionOperator"; + pOperator->operatorType = OP_Condition; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->numOfOutput = numOfOutput; pOperator->pExpr = pExpr; pOperator->upstream = upstream; - pOperator->exec = doHaving; + pOperator->exec = doFilter; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->cleanup = destroyHavingOperatorInfo; + pOperator->cleanup = destroyConditionOperatorInfo; return pOperator; } - - SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; @@ -5997,6 +5912,37 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p return pMsg; } +static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) { + for (int32_t f = 0; f < numOfFilters; ++f) { + SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg); + + SColumnFilterInfo *pColFilter = &pColFilters[f]; + pColFilter->filterstr = htons(pFilterMsg->filterstr); + + (*pMsg) += sizeof(SColumnFilterInfo); + + if (pColFilter->filterstr) { + pColFilter->len = htobe64(pFilterMsg->len); + + pColFilter->pz = (int64_t)calloc(1, (size_t)(pColFilter->len + 1 * TSDB_NCHAR_SIZE)); // note: null-terminator + if (pColFilter->pz == 0) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + memcpy((void *)pColFilter->pz, (*pMsg), (size_t)pColFilter->len); + (*pMsg) += (pColFilter->len + 1); + } else { + pColFilter->lowerBndi = htobe64(pFilterMsg->lowerBndi); + pColFilter->upperBndi = htobe64(pFilterMsg->upperBndi); + } + + pColFilter->lowerRelOptr = htons(pFilterMsg->lowerRelOptr); + pColFilter->upperRelOptr = htons(pFilterMsg->upperRelOptr); + } + + return TSDB_CODE_SUCCESS; +} + /** * pQueryMsg->head has been converted before this function is called. * @@ -6058,7 +6004,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pColInfo->colId = htons(pColInfo->colId); pColInfo->type = htons(pColInfo->type); pColInfo->bytes = htons(pColInfo->bytes); - pColInfo->numOfFilters = htons(pColInfo->numOfFilters); + pColInfo->flist.numOfFilters = htons(pColInfo->flist.numOfFilters); if (!isValidDataType(pColInfo->type)) { qDebug("qmsg:%p, invalid data type in source column, index:%d, type:%d", pQueryMsg, col, pColInfo->type); @@ -6066,41 +6012,18 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { goto _cleanup; } - int32_t numOfFilters = pColInfo->numOfFilters; + int32_t numOfFilters = pColInfo->flist.numOfFilters; if (numOfFilters > 0) { - pColInfo->filterInfo = calloc(numOfFilters, sizeof(SColumnFilterInfo)); - if (pColInfo->filterInfo == NULL) { + pColInfo->flist.filterInfo = calloc(numOfFilters, sizeof(SColumnFilterInfo)); + if (pColInfo->flist.filterInfo == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _cleanup; } } - for (int32_t f = 0; f < numOfFilters; ++f) { - SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; - - SColumnFilterInfo *pColFilter = &pColInfo->filterInfo[f]; - pColFilter->filterstr = htons(pFilterMsg->filterstr); - - pMsg += sizeof(SColumnFilterInfo); - - if (pColFilter->filterstr) { - pColFilter->len = htobe64(pFilterMsg->len); - - pColFilter->pz = (int64_t)calloc(1, (size_t)(pColFilter->len + 1 * TSDB_NCHAR_SIZE)); // note: null-terminator - if (pColFilter->pz == 0) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _cleanup; - } - - memcpy((void *)pColFilter->pz, pMsg, (size_t)pColFilter->len); - pMsg += (pColFilter->len + 1); - } else { - pColFilter->lowerBndi = htobe64(pFilterMsg->lowerBndi); - pColFilter->upperBndi = htobe64(pFilterMsg->upperBndi); - } - - pColFilter->lowerRelOptr = htons(pFilterMsg->lowerRelOptr); - pColFilter->upperRelOptr = htons(pFilterMsg->upperRelOptr); + code = deserializeColFilterInfo(pColInfo->flist.filterInfo, numOfFilters, &pMsg); + if (code != TSDB_CODE_SUCCESS) { + goto _cleanup; } } @@ -6128,35 +6051,9 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pExprMsg->resColId = htons(pExprMsg->resColId); - pExprMsg->filterNum = htonl(pExprMsg->filterNum); - + pExprMsg->flist.numOfFilters = htons(pExprMsg->flist.numOfFilters); pMsg += sizeof(SSqlExpr); - SColumnFilterInfo* pExprFilterInfo = pExprMsg->pFilter; - - pMsg += sizeof(SColumnFilterInfo) * pExprMsg->filterNum; - - for (int32_t f = 0; f < pExprMsg->filterNum; ++f) { - SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pExprFilterInfo; - - pFilterMsg->filterstr = htons(pFilterMsg->filterstr); - - if (pFilterMsg->filterstr) { - pFilterMsg->len = htobe64(pFilterMsg->len); - - pFilterMsg->pz = (int64_t)pMsg; - pMsg += (pFilterMsg->len + 1); - } else { - pFilterMsg->lowerBndi = htobe64(pFilterMsg->lowerBndi); - pFilterMsg->upperBndi = htobe64(pFilterMsg->upperBndi); - } - - pFilterMsg->lowerRelOptr = htons(pFilterMsg->lowerRelOptr); - pFilterMsg->upperRelOptr = htons(pFilterMsg->upperRelOptr); - - pExprFilterInfo++; - } - for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { pExprMsg->param[j].nType = htons(pExprMsg->param[j].nType); pExprMsg->param[j].nLen = htons(pExprMsg->param[j].nLen); @@ -6177,6 +6074,11 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { } } + if (pExprMsg->flist.numOfFilters > 0) { + pExprMsg->flist.filterInfo = calloc(pExprMsg->flist.numOfFilters, sizeof(SColumnFilterInfo)); + } + + deserializeColFilterInfo(pExprMsg->flist.filterInfo, pExprMsg->flist.numOfFilters, &pMsg); pExprMsg = (SSqlExpr *)pMsg; } @@ -6276,7 +6178,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pTagCol->colId = htons(pTagCol->colId); pTagCol->bytes = htons(pTagCol->bytes); pTagCol->type = htons(pTagCol->type); - pTagCol->numOfFilters = 0; + pTagCol->flist.numOfFilters = 0; param->pTagColumnInfo[i] = *pTagCol; pMsg += sizeof(SColumnInfo); @@ -6500,9 +6402,9 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp bytes = s->bytes; } - if (pExprs[i].base.filterNum > 0) { - int32_t ret = cloneExprFilterInfo(&pExprs[i].base.pFilter, pExprMsg[i]->pFilter, - pExprMsg[i]->filterNum); + if (pExprs[i].base.flist.numOfFilters > 0) { + int32_t ret = cloneExprFilterInfo(&pExprs[i].base.flist.filterInfo, pExprMsg[i]->flist.filterInfo, + pExprMsg[i]->flist.numOfFilters); if (ret) { return ret; } @@ -6612,53 +6514,44 @@ SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex * return pGroupbyExpr; } -static int32_t createFilterInfo(SQueryAttr *pQueryAttr, uint64_t qId) { - for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { - if (pQueryAttr->tableCols[i].numOfFilters > 0) { - pQueryAttr->numOfFilterCols++; - } - } - - if (pQueryAttr->numOfFilterCols == 0) { - return TSDB_CODE_SUCCESS; - } - - pQueryAttr->pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * pQueryAttr->numOfFilterCols); - if (pQueryAttr->pFilterInfo == NULL) { +static int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, + SSingleColumnFilterInfo** pFilterInfo, uint64_t qId) { + *pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols); + if (pFilterInfo == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - for (int32_t i = 0, j = 0; i < pQueryAttr->numOfCols; ++i) { - if (pQueryAttr->tableCols[i].numOfFilters > 0) { - SSingleColumnFilterInfo *pFilterInfo = &pQueryAttr->pFilterInfo[j]; + for (int32_t i = 0, j = 0; i < numOfCols; ++i) { + if (pCols[i].flist.numOfFilters > 0) { + SSingleColumnFilterInfo* pFilter = &((*pFilterInfo)[j]); - memcpy(&pFilterInfo->info, &pQueryAttr->tableCols[i], sizeof(SColumnInfo)); - pFilterInfo->info = pQueryAttr->tableCols[i]; + memcpy(&pFilter->info, &pCols[i], sizeof(SColumnInfo)); + pFilter->info = pCols[i]; - pFilterInfo->numOfFilters = pQueryAttr->tableCols[i].numOfFilters; - pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem)); - if (pFilterInfo->pFilters == NULL) { + pFilter->numOfFilters = pCols[i].flist.numOfFilters; + pFilter->pFilters = calloc(pFilter->numOfFilters, sizeof(SColumnFilterElem)); + if (pFilter->pFilters == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) { - SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f]; - pSingleColFilter->filterInfo = pQueryAttr->tableCols[i].filterInfo[f]; + for (int32_t f = 0; f < pFilter->numOfFilters; ++f) { + SColumnFilterElem* pSingleColFilter = &pFilter->pFilters[f]; + pSingleColFilter->filterInfo = pCols[i].flist.filterInfo[f]; int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr; int32_t upper = pSingleColFilter->filterInfo.upperRelOptr; if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) { - qError("QInfo:%"PRIu64" invalid filter info", qId); + qError("QInfo:%" PRIu64 " invalid filter info", qId); return TSDB_CODE_QRY_INVALID_MSG; } pSingleColFilter->fp = getFilterOperator(lower, upper); if (pSingleColFilter->fp == NULL) { - qError("QInfo:%"PRIu64" invalid filter info", qId); + qError("QInfo:%" PRIu64 " invalid filter info", qId); return TSDB_CODE_QRY_INVALID_MSG; } - pSingleColFilter->bytes = pQueryAttr->tableCols[i].bytes; + pSingleColFilter->bytes = pCols[i].bytes; } j++; @@ -6668,6 +6561,34 @@ static int32_t createFilterInfo(SQueryAttr *pQueryAttr, uint64_t qId) { return TSDB_CODE_SUCCESS; } +void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { + for (int32_t i = 0; i < numOfFilterCols; ++i) { + if (pFilterInfo[i].numOfFilters > 0) { + tfree(pFilterInfo[i].pFilters); + } + } + + tfree(pFilterInfo); + return NULL; +} + +static int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId) { + for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { + if (pQueryAttr->tableCols[i].flist.numOfFilters > 0) { + pQueryAttr->numOfFilterCols++; + } + } + + if (pQueryAttr->numOfFilterCols == 0) { + return TSDB_CODE_SUCCESS; + } + + doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols, + &pQueryAttr->pFilterInfo, qId); + + return TSDB_CODE_SUCCESS; +} + static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) { assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL); @@ -6795,7 +6716,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr pQueryAttr->maxTableColumnWidth = 0; for (int16_t i = 0; i < numOfCols; ++i) { pQueryAttr->tableCols[i] = pQueryMsg->tableCols[i]; - pQueryAttr->tableCols[i].filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].filterInfo, pQueryAttr->tableCols[i].numOfFilters); + pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters); pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes; if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) { @@ -6813,7 +6734,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr pQueryAttr->tagLen += pExprs[col].base.resBytes; } - if (pExprs[col].base.pFilter) { + if (pExprs[col].base.flist.filterInfo) { ++pQueryAttr->havingNum; } } @@ -6916,8 +6837,8 @@ _cleanup_qinfo: pExprInfo->pExpr = NULL; } - if (pExprInfo->base.pFilter) { - freeColumnFilterInfo(pExprInfo->base.pFilter, pExprInfo->base.filterNum); + if (pExprInfo->base.flist.filterInfo) { + freeColumnFilterInfo(pExprInfo->base.flist.filterInfo, pExprInfo->base.flist.numOfFilters); } } @@ -7000,6 +6921,7 @@ _error: return code; } +//TODO refactor void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { if (pFilter == NULL || numOfFilters == 0) { return; @@ -7049,8 +6971,8 @@ static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) { tExprTreeDestroy(pExprInfo[i].pExpr, NULL); } - if (pExprInfo[i].base.pFilter) { - freeColumnFilterInfo(pExprInfo[i].base.pFilter, pExprInfo[i].base.filterNum); + if (pExprInfo[i].base.flist.filterInfo) { + freeColumnFilterInfo(pExprInfo[i].base.flist.filterInfo, pExprInfo[i].base.flist.numOfFilters); } } @@ -7078,24 +7000,18 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQueryAttr->fillVal); } - for (int32_t i = 0; i < pQueryAttr->numOfFilterCols; ++i) { - SSingleColumnFilterInfo *pColFilter = &pQueryAttr->pFilterInfo[i]; - if (pColFilter->numOfFilters > 0) { - tfree(pColFilter->pFilters); - } - } + pQueryAttr->pFilterInfo = doDestroyFilterInfo(pQueryAttr->pFilterInfo, pQueryAttr->numOfFilterCols); pQueryAttr->pExpr1 = destroyQueryFuncExpr(pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pQueryAttr->pExpr2 = destroyQueryFuncExpr(pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); pQueryAttr->pExpr3 = destroyQueryFuncExpr(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3); tfree(pQueryAttr->tagColList); - tfree(pQueryAttr->pFilterInfo); if (pQueryAttr->tableCols != NULL) { for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) { SColumnInfo *column = pQueryAttr->tableCols + i; - freeColumnFilterInfo(column->filterInfo, column->numOfFilters); + freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters); } tfree(pQueryAttr->tableCols); } @@ -7282,7 +7198,7 @@ void freeQueryAttr(SQueryAttr* pQueryAttr) { if (pQueryAttr->tableCols != NULL) { for (int32_t i = 0; i < pQueryAttr->numOfCols; i++) { SColumnInfo* column = pQueryAttr->tableCols + i; - freeColumnFilterInfo(column->filterInfo, column->numOfFilters); + freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters); } tfree(pQueryAttr->tableCols); } diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 8af74ee8ce..1efaca7277 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -86,6 +86,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { op = OP_Groupby; taosArrayPush(plan, &op); + if (!pQueryAttr->stableQuery && pQueryAttr->havingNum > 0) { + op = OP_Condition; + taosArrayPush(plan, &op); + } + if (pQueryAttr->pExpr2 != NULL) { op = OP_Arithmetic; taosArrayPush(plan, &op); @@ -107,6 +112,11 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); + if (!pQueryAttr->stableQuery && pQueryAttr->havingNum > 0) { + op = OP_Condition; + taosArrayPush(plan, &op); + } + if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) { op = OP_Arithmetic; taosArrayPush(plan, &op); @@ -139,6 +149,11 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { op = OP_GlobalAggregate; taosArrayPush(plan, &op); + if (pQueryAttr->havingNum > 0) { + op = OP_Condition; + taosArrayPush(plan, &op); + } + if (pQueryAttr->pExpr2 != NULL) { op = OP_Arithmetic; taosArrayPush(plan, &op); diff --git a/src/query/src/qSqlParser.c b/src/query/src/qSqlParser.c index 841407382b..aa9921d93e 100644 --- a/src/query/src/qSqlParser.c +++ b/src/query/src/qSqlParser.c @@ -349,7 +349,6 @@ int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right) { return 1; } - if (right->pParam && left->pParam) { size_t size = taosArrayGetSize(right->pParam); if (left->pParam && taosArrayGetSize(left->pParam) != size) { diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index ddc62b8b91..3ac05499a9 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -186,7 +186,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) { SColumnInfo* column = pQueryMsg->tableCols + i; - freeColumnFilterInfo(column->filterInfo, column->numOfFilters); + freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters); } //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; -- GitLab