From 64545fb9a3d12dc20aec5394effa550f23c124a0 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 22 Apr 2020 17:14:39 +0800 Subject: [PATCH] [td-171] add filter support, refactor the client side parse functions --- src/client/inc/tscUtil.h | 18 +- src/client/inc/tsclient.h | 24 +- src/client/src/tscFunctionImpl.c | 2 +- src/client/src/tscSQLParser.c | 42 ++- src/client/src/tscServer.c | 51 +-- src/client/src/tscSubquery.c | 50 +-- src/client/src/tscUtil.c | 231 +++++-------- src/inc/taosmsg.h | 2 +- src/inc/tsdb.h | 2 +- src/query/inc/queryExecutor.h | 5 +- src/query/inc/queryUtil.h | 5 + src/query/src/queryExecutor.c | 262 +++++---------- src/query/src/queryFilterFunc.c | 558 +++++++++++++++++++++++++++++++ src/tsdb/src/tsdbRead.c | 12 +- src/util/inc/tarray.h | 4 +- src/util/src/tarray.c | 4 +- 16 files changed, 849 insertions(+), 423 deletions(-) create mode 100644 src/query/src/queryFilterFunc.c diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 57cf821eb1..70c2ed2c7a 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -57,7 +57,7 @@ typedef struct SJoinSubquerySupporter { int64_t interval; // interval time SLimitVal limit; // limit info uint64_t uid; // query meter uid - SColumnBaseInfo colList; // previous query information + SArray* colList; // previous query information SSqlExprInfo exprsInfo; SFieldInfo fieldsInfo; STagCond tagCond; @@ -106,7 +106,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); -bool tscQueryOnMetric(SSqlCmd* pCmd); +bool tscQueryOnSTable(SSqlCmd* pCmd); bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); @@ -159,16 +159,12 @@ void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t ui void* tscSqlExprDestroy(SSqlExpr* pExpr); void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo); -SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* colIndex); -void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src); -void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src); +SColumn* tscColumnClone(const SColumn* src); +SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex); +void tscColumnListAssign(SArray* dst, const SArray* src, int16_t tableIndex); +void tscColumnListDestroy(SArray* pColList); -void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex); -SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index); -void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex); - -void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size); -void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo); +SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); int32_t tscValidateName(SSQLToken* pToken); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f225d546f3..38344c9c41 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -30,9 +30,9 @@ extern "C" { #include "taosmsg.h" #include "tarray.h" #include "tglobal.h" -#include "trpc.h" -#include "tsqlfunction.h" #include "tutil.h" +#include "tsqlfunction.h" +#include "queryExecutor.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows) @@ -41,14 +41,6 @@ struct SSqlInfo; typedef SCMSTableVgroupRspMsg SVgroupsInfo; -typedef struct SSqlGroupbyExpr { - int16_t tableIndex; - int16_t numOfGroupCols; - SColIndex columnInfo[TSDB_MAX_TAGS]; // group by columns information - int16_t orderIndex; // order by column index - int16_t orderType; // order by type: asc/desc -} SSqlGroupbyExpr; - typedef struct STableComInfo { uint8_t numOfTags; uint8_t precision; @@ -123,17 +115,11 @@ typedef struct SSqlExprInfo { SSqlExpr** pExprs; } SSqlExprInfo; -typedef struct SColumnBase { +typedef struct SColumn { SColumnIndex colIndex; int32_t numOfFilters; SColumnFilterInfo *filterInfo; -} SColumnBase; - -typedef struct SColumnBaseInfo { - int16_t numOfAlloc; - int16_t numOfCols; - SColumnBase *pColList; -} SColumnBaseInfo; +} SColumn; struct SLocalReducer; @@ -223,7 +209,7 @@ typedef struct SQueryInfo { int64_t slidingTime; // sliding window in mseconds SSqlGroupbyExpr groupbyExpr; // group by tags info - SColumnBaseInfo colList; + SArray* colList; SFieldInfo fieldsInfo; SSqlExprInfo exprsInfo; SLimitVal limit; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 9a8bec4597..d3bf1c5206 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2940,7 +2940,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); - pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; + pCtx->aOutputBuf += pCtx->inputBytes; } /** diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c13acbba6a..cc9721b1fd 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1123,7 +1123,11 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel const char* msg5 = "invalid function name"; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); - + + if (pQueryInfo->colList == NULL) { + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); + } + for (int32_t i = 0; i < pSelection->nExpr; ++i) { int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs; tSQLExprItem* pItem = &pSelection->a[i]; @@ -1278,7 +1282,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type, char* fieldName, SSqlExpr* pSqlExpr) { for (int32_t i = 0; i < pIdList->num; ++i) { - tscColumnBaseInfoInsert(pQueryInfo, &(pIdList->ids[i])); + tscColumnListInsert(pQueryInfo->colList, &(pIdList->ids[i])); } tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, outputIndex, type, fieldName, bytes); @@ -1494,7 +1498,7 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, // for all querie, the timestamp column meeds to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnBaseInfoInsert(pQueryInfo, &index); + tscColumnListInsert(pQueryInfo->colList, &index); SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr); @@ -1579,12 +1583,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr insertResultField(pQueryInfo, numOfOutput, &ids, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->aliasName, pExpr); } else { for (int32_t i = 0; i < ids.num; ++i) { - tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); } } SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + tscColumnListInsert(pQueryInfo->colList, &tsCol); return TSDB_CODE_SUCCESS; } @@ -1690,12 +1694,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr insertResultField(pQueryInfo, numOfOutput, &ids, pExpr->resBytes, pExpr->resType, pExpr->aliasName, pExpr); } else { for (int32_t i = 0; i < ids.num; ++i) { - tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); } } SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnBaseInfoInsert(pQueryInfo, &tsCol); + tscColumnListInsert(pQueryInfo->colList, &tsCol); return TSDB_CODE_SUCCESS; } @@ -1889,7 +1893,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->aliasName, pExpr); } else { for (int32_t i = 0; i < ids.num; ++i) { - tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); + tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); } } @@ -2535,7 +2539,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* return invalidSqlErrMsg(pQueryInfo->msg, msg8); } - tscColumnBaseInfoInsert(pQueryInfo, &index); + tscColumnListInsert(pQueryInfo->colList, &index); pQueryInfo->groupbyExpr.columnInfo[i] = (SColIndex){.colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId}; // relIndex; pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; @@ -2559,7 +2563,7 @@ void setColumnOffsetValueInResultset(SQueryInfo* pQueryInfo) { } } -static SColumnFilterInfo* addColumnFilterInfo(SColumnBase* pColumn) { +static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) { if (pColumn == NULL) { return NULL; } @@ -2836,7 +2840,7 @@ static int32_t extractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnIndex* pIn const char* msg1 = "non binary column not support like operator"; const char* msg2 = "binary column not support this operator"; - SColumnBase* pColumn = tscColumnBaseInfoInsert(pQueryInfo, pIndex); + SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex); SColumnFilterInfo* pColFilter = NULL; /* @@ -2857,10 +2861,10 @@ static int32_t extractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnIndex* pIn return TSDB_CODE_INVALID_SQL; } - pColFilter->filterOnBinary = + pColFilter->filterstr = ((pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0); - if (pColFilter->filterOnBinary) { + if (pColFilter->filterstr) { if (pExpr->nSQLOptr != TK_EQ && pExpr->nSQLOptr != TK_NE && pExpr->nSQLOptr != TK_LIKE) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } @@ -3584,11 +3588,15 @@ static int32_t setTableCondForSTableQuery(SQueryInfo* pQueryInfo, const char* ac } static bool validateFilterExpr(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { - SColumnBase* pColBase = &pQueryInfo->colList.pColList[i]; + SArray* pColList = pQueryInfo->colList; + + size_t num = taosArrayGetSize(pColList); + + for (int32_t i = 0; i < num; ++i) { + SColumn* pCol = taosArrayGetP(pColList, i); - for (int32_t j = 0; j < pColBase->numOfFilters; ++j) { - SColumnFilterInfo* pColFilter = &pColBase->filterInfo[j]; + for (int32_t j = 0; j < pCol->numOfFilters; ++j) { + SColumnFilterInfo* pColFilter = &pCol->filterInfo[j]; int32_t lowerOptr = pColFilter->lowerRelOptr; int32_t upperOptr = pColFilter->upperRelOptr; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ad5f9cd0af..5c1a6dba0c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -567,7 +567,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5; SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); - int32_t srcColListSize = pQueryInfo->colList.numOfCols * sizeof(SColumnInfo); + int32_t srcColListSize = taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo); int32_t exprSize = sizeof(SSqlFuncExprMsg) * pQueryInfo->exprsInfo.numOfExprs; STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -624,7 +624,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - if (pQueryInfo->colList.numOfCols <= 0) { + if (taosArrayGetSize(pQueryInfo->colList) <= 0) { tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta)); return -1; } @@ -700,7 +700,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->interpoType = htons(pQueryInfo->interpoType); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); - pQueryMsg->numOfCols = htons(pQueryInfo->colList.numOfCols); + pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList)); pQueryMsg->intervalTime = htobe64(pQueryInfo->intervalTime); pQueryMsg->slidingTime = htobe64(pQueryInfo->slidingTime); pQueryMsg->slidingTimeUnit = pQueryInfo->slidingTimeUnit; @@ -716,21 +716,22 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } // set column list ids - char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); + char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo); SSchema *pSchema = tscGetTableSchema(pTableMeta); - for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { - SColumnBase *pCol = tscColumnBaseInfoGet(&pQueryInfo->colList, i); - SSchema * pColSchema = &pSchema[pCol->colIndex.columnIndex]; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); + SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; -// if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL || -// pColSchema->type > TSDB_DATA_TYPE_NCHAR) { -// tscError("%p vid:%d sid:%d id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", pSql, -// htons(pQueryMsg->vnode), pTableMeta->sid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex, -// pColSchema->name); -// -// return -1; // 0 means build msg failed -// } + if (pCol->colIndex.columnIndex >= tscGetNumOfColumns(pTableMeta) || pColSchema->type < TSDB_DATA_TYPE_BOOL || + pColSchema->type > TSDB_DATA_TYPE_NCHAR) { + tscError("%p sid:%d uid:%" PRIu64" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s", + pSql, pTableMeta->sid, pTableMeta->uid, pTableMetaInfo->name, tscGetNumOfColumns(pTableMeta), pCol->colIndex, + pColSchema->name); + + return -1; // 0 means build msg failed + } pQueryMsg->colList[i].colId = htons(pColSchema->colId); pQueryMsg->colList[i].bytes = htons(pColSchema->bytes); @@ -742,11 +743,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SColumnFilterInfo *pColFilter = &pCol->filterInfo[f]; SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg; - pFilterMsg->filterOnBinary = htons(pColFilter->filterOnBinary); + pFilterMsg->filterstr = htons(pColFilter->filterstr); pMsg += sizeof(SColumnFilterInfo); - if (pColFilter->filterOnBinary) { + if (pColFilter->filterstr) { pFilterMsg->len = htobe64(pColFilter->len); memcpy(pMsg, (void *)pColFilter->pz, pColFilter->len + 1); pMsg += (pColFilter->len + 1); // append the additional filter binary info @@ -808,8 +809,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t len = 0; if (hasArithmeticFunction) { - SColumnBase *pColBase = pQueryInfo->colList.pColList; - for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColumn* pColBase = taosArrayGetP(pQueryInfo->colList, i); + char * name = pSchema[pColBase[i].colIndex.columnIndex].name; int32_t lenx = strlen(name); memcpy(pMsg, name, lenx); @@ -2194,12 +2196,15 @@ int tscProcessShowRsp(SSqlObj *pSql) { pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols; SSchema *pTableSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); - tscColumnBaseInfoReserve(&pQueryInfo->colList, pMetaMsg->numOfColumns); + if (pQueryInfo->colList == NULL) { + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); + } + SColumnIndex index = {0}; - for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i) { index.columnIndex = i; - tscColumnBaseInfoInsert(pQueryInfo, &index); + tscColumnListInsert(pQueryInfo->colList, &index); + tscFieldInfoSetValFromSchema(&pQueryInfo->fieldsInfo, i, &pTableSchema[i]); pQueryInfo->fieldsInfo.pSqlExpr[i] = tscSqlExprInsert(pQueryInfo, i, TSDB_FUNC_TS_DUMMY, &index, @@ -2477,7 +2482,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) { * 1. only update the metermeta in force model metricmeta is not updated * 2. if get metermeta failed, still get the metermeta */ - if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnMetric(pCmd)) { + if (pTableMetaInfo->pTableMeta == NULL || !tscQueryOnSTable(pCmd)) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; if (pTableMetaInfo->pTableMeta) { tscTrace("%p update table meta, old: numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index e647772226..5b0bba4607 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -191,7 +191,7 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) { } tscSqlExprInfoDestroy(&pSupporter->exprsInfo); - tscColumnBaseInfoDestroy(&pSupporter->colList); + tscColumnListDestroy(pSupporter->colList); tscClearFieldInfo(&pSupporter->fieldsInfo); @@ -211,8 +211,10 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) { * */ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { - for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { - SColumnBase* pBase = tscColumnBaseInfoGet(&pQueryInfo->colList, i); + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumn* pBase = taosArrayGet(pQueryInfo->colList, i); if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return true; } @@ -299,7 +301,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { pQueryInfo->intervalTime = pSupporter->interval; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; - tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); + tscColumnListAssign(pQueryInfo->colList, pSupporter->colList, 0); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false); @@ -342,9 +344,10 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { tscPrintSelectClause(pNew, 0); + size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->exprsInfo.numOfExprs, numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); } @@ -850,8 +853,14 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNewQueryInfo != NULL); - tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0); - tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0); + // update the table index + size_t num = taosArrayGetSize(pNewQueryInfo->colList); + for (int32_t i = 0; i < num; ++i) { + SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i); + pCol->colIndex.tableIndex = 0; + } + + tscColumnListAssign(pSupporter->colList, pNewQueryInfo->colList, 0); tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false); tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo); @@ -888,27 +897,26 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pExpr->numOfParams = 1; // add the filter tag column - for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) { - SColumnBase *pColBase = &pSupporter->colList.pColList[i]; - if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. - tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase); - pNewQueryInfo->colList.numOfCols++; + size_t s = taosArrayGetSize(pSupporter->colList); + + for (int32_t i = 0; i < s; ++i) { + SColumn *pCol = taosArrayGetP(pSupporter->colList, i); + + if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + SColumn* p = tscColumnClone(pCol); + taosArrayPush(pNewQueryInfo->colList, &p); } } - - tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); - tscPrintSelectClause(pNew, 0); - + + size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, - pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->exprsInfo.numOfExprs, numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name); tscPrintSelectClause(pNew, 0); + } else { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3650ea1f30..cb5e8c0d66 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -130,7 +130,7 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBuffer* pBuf) { taosArrayPush(pTagCond->pCond, &cond); } -bool tscQueryOnMetric(SSqlCmd* pCmd) { +bool tscQueryOnSTable(SSqlCmd* pCmd) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) && @@ -1289,7 +1289,6 @@ void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) { pExprInfo->numOfExprs = 0; } - void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid, bool deepcopy) { if (src == NULL || src->numOfExprs == 0) { return; @@ -1323,186 +1322,130 @@ void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableui } } } - -} - -static void clearVal(SColumnBase* pBase) { - memset(pBase, 0, sizeof(SColumnBase)); - - pBase->colIndex.tableIndex = -2; - pBase->colIndex.columnIndex = -2; -} - -static void _cf_ensureSpace(SColumnBaseInfo* pcolList, int32_t size) { - if (pcolList->numOfAlloc < size) { - int32_t oldSize = pcolList->numOfAlloc; - - int32_t newSize = (oldSize <= 0) ? 8 : (oldSize << 1); - while (newSize < size) { - newSize = (newSize << 1); - } - - if (newSize > TSDB_MAX_COLUMNS) { - newSize = TSDB_MAX_COLUMNS; - } - - int32_t inc = newSize - oldSize; - - pcolList->pColList = realloc(pcolList->pColList, newSize * sizeof(SColumnBase)); - memset(&pcolList->pColList[oldSize], 0, inc * sizeof(SColumnBase)); - - pcolList->numOfAlloc = newSize; - } } -static void _cf_evic(SColumnBaseInfo* pcolList, int32_t index) { - if (index < pcolList->numOfCols) { - memmove(&pcolList->pColList[index + 1], &pcolList->pColList[index], - sizeof(SColumnBase) * (pcolList->numOfCols - index)); - - clearVal(&pcolList->pColList[index]); - } -} - -SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index) { - if (pColumnBaseInfo == NULL || pColumnBaseInfo->numOfCols < index) { - return NULL; - } - - return &pColumnBaseInfo->pColList[index]; -} - -void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex) { - for (int32_t i = 0; i < pColList->numOfCols; ++i) { - pColList->pColList[i].colIndex.tableIndex = tableIndex; - } -} - -// todo refactor -SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* pColIndex) { - SColumnBaseInfo* pcolList = &pQueryInfo->colList; - +SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { // ignore the tbname column to be inserted into source list if (pColIndex->columnIndex < 0) { return NULL; } - + + size_t numOfCols = taosArrayGetSize(pColumnList); int16_t col = pColIndex->columnIndex; int32_t i = 0; - while (i < pcolList->numOfCols) { - if (pcolList->pColList[i].colIndex.columnIndex < col) { + while (i < numOfCols) { + SColumn* pCol = taosArrayGetP(pColumnList, i); + if (pCol->colIndex.columnIndex < col) { i++; - } else if (pcolList->pColList[i].colIndex.tableIndex < pColIndex->tableIndex) { + } else if (pCol->colIndex.tableIndex < pColIndex->tableIndex) { i++; } else { break; } } - SColumnIndex* pIndex = &pcolList->pColList[i].colIndex; - if ((i < pcolList->numOfCols && (pIndex->columnIndex > col || pIndex->tableIndex != pColIndex->tableIndex)) || - (i >= pcolList->numOfCols)) { - _cf_ensureSpace(pcolList, pcolList->numOfCols + 1); - _cf_evic(pcolList, i); - - pcolList->pColList[i].colIndex = *pColIndex; - pcolList->numOfCols++; + if (i >= numOfCols || numOfCols == 0) { + SColumn* b = calloc(1, sizeof(SColumn)); + b->colIndex = *pColIndex; + + taosArrayInsert(pColumnList, i, &b); + } else { + SColumn* pCol = taosArrayGetP(pColumnList, i); + + if (i < numOfCols && (pCol->colIndex.columnIndex > col || pCol->colIndex.tableIndex != pColIndex->tableIndex)) { + SColumn* b = calloc(1, sizeof(SColumn)); + b->colIndex = *pColIndex; + + taosArrayInsert(pColumnList, i, &b); + } } - return &pcolList->pColList[i]; + return taosArrayGetP(pColumnList, i); } -void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src) { - assert(src != NULL && dst != NULL); - - assert(src->filterOnBinary == 0 || src->filterOnBinary == 1); - if (src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID) { - assert(0); +SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) { + SColumnFilterInfo* pFilter = NULL; + if (numOfFilters > 0) { + pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo)); + } else { + assert(src == NULL); + return NULL; } - - *dst = *src; - if (dst->filterOnBinary) { - size_t len = (size_t)dst->len + 1; - char* pTmp = calloc(1, len); - dst->pz = (int64_t)pTmp; - memcpy((char*)dst->pz, (char*)src->pz, (size_t)len); + + memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters); + for (int32_t j = 0; j < numOfFilters; ++j) { + if (pFilter[j].filterstr) { + size_t len = (size_t) pFilter[j].len + 1; + + char* pTmp = calloc(1, len); + pFilter[j].pz = (int64_t) pTmp; + + memcpy((char*)pFilter[j].pz, (char*)src->pz, (size_t)len); + } } + + assert(src->filterstr == 0 || src->filterstr == 1); + assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID)); + + return pFilter; } -void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src) { - assert(src != NULL && dst != NULL); - - *dst = *src; - - if (src->numOfFilters > 0) { - dst->filterInfo = calloc(1, src->numOfFilters * sizeof(SColumnFilterInfo)); - - for (int32_t j = 0; j < src->numOfFilters; ++j) { - tscColumnFilterInfoCopy(&dst->filterInfo[j], &src->filterInfo[j]); +static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) { + for(int32_t i = 0; i < numOfFilters; ++i) { + if (pFilterInfo[i].filterstr) { + tfree(pFilterInfo[i].pz); } - } else { - assert(src->filterInfo == NULL); } + + tfree(pFilterInfo); } -void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex) { +SColumn* tscColumnClone(const SColumn* src) { + assert(src != NULL); + + SColumn* dst = calloc(1, sizeof(SColumn)); + + dst->colIndex = src->colIndex; + dst->numOfFilters = src->numOfFilters; + dst->filterInfo = tscFilterInfoClone(src->filterInfo, src->numOfFilters); + + return dst; +} + +static void tscColumnDestroy(SColumn* pCol) { + destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters); + free(pCol); +} + +void tscColumnListAssign(SArray* dst, const SArray* src, int16_t tableIndex) { if (src == NULL) { return; } + + size_t num = taosArrayGetSize(src); + for (int32_t i = 0; i < num; ++i) { + SColumn* pCol = taosArrayGetP(src, i); - *dst = *src; - dst->pColList = calloc(1, sizeof(SColumnBase) * dst->numOfAlloc); - - int16_t num = 0; - for (int32_t i = 0; i < src->numOfCols; ++i) { - if (src->pColList[i].colIndex.tableIndex == tableIndex || tableIndex < 0) { - dst->pColList[num] = src->pColList[i]; - - if (dst->pColList[num].numOfFilters > 0) { - dst->pColList[num].filterInfo = calloc(1, dst->pColList[num].numOfFilters * sizeof(SColumnFilterInfo)); - - for (int32_t j = 0; j < dst->pColList[num].numOfFilters; ++j) { - tscColumnFilterInfoCopy(&dst->pColList[num].filterInfo[j], &src->pColList[i].filterInfo[j]); - } - } - - num += 1; + if (pCol->colIndex.tableIndex == tableIndex || tableIndex < 0) { + SColumn* p = tscColumnClone(pCol); + taosArrayPush(dst, &p); } } - - dst->numOfCols = num; } -void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo) { +void tscColumnListDestroy(SArray* pColumnBaseInfo) { if (pColumnBaseInfo == NULL) { return; } - assert(pColumnBaseInfo->numOfCols <= TSDB_MAX_COLUMNS); - - for (int32_t i = 0; i < pColumnBaseInfo->numOfCols; ++i) { - SColumnBase* pColBase = &(pColumnBaseInfo->pColList[i]); - - if (pColBase->numOfFilters > 0) { - for (int32_t j = 0; j < pColBase->numOfFilters; ++j) { - assert(pColBase->filterInfo[j].filterOnBinary == 0 || pColBase->filterInfo[j].filterOnBinary == 1); - - if (pColBase->filterInfo[j].filterOnBinary) { - free((char*)pColBase->filterInfo[j].pz); - pColBase->filterInfo[j].pz = 0; - } - } - } - - tfree(pColBase->filterInfo); + size_t num = taosArrayGetSize(pColumnBaseInfo); + for (int32_t i = 0; i < num; ++i) { + SColumn* pCol = taosArrayGetP(pColumnBaseInfo, i); + tscColumnDestroy(pCol); } - tfree(pColumnBaseInfo->pColList); -} - -void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size) { - _cf_ensureSpace(pColumnBaseInfo, size); + taosArrayDestroy(pColumnBaseInfo); } /* @@ -1883,7 +1826,7 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { tscSqlExprInfoDestroy(&pQueryInfo->exprsInfo); memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo)); - tscColumnBaseInfoDestroy(&pQueryInfo->colList); + tscColumnListDestroy(pQueryInfo->colList); memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); @@ -2070,7 +2013,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void return NULL; } - tscColumnBaseInfoCopy(&pNewQueryInfo->colList, &pQueryInfo->colList, (int16_t)tableIndex); + tscColumnListAssign(pNewQueryInfo->colList, pQueryInfo->colList, (int16_t)tableIndex); // set the correct query type if (pPrevSql != NULL) { @@ -2149,11 +2092,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } if (cmd == TSDB_SQL_SELECT) { + size_t size = taosArrayGetSize(pNewQueryInfo->colList); + tscTrace( "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, - pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, + size, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); tscPrintSelectClause(pNew, 0); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index f292345e68..d460d100a0 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -413,7 +413,7 @@ typedef struct SSqlFunctionExpr { typedef struct SColumnFilterInfo { int16_t lowerRelOptr; int16_t upperRelOptr; - int16_t filterOnBinary; /* denote if current column is binary */ + int16_t filterstr; // denote if current column is char(binary/nchar) union { struct { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 3e15257fcb..8d33e0bbf7 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -146,7 +146,7 @@ typedef struct STsdbQueryCond { STimeWindow twindow; int32_t order; // desc/asc order to iterate the data block int32_t numOfCols; - SColumnInfoData *colList; + SColumnInfo *colList; } STsdbQueryCond; typedef struct SBlockInfo { diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index c07897abf6..504d20f992 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -90,7 +90,7 @@ typedef struct SColumnFilterElem { } SColumnFilterElem; typedef struct SSingleColumnFilterInfo { - SColumnInfoData info; + SColumnInfo info; int32_t numOfFilters; SColumnFilterElem* pFilters; void* pData; @@ -129,14 +129,13 @@ typedef struct SQuery { int32_t rowSize; SSqlGroupbyExpr* pGroupbyExpr; SSqlFunctionExpr* pSelectExpr; - SColumnInfoData* colList; + SColumnInfo* colList; int32_t numOfFilterCols; int64_t* defaultVal; TSKEY lastKey; uint32_t status; // query status SResultRec rec; int32_t pos; - int64_t pointsOffset; // the number of points offset to save read data SData** sdata; SSingleColumnFilterInfo* pFilterInfo; } SQuery; diff --git a/src/query/inc/queryUtil.h b/src/query/inc/queryUtil.h index 677ddf6378..e21504b908 100644 --- a/src/query/inc/queryUtil.h +++ b/src/query/inc/queryUtil.h @@ -38,4 +38,9 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult); +__filter_func_t *getRangeFilterFuncArray(int32_t type); +__filter_func_t *getValueFilterFuncArray(int32_t type); + +bool supportPrefilter(int32_t type); + #endif // TDENGINE_QUERYUTIL_H diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index c1208dbc0b..aefe4a3b8b 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -27,6 +27,7 @@ #include "tscompression.h" #include "tsdbMain.h" //todo use TableId instead of STable object #include "ttime.h" +#include "tscUtil.h" // todo move the function to common module #define DEFAULT_INTERN_BUF_SIZE 16384L @@ -52,8 +53,8 @@ /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ - ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.bytes) -#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].info.type) + ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].bytes) +#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIndex].type) typedef struct SPointInterpoSupporter { int32_t numOfCols; @@ -223,19 +224,19 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter * return true; } -bool vnodeDoFilterData(SQuery *pQuery, int32_t elemPos) { +bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; - char * pElem = pFilterInfo->pData + pFilterInfo->info.info.bytes * elemPos; - - if (isNull(pElem, pFilterInfo->info.info.type)) { + + char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos; + if (isNull(pElem, pFilterInfo->info.type)) { return false; } - int32_t num = pFilterInfo->numOfFilters; - bool qualified = false; - for (int32_t j = 0; j < num; ++j) { + bool qualified = false; + for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) { SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j]; + if (pFilterElem->fp(pFilterElem, pElem, pElem)) { qualified = true; break; @@ -252,7 +253,7 @@ bool vnodeDoFilterData(SQuery *pQuery, int32_t elemPos) { bool vnodeFilterData(SQuery *pQuery, int32_t *numOfActualRead, int32_t index) { (*numOfActualRead)++; - if (!vnodeDoFilterData(pQuery, index)) { + if (!doFilterData(pQuery, index)) { return false; } @@ -333,8 +334,8 @@ int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) { } for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - if (colId == pQuery->colList[i].info.colId) { - type = pQuery->colList[i].info.type; + if (colId == pQuery->colList[i].colId) { + type = pQuery->colList[i].type; break; } } @@ -367,7 +368,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; } -bool doRevisedResultsByLimit(SQInfo *pQInfo) { +bool limitResults(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { @@ -848,10 +849,9 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 } for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - SColumnInfo *pColMsg = &pQuery->colList[i].info; + SColumnInfo *pColMsg = &pQuery->colList[i]; assert(0); - // char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf); - +// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf); sas->elemSize[i] = pColMsg->bytes; // sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset } @@ -860,7 +860,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 sas->offset = 0; } else { // other type of query function SColIndex *pCol = &pQuery->pSelectExpr[col].pBase.colInfo; - if (TSDB_COL_IS_TAG(pCol->flag)) { + if (TSDB_COL_IS_TAG(pCol->flag) || pDataBlock == NULL) { dataBlock = NULL; } else { /* @@ -868,11 +868,8 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 * stage, the remain meter may not have the required column in cache actually. So, the validation of required * column in cache with the corresponding meter schema is reinforced. */ - if (pDataBlock == NULL) { - return NULL; - } - int32_t numOfCols = taosArrayGetSize(pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData *p = taosArrayGet(pDataBlock, i); if (pCol->colId == p->info.colId) { @@ -896,7 +893,7 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3 * @return the incremental number of output value, so it maybe 0 for fixed number of query, * such as count/min/max etc. */ -static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, +static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -1021,7 +1018,7 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1 int32_t colId = pGroupbyExpr->columnInfo[k].colId; for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - if (pQuery->colList[i].info.colId == colId) { + if (pQuery->colList[i].colId == colId) { colIndex = i; break; } @@ -1029,8 +1026,8 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1 assert(colIndex >= 0 && colIndex < pQuery->numOfCols); - *type = pQuery->colList[colIndex].info.type; - *bytes = pQuery->colList[colIndex].info.bytes; + *type = pQuery->colList[colIndex].type; + *bytes = pQuery->colList[colIndex].bytes; // groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf); break; @@ -1093,22 +1090,14 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return true; } -static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, - SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, - SArray *pDataBlock) { +static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, + SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - SQuery * pQuery = pRuntimeEnv->pQuery; - TSKEY * primaryKeyCol = (TSKEY *)taosArrayGet(pDataBlock, 0); - - // SData **data = pRuntimeEnv->colDataBuffer; - - int64_t prevNumOfRes = 0; - bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); - - if (!groupbyStateValue) { - prevNumOfRes = getNumOfResult(pRuntimeEnv); - } + + SQuery *pQuery = pRuntimeEnv->pQuery; + TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; + bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport)); int16_t type = 0; @@ -1134,16 +1123,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat // set the input column data for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { - // SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; - assert(0); - /* - * NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column, - * so we do NOT check if is a tag or not - */ - // pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf); + SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; + pFilterInfo->pData = getDataBlocks(pRuntimeEnv, &sasArray[k], pFilterInfo->info.colId, pDataBlockInfo->rows, pDataBlock); } - int32_t numOfRes = 0; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // from top to bottom in desc @@ -1171,7 +1154,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat } } - if (pQuery->numOfFilterCols > 0 && (!vnodeDoFilterData(pQuery, offset))) { + if (pQuery->numOfFilterCols > 0 && (!doFilterData(pQuery, offset))) { continue; } @@ -1181,9 +1164,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat int64_t ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - assert(0); - int32_t ret = 0; - // int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win); + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1197,8 +1178,6 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat lastKey = ts; STimeWindow nextWin = win; int32_t index = pWindowResInfo->curIndex; - assert(0); - int32_t sid = 0; // pRuntimeEnv->pTabObj->sid; while (1) { getNextTimeWindow(pQuery, &nextWin); @@ -1213,7 +1192,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat } // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &nextWin) != TSDB_CODE_SUCCESS) { break; } @@ -1254,54 +1233,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat break; } } - - /* - * pointsOffset is the maximum available space in result buffer update the actual forward step for query that - * requires checking buffer during loop - */ - if ((pQuery->checkBuffer == 1) && (++numOfRes) >= pQuery->pointsOffset) { - pQuery->lastKey = lastKey + step; - assert(0); - // *forwardStep = j + 1; - break; - } } - + + pQuery->lastKey = lastKey + step; free(sasArray); - - /* - * No need to calculate the number of output results for group-by normal columns, interval query - * because the results of group by normal column is put into intermediate buffer. - */ - int32_t num = 0; - if (!groupbyStateValue && !isIntervalQuery(pQuery)) { - num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; - } - - return num; -} - -static UNUSED_FUNC int32_t reviseForwardSteps(SQueryRuntimeEnv *pRuntimeEnv, int32_t forwardStep) { - /* - * 1. If value filter exists, we try all data in current block, and do not set the QUERY_RESBUF_FULL flag. - * - * 2. In case of top/bottom/ts_comp query, the checkBuffer == 1 and pQuery->numOfFilterCols - * may be 0 or not. We do not check the capacity of output buffer, since the filter function will do it. - * - * 3. In handling the query of secondary query of join, tsBuf servers as a ts filter. - */ - SQuery *pQuery = pRuntimeEnv->pQuery; - - if (isTopBottomQuery(pQuery) || isTSCompQuery(pQuery) || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { - return forwardStep; - } - - // current buffer does not have enough space, try in the next loop - if ((pQuery->checkBuffer == 1) && (pQuery->pointsOffset <= forwardStep)) { - forwardStep = pQuery->pointsOffset; - } - - return forwardStep; } static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, @@ -1310,9 +1245,9 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - /*numOfRes = */ rowwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); + rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { - blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); + blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; @@ -1854,7 +1789,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { bool vnodeParametersSafetyCheck(SQuery *pQuery) { // load data column information is incorrect for (int32_t i = 0; i < pQuery->numOfCols - 1; ++i) { - if (pQuery->colList[i].info.colId == pQuery->colList[i + 1].info.colId) { + if (pQuery->colList[i].colId == pQuery->colList[i + 1].colId) { qError("QInfo:%p invalid data load column for query", GET_QINFO_ADDR(pQuery)); return false; } @@ -2133,7 +2068,7 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu /* get appropriated size for one row data source*/ int32_t len = 0; for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - len += pQuery->colList[i].info.bytes; + len += pQuery->colList[i].bytes; } // assert(PRIMARY_TSCOL_LOADED(pQuery)); @@ -2147,7 +2082,7 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu pInterpoSupport->pPrevPoint[i] = prev + offset; pInterpoSupport->pNextPoint[i] = next + offset; - offset += pQuery->colList[i].info.bytes; + offset += pQuery->colList[i].bytes; } } } @@ -3252,7 +3187,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { } } -void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { +void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; if (pQuery->rec.rows == 0 || pQuery->limit.offset == 0) { return; @@ -3260,23 +3195,21 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { if (pQuery->rec.rows <= pQuery->limit.offset) { pQuery->limit.offset -= pQuery->rec.rows; - pQuery->rec.rows = 0; - // pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer resetCtxOutputBuf(pRuntimeEnv); // clear the buffer is full flag if exists pQuery->status &= (~QUERY_RESBUF_FULL); } else { - int32_t numOfSkip = (int32_t)pQuery->limit.offset; + int32_t numOfSkip = (int32_t) pQuery->limit.offset; pQuery->rec.rows -= numOfSkip; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; - assert(0); - // memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->size * bytes); + + memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes); pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { @@ -3864,9 +3797,9 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo * SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { - // numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo); + rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { - blockwiseApplyAllFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); + blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } updateWindowResNumOfRes(pRuntimeEnv, pTableDataInfo); @@ -4667,10 +4600,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { scanAllDataBlocks(pRuntimeEnv); pQuery->rec.rows = getNumOfResult(pRuntimeEnv); - doSkipResults(pRuntimeEnv); + skipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed - if (doRevisedResultsByLimit(pQInfo)) { + if (limitResults(pQInfo)) { pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; break; } @@ -4963,8 +4896,8 @@ static void tableFixedOutputProcess(SQInfo *pQInfo) { assert(isTopBottomQuery(pQuery)); } - doSkipResults(pRuntimeEnv); - doRevisedResultsByLimit(pQInfo); + skipResults(pRuntimeEnv); + limitResults(pQInfo); } static void tableMultiOutputProcess(SQInfo *pQInfo) { @@ -4993,7 +4926,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) { pQuery->rec.rows = getNumOfResult(pRuntimeEnv); if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.rows > 0) { - doSkipResults(pRuntimeEnv); + skipResults(pRuntimeEnv); } /* @@ -5010,7 +4943,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) { resetCtxOutputBuf(pRuntimeEnv); } - doRevisedResultsByLimit(pQInfo); + limitResults(pQInfo); if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->lastKey, pQuery->window.ekey); @@ -5079,7 +5012,7 @@ static void tableIntervalProcess(SQInfo *pQInfo) { // the offset is handled at prepare stage if no interpolation involved if (pQuery->interpoType == TSDB_INTERPO_NONE) { - doRevisedResultsByLimit(pQInfo); + limitResults(pQInfo); break; } else { taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.rows, pQuery->interpoType); @@ -5095,7 +5028,7 @@ static void tableIntervalProcess(SQInfo *pQInfo) { qTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.rows); if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - doRevisedResultsByLimit(pQInfo); + limitResults(pQInfo); break; } @@ -5129,7 +5062,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); - doRevisedResultsByLimit(pQInfo); + limitResults(pQInfo); pQInfo->pointsInterpo += numOfInterpo; qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -5361,11 +5294,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SColumnFilterInfo *pFilterInfo = (SColumnFilterInfo *)pMsg; SColumnFilterInfo *pDestFilterInfo = &pColInfo->filters[f]; - pDestFilterInfo->filterOnBinary = htons(pFilterInfo->filterOnBinary); + pDestFilterInfo->filterstr = htons(pFilterInfo->filterstr); pMsg += sizeof(SColumnFilterInfo); - if (pDestFilterInfo->filterOnBinary) { + if (pDestFilterInfo->filterstr) { pDestFilterInfo->len = htobe64(pFilterInfo->len); pDestFilterInfo->pz = (int64_t)calloc(1, pDestFilterInfo->len + 1); @@ -5635,9 +5568,9 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol return pGroupbyExpr; } -static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { +static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - if (pQuery->colList[i].info.numOfFilters > 0) { + if (pQuery->colList[i].numOfFilters > 0) { pQuery->numOfFilterCols++; } } @@ -5649,18 +5582,18 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { pQuery->pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * pQuery->numOfFilterCols); for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i) { - if (pQuery->colList[i].info.numOfFilters > 0) { + if (pQuery->colList[i].numOfFilters > 0) { SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[j]; memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData)); - pFilterInfo->info.info.filters = NULL; - - pFilterInfo->numOfFilters = pQuery->colList[i].info.numOfFilters; + pFilterInfo->info = pQuery->colList[i]; + + pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters; pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem)); for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) { SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f]; - pSingleColFilter->filterInfo = pQuery->colList[i].info.filters[f]; + pSingleColFilter->filterInfo = pQuery->colList[i].filters[f]; int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr; int32_t upper = pSingleColFilter->filterInfo.upperRelOptr; @@ -5670,11 +5603,12 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_INVALID_QUERY_MSG; } - int16_t type = pQuery->colList[i].info.type; - int16_t bytes = pQuery->colList[i].info.bytes; + int16_t type = pQuery->colList[i].type; + int16_t bytes = pQuery->colList[i].bytes; - __filter_func_t *rangeFilterArray = NULL; // vnodeGetRangeFilterFuncArray(type); - __filter_func_t *filterArray = NULL; // vnodeGetValueFilterFuncArray(type); + // todo refactor + __filter_func_t *rangeFilterArray = getRangeFilterFuncArray(type); + __filter_func_t *filterArray = getValueFilterFuncArray(type); if (rangeFilterArray == NULL && filterArray == NULL) { qError("QInfo:%p failed to get filter function, invalid data type:%d", pQInfo, type); @@ -5744,7 +5678,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { SColIndex *pColIndexEx = &pSqlExprMsg->colInfo; for (int32_t f = 0; f < pQuery->numOfCols; ++f) { - if (pColIndexEx->colId == pQuery->colList[f].info.colId) { + if (pColIndexEx->colId == pQuery->colList[f].colId) { pColIndexEx->colIndex = f; break; } @@ -5765,44 +5699,29 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutputCols = pQueryMsg->numOfOutputCols; - pQuery->numOfCols = numOfCols; + pQuery->numOfCols = numOfCols; pQuery->numOfOutputCols = numOfOutputCols; - - pQuery->limit.limit = pQueryMsg->limit; - pQuery->limit.offset = pQueryMsg->offset; - - pQuery->order.order = pQueryMsg->order; + pQuery->limit.limit = pQueryMsg->limit; + pQuery->limit.offset = pQueryMsg->offset; + pQuery->order.order = pQueryMsg->order; pQuery->order.orderColId = pQueryMsg->orderColId; - - pQuery->pSelectExpr = pExprs; - pQuery->pGroupbyExpr = pGroupbyExpr; - - pQuery->intervalTime = pQueryMsg->intervalTime; - - pQuery->slidingTime = pQueryMsg->slidingTime; + pQuery->pSelectExpr = pExprs; + pQuery->pGroupbyExpr = pGroupbyExpr; + pQuery->intervalTime = pQueryMsg->intervalTime; + pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; - - pQuery->interpoType = pQueryMsg->interpoType; + pQuery->interpoType = pQueryMsg->interpoType; pQuery->colList = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfCols); if (pQuery->colList == NULL) { - goto _clean_memory; + goto _cleanup; } for (int16_t i = 0; i < numOfCols; ++i) { - pQuery->colList[i].info = pQueryMsg->colList[i]; + pQuery->colList[i] = pQueryMsg->colList[i]; - SColumnInfo *pColInfo = &pQuery->colList[i].info; - pColInfo->filters = NULL; - // if (colList[i].numOfFilters > 0) { - // pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo)); - // - // for (int32_t j = 0; j < colList[i].numOfFilters; ++j) { - // tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]); - // } - // } else { - // pQuery->colList[i].data.filters = NULL; - // } + SColumnInfo *pColInfo = &pQuery->colList[i]; + pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters); } // calculate the result row size @@ -5813,15 +5732,15 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou doUpdateExprColumnIndex(pQuery); - int32_t ret = vnodeCreateFilterInfo(pQInfo, pQuery); + int32_t ret = createFilterInfo(pQInfo, pQuery); if (ret != TSDB_CODE_SUCCESS) { - goto _clean_memory; + goto _cleanup; } // prepare the result buffer pQuery->sdata = (SData **)calloc(pQuery->numOfOutputCols, POINTER_BYTES); if (pQuery->sdata == NULL) { - goto _clean_memory; + goto _cleanup; } // set the output buffer capacity @@ -5835,14 +5754,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou size_t size = (pQuery->rec.capacity + 1) * pExprs[col].resBytes + pExprs[col].interResBytes + sizeof(SData); pQuery->sdata[col] = (SData *)calloc(1, size); if (pQuery->sdata[col] == NULL) { - goto _clean_memory; + goto _cleanup; } } if (pQuery->interpoType != TSDB_INTERPO_NONE) { pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutputCols); if (pQuery->defaultVal == NULL) { - goto _clean_memory; + goto _cleanup; } // the first column is the timestamp @@ -5861,7 +5780,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); - goto _clean_memory; + goto _cleanup; } vnodeParametersSafetyCheck(pQuery); @@ -5869,7 +5788,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou qTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; -_clean_memory: +_cleanup: tfree(pQuery->defaultVal); if (pQuery->sdata != NULL) { @@ -6082,7 +6001,6 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) goto _query_over; } - // todo check vnode status if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) { qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg); code = TSDB_CODE_INVALID_QUERY_MSG; diff --git a/src/query/src/queryFilterFunc.c b/src/query/src/queryFilterFunc.c new file mode 100644 index 0000000000..3218b26179 --- /dev/null +++ b/src/query/src/queryFilterFunc.c @@ -0,0 +1,558 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" + +#include "taosmsg.h" +#include "tsqlfunction.h" +#include "queryExecutor.h" +#include "tcompare.h" + +bool less_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)minval < pFilter->filterInfo.upperBndi); +} + +bool less_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)minval < pFilter->filterInfo.upperBndi); +} + +bool less_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)minval < pFilter->filterInfo.upperBndi); +} + +bool less_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)minval < pFilter->filterInfo.upperBndi); +} + +bool less_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)minval < pFilter->filterInfo.upperBndd); +} + +bool less_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)minval < pFilter->filterInfo.upperBndd); +} + +////////////////////////////////////////////////////////////////// +bool large_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +bool large_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +bool large_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +bool large_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +bool large_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)maxval > pFilter->filterInfo.lowerBndd); +} + +bool large_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)maxval > pFilter->filterInfo.lowerBndd); +} +///////////////////////////////////////////////////////////////////// + +bool lessEqual_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)minval <= pFilter->filterInfo.upperBndi); +} + +bool lessEqual_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)minval <= pFilter->filterInfo.upperBndi); +} + +bool lessEqual_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)minval <= pFilter->filterInfo.upperBndi); +} + +bool lessEqual_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)minval <= pFilter->filterInfo.upperBndi); +} + +bool lessEqual_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)minval <= pFilter->filterInfo.upperBndd); +} + +bool lessEqual_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)minval <= pFilter->filterInfo.upperBndd); +} + +////////////////////////////////////////////////////////////////////////// +bool largeEqual_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool largeEqual_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool largeEqual_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool largeEqual_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool largeEqual_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)maxval >= pFilter->filterInfo.lowerBndd); +} + +bool largeEqual_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)maxval >= pFilter->filterInfo.lowerBndd); +} + +//////////////////////////////////////////////////////////////////////// + +bool equal_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int8_t *)minval == *(int8_t *)maxval) { + return (*(int8_t *)minval == pFilter->filterInfo.lowerBndi); + } else { /* range filter */ + assert(*(int8_t *)minval < *(int8_t *)maxval); + + return *(int8_t *)minval <= pFilter->filterInfo.lowerBndi && *(int8_t *)maxval >= pFilter->filterInfo.lowerBndi; + } +} + +bool equal_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int16_t *)minval == *(int16_t *)maxval) { + return (*(int16_t *)minval == pFilter->filterInfo.lowerBndi); + } else { /* range filter */ + assert(*(int16_t *)minval < *(int16_t *)maxval); + + return *(int16_t *)minval <= pFilter->filterInfo.lowerBndi && *(int16_t *)maxval >= pFilter->filterInfo.lowerBndi; + } +} + +bool equal_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int32_t *)minval == *(int32_t *)maxval) { + return (*(int32_t *)minval == pFilter->filterInfo.lowerBndi); + } else { /* range filter */ + assert(*(int32_t *)minval < *(int32_t *)maxval); + + return *(int32_t *)minval <= pFilter->filterInfo.lowerBndi && *(int32_t *)maxval >= pFilter->filterInfo.lowerBndi; + } +} + +bool equal_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int64_t *)minval == *(int64_t *)maxval) { + return (*(int64_t *)minval == pFilter->filterInfo.lowerBndi); + } else { /* range filter */ + assert(*(int64_t *)minval < *(int64_t *)maxval); + + return *(int64_t *)minval <= pFilter->filterInfo.lowerBndi && *(int64_t *)maxval >= pFilter->filterInfo.lowerBndi; + } +} + +bool equal_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(float *)minval == *(float *)maxval) { + return (fabs(*(float *)minval - pFilter->filterInfo.lowerBndd) <= FLT_EPSILON); + } else { /* range filter */ + assert(*(float *)minval < *(float *)maxval); + return *(float *)minval <= pFilter->filterInfo.lowerBndd && *(float *)maxval >= pFilter->filterInfo.lowerBndd; + } +} + +bool equal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(double *)minval == *(double *)maxval) { + return (*(double *)minval == pFilter->filterInfo.lowerBndd); + } else { /* range filter */ + assert(*(double *)minval < *(double *)maxval); + + return *(double *)minval <= pFilter->filterInfo.lowerBndi && *(double *)maxval >= pFilter->filterInfo.lowerBndi; + } +} + +bool equal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) { + // query condition string is greater than the max length of string, not qualified data + if (pFilter->filterInfo.len > pFilter->bytes) { + return false; + } + + return strncmp((char *)pFilter->filterInfo.pz, minval, pFilter->bytes) == 0; +} + +bool equal_nchar(SColumnFilterElem *pFilter, char *minval, char *maxval) { + // query condition string is greater than the max length of string, not qualified data + if (pFilter->filterInfo.len > pFilter->bytes) { + return false; + } + + return wcsncmp((wchar_t *)pFilter->filterInfo.pz, (wchar_t*) minval, pFilter->bytes/TSDB_NCHAR_SIZE) == 0; +} + +//////////////////////////////////////////////////////////////// +bool like_str(SColumnFilterElem *pFilter, char *minval, char *maxval) { + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + + return patternMatch((char *)pFilter->filterInfo.pz, minval, pFilter->bytes, &info) == TSDB_PATTERN_MATCH; +} + +bool like_nchar(SColumnFilterElem* pFilter, char* minval, char *maxval) { + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + + return WCSPatternMatch((wchar_t*) pFilter->filterInfo.pz, (wchar_t*) minval, pFilter->bytes/TSDB_NCHAR_SIZE, &info) == TSDB_PATTERN_MATCH; +} + +//////////////////////////////////////////////////////////////// +/** + * If minval equals to maxval, it may serve as the one element filter, + * or all elements of an array are identical during pref-filter stage. + * Otherwise, it must be pre-filter of array list of elements. + * + * During pre-filter stage, if there is one element that locates in [minval, maxval], + * the filter function will return true. + */ +bool nequal_i8(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int8_t *)minval == *(int8_t *)maxval) { + return (*(int8_t *)minval != pFilter->filterInfo.lowerBndi); + } + + return true; +} + +bool nequal_i16(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int16_t *)minval == *(int16_t *)maxval) { + return (*(int16_t *)minval != pFilter->filterInfo.lowerBndi); + } + + return true; +} + +bool nequal_i32(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int32_t *)minval == *(int32_t *)maxval) { + return (*(int32_t *)minval != pFilter->filterInfo.lowerBndi); + } + + return true; +} + +bool nequal_i64(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(int64_t *)minval == *(int64_t *)maxval) { + return (*(int64_t *)minval != pFilter->filterInfo.lowerBndi); + } + + return true; +} + +bool nequal_ds(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(float *)minval == *(float *)maxval) { + return (*(float *)minval != pFilter->filterInfo.lowerBndd); + } + + return true; +} + +bool nequal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (*(double *)minval == *(double *)maxval) { + return (*(double *)minval != pFilter->filterInfo.lowerBndd); + } + + return true; +} + +bool nequal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) { + if (pFilter->filterInfo.len > pFilter->bytes) { + return true; + } + + return strncmp((char *)pFilter->filterInfo.pz, minval, pFilter->bytes) != 0; +} + +bool nequal_nchar(SColumnFilterElem *pFilter, char* minval, char *maxval) { + if (pFilter->filterInfo.len > pFilter->bytes) { + return true; + } + + return wcsncmp((wchar_t *)pFilter->filterInfo.pz, (wchar_t*)minval, pFilter->bytes/TSDB_NCHAR_SIZE) != 0; +} + +//////////////////////////////////////////////////////////////// + +bool rangeFilter_i32_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)minval <= pFilter->filterInfo.upperBndi && *(int32_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i32_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)minvalfilterInfo.upperBndi &&*(int32_t *)maxval> pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i32_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)minval < pFilter->filterInfo.upperBndi && *(int32_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i32_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int32_t *)minval <= pFilter->filterInfo.upperBndi && *(int32_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +/////////////////////////////////////////////////////////////////////////////// +bool rangeFilter_i8_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)minval <= pFilter->filterInfo.upperBndi && *(int8_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i8_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)minvalfilterInfo.upperBndi &&*(int8_t *)maxval> pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i8_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)minval < pFilter->filterInfo.upperBndi && *(int8_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i8_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int8_t *)minval <= pFilter->filterInfo.upperBndi && *(int8_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +///////////////////////////////////////////////////////////////////////////////////// +bool rangeFilter_i16_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)minval <= pFilter->filterInfo.upperBndi && *(int16_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i16_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)minvalfilterInfo.upperBndi &&*(int16_t *)maxval> pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i16_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)minval < pFilter->filterInfo.upperBndi && *(int16_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i16_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int16_t *)minval <= pFilter->filterInfo.upperBndi && *(int16_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +//////////////////////////////////////////////////////////////////////// +bool rangeFilter_i64_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)minval <= pFilter->filterInfo.upperBndi && *(int64_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i64_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)minvalfilterInfo.upperBndi &&*(int64_t *)maxval> pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i64_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)minval < pFilter->filterInfo.upperBndi && *(int64_t *)maxval >= pFilter->filterInfo.lowerBndi); +} + +bool rangeFilter_i64_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(int64_t *)minval <= pFilter->filterInfo.upperBndi && *(int64_t *)maxval > pFilter->filterInfo.lowerBndi); +} + +//////////////////////////////////////////////////////////////////////// +bool rangeFilter_ds_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)minval <= pFilter->filterInfo.upperBndd && *(float *)maxval >= pFilter->filterInfo.lowerBndd); +} + +bool rangeFilter_ds_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)minvalfilterInfo.upperBndd &&*(float *)maxval> pFilter->filterInfo.lowerBndd); +} + +bool rangeFilter_ds_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)minval < pFilter->filterInfo.upperBndd && *(float *)maxval >= pFilter->filterInfo.lowerBndd); +} + +bool rangeFilter_ds_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(float *)minval <= pFilter->filterInfo.upperBndd && *(float *)maxval > pFilter->filterInfo.lowerBndd); +} + +////////////////////////////////////////////////////////////////////////// +bool rangeFilter_dd_ii(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)minval <= pFilter->filterInfo.upperBndd && *(double *)maxval >= pFilter->filterInfo.lowerBndd); +} + +bool rangeFilter_dd_ee(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)minvalfilterInfo.upperBndd &&*(double *)maxval> pFilter->filterInfo.lowerBndd); +} + +bool rangeFilter_dd_ie(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)minval < pFilter->filterInfo.upperBndd && *(double *)maxval >= pFilter->filterInfo.lowerBndd); +} + +bool rangeFilter_dd_ei(SColumnFilterElem *pFilter, char *minval, char *maxval) { + return (*(double *)minval <= pFilter->filterInfo.upperBndd && *(double *)maxval > pFilter->filterInfo.lowerBndd); +} + +//////////////////////////////////////////////////////////////////////////// +bool (*filterFunc_i8[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + less_i8, + large_i8, + equal_i8, + lessEqual_i8, + largeEqual_i8, + nequal_i8, + NULL, +}; + +bool (*filterFunc_i16[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + less_i16, + large_i16, + equal_i16, + lessEqual_i16, + largeEqual_i16, + nequal_i16, + NULL, +}; + +bool (*filterFunc_i32[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + less_i32, + large_i32, + equal_i32, + lessEqual_i32, + largeEqual_i32, + nequal_i32, + NULL, +}; + +bool (*filterFunc_i64[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + less_i64, + large_i64, + equal_i64, + lessEqual_i64, + largeEqual_i64, + nequal_i64, + NULL, +}; + +bool (*filterFunc_ds[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + less_ds, + large_ds, + equal_ds, + lessEqual_ds, + largeEqual_ds, + nequal_ds, + NULL, +}; + +bool (*filterFunc_dd[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + less_dd, + large_dd, + equal_dd, + lessEqual_dd, + largeEqual_dd, + nequal_dd, + NULL, +}; + +bool (*filterFunc_str[])(SColumnFilterElem* pFilter, char* minval, char *maxval) = { + NULL, + NULL, + NULL, + equal_str, + NULL, + NULL, + nequal_str, + like_str, +}; + +bool (*filterFunc_nchar[])(SColumnFilterElem* pFitler, char* minval, char* maxval) = { + NULL, + NULL, + NULL, + equal_nchar, + NULL, + NULL, + nequal_nchar, + like_nchar, +}; + +bool (*rangeFilterFunc_i8[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + rangeFilter_i8_ee, + rangeFilter_i8_ie, + rangeFilter_i8_ei, + rangeFilter_i8_ii, +}; + +bool (*rangeFilterFunc_i16[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + rangeFilter_i16_ee, + rangeFilter_i16_ie, + rangeFilter_i16_ei, + rangeFilter_i16_ii, +}; + +bool (*rangeFilterFunc_i32[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + rangeFilter_i32_ee, + rangeFilter_i32_ie, + rangeFilter_i32_ei, + rangeFilter_i32_ii, +}; + +bool (*rangeFilterFunc_i64[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + rangeFilter_i64_ee, + rangeFilter_i64_ie, + rangeFilter_i64_ei, + rangeFilter_i64_ii, +}; + +bool (*rangeFilterFunc_ds[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + rangeFilter_ds_ee, + rangeFilter_ds_ie, + rangeFilter_ds_ei, + rangeFilter_ds_ii, +}; + +bool (*rangeFilterFunc_dd[])(SColumnFilterElem *pFilter, char *minval, char *maxval) = { + NULL, + rangeFilter_dd_ee, + rangeFilter_dd_ie, + rangeFilter_dd_ei, + rangeFilter_dd_ii, +}; + +__filter_func_t* getRangeFilterFuncArray(int32_t type) { + switch(type) { + case TSDB_DATA_TYPE_BOOL: return rangeFilterFunc_i8; + case TSDB_DATA_TYPE_TINYINT: return rangeFilterFunc_i8; + case TSDB_DATA_TYPE_SMALLINT: return rangeFilterFunc_i16; + case TSDB_DATA_TYPE_INT: return rangeFilterFunc_i32; + case TSDB_DATA_TYPE_TIMESTAMP: //timestamp uses bigint filter + case TSDB_DATA_TYPE_BIGINT: return rangeFilterFunc_i64; + case TSDB_DATA_TYPE_FLOAT: return rangeFilterFunc_ds; + case TSDB_DATA_TYPE_DOUBLE: return rangeFilterFunc_dd; + default:return NULL; + } +} + +__filter_func_t* getValueFilterFuncArray(int32_t type) { + switch(type) { + case TSDB_DATA_TYPE_BOOL: return filterFunc_i8; + case TSDB_DATA_TYPE_TINYINT: return filterFunc_i8; + case TSDB_DATA_TYPE_SMALLINT: return filterFunc_i16; + case TSDB_DATA_TYPE_INT: return filterFunc_i32; + case TSDB_DATA_TYPE_TIMESTAMP: //timestamp uses bigint filter + case TSDB_DATA_TYPE_BIGINT: return filterFunc_i64; + case TSDB_DATA_TYPE_FLOAT: return filterFunc_ds; + case TSDB_DATA_TYPE_DOUBLE: return filterFunc_dd; + case TSDB_DATA_TYPE_BINARY: return filterFunc_str; + case TSDB_DATA_TYPE_NCHAR: return filterFunc_nchar; + default: return NULL; + } +} + +bool supportPrefilter(int32_t type) { return type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 4ab6895863..2f644175a5 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -189,8 +189,8 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData pDest = {{0}, 0}; - pDest.info = pCond->colList[i].info; - pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].info.bytes); + pDest.info = pCond->colList[i]; + pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].bytes); taosArrayPush(pQueryHandle->pColumns, &pDest); } @@ -595,10 +595,8 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j); if (pCol->info.colId == colId) { - // SDataCol* pDataCol = &pCols->cols[i]; -// pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start; - memmove(pCol->pData, pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start, - pQueryHandle->realNumOfRows * pCol->info.bytes); + memmove(pCol->pData, pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start, + pQueryHandle->realNumOfRows * pCol->info.bytes); break; } } @@ -1082,7 +1080,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { return pHandle->pColumns; } else { STableBlockInfo* pBlockInfoEx = &pHandle->pDataBlockInfo[pHandle->cur.slot]; - STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo; + STableCheckInfo* pCheckInfo = pBlockInfoEx->pTableCheckInfo; SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfoEx->pBlock.compBlock); assert(pHandle->realNumOfRows <= binfo.rows); diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index 28ebb18235..6fab24d51a 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -61,7 +61,7 @@ void taosArrayPop(SArray* pArray); * @param index * @return */ -void* taosArrayGet(SArray* pArray, size_t index); +void* taosArrayGet(const SArray* pArray, size_t index); /** * get the pointer data from the array @@ -69,7 +69,7 @@ void* taosArrayGet(SArray* pArray, size_t index); * @param index * @return */ -void* taosArrayGetP(SArray* pArray, size_t index); +void* taosArrayGetP(const SArray* pArray, size_t index); /** * return the size of array diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index d97b220a40..c025958438 100755 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -84,12 +84,12 @@ void taosArrayPop(SArray* pArray) { pArray->size -= 1; } -void* taosArrayGet(SArray* pArray, size_t index) { +void* taosArrayGet(const SArray* pArray, size_t index) { assert(index < pArray->size); return TARRAY_GET_ELEM(pArray, index); } -void* taosArrayGetP(SArray* pArray, size_t index) { +void* taosArrayGetP(const SArray* pArray, size_t index) { void* ret = taosArrayGet(pArray, index); if (ret == NULL) { return NULL; -- GitLab