diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index e2fec9b3dd11714647e101e5e8bbddcc0e957b2e..f68caf1a6063d9dcacfa29ecdb067ea2369d7b43 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -108,7 +108,9 @@ void tsdbClearTableCfg(STableCfg *config); void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes); char* tsdbGetTableName(void *pTable); -STableId tsdbGetTableId(void *pTable); + +#define TSDB_TABLEID(_table) ((STableId*) (_table)) + STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 3aa1b60be576e9b0d3559d0f011c171a9af33a16..73124b8fc602b1d800e3c56ef08a714fdac676fb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -154,6 +154,7 @@ typedef struct SQuery { } SQuery; typedef struct SQueryRuntimeEnv { + jmp_buf env; SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SQuery* pQuery; SQLFunctionCtx* pCtx; @@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv { void* pSecQueryHandle; // another thread for bool stableQuery; // super table query or not bool topBotQuery; // false + bool groupbyNormalCol; // denote if this is a groupby normal column query + bool hasTagResults; // if there are tag values in final result or not int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file } SQueryRuntimeEnv; @@ -197,8 +200,10 @@ typedef struct SQInfo { */ int32_t tableIndex; int32_t numOfGroupResultPages; - _qinfo_free_fn_t freeFn; - jmp_buf env; + _qinfo_free_fn_t freeFn; //todo remove it + + void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; + } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1882aa1850754d8da3c84dd0b50fe68569d91d93..ad56de9eb1fbcd448b98fe5ab1db0b3e7e663e55 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -99,6 +99,16 @@ static UNUSED_FUNC void *u_malloc (size_t __size) { } } +static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) { + uint32_t v = rand(); + if (v % 5 <= 1) { + return NULL; + } else { + return calloc(num, __size); + } +} + +#define calloc u_calloc #define malloc u_malloc #endif @@ -108,6 +118,7 @@ static UNUSED_FUNC void *u_malloc (size_t __size) { static void setQueryStatus(SQuery *pQuery, int8_t status); +#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0) static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } // todo move to utility @@ -313,6 +324,24 @@ static bool isTopBottomQuery(SQuery *pQuery) { return false; } +static bool hasTagValOutput(SQuery* pQuery) { + SExprInfo *pExprInfo = &pQuery->pSelectExpr[0]; + if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { + return true; + } else { // set tag value, by which the results are aggregated. + for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { + SExprInfo *pLocalExprInfo = &pQuery->pSelectExpr[idx]; + + // ts_comp column required the tag value for join filter + if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) { + return true; + } + } + } + + return false; +} + static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) { // for a tag column, no corresponding field info SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo; @@ -786,8 +815,8 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas return NULL; } char *dataBlock = NULL; - SQuery *pQuery = pRuntimeEnv->pQuery; + SQuery *pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; int32_t functionId = pQuery->pSelectExpr[col].base.functionId; @@ -806,6 +835,10 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas sas->numOfCols = pQuery->numOfCols; sas->data = calloc(pQuery->numOfCols, POINTER_BYTES); + if (sas->data == NULL) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + // here the pQuery->colList and sas->colList are identical int32_t numOfCols = taosArrayGetSize(pDataBlock); for (int32_t i = 0; i < pQuery->numOfCols; ++i) { @@ -859,6 +892,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); + if (sasArray == NULL) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); @@ -866,7 +902,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (isIntervalQuery(pQuery) && tsCols != NULL) { + if (QUERY_IS_INTERVAL_QUERY(pQuery) && tsCols != NULL) { int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = tsCols[offset]; @@ -1083,8 +1119,12 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; - bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); + bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol; + SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); + if (sasArray == NULL) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } int16_t type = 0; int16_t bytes = 0; @@ -1230,7 +1270,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl STableQueryInfo* pTableQInfo = pQuery->current; SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); @@ -1351,14 +1391,16 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY } // set the output buffer for the selectivity + tag query -static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { +static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) { + SQuery* pQuery = pRuntimeEnv->pQuery; + if (isSelectivityWithTagsQuery(pQuery)) { int32_t num = 0; int16_t tagLen = 0; SQLFunctionCtx *p = NULL; SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES); - + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; @@ -1385,7 +1427,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { } } -static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) { +static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE); @@ -1476,7 +1518,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order resetCtxOutputBuf(pRuntimeEnv); } - setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx); + setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx); return TSDB_CODE_SUCCESS; _clean: @@ -2116,7 +2158,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; - if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) { + if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pQuery)) { SResultRec *pRec = &pQuery->rec; if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { @@ -2170,7 +2212,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); // todo extract methods - if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) { + if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) { STimeWindow realWin = TSWINDOW_INITIALIZER, w = TSWINDOW_INITIALIZER; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; @@ -2216,7 +2258,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { setQueryStatus(pQuery, QUERY_COMPLETED); } - if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { + if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; @@ -2637,7 +2679,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { tfree(pTableList); qError("QInfo:%p failed alloc memory", pQInfo); - longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // todo opt for the case of one table per group @@ -2645,7 +2687,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { for (int32_t i = 0; i < size; ++i) { STableQueryInfo *item = taosArrayGetP(pGroup, i); - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tsdbGetTableId(item->pTable).tid); + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid); if (list.size > 0 && item->windowResInfo.size > 0) { pTableList[numOfTables] = item; numOfTables += 1; @@ -2668,6 +2710,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo)); + if (pResultInfo == NULL) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); @@ -2868,7 +2914,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { // group by normal columns and interval query on normal table SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { + if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) { disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor @@ -3043,7 +3089,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; bool toContinue = false; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { + if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) { // for each group result, call the finalize function for each column SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; @@ -3235,10 +3281,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { + if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) { // for each group result, call the finalize function for each column SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (pRuntimeEnv->groupbyNormalCol) { closeAllTimeWindow(pWindowResInfo); } @@ -3280,10 +3326,10 @@ static bool hasMainOutput(SQuery *pQuery) { return false; } -static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win) { +static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win, void* buf) { SQuery* pQuery = pRuntimeEnv->pQuery; - STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); + STableQueryInfo *pTableQueryInfo = buf;//calloc(1, sizeof(STableQueryInfo)); pTableQueryInfo->win = win; pTableQueryInfo->lastKey = win.skey; @@ -3294,7 +3340,8 @@ static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, voi int32_t initialSize = 1; int32_t initialThreshold = 1; - if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + // set more initial size of interval/groupby query + if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { initialSize = 20; initialThreshold = 100; } @@ -3309,7 +3356,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) } cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols); - free(pTableQueryInfo); } #define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \ @@ -3322,7 +3368,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) /** * set output buffer for different group - * TODO opt performance if current group is identical to previous group * @param pRuntimeEnv * @param pDataBlockInfo */ @@ -3333,7 +3378,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { // lastKey needs to be updated pTableQueryInfo->lastKey = nextKey; - setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); + + if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) { + setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); + } if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { return; @@ -3522,7 +3570,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t totalSubset = 0; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { + if (pQInfo->runtimeEnv.groupbyNormalCol || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); } else { totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo); @@ -3653,7 +3701,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo * SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1; - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); @@ -3695,7 +3743,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) { } else { // there are results waiting for returned to client. if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) && - (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) && + (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) && (pRuntimeEnv->windowResInfo.size > 0)) { return true; } @@ -4100,6 +4148,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pRuntimeEnv->cur.vgroupIndex = -1; pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; + pRuntimeEnv->groupbyNormalCol = isGroupbyNormalCol(pQuery->pGroupbyExpr); if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; @@ -4124,7 +4173,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo if (pQuery->intervalTime == 0) { int16_t type = TSDB_DATA_TYPE_NULL; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; + if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags; type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); } else { type = TSDB_DATA_TYPE_INT; // group id @@ -4133,7 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); } - } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { + } else if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) { int32_t rows = getInitialPageNum(pQInfo); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); if (code != TSDB_CODE_SUCCESS) { @@ -4141,7 +4190,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo } int16_t type = TSDB_DATA_TYPE_NULL; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (pRuntimeEnv->groupbyNormalCol) { type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); } else { type = TSDB_DATA_TYPE_TIMESTAMP; @@ -4159,8 +4208,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo // todo refactor pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -4201,7 +4251,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { SDataStatis *pStatis = NULL; SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); - if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (!pRuntimeEnv->groupbyNormalCol) { if (!isIntervalQuery(pQuery)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step); @@ -4233,9 +4283,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb); - STableId id = tsdbGetTableId(pCheckInfo->pTable); + STableId* id = TSDB_TABLEID(pCheckInfo->pTable); qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, - id.uid, id.tid, pCheckInfo->lastKey, pCheckInfo->win.ekey); + id->uid, id->tid, pCheckInfo->lastKey, pCheckInfo->win.ekey); STsdbQueryCond cond = { .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, @@ -4364,7 +4414,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { break; } } - } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query + } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query while (pQInfo->groupIndex < numOfGroups) { SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); @@ -4502,11 +4552,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { */ pQInfo->tableIndex++; - STableIdInfo tidInfo; - STableId id = tsdbGetTableId(pQuery->current->pTable); + STableIdInfo tidInfo = {0}; - tidInfo.uid = id.uid; - tidInfo.tid = id.tid; + STableId* id = TSDB_TABLEID(pQuery->current->pTable); + tidInfo.uid = id->uid; + tidInfo.tid = id->tid; tidInfo.key = pQuery->current->lastKey; taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); @@ -4680,7 +4730,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { return; } - if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { + if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) { if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); @@ -4777,10 +4827,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->current->lastKey, pQuery->window.ekey); } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { STableIdInfo tidInfo; - STableId id = tsdbGetTableId(pQuery->current); + STableId* id = TSDB_TABLEID(pQuery->current); - tidInfo.uid = id.uid; - tidInfo.tid = id.tid; + tidInfo.uid = id->uid; + tidInfo.tid = id->tid; tidInfo.key = pQuery->current->lastKey; taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); } @@ -4870,7 +4920,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { } // all data scanned, the group by normal column can return - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result + if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); @@ -4931,7 +4981,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { STableQueryInfo* item = taosArrayGetP(g, 0); // group by normal column, sliding window query, interval query are handled by interval query processor - if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) + if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation) tableIntervalProcess(pQInfo, item); } else if (isFixedOutputQuery(pQuery)) { tableFixedOutputProcess(pQInfo, item); @@ -4946,18 +4996,19 @@ static void tableQueryImpl(SQInfo *pQInfo) { } static void stableQueryImpl(SQInfo *pQInfo) { - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery *pQuery = pRuntimeEnv->pQuery; pQuery->rec.rows = 0; int64_t st = taosGetTimestampUs(); - if (isIntervalQuery(pQuery) || - (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && + if (QUERY_IS_INTERVAL_QUERY(pQuery) || + (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol && !isFirstLastRowQuery(pQuery))) { multiTableQueryProcess(pQInfo); } else { assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || - isFirstLastRowQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); + isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol); sequentialTableProcess(pQInfo); } @@ -5652,28 +5703,33 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, STimeWindow window = pQueryMsg->window; taosArraySort(pTableIdList, compareTableIdInfo); + // TODO optimize the STableQueryInfo malloc strategy + pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); + int32_t index = 0; + for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); - size_t s = taosArrayGetSize(pa); + size_t s = taosArrayGetSize(pa); SArray* p1 = taosArrayInit(s, POINTER_BYTES); for(int32_t j = 0; j < s; ++j) { void* pTable = taosArrayGetP(pa, j); + STableId* id = TSDB_TABLEID(pTable); - // NOTE: compare STableIdInfo with STableId - STableId id = tsdbGetTableId(pTable); - STableIdInfo* pTableId = taosArraySearch(pTableIdList, &id, compareTableIdInfo); + STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo); if (pTableId != NULL ) { window.skey = pTableId->key; } else { window.skey = pQueryMsg->window.skey; } - STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window); + void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo); + STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window, buf); item->groupIndex = i; taosArrayPush(p1, &item); - taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES); + taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES); + index += 1; } taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); @@ -5817,6 +5873,7 @@ static void freeQInfo(SQInfo *pQInfo) { taosArrayDestroy(p); } + tfree(pQInfo->pBuf); taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList); taosHashCleanup(pQInfo->tableqinfoGroupInfo.map); tsdbDestoryTableGroup(&pQInfo->tableGroupInfo); @@ -6093,7 +6150,8 @@ void qTableQuery(qinfo_t qinfo) { return; } - int32_t ret = setjmp(pQInfo->env); + int32_t ret = setjmp(pQInfo->runtimeEnv.env); + // error occurs, record the error code and return to client if (ret != TSDB_CODE_SUCCESS) { pQInfo->code = ret; @@ -6276,13 +6334,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) { varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); output = varDataVal(output); - STableId id = tsdbGetTableId(item->pTable); + STableId* id = TSDB_TABLEID(item->pTable); - *(int64_t *)output = id.uid; // memory align problem, todo serialize - output += sizeof(id.uid); + *(int64_t *)output = id->uid; // memory align problem, todo serialize + output += sizeof(id->uid); - *(int32_t *)output = id.tid; - output += sizeof(id.tid); + *(int32_t *)output = id->tid; + output += sizeof(id->tid); *(int32_t *)output = pQInfo->vgId; output += sizeof(pQInfo->vgId); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index d7725f561d5accc556983bac1061d0c66d9bbca0..9541f283f0bf65abeedc4abf3e2c42d25cb83c5e 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun pWindowResInfo->threshold = threshold; pWindowResInfo->type = type; - _hash_fn_t fn = taosGetDefaultHashFunction(type); pWindowResInfo->hashList = taosHashInit(threshold, fn, false); @@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { if (pWindowRes == NULL) { return; } - + + // TODO opt malloc strategy for (int32_t i = 0; i < nOutputCols; ++i) { free(pWindowRes->resultInfo[i].interResultBuf); } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 3ae88c3141eac390b5e8b65a7bdc8afc0c167a41..e1bed0125518e038f6e328c50462486babd38b8a 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -47,9 +47,9 @@ extern int tsdbDebugFlag; // Definitions // ------------------ tsdbMeta.c typedef struct STable { + STableId tableId; ETableType type; tstr* name; // NOTE: there a flexible string here - STableId tableId; uint64_t suid; struct STable* pSuper; // super table pointer uint8_t numOfSchemas; @@ -294,14 +294,36 @@ typedef struct { #define TABLE_SUID(t) (t)->suid #define TABLE_LASTKEY(t) (t)->lastKey +static FORCE_INLINE STSchema *tsdbGetTableSchema(STable *pTable) { + if (pTable->type == TSDB_CHILD_TABLE) { // check child table first + STable *pSuper = pTable->pSuper; + if (pSuper == NULL) return NULL; + return pSuper->schema[pSuper->numOfSchemas - 1]; + } else if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { + return pTable->schema[pTable->numOfSchemas - 1]; + } else { + return NULL; + } +} + +static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { + if (pTable->type == TSDB_CHILD_TABLE) { // check child table first + STable *pSuper = pTable->pSuper; + if (pSuper == NULL) return NULL; + return pSuper->tagSchema; + } else if (pTable->type == TSDB_SUPER_TABLE) { + return pTable->tagSchema; + } else { + return NULL; + } +} + STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); void tsdbFreeMeta(STsdbMeta* pMeta); int tsdbOpenMeta(STsdbRepo* pRepo); int tsdbCloseMeta(STsdbRepo* pRepo); -STSchema* tsdbGetTableSchema(STable* pTable); STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); -STSchema* tsdbGetTableTagSchema(STable* pTable); int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg); int tsdbWLockRepoMeta(STsdbRepo* pRepo); int tsdbRLockRepoMeta(STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index cbbf51d862686c1b36ed36b91af5528016276ea6..ad25b7f675a3cdf0e3fce0a5b33e6a4bc941e3d1 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -449,18 +449,6 @@ int tsdbCloseMeta(STsdbRepo *pRepo) { return 0; } -STSchema *tsdbGetTableSchema(STable *pTable) { - if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { - return pTable->schema[pTable->numOfSchemas - 1]; - } else if (pTable->type == TSDB_CHILD_TABLE) { - STable *pSuper = pTable->pSuper; - if (pSuper == NULL) return NULL; - return pSuper->schema[pSuper->numOfSchemas - 1]; - } else { - return NULL; - } -} - STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid)); @@ -480,18 +468,6 @@ STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) { return *(STSchema **)ptr; } -STSchema *tsdbGetTableTagSchema(STable *pTable) { - if (pTable->type == TSDB_SUPER_TABLE) { - return pTable->tagSchema; - } else if (pTable->type == TSDB_CHILD_TABLE) { - STable *pSuper = pTable->pSuper; - if (pSuper == NULL) return NULL; - return pSuper->tagSchema; - } else { - return NULL; - } -} - int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) { // TODO: this function can only be called when there is no query and commit on this table ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);