diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 4d377f8a2d068a0fab13ddaad609476ad93e4882..6fb8df24441907fde789748bfeeaa6a6eb67fac3 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -1691,10 +1691,7 @@ static void last_function(SQLFunctionCtx *pCtx) { } static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { - if (pCtx->order == TSDB_ORDER_ASC) { - return; - } - + assert(pCtx->order != TSDB_ORDER_ASC); void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; @@ -2912,7 +2909,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { } static void date_col_output_function(SQLFunctionCtx *pCtx) { - if (pCtx->scanFlag == SUPPLEMENTARY_SCAN) { + if (pCtx->scanFlag == REVERSE_SCAN) { return; } diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 906dadb31798b2a48b0ebb142c613511a49dee65..2088e5a49ddd4d249effdc915234c70efff961cc 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -110,6 +110,14 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct SWindowResInfo windowResInfo; } STableQueryInfo; +typedef struct SQueryCostSummary { +} SQueryCostSummary; + +typedef struct SGroupItem { + STableId id; + STableQueryInfo* info; +} SGroupItem; + typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; @@ -131,17 +139,15 @@ typedef struct SQuery { SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* defaultVal; - TSKEY lastKey; +// TSKEY lastKey; uint32_t status; // query status SResultRec rec; int32_t pos; SData** sdata; + STableQueryInfo* current; SSingleColumnFilterInfo* pFilterInfo; } SQuery; -typedef struct SQueryCostSummary { -} SQueryCostSummary; - typedef struct SQueryRuntimeEnv { SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SQuery* pQuery; diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index a92f401323ad4d63cf9141d9fb7c0cdb569f6628..a11c03f2c0da8db902d6f24ba70fea56b37385f2 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -104,7 +104,7 @@ extern "C" { enum { MASTER_SCAN = 0x0u, - SUPPLEMENTARY_SCAN = 0x1u, + REVERSE_SCAN = 0x1u, REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan FIRST_STAGE_MERGE = 0x10u, SECONDARY_STAGE_MERGE = 0x20u, diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 71b1b0fc68895358b7f411d730cebde3f15f42a5..8a31b02ddafcae9c1cd78126ee96706b0c263259 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -40,9 +40,9 @@ #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) -#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN) +#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) -#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN) +#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv))) @@ -96,11 +96,6 @@ typedef struct { STSCursor cur; } SQueryStatusInfo; -typedef struct SGroupItem { - STableId id; - STableQueryInfo* info; -} SGroupItem; - static void setQueryStatus(SQuery *pQuery, int8_t status); static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } @@ -121,7 +116,7 @@ static bool hasMainOutput(SQuery *pQuery); static void createTableQueryInfo(SQInfo *pQInfo); static void buildTagQueryResult(SQInfo *pQInfo); -static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTaleId, STableQueryInfo *pTableQueryInfo); +static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo); static int32_t flushFromResultBuf(SQInfo *pQInfo); bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) { @@ -621,8 +616,8 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, } // query completed - if ((lastKey >= pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (lastKey <= pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + if ((lastKey >= pQuery->current->win.ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (lastKey <= pQuery->current->win.ekey && !QUERY_IS_ASC_QUERY(pQuery))) { closeAllTimeWindow(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; @@ -662,22 +657,22 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, setQueryStatus(pQuery, QUERY_RESBUF_FULL); } - qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n); + qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, n); } assert(pWindowResInfo->prevSKey != 0); } static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, - int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, - bool updateLastKey) { + int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { assert(startPos >= 0 && startPos < pDataBlockInfo->rows); int32_t num = -1; int32_t order = pQuery->order.order; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); + STableQueryInfo* item = pQuery->current; + if (QUERY_IS_ASC_QUERY(pQuery)) { if (ekey < pDataBlockInfo->window.ekey) { num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn); @@ -685,13 +680,13 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo assert(ekey < pPrimaryColumn[startPos]); } else { if (updateLastKey) { - pQuery->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; + item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; } } } else { num = pDataBlockInfo->rows - startPos; if (updateLastKey) { - pQuery->lastKey = pDataBlockInfo->window.ekey + step; + item->lastKey = pDataBlockInfo->window.ekey + step; } } } else { // desc @@ -701,13 +696,13 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo assert(ekey > pPrimaryColumn[startPos]); } else { if (updateLastKey) { - pQuery->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; + item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; } } } else { num = startPos + 1; if (updateLastKey) { - pQuery->lastKey = pDataBlockInfo->window.skey + step; + item->lastKey = pDataBlockInfo->window.skey + step; } } } @@ -906,8 +901,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * SQuery * pQuery = pRuntimeEnv->pQuery; SColumnInfoData *pColInfo = NULL; - TSKEY * primaryKeyCol = NULL; - + + TSKEY *primaryKeyCol = NULL; if (pDataBlock != NULL) { pColInfo = taosArrayGet(pDataBlock, 0); primaryKeyCol = (TSKEY *)(pColInfo->pData); @@ -1097,15 +1092,20 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) { SResultInfo *pResInfo = GET_RES_INFO(pCtx); - + SQuery* pQuery = pRuntimeEnv->pQuery; + if (pResInfo->complete || functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { return false; } + if (functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST) { + return !QUERY_IS_ASC_QUERY(pQuery); + } else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) { + return QUERY_IS_ASC_QUERY(pQuery); + } + // in the supplementary scan, only the following functions need to be executed - if (IS_REVERSE_SCAN(pRuntimeEnv) && - !(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST || - functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) { + if (IS_REVERSE_SCAN(pRuntimeEnv)) {// && (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) { return false; } @@ -1117,9 +1117,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery *pQuery = pRuntimeEnv->pQuery; + STableQueryInfo* item = pQuery->current; + TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; - - bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); + bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); int16_t type = 0; @@ -1253,7 +1254,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } - pQuery->lastKey = primaryKeyCol[offset] + step; + item->lastKey = primaryKeyCol[offset] + step; // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -1270,7 +1271,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + + STableQueryInfo* pTableQInfo = pQuery->current; + SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); @@ -1279,7 +1282,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl } TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; - pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); @@ -1745,73 +1748,6 @@ static UNUSED_FUNC bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pP } } -// TODO refactor code, the best way to implement the last_row is utilizing the iterator -bool normalizeUnBoundLastRowQuery(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter) { -#if 0 - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj *pMeterObj = pRuntimeEnv->pTabObj; - - assert(!QUERY_IS_ASC_QUERY(pQuery) && notHasQueryTimeRange(pQuery)); - __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; - - TSKEY lastKey = -1; - - pQuery->fileId = -1; - vnodeFreeFieldsEx(pRuntimeEnv); - - // keep in-memory cache status in local variables in case that it may be changed by write operation - getBasicCacheInfoSnapshot(pQuery, pMeterObj->pCache, pMeterObj->vnode); - - SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; - if (pCacheInfo != NULL && pCacheInfo->cacheBlocks != NULL && pQuery->numOfBlocks > 0) { - pQuery->fileId = -1; - TSKEY key = pMeterObj->lastKey; - - pQuery->window.skey = key; - pQuery->window.ekey = key; - pQuery->lastKey = pQuery->window.skey; - - /* - * cache block may have been flushed to disk, and no data in cache anymore. - * So, copy cache block to local buffer is required. - */ - lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); - if (lastKey < 0) { // data has been flushed to disk, try again search in file - lastKey = getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn); - - if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { - return false; - } - } - } else { // no data in cache, try file - TSKEY key = pMeterObj->lastKeyOnFile; - - pQuery->window.skey = key; - pQuery->window.ekey = key; - pQuery->lastKey = pQuery->window.skey; - - bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn); - if (!ret) { // no data in file, return false; - return false; - } - - lastKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - } - - assert(lastKey <= pQuery->window.skey); - - pQuery->window.skey = lastKey; - pQuery->window.ekey = lastKey; - pQuery->lastKey = pQuery->window.skey; - - return getNeighborPoints(pQInfo, pMeterObj, pPointInterpSupporter); -#endif - - return true; -} - static void setScanLimitationByResultBuffer(SQuery *pQuery) { if (isTopBottomQuery(pQuery)) { pQuery->checkBuffer = 0; @@ -2446,9 +2382,11 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; + STableQueryInfo* pTableQueryInfo = pQuery->current; qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", - GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); + GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey, + pQuery->order.order); TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { @@ -2529,7 +2467,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; closeAllTimeWindow(&pRuntimeEnv->windowResInfo); - removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step); + removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step); pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window } else { assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); @@ -2539,8 +2477,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { return 0; } -static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTableQInfo->lastKey = pQuery->lastKey; } - /* * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer @@ -3338,13 +3274,14 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - + STableQueryInfo* pTableQueryInfo = pQuery->current; + SQueryStatusInfo info = { .status = pQuery->status, .windowIndex = pRuntimeEnv->windowResInfo.curIndex, - .lastKey = pQuery->lastKey, + .lastKey = pTableQueryInfo->lastKey, .w = pQuery->window, - .curWindow = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey}, + .curWindow = {.skey = pTableQueryInfo->lastKey, .ekey = pQuery->window.ekey}, }; return info; @@ -3388,6 +3325,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; + STableQueryInfo* pTableQueryInfo = pQuery->current; SWITCH_ORDER(pQuery->order.order); switchCtxOrder(pRuntimeEnv); @@ -3401,17 +3339,19 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query // during reverse scan - pQuery->lastKey = pStatus->lastKey; + pTableQueryInfo->lastKey = pStatus->lastKey; pQuery->status = pStatus->status; - pQuery->window = pStatus->w; + pTableQueryInfo->win = pStatus->w; } void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { + SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; + STableQueryInfo *pTableQueryInfo = pQuery->current; + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); // store the start query position - SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv); SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); @@ -3422,7 +3362,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (pRuntimeEnv->scanFlag == MASTER_SCAN) { qstatus.status = pQuery->status; - qstatus.curWindow.ekey = pQuery->lastKey - step; + qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step; } if (!needScanDataBlocksAgain(pRuntimeEnv)) { @@ -3457,7 +3397,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } - if (pRuntimeEnv->stableQuery || !needReverseScan(pQuery)) { + if (!needReverseScan(pQuery)) { return; } @@ -3542,12 +3482,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; - - pQuery->window = pTableQueryInfo->win; - pQuery->lastKey = pTableQueryInfo->lastKey; - - assert(((pQuery->lastKey >= pQuery->window.skey) && QUERY_IS_ASC_QUERY(pQuery)) || - ((pQuery->lastKey <= pQuery->window.skey) && !QUERY_IS_ASC_QUERY(pQuery))); + pQuery->current = pTableQueryInfo; + + assert(((pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) && QUERY_IS_ASC_QUERY(pQuery)) || + ((pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) && !QUERY_IS_ASC_QUERY(pQuery))); } /** @@ -3555,8 +3493,10 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) { +void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; + SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; int32_t GROUPRESULTID = 1; @@ -3640,12 +3580,12 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *p * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there * is a previous result generated or not. */ -void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSKEY key) { +void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - + STableQueryInfo *pTableQueryInfo = pQuery->current; + if (pTableQueryInfo->queryRangeSet) { - pQuery->lastKey = key; pTableQueryInfo->lastKey = key; } else { pQuery->window.skey = key; @@ -3682,8 +3622,6 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK pTableQueryInfo->queryRangeSet = 1; pTableQueryInfo->lastKey = pQuery->window.skey; pTableQueryInfo->win.skey = pQuery->window.skey; - - pQuery->lastKey = pQuery->window.skey; } } @@ -3703,7 +3641,9 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) { * 2. if there are top/bottom, first_dst/last_dst functions, we need to load timestamp column in any cases; */ STimeWindow *w = &pDataBlockInfo->window; - bool loadPrimaryTS = (pQuery->lastKey >= w->skey && pQuery->lastKey <= w->ekey) || + STableQueryInfo* pTableQueryInfo = pQuery->current; + + bool loadPrimaryTS = (pTableQueryInfo->lastKey >= w->skey && pTableQueryInfo->lastKey <= w->ekey) || (pQuery->window.ekey >= w->skey && pQuery->window.ekey <= w->ekey) || requireTimestamp(pQuery); return loadPrimaryTS; @@ -3840,7 +3780,6 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo } updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo); - updatelastkey(pQuery, pTableQueryInfo); } bool vnodeHasRemainResults(void *handle) { @@ -4034,10 +3973,12 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; + STableQueryInfo* pTableQueryInfo = pQuery->current; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed - pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step; + pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step; pQuery->limit.offset = 0; return; } @@ -4057,7 +3998,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc TSKEY *keys = (TSKEY *)pColInfoData->pData; // update the offset value - pQuery->lastKey = keys[pQuery->pos]; + pTableQueryInfo->lastKey = keys[pQuery->pos]; pQuery->limit.offset = 0; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); @@ -4076,6 +4017,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->pos = 0; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + STableQueryInfo* pTableQueryInfo = pQuery->current; TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle; while (tsdbNextDataBlock(pQueryHandle)) { @@ -4087,8 +4029,8 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (pQuery->limit.offset > blockInfo.rows) { pQuery->limit.offset -= blockInfo.rows; - pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; - pQuery->lastKey += step; + pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; + pTableQueryInfo->lastKey += step; qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, pQuery->limit.offset); @@ -4117,6 +4059,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { TSKEY skey1, ekey1; STimeWindow w = {0}; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + STableQueryInfo *pTableQueryInfo = pQuery->current; while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); @@ -4162,7 +4105,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { // set the abort info pQuery->pos = startPos; - pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; + pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; pWindowResInfo->prevSKey = tw.skey; int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); @@ -4190,7 +4133,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { // set the abort info pQuery->pos = startPos; - pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; + pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; pWindowResInfo->prevSKey = tw.skey; win = tw; } else { @@ -4211,12 +4154,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool setScanLimitationByResultBuffer(pQuery); changeExecuteScanOrder(pQuery, false); - // dataInCache requires lastKey value - pQuery->lastKey = pQuery->window.skey; - STsdbQueryCond cond = { .twindow = pQuery->window, - .order = pQuery->order.order, + .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; @@ -4229,6 +4169,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); } + + // create the table query support structures + createTableQueryInfo(pQInfo); } pQInfo->tsdb = tsdb; @@ -4322,7 +4265,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool // } // the pQuery->window.skey is changed during normalizedFirstQueryRange, so set the newest lastkey value - pQuery->lastKey = pQuery->window.skey; return TSDB_CODE_SUCCESS; } @@ -4352,7 +4294,6 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { } } -static int32_t xx = 0; static int64_t queryOnDataBlocks(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -4392,31 +4333,18 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) { } } - if (pTableQueryInfo->id.tid == 5) { - int32_t k = 1; - printf("%d\n", k); - } - - - if (!QUERY_IS_ASC_QUERY(pQuery)) { - printf("-------------%d\n", xx++); - } - - if (pTableQueryInfo->id.tid == 5 && !QUERY_IS_ASC_QUERY(pQuery)) { - int32_t k = 1; - printf("%d\n", k); - } assert(pTableQueryInfo != NULL); restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo); SDataStatis *pStatis = NULL; - SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); + + SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); TSKEY nextKey = blockInfo.window.skey; if (!isIntervalQuery(pQuery)) { - setExecutionContext(pQInfo, pTableQueryInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey); + setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey); } else { // interval query - setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey); + setIntervalQueryRange(pQInfo, nextKey); int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); if (ret != TSDB_CODE_SUCCESS) { @@ -4587,8 +4515,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } } else { - createTableQueryInfo(pQInfo); - /* * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. @@ -4621,8 +4547,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex); + pQuery->current = item->info; - STableQueryInfo *pInfo = item->info; if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; @@ -4662,8 +4588,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * to ensure that, we can reset the query range once query on a meter is completed. */ pQInfo->tableIndex++; - pInfo->lastKey = pQuery->lastKey; - // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*|| isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) { @@ -4671,7 +4595,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } else { // forward query range - pQuery->window.skey = pQuery->lastKey; + pQuery->window.skey = pQuery->current->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter if (pQuery->rec.rows == 0) { @@ -4856,12 +4780,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); - // create the query support structures - createTableQueryInfo(pQInfo); - // do check all qualified data blocks int64_t el = queryOnDataBlocks(pQInfo); - qTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start", pQInfo, el); + qTrace("QInfo:%p master scan completed, elapsed time: %lldms, reverse scan start", pQInfo, el); // query error occurred or query is killed, abort current execution if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) { @@ -4912,10 +4833,12 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; * select count(*) from table_name group by status_column; */ -static void tableFixedOutputProcess(SQInfo *pQInfo) { +static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - + + SQuery *pQuery = pRuntimeEnv->pQuery; + pQuery->current = pTableInfo; // set current query table info + scanAllDataBlocks(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv); @@ -4935,10 +4858,12 @@ static void tableFixedOutputProcess(SQInfo *pQInfo) { limitResults(pQInfo); } -static void tableMultiOutputProcess(SQInfo *pQInfo) { +static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - + + SQuery *pQuery = pRuntimeEnv->pQuery; + pQuery->current = pTableInfo; + // for ts_comp query, re-initialized is not allowed if (!isTSCompQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); @@ -4973,15 +4898,15 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) { } qTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64, - pQInfo, pQuery->limit.offset, pQuery->lastKey); + pQInfo, pQuery->limit.offset, pQuery->current->lastKey); resetCtxOutputBuf(pRuntimeEnv); } limitResults(pQInfo); if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { - qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->lastKey, - pQuery->window.ekey); + qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, + pQuery->current->lastKey, pQuery->window.ekey); } if (!isTSCompQuery(pQuery)) { @@ -5021,11 +4946,12 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) { } // handle time interval query on table -static void tableIntervalProcess(SQInfo *pQInfo) { +static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv); - SQuery * pQuery = pRuntimeEnv->pQuery; int32_t numOfInterpo = 0; + SQuery *pQuery = pRuntimeEnv->pQuery; + pQuery->current = pTableInfo; // skip blocks without load the actual data block from file if no filter condition present skipTimeInterval(pRuntimeEnv); @@ -5134,15 +5060,19 @@ static void tableQueryImpl(SQInfo *pQInfo) { // number of points returned during this query pQuery->rec.rows = 0; int64_t st = taosGetTimestampUs(); - + + assert(pQInfo->groupInfo.numOfTables == 1); + SArray* g = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + SGroupItem* item = taosArrayGet(g, 0); + // group by normal column, sliding window query, interval query are handled by interval query processor if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) - tableIntervalProcess(pQInfo); + tableIntervalProcess(pQInfo, item->info); } else if (isFixedOutputQuery(pQuery)) { - tableFixedOutputProcess(pQInfo); + tableFixedOutputProcess(pQInfo, item->info); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBuffer == 1); - tableMultiOutputProcess(pQInfo); + tableMultiOutputProcess(pQInfo, item->info); } // record the total elapsed time @@ -5836,7 +5766,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->pos = -1; pQuery->window = pQueryMsg->window; - pQuery->lastKey = pQuery->window.skey; if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); @@ -5913,9 +5842,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { goto _error; } - - // qTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, - // pQInfo->refCount); + return code; _error: @@ -6088,34 +6015,35 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi if (pQueryMsg->numOfTables <= 0) { qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables); code = TSDB_CODE_INVALID_QUERY_MSG; - goto _query_over; + goto _over; } if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) { qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg); code = TSDB_CODE_INVALID_QUERY_MSG; - goto _query_over; + goto _over; } SExprInfo *pExprs = NULL; if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { - goto _query_over; + goto _over; } SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code); if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { - goto _query_over; + goto _over; } bool isSTableQuery = false; STableGroupInfo groupInfo = {0}; + //todo multitable_query?? if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); STableId *id = taosArrayGet(pTableIdList, 0); if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { - goto _query_over; + goto _over; } } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; @@ -6132,7 +6060,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi numOfGroupByCols); if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; - goto _query_over; + goto _over; } } else { assert(0); @@ -6141,11 +6069,12 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; + goto _over; } code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery); -_query_over: +_over: tfree(tagCond); tfree(tbnameCond); taosArrayDestroy(pTableIdList); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index d41f4fddb3b3ef1b40a59a8cf94e0dcd5fe97f4b..bbc72bd1c7522d0b65834246c705f8c4d8097302 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -85,7 +85,6 @@ STable *tsdbDecodeTable(void *cont, int contLen) { varDataSetLen(pTable->name, len); memcpy(pTable->name->data, ptr, len); - printf("%s\n", pTable->name->data); ptr = (char *)ptr + len; T_READ_MEMBER(ptr, int64_t, pTable->tableId.uid);