From d6647dd60c44fbbc7052ca86576db64c01ec26bc Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 27 Apr 2020 14:01:54 +0800 Subject: [PATCH] [td-186] add select on tags support --- src/client/inc/tsclient.h | 2 +- src/client/src/tscSQLParser.c | 2 +- src/client/src/tscSql.c | 2 +- src/inc/taosmsg.h | 4 +- src/inc/tsdb.h | 17 +- src/query/inc/queryExecutor.h | 27 ++- src/query/inc/tsqlfunction.h | 2 +- src/query/src/queryExecutor.c | 331 +++++++++++++++++++--------------- src/tsdb/src/tsdbMeta.c | 8 +- src/tsdb/src/tsdbRead.c | 55 ++---- src/util/inc/tutil.h | 5 - 11 files changed, 231 insertions(+), 224 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index fead73d2a4..e4a3f7ec21 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -102,7 +102,7 @@ typedef struct SColumnIndex { typedef struct SFieldSupInfo { bool visible; - SArithExprInfo *pArithExprInfo; + SExprInfo *pArithExprInfo; SSqlExpr * pSqlExpr; } SFieldSupInfo; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index ccc0f85d67..586f6b8795 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1207,7 +1207,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, slot); if (pInfo->pSqlExpr == NULL) { - SArithExprInfo* pFuncExpr = calloc(1, sizeof(SArithExprInfo)); + SExprInfo* pFuncExpr = calloc(1, sizeof(SExprInfo)); pInfo->pArithExprInfo = pFuncExpr; // arithmetic expression always return result in the format of double float diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 12e90713ce..5d2c32344f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -437,7 +437,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { static UNUSED_FUNC char *getArithemicInputSrc(void *param, const char *name, int32_t colId) { // SArithmeticSupport *pSupport = (SArithmeticSupport *)param; -// SArithExprInfo * pExpr = pSupport->pArithExpr; +// SExprInfo * pExpr = pSupport->pArithExpr; // int32_t index = -1; // for (int32_t i = 0; i < pExpr->numOfCols; ++i) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 4573e3a6e8..e9842cc922 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -380,13 +380,13 @@ typedef struct SSqlFuncMsg { } arg[3]; } SSqlFuncMsg; -typedef struct SArithExprInfo { +typedef struct SExprInfo { SSqlFuncMsg base; struct tExprNode* pExpr; int16_t bytes; int16_t type; int16_t interResBytes; -} SArithExprInfo; +} SExprInfo; typedef struct SColumnFilterInfo { int16_t lowerRelOptr; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index e5c884c819..2ba3d3dd12 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -97,8 +97,8 @@ int tsdbTableSetName(STableCfg *config, char *name, bool dup); int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); void tsdbClearTableCfg(STableCfg *config); -int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId id, int32_t col, int16_t *type, int16_t *bytes, char **val); -int32_t tsdbTableGetName(TsdbRepoT *repo, STableId id, char** name); +int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t col, int16_t *type, int16_t *bytes, char **val); +int32_t tsdbTableGetName(TsdbRepoT *repo, STableId* id, char** name); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); @@ -150,21 +150,12 @@ typedef struct STsdbQueryCond { SColumnInfo *colList; } STsdbQueryCond; -typedef struct SBlockInfo { - STimeWindow window; - - int32_t numOfRows; - int32_t numOfCols; - - STableId tableId; -} SBlockInfo; - typedef struct SDataBlockInfo { STimeWindow window; int32_t rows; int32_t numOfCols; int64_t uid; - int32_t sid; + int32_t tid; } SDataBlockInfo; typedef struct { @@ -279,7 +270,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle); * @param pTagCond. tag query condition * */ -int32_t tsdbQueryByTagsCond( +int32_t tsdbQuerySTableByTagCond( TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 13b0195893..cfce3109b4 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -96,24 +96,20 @@ typedef struct SSingleColumnFilterInfo { void* pData; } SSingleColumnFilterInfo; -typedef struct STableQueryInfo { - int64_t lastKey; - STimeWindow win; +typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct + int32_t tableIndex; + int32_t groupIdx; // group id in table list + TSKEY lastKey; int32_t numOfRes; int16_t queryRangeSet; // denote if the query range is set, only available for interval query int64_t tag; + STimeWindow win; STSCursor cur; - int32_t tid; // for retrieve the page id list - + STableId id; // for retrieve the page id list + SWindowResInfo windowResInfo; } STableQueryInfo; -typedef struct STableDataInfo { // todo merge with the STableQueryInfo struct - int32_t tableIndex; - int32_t groupIdx; // group id in table list - STableQueryInfo* pTableQInfo; -} STableDataInfo; - typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; @@ -130,7 +126,7 @@ typedef struct SQuery { SLimitVal limit; int32_t rowSize; SSqlGroupbyExpr* pGroupbyExpr; - SArithExprInfo* pSelectExpr; + SExprInfo* pSelectExpr; SColumnInfo* colList; SColumnInfo* tagColList; int32_t numOfFilterCols; @@ -170,14 +166,15 @@ typedef struct SQInfo { TSKEY startTime; TSKEY elapsedTime; int32_t pointsInterpo; - int32_t code; // error code to returned to client + int32_t code; // error code to returned to client sem_t dataReady; void* tsdb; - STableGroupInfo groupInfo; // table id list + STableGroupInfo tableIdGroupInfo; // table id list < only includes the STableId list> + STableGroupInfo groupInfo; // SQueryRuntimeEnv runtimeEnv; int32_t groupIndex; - int32_t offset; /* offset in group result set of subgroup */ + int32_t offset; // offset in group result set of subgroup, todo refactor T_REF_DECLARE() /* diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 7ac0609a55..3f6376bb94 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -114,7 +114,7 @@ enum { #define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0) typedef struct SArithmeticSupport { - SArithExprInfo *pArithExpr; + SExprInfo *pArithExpr; int32_t numOfCols; SColumnInfo* colList; int32_t offset; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 04188b45cc..216758a0fb 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -25,7 +25,6 @@ #include "taosmsg.h" #include "tlosertree.h" #include "tscompression.h" -#include "tsdbMain.h" //todo use TableId instead of STable object #include "ttime.h" #include "tscUtil.h" // todo move the function to common module @@ -35,8 +34,6 @@ * check if the primary column is load by default, otherwise, the program will * forced to load primary column explicitly. */ -#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) - #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0) #define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) @@ -98,12 +95,18 @@ typedef struct { STSCursor cur; } SQueryStatusInfo; +typedef struct SGroupItem { + STableId id; + STableQueryInfo* info; +} SGroupItem; + static void setQueryStatus(SQuery *pQuery, int8_t status); -bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } +static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } +// todo move to utility static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group); -static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); +static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); @@ -114,9 +117,10 @@ static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols); static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); -static void createTableDataInfo(SQInfo *pQInfo); +static void createTableQueryInfo(SQInfo *pQInfo); +static void buildTagQueryResult(SQInfo *pQInfo); -static int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo); +static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTaleId, STableQueryInfo *pTableQueryInfo); static int32_t flushFromResultBuf(SQInfo *pQInfo); bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) { @@ -926,7 +930,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &win) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) { return; } @@ -947,7 +951,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &nextWin) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) { break; } @@ -1178,7 +1182,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS int64_t ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &win); + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1206,7 +1210,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } // null data, failed to allocate more memory buffer - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->sid, &nextWin) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin) != TSDB_CODE_SUCCESS) { break; } @@ -1553,7 +1557,7 @@ static bool isQueryKilled(SQInfo *pQInfo) { static void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_QUERY_CANCELLED; } -bool isFixedOutputQuery(SQuery *pQuery) { +static bool isFixedOutputQuery(SQuery *pQuery) { if (pQuery->intervalTime != 0) { return false; } @@ -1584,7 +1588,7 @@ bool isFixedOutputQuery(SQuery *pQuery) { return false; } -bool isPointInterpoQuery(SQuery *pQuery) { +static bool isPointInterpoQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionID = pQuery->pSelectExpr[i].base.functionId; if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { @@ -1596,7 +1600,7 @@ bool isPointInterpoQuery(SQuery *pQuery) { } // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION -bool isSumAvgRateQuery(SQuery *pQuery) { +static bool isSumAvgRateQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].base.functionId; if (functionId == TSDB_FUNC_TS) { @@ -1612,7 +1616,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) { return false; } -bool isFirstLastRowQuery(SQuery *pQuery) { +static bool isFirstLastRowQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionID = pQuery->pSelectExpr[i].base.functionId; if (functionID == TSDB_FUNC_LAST_ROW) { @@ -1623,7 +1627,7 @@ bool isFirstLastRowQuery(SQuery *pQuery) { return false; } -bool notHasQueryTimeRange(SQuery *pQuery) { +static bool notHasQueryTimeRange(SQuery *pQuery) { return (pQuery->window.skey == 0 && pQuery->window.ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->window.skey == INT64_MAX && pQuery->window.ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery))); } @@ -1643,6 +1647,19 @@ static bool needReverseScan(SQuery *pQuery) { return false; } + +static bool onlyQueryTags(SQuery* pQuery) { + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t functionId = pQuery->pSelectExpr[i].base.functionId; + + if (functionId != TSDB_FUNC_TAG) { + return false; + } + } + + return true; +} + ///////////////////////////////////////////////////////////////////////////////////////////// void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, int64_t *realSkey, @@ -2238,22 +2255,6 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { #endif } -UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; - - if (functionId == TSDB_FUNC_SPREAD) { - pRuntimeEnv->pCtx[i].param[1].dKey = stime; - pRuntimeEnv->pCtx[i].param[2].dKey = etime; - - pRuntimeEnv->pCtx[i].param[1].nType = TSDB_DATA_TYPE_DOUBLE; - pRuntimeEnv->pCtx[i].param[2].nType = TSDB_DATA_TYPE_DOUBLE; - } - } -} - static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfTotalPoints) { if (pDataStatis == NULL) { @@ -2539,7 +2540,7 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVariant *param) { +static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *param) { tVariantDestroy(param); char * val = NULL; @@ -2547,11 +2548,11 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar int16_t type = 0; if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { - tsdbTableGetName(tsdb, id, &val); + tsdbTableGetName(tsdb, pTableId, &val); bytes = TSDB_TABLE_NAME_LEN; type = TSDB_DATA_TYPE_BINARY; } else { - tsdbGetTableTagVal(tsdb, id, tagColId, &type, &bytes, &val); + tsdbGetTableTagVal(tsdb, pTableId, tagColId, &type, &bytes, &val); } tVariantCreateFromBinary(param, val, bytes, type); @@ -2561,13 +2562,13 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar } } -void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) { +void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { SQuery *pQuery = pRuntimeEnv->pQuery; SSqlFuncMsg *pFuncMsg = &pQuery->pSelectExpr[0].base; if (pQuery->numOfOutput == 1 && pFuncMsg->functionId == TSDB_FUNC_TS_COMP) { assert(pFuncMsg->numOfParams == 1); - doSetTagValueInParam(tsdb, id, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag); + doSetTagValueInParam(tsdb, pTableId, pFuncMsg->arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag); } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { @@ -2579,7 +2580,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId id, void *tsdb) { } // todo use tag column index to optimize performance - doSetTagValueInParam(tsdb, id, pCol->colId, &pRuntimeEnv->pCtx[idx].tag); + doSetTagValueInParam(tsdb, pTableId, pCol->colId, &pRuntimeEnv->pCtx[idx].tag); } // set the join tag for first column @@ -2744,9 +2745,9 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf } typedef struct SCompSupporter { - STableDataInfo **pTableDataInfo; - int32_t * position; - SQInfo * pQInfo; + STableQueryInfo **pTableQueryInfo; + int32_t * position; + SQInfo * pQInfo; } SCompSupporter; int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { @@ -2769,13 +2770,13 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) return -1; } - SWindowResInfo *pWindowResInfo1 = &supporter->pTableDataInfo[left]->pTableQInfo->windowResInfo; + SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo; SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1); TSKEY leftTimestamp = GET_INT64_VAL(b1); - SWindowResInfo *pWindowResInfo2 = &supporter->pTableDataInfo[right]->pTableQInfo->windowResInfo; + SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo; SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2); @@ -2902,16 +2903,16 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { tFilePage **buffer = (tFilePage **)pQuery->sdata; int32_t * posList = calloc(size, sizeof(int32_t)); - STableDataInfo **pTableList = malloc(POINTER_BYTES * size); + STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); // todo opt for the case of one table per group int32_t numOfTables = 0; for (int32_t i = 0; i < size; ++i) { - SPair * p = taosArrayGet(pGroup, i); - STableDataInfo *pInfo = p->sec; + SGroupItem *item = taosArrayGet(pGroup, i); + STableQueryInfo *pInfo = item->info; - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->pTableQInfo->tid); - if (list.size > 0 && pInfo->pTableQInfo->windowResInfo.size > 0) { + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->id.tid); + if (list.size > 0 && pInfo->windowResInfo.size > 0) { pTableList[numOfTables] = pInfo; numOfTables += 1; } @@ -2940,7 +2941,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { while (1) { int32_t pos = pTree->pNode[0].index; - SWindowResInfo *pWindowResInfo = &pTableList[pos]->pTableQInfo->windowResInfo; + SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo; SWindowResult * pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); @@ -3074,9 +3075,9 @@ void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pRes } } -void setTableDataInfo(STableDataInfo *pTableDataInfo, int32_t tableIndex, int32_t groupId) { - pTableDataInfo->groupIdx = groupId; - pTableDataInfo->tableIndex = tableIndex; +void setTableDataInfo(STableQueryInfo *pTableQueryInfo, int32_t tableIndex, int32_t groupId) { + pTableQueryInfo->groupIdx = groupId; + pTableQueryInfo->tableIndex = tableIndex; } static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) { @@ -3136,7 +3137,7 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { if (isIntervalQuery(pQuery)) { // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { - // STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + // STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo; // SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; // // doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); @@ -3364,7 +3365,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); @@ -3430,7 +3431,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); @@ -3503,13 +3504,13 @@ static bool hasMainOutput(SQuery *pQuery) { return false; } -STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid, STimeWindow win) { +STableQueryInfo *createTableQueryInfoImpl(SQueryRuntimeEnv *pRuntimeEnv, STableId tableId, STimeWindow win) { STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); pTableQueryInfo->win = win; pTableQueryInfo->lastKey = win.skey; - pTableQueryInfo->tid = tid; + pTableQueryInfo->id = tableId; pTableQueryInfo->cur.vnodeIndex = -1; initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT); @@ -3562,8 +3563,7 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STable *pTable, int32_t groupIdx, - TSKEY nextKey) { +void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; int32_t GROUPRESULTID = 1; @@ -3588,7 +3588,7 @@ void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STabl initCtxOutputBuf(pRuntimeEnv); pTableQueryInfo->lastKey = nextKey; - setAdditionalInfo(pQInfo, pTable, pTableQueryInfo); + setAdditionalInfo(pQInfo, pTableId, pTableQueryInfo); } static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { @@ -3616,11 +3616,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * } } -int32_t setAdditionalInfo(SQInfo *pQInfo, STable *pTable, STableQueryInfo *pTableQueryInfo) { +int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *pTableQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; assert(pTableQueryInfo->lastKey >= 0); - setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb); + setTagVal(pRuntimeEnv, pTableId, pQInfo->tsdb); // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { @@ -3819,12 +3819,12 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { assert(pQuery->rec.rows <= pQuery->rec.capacity); } -static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo) { +static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; // update the number of result for each, only update the number of rows for the corresponding window result. if (pQuery->intervalTime == 0) { - int32_t g = pTableDataInfo->groupIdx; + int32_t g = pTableQueryInfo->groupIdx; assert(pRuntimeEnv->windowResInfo.size > 0); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g)); @@ -3834,11 +3834,10 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf } } -void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo *pTableDataInfo, +void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock, __block_search_fn_t searchFn) { SQuery * pQuery = pRuntimeEnv->pQuery; - STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo; SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; @@ -3848,7 +3847,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableDataInfo * blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } - updateWindowResNumOfRes(pRuntimeEnv, pTableDataInfo); + updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo); updatelastkey(pQuery, pTableQueryInfo); } @@ -4226,7 +4225,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) }; if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); } pQInfo->tsdb = tsdb; @@ -4362,8 +4361,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { } SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); - STableDataInfo *pTableDataInfo = NULL; - STable * pTable = NULL; + STableQueryInfo *pTableQueryInfo = NULL; // todo opt performance using hash table size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); @@ -4372,20 +4370,19 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { - SPair * p = taosArrayGet(group, j); - STableDataInfo *pInfo = p->sec; + SGroupItem *item = taosArrayGet(group, j); + STableQueryInfo *pInfo = item->info; - if (pInfo->pTableQInfo->tid == blockInfo.sid) { - pTableDataInfo = p->sec; - pTable = p->first; + if (pInfo->id.tid == blockInfo.tid) { + assert(pInfo->id.uid == blockInfo.uid); + pTableQueryInfo = item->info; + break; } } } - assert(pTableDataInfo != NULL && pTableDataInfo->pTableQInfo != NULL); - STableQueryInfo *pTableQueryInfo = pTableDataInfo->pTableQInfo; - + assert(pTableQueryInfo != NULL && pTableQueryInfo != NULL); restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo); SDataStatis *pStatis = NULL; @@ -4393,10 +4390,10 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { TSKEY nextKey = blockInfo.window.skey; if (!isIntervalQuery(pQuery)) { - setExecutionContext(pQInfo, pTableQueryInfo, pTable, pTableDataInfo->groupIdx, nextKey); + setExecutionContext(pQInfo, pTableQueryInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey); } else { // interval query setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey); - int32_t ret = setAdditionalInfo(pQInfo, pTable, pTableQueryInfo); + int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); if (ret != TSDB_CODE_SUCCESS) { pQInfo->code = ret; @@ -4404,7 +4401,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { } } - stableApplyFunctionsOnBlock(pRuntimeEnv, pTableDataInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey); + stableApplyFunctionsOnBlock(pRuntimeEnv, pTableQueryInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey); } int64_t et = taosGetTimestampMs(); @@ -4417,27 +4414,24 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { setQueryStatus(pQuery, QUERY_NOT_COMPLETED); SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - SPair * p = taosArrayGet(group, index); - - STable * pTable = p->first; - STableDataInfo *pInfo = p->sec; + SGroupItem* item = taosArrayGet(group, index); - setTagVal(pRuntimeEnv, pTable->tableId, pQInfo->tsdb); + setTagVal(pRuntimeEnv, &item->id, pQInfo->tsdb); qTrace("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, - pTable->tableId.uid, pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey); + item->id.uid, item->info->lastKey, item->info->win.ekey); STsdbQueryCond cond = { - .twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey}, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = {item->info->lastKey, item->info->win.ekey}, + .order = pQuery->order.order, + .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayInit(1, sizeof(SPair)); + SArray *tx = taosArrayInit(1, sizeof(STableId)); - taosArrayPush(tx, p); + taosArrayPush(tx, &item->info->id); taosArrayPush(g1, &tx); STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; @@ -4585,7 +4579,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { #endif } else { - createTableDataInfo(pQInfo); + createTableQueryInfo(pQInfo); /* * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query @@ -4618,12 +4612,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { return; } - SPair * p = taosArrayGet(group, pQInfo->tableIndex); - STableDataInfo *pInfo = p->sec; - - TSKEY skey = pInfo->pTableQInfo->lastKey; - if (skey > 0) { - pQuery->window.skey = skey; + SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex); + + STableQueryInfo *pInfo = item->info; + if (pInfo->lastKey > 0) { + pQuery->window.skey = pInfo->lastKey; } if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { @@ -4665,7 +4658,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * to ensure that, we can reset the query range once query on a meter is completed. */ pQInfo->tableIndex++; - pInfo->pTableQInfo->lastKey = pQuery->lastKey; + pInfo->lastKey = pQuery->lastKey; // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| @@ -4737,7 +4730,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->limit.offset); } -static void createTableDataInfo(SQInfo *pQInfo) { +static void createTableQueryInfo(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; // todo make sure the table are added the reference count to gauranteed that all involved tables are valid @@ -4749,21 +4742,18 @@ static void createTableDataInfo(SQInfo *pQInfo) { size_t s = taosArrayGetSize(group); for (int32_t j = 0; j < s; ++j) { - SPair *p = (SPair *)taosArrayGet(group, j); + SGroupItem* item = (SGroupItem *)taosArrayGet(group, j); - // STableDataInfo has been created for each table - if (p->sec != NULL) { // todo refactor + // STableQueryInfo has been created for each table + if (item->info != NULL) { return; } - STableDataInfo *pInfo = calloc(1, sizeof(STableDataInfo)); - - setTableDataInfo(pInfo, index, i); - pInfo->pTableQInfo = - createTableQueryInfo(&pQInfo->runtimeEnv, ((STable *)(p->first))->tableId.tid, pQuery->window); - - p->sec = pInfo; - + STableQueryInfo* pInfo = createTableQueryInfoImpl(&pQInfo->runtimeEnv, item->id, pQuery->window); + pInfo->groupIdx = i; + pInfo->tableIndex = index; + + item->info = pInfo; index += 1; } } @@ -4773,7 +4763,7 @@ static void prepareQueryInfoForReverseScan(SQInfo *pQInfo) { // SQuery *pQuery = pQInfo->runtimeEnv.pQuery; // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { - // STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; + // STableQueryInfo *pTableQueryInfo = pQInfo->pTableQueryInfo[i].pTableQInfo; // changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo); // } } @@ -4811,20 +4801,14 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (isIntervalQuery(pQuery)) { - // for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { - // STableQueryInfo *pTableQueryInfo = pQInfo->pTableDataInfo[i].pTableQInfo; - // closeAllTimeWindow(&pTableQueryInfo->windowResInfo); - // } size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); for (int32_t i = 0; i < numOfGroup; ++i) { SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { - SPair * p = taosArrayGet(group, j); - STableDataInfo *pInfo = p->sec; - - closeAllTimeWindow(&pInfo->pTableQInfo->windowResInfo); + SGroupItem* item = taosArrayGet(group, j); + closeAllTimeWindow(&item->info->windowResInfo); } } } else { // close results for group result @@ -4865,7 +4849,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); // create the query support structures - createTableDataInfo(pQInfo); + createTableQueryInfo(pQInfo); // do check all qualified data blocks int64_t el = queryOnDataBlocks(pQInfo); @@ -5481,7 +5465,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, return 0; } -static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { +static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { qTrace("qmsg:%p create arithmetic expr from binary string", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz); tExprNode* pExprNode = exprTreeFromBinary(pArithExprInfo->base.arg[0].argValue.pz, pArithExprInfo->base.arg[0].argBytes); @@ -5494,12 +5478,12 @@ static int32_t buildAirthmeticExprFromMsg(SArithExprInfo *pArithExprInfo, SQuery return TSDB_CODE_SUCCESS; } -static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExprInfo **pSqlFuncExpr, +static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pSqlFuncExpr, SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) { *pSqlFuncExpr = NULL; int32_t code = TSDB_CODE_SUCCESS; - SArithExprInfo *pExprs = (SArithExprInfo *)calloc(1, sizeof(SArithExprInfo) * pQueryMsg->numOfOutput); + SExprInfo *pExprs = (SExprInfo *)calloc(1, sizeof(SExprInfo) * pQueryMsg->numOfOutput); if (pExprs == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -5713,7 +5697,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } } -static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SArithExprInfo *pExprs, +static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, STableGroupInfo *groupInfo, SColumnInfo* pTagCols) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { @@ -5727,7 +5711,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou int16_t numOfOutput = pQueryMsg->numOfOutput; pQuery->numOfCols = numOfCols; - pQuery->numOfOutput = numOfOutput; + pQuery->numOfOutput = numOfOutput; pQuery->limit.limit = pQueryMsg->limit; pQuery->limit.offset = pQueryMsg->offset; pQuery->order.order = pQueryMsg->order; @@ -5801,7 +5785,26 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - pQInfo->groupInfo = *groupInfo; + + pQInfo->tableIdGroupInfo = *groupInfo; + size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList); + + pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); + pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; + + for(int32_t i = 0; i < numOfGroups; ++i) { + SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); + size_t s = taosArrayGetSize(pa); + + SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); + + for(int32_t j = 0; j < s; ++j) { + SGroupItem item = { .id = *(STableId*) taosArrayGet(pa, j), .info = NULL, }; + taosArrayPush(p1, &item); + } + + taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); + } pQuery->pos = -1; @@ -5853,7 +5856,8 @@ static bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } -static void freeQInfo(SQInfo *pQInfo); +static void freeQInfo(SQInfo *pQInfo); + static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, bool isSTable) { int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -5918,12 +5922,11 @@ static void freeQInfo(SQInfo *pQInfo) { if (pQuery->pSelectExpr != NULL) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { -// SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].arithExprInfo; + SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; -// if (pBinExprInfo->numOfCols > 0) { -// tfree(pBinExprInfo->pReqColumns); -// tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL); -// } + if (pExprInfo->pExpr != NULL) { + tExprTreeDestroy(&pExprInfo->pExpr, NULL); + } } tfree(pQuery->pSelectExpr); @@ -5940,15 +5943,24 @@ static void freeQInfo(SQInfo *pQInfo) { size_t num = taosArrayGetSize(p); for(int32_t j = 0; j < num; ++j) { - SPair* pair = taosArrayGet(p, j); - if (pair->sec != NULL) { - destroyTableQueryInfo(((STableDataInfo*)pair->sec)->pTableQInfo, pQuery->numOfOutput); - tfree(pair->sec); + SGroupItem* item = taosArrayGet(p, j); + if (item->info != NULL) { + destroyTableQueryInfo(item->info, pQuery->numOfOutput); } } + taosArrayDestroy(p); } + taosArrayDestroy(pQInfo->groupInfo.pGroupList); + + for(int32_t i = 0; i < numOfGroups; ++i) { + SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i); + taosArrayDestroy(p); + } + + taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); + if (pQuery->pGroupbyExpr != NULL) { taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo); tfree(pQuery->pGroupbyExpr); @@ -5959,7 +5971,6 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->colList); tfree(pQuery->sdata); - taosArrayDestroy(pQInfo->groupInfo.pGroupList); tfree(pQuery); qTrace("QInfo:%p QInfo is freed", pQInfo); @@ -6059,7 +6070,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) goto _query_over; } - SArithExprInfo *pExprs = NULL; + SExprInfo *pExprs = NULL; if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { goto _query_over; } @@ -6069,7 +6080,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) goto _query_over; } - bool isSTableQuery = false; + bool isSTableQuery = false; STableGroupInfo groupInfo = {0}; if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { @@ -6085,7 +6096,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) } // todo handle the error - /*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, + /*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, numOfGroupByCols); if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; @@ -6136,8 +6147,10 @@ void qTableQuery(qinfo_t qinfo) { } qTrace("QInfo:%p query task is launched", pQInfo); - - if (pQInfo->runtimeEnv.stableQuery) { + + if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { + buildTagQueryResult(pQInfo); + } else if (pQInfo->runtimeEnv.stableQuery) { stableQueryImpl(pQInfo); } else { tableQueryImpl(pQInfo); @@ -6229,3 +6242,33 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co // pObj->qhandle = NULL; // } } + +static void buildTagQueryResult(SQInfo* pQInfo) { + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + + size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + assert(num == 1); // only one group + + SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + + num = taosArrayGetSize(pa); + assert(num == pQInfo->groupInfo.numOfTables); + + int16_t type, bytes; + + for(int32_t i = 0; i < num; ++i) { + SExprInfo* pExprInfo = pQuery->pSelectExpr; + char* data = NULL; + + SGroupItem* item = taosArrayGet(pa, i); + + for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { + tsdbGetTableTagVal(pQInfo->tsdb, &item->id, j, &type, &bytes, &data); + assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type); + + memcpy(pQuery->sdata[j]->data + num * bytes, data, bytes); + } + } +} + diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 04863d1ef2..39a06fb8b9 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -225,9 +225,9 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { } } -int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { +int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { STsdbMeta* pMeta = tsdbGetMeta(repo); - STable* pTable = tsdbGetTableByUid(pMeta, id.uid); + STable* pTable = tsdbGetTableByUid(pMeta, id->uid); STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); @@ -251,9 +251,9 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId id, int32_t colId, int16_t* return 0; } -int32_t tsdbTableGetName(TsdbRepoT *repo, STableId id, char** name) { +int32_t tsdbTableGetName(TsdbRepoT *repo, STableId* id, char** name) { STsdbMeta* pMeta = tsdbGetMeta(repo); - STable* pTable = tsdbGetTableByUid(pMeta, id.uid); + STable* pTable = tsdbGetTableByUid(pMeta, id->uid); *name = strndup(pTable->name, TSDB_TABLE_NAME_LEN); if (*name == NULL) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index eb35be5383..e66c6dc348 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -62,13 +62,9 @@ typedef struct STableCheckInfo { STableId tableId; TSKEY lastKey; STable* pTableObj; - int64_t offsetInHeaderFile; int32_t start; - bool checkFirstFileBlock; - SCompInfo* pCompInfo; int32_t compSize; - int32_t numOfBlocks; // number of qualified data blocks not the original blocks SDataCols* pDataCols; @@ -159,15 +155,15 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable assert(gsize > 0); for (int32_t j = 0; j < gsize; ++j) { - SPair* d = (SPair*) taosArrayGet(group, j); - assert(d->first != NULL); + STableId* id = (STableId*) taosArrayGet(group, j); STableCheckInfo info = { .lastKey = pQueryHandle->window.skey, - .tableId = ((STable*) d->first)->tableId, - .pTableObj = d->first, + .tableId = *id, + .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid), }; + assert(info.pTableObj != NULL); taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } } @@ -357,7 +353,7 @@ static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlo .window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast}, .numOfCols = pBlock->numOfCols, .rows = pBlock->numOfPoints, - .sid = pCheckInfo->tableId.tid, + .tid = pCheckInfo->tableId.tid, .uid = pCheckInfo->tableId.uid, }; @@ -1058,7 +1054,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { SDataBlockInfo blockInfo = { .uid = pTable->tableId.uid, - .sid = pTable->tableId.tid, + .tid = pTable->tableId.tid, .rows = rows, .window = {.skey = MIN(skey, ekey), .ekey = MAX(skey, ekey)} }; @@ -1293,24 +1289,19 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { } void createTableGroupImpl(SArray* pGroups, STable** pTables, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { - SArray* g = taosArrayInit(16, sizeof(SPair)); - - SPair p = {.first = pTables[0]}; - taosArrayPush(g, &p); + SArray* g = taosArrayInit(16, sizeof(STableId)); + taosArrayPush(g, &pTables[0]->tableId); for (int32_t i = 1; i < numOfTables; ++i) { int32_t ret = compareFn(&pTables[i - 1], &pTables[i], pSupp); assert(ret == 0 || ret == -1); if (ret == 0) { - SPair p1 = {.first = pTables[i]}; - taosArrayPush(g, &p1); + taosArrayPush(g, &pTables[i]->tableId); } else { taosArrayPush(pGroups, &g); // current group is ended, start a new group g = taosArrayInit(16, POINTER_BYTES); - - SPair p1 = {.first = pTables[i]}; - taosArrayPush(g, &p1); + taosArrayPush(g, &pTables[i]->tableId); } } @@ -1329,11 +1320,10 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC } if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table - SArray* sa = taosArrayInit(size, sizeof(SPair)); + SArray* sa = taosArrayInit(size, sizeof(STableId)); for(int32_t i = 0; i < size; ++i) { STable* pTable = taosArrayGetP(pTableList, i); - SPair p = {.first = pTable}; - taosArrayPush(sa, &p); + taosArrayPush(sa, &pTable->tableId); } taosArrayPush(pTableGroup, &sa); @@ -1441,24 +1431,15 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) } -int32_t tsdbQueryByTagsCond( - TsdbRepoT *tsdb, - int64_t uid, - const char *pTagCond, - size_t len, - int16_t tagNameRelType, - const char* tbnameCond, - STableGroupInfo *pGroupInfo, - SColIndex *pColIndex, - int32_t numOfCols -) { +int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagCond, size_t len, int16_t tagNameRelType, + const char* tbnameCond, STableGroupInfo *pGroupInfo, SColIndex *pColIndex, int32_t numOfCols) { STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); if (pSTable == NULL) { uError("failed to get stable, uid:%" PRIu64, uid); return TSDB_CODE_INVALID_TABLE_ID; } - SArray* res = taosArrayInit(8, POINTER_BYTES); + SArray* res = taosArrayInit(8, sizeof(STableId)); STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pSTable); // no tags and tbname condition, all child tables of this stable are involved @@ -1472,7 +1453,7 @@ int32_t tsdbQueryByTagsCond( return ret; } - int32_t ret = TSDB_CODE_SUCCESS; + int32_t ret = TSDB_CODE_SUCCESS; tExprNode* expr = exprTreeFromTableName(tbnameCond); tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); @@ -1507,9 +1488,9 @@ int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, int64_t uid, STableGroupInfo* pGro pGroupInfo->numOfTables = 1; pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - SArray* group = taosArrayInit(1, POINTER_BYTES); + SArray* group = taosArrayInit(1, sizeof(STableId)); - taosArrayPush(group, &pTable); + taosArrayPush(group, &pTable->tableId); taosArrayPush(pGroupInfo->pGroupList, &group); return TSDB_CODE_SUCCESS; diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index cdcc639151..68d4b80bc0 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -107,11 +107,6 @@ extern "C" { #define POW2(x) ((x) * (x)) -typedef struct SPair { - void* first; - void* sec; -} SPair; - int32_t strdequote(char *src); void strtrim(char *src); -- GitLab