From c6b0d6b0f9069d580e64a4b771e28fc94217729d Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 29 Jan 2020 15:29:54 +0800 Subject: [PATCH] support the sliding query [tbase-266] --- src/client/src/tscSQLParser.c | 57 +- src/system/detail/inc/vnode.h | 1 + src/system/detail/inc/vnodeQueryImpl.h | 10 +- src/system/detail/inc/vnodeRead.h | 35 +- src/system/detail/src/vnodeQueryImpl.c | 1437 ++++++++++++++------- src/system/detail/src/vnodeQueryProcess.c | 63 +- src/system/detail/src/vnodeRead.c | 3 + src/util/src/hash.c | 10 +- src/util/src/thashutil.c | 1 + src/util/src/tinterpolation.c | 26 +- 10 files changed, 1060 insertions(+), 583 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f5b1b76607..3407d2fe4b 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -62,7 +62,7 @@ static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQ static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type, char* fieldName); static int32_t changeFunctionID(int32_t optr, int16_t* functionId); -static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isMetric); +static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable); static bool validateIpAddress(const char* ip, size_t size); static bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo); @@ -93,6 +93,8 @@ static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); +static bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo); + static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex); static int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t index, SQuerySQL* pQuerySql, SSqlObj* pSql); @@ -4432,8 +4434,6 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* const char* msg1 = "slimit/soffset only available for STable query"; const char* msg2 = "function not supported on table"; const char* msg3 = "slimit/soffset can not apply to projection query"; - const char* msg4 = "projection on super table requires order by clause along with limitation"; - const char* msg5 = "ordered projection result too large"; // handle the limit offset value, validate the limit pQueryInfo->limit = pQuerySql->limit; @@ -5542,28 +5542,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } } - // set sliding value - SSQLToken* pSliding = &pQuerySql->sliding; - if (pSliding->n != 0) { - if (!tscEmbedded && pCmd->inStream == 0) { // sliding only allowed in stream - const char* msg = "not support sliding in query"; - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); - } - - getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->nSlidingTime); - if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) { - pQueryInfo->nSlidingTime /= 1000; - } - - if (pQueryInfo->nSlidingTime < tsMinSlidingTime) { - return invalidSqlErrMsg(pQueryInfo->msg, msg3); - } - - if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) { - return invalidSqlErrMsg(pQueryInfo->msg, msg4); - } - } - // set order by info if (parseOrderbyClause(pQueryInfo, pQuerySql, tsGetSchema(pMeterMetaInfo->pMeterMeta)) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -5602,6 +5580,30 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { if (!hasTimestampForPointInterpQuery(pQueryInfo)) { return invalidSqlErrMsg(pQueryInfo->msg, msg2); } + + // set sliding value, the query time range needs to be decide in the first place + SSQLToken* pSliding = &pQuerySql->sliding; + if (pSliding->n != 0) { + if (!tscEmbedded && pCmd->inStream == 0 && hasDefaultQueryTimeRange(pQueryInfo)) { // sliding only allowed in stream + const char* msg = "time range expected for sliding window query"; + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); + } + + getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->nSlidingTime); + if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) { + pQueryInfo->nSlidingTime /= 1000; + } + + if (pQueryInfo->nSlidingTime < tsMinSlidingTime) { + return invalidSqlErrMsg(pQueryInfo->msg, msg3); + } + + if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) { + return invalidSqlErrMsg(pQueryInfo->msg, msg4); + } + } else { + pQueryInfo->nSlidingTime = -1; + } // in case of join query, time range is required. if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { @@ -5651,3 +5653,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return TSDB_CODE_SUCCESS; // Does not build query message here } + +bool hasDefaultQueryTimeRange(SQueryInfo *pQueryInfo) { + return (pQueryInfo->stime == 0 && pQueryInfo->etime == INT64_MAX) || + (pQueryInfo->stime == INT64_MAX && pQueryInfo->etime == 0); +} \ No newline at end of file diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 75c0e8cd61..60449de9f5 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -260,6 +260,7 @@ typedef struct SQuery { TSKEY skey; TSKEY ekey; int64_t nAggTimeInterval; + int64_t slidingTime; // sliding time for sliding window query char intervalTimeUnit; // interval data type, used for daytime revise int8_t precision; int16_t numOfOutputCols; diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 7870c0a9ed..3f46d4fb54 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -64,7 +64,7 @@ typedef enum { * the next query. * * this status is only exist in group-by clause and - * diff/add/division/mulitply/ query. + * diff/add/division/multiply/ query. */ QUERY_RESBUF_FULL = 0x2, @@ -149,7 +149,6 @@ void vnodeScanAllData(SQueryRuntimeEnv* pRuntimeEnv); int32_t vnodeQueryResultInterpolate(SQInfo* pQInfo, tFilePage** pDst, tFilePage** pDataSrc, int32_t numOfRows, int32_t* numOfInterpo); void copyResToQueryResultBuf(SMeterQuerySupportObj* pSupporter, SQuery* pQuery); -void moveDescOrderResultsToFront(SQueryRuntimeEnv* pRuntimeEnv); void doSkipResults(SQueryRuntimeEnv* pRuntimeEnv); void doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv); @@ -159,7 +158,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj* pSupporter, SQueryRuntimeE void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv); bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySupportObj* pSupporter, - SPointInterpoSupporter* pPointInterpSupporter); + SPointInterpoSupporter* pPointInterpSupporter, int64_t* key); void pointInterpSupporterInit(SQuery* pQuery, SPointInterpoSupporter* pInterpoSupport); void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport); @@ -278,6 +277,11 @@ void displayInterResult(SData** pdata, SQuery* pQuery, int32_t numOfRows); void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter); void clearGroupResultBuf(SOutputRes* pOneOutputRes, int32_t nOutputCols); +void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols); + +void resetResWindowInfo(SSlidingWindowResInfo* pWindowResInfo, int32_t numOfCols); +void clearCompletedResWindows(SSlidingWindowResInfo* pWindowResInfo, int32_t numOfCols); +int32_t numOfResFromResWindowInfo(SSlidingWindowResInfo* pWindowResInfo); #ifdef __cplusplus } diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 7673f770db..eb8fb09bd3 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -112,7 +112,31 @@ typedef struct SQueryFilesInfo { char dbFilePathPrefix[PATH_MAX]; } SQueryFilesInfo; -typedef struct RuntimeEnvironment { +typedef struct STimeWindow { + TSKEY skey; + TSKEY ekey; +} STimeWindow; + +typedef struct SWindowStatus { + STimeWindow window; + bool closed; +} SWindowStatus; + +typedef struct SSlidingWindowResInfo { + SOutputRes* pResult; // reference to SQuerySupporter->pResult + SWindowStatus* pStatus; // current query window closed or not? + void* hashList; // hash list for quick access + int16_t type; // data type for hash key + int32_t capacity; // max capacity + int32_t curIndex; // current start active index + int32_t size; + + int64_t startTime; // start time of the first time window for sliding query + int64_t prevSKey; // previous (not completed) sliding window start key + int64_t threshold; // threshold for return completed results. +} SSlidingWindowResInfo; + +typedef struct SQueryRuntimeEnv { SPositionInfo startPos; /* the start position, used for secondary/third iteration */ SPositionInfo endPos; /* the last access position in query, served as the start pos of reversed order query */ SPositionInfo nextPos; /* start position of the next scan */ @@ -134,13 +158,16 @@ typedef struct RuntimeEnvironment { int16_t scanFlag; // denotes reversed scan of data or not SInterpolationInfo interpoInfo; SData** pInterpoBuf; - SOutputRes* pResult; // reference to SQuerySupporter->pResult - void* hashList; - int32_t usedIndex; // assigned SOutputRes in list + + SSlidingWindowResInfo swindowResInfo; + STSBuf* pTSBuf; STSCursor cur; SQueryCostSummary summary; + TSKEY intervalSKey; // skey of the complete time window, not affected by the actual data distribution + TSKEY intervalEKey; // ekey of the complete time window + /* * Temporarily hold the in-memory cache block info during scan cache blocks * Here we do not use the cacheblock info from pMeterObj, simple because it may change anytime diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9c4f442bbc..4b405bdf6b 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ +#include "hash.h" +#include "hashutil.h" #include "os.h" #include "taosmsg.h" #include "textbuffer.h" #include "ttime.h" -#include "hash.h" -#include "hashutil.h" #include "tinterpolation.h" #include "tscJoinProcess.h" @@ -69,15 +69,15 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj static void setGroupOutputBuffer(SQueryRuntimeEnv *pRuntimeEnv, SOutputRes *pResult); -static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY keyInData, TSKEY skey, TSKEY ekey); +static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY keyInData, TSKEY skey, TSKEY ekey); static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pInfo, SBlockInfo *pBlockInfo, int64_t *pPrimaryCol, SField *pFields, __block_search_fn_t searchFn); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); -static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, - int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, - SField *pFields, __block_search_fn_t searchFn); +static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, int64_t *pPrimaryData, + SBlockInfo *pBlockInfo, int32_t blockStatus, SField *pFields, + __block_search_fn_t searchFn); static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, @@ -86,6 +86,9 @@ static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEn static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); +static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, + int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey); +static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t *skey, int64_t *ekey); // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, @@ -108,8 +111,9 @@ static FORCE_INLINE int32_t getCompHeaderStartPosition(SVnodeCfg *pCfg) { static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *pMeterObj, SCompHeader *pCompHeader, SQueryFilesInfo *pQueryFileInfo, int32_t headerSize) { if (pCompHeader->compInfoOffset < headerSize || pCompHeader->compInfoOffset > pQueryFileInfo->headerFileSize) { - dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%" PRId64 " is not valid, size:%" PRId64, pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, pQueryFileInfo->headerFileSize); + dError("QInfo:%p vid:%d sid:%d id:%s, compInfoOffset:%" PRId64 " is not valid, size:%" PRId64, pQInfo, + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pCompHeader->compInfoOffset, + pQueryFileInfo->headerFileSize); return -1; } @@ -121,8 +125,8 @@ static FORCE_INLINE int32_t validateCompBlockOffset(SQInfo *pQInfo, SMeterObj *p static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const char *filePath, int32_t vid, SCompInfo *compInfo, int64_t offset) { if (!taosCheckChecksumWhole((uint8_t *)compInfo, sizeof(SCompInfo))) { - dLError("QInfo:%p vid:%d, failed to read header file:%s, file compInfo broken, offset:%" PRId64, pQInfo, vid, filePath, - offset); + dLError("QInfo:%p vid:%d, failed to read header file:%s, file compInfo broken, offset:%" PRId64, pQInfo, vid, + filePath, offset); return -1; } return 0; @@ -163,27 +167,27 @@ bool isGroupbyNormalCol(SSqlGroupbyExpr *pGroupbyExpr) { return false; } -int16_t getGroupbyColumnType(SQuery* pQuery, SSqlGroupbyExpr *pGroupbyExpr) { +int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) { assert(pGroupbyExpr != NULL); - + int32_t colId = -2; int16_t type = TSDB_DATA_TYPE_NULL; - - for(int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { + + for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) { SColIndexEx *pColIndex = &pGroupbyExpr->columnInfo[i]; if (pColIndex->flag == TSDB_COL_NORMAL) { colId = pColIndex->colId; break; } } - - for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { if (colId == pQuery->colList[i].data.colId) { type = pQuery->colList[i].data.type; break; } } - + return type; } @@ -236,7 +240,7 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj static void vnodeSetCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex, int32_t sid) { SLoadCompBlockInfo *pCompBlockLoadInfo = &pRuntimeEnv->loadCompBlockInfo; - + pCompBlockLoadInfo->sid = sid; pCompBlockLoadInfo->fileListIndex = fileIndex; pCompBlockLoadInfo->fileId = pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID; @@ -250,7 +254,7 @@ static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, bool loadPrimaryTS) { - SQuery * pQuery = pRuntimeEnv->pQuery; + SQuery * pQuery = pRuntimeEnv->pQuery; SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; /* this block has been loaded into memory, return directly */ @@ -269,7 +273,7 @@ static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj * static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, bool tsLoaded) { - SQuery * pQuery = pRuntimeEnv->pQuery; + SQuery * pQuery = pRuntimeEnv->pQuery; SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; pLoadInfo->fileId = pQuery->fileId; @@ -344,7 +348,7 @@ static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { tclose(pVnodeFilesInfo->headerFd); tclose(pVnodeFilesInfo->dataFd); tclose(pVnodeFilesInfo->lastFd); - + pVnodeFilesInfo->current = -1; pVnodeFilesInfo->headerFileSize = -1; } @@ -363,7 +367,7 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { */ static int32_t doOpenQueryFile(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo) { SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; - + /* * current header file is empty or broken, return directly. * @@ -373,10 +377,10 @@ static int32_t doOpenQueryFile(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo) if (checkIsHeaderFileEmpty(pVnodeFileInfo)) { qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo, pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headerFileSize); - + return -1; } - + pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->headerFd)) { dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno)); @@ -403,7 +407,7 @@ static void doCloseQueryFiles(SQueryFilesInfo *pVnodeFileInfo) { assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0); pVnodeFileInfo->headerFileSize = -1; - + doCloseQueryFileInfoFD(pVnodeFileInfo); } @@ -478,38 +482,38 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim if (ret != TSDB_CODE_SUCCESS) { return -1; // failed to load the header file data into memory } - - char* buf = calloc(1, getCompHeaderSegSize(pCfg)); + + char * buf = calloc(1, getCompHeaderSegSize(pCfg)); SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; - + lseek(pVnodeFileInfo->headerFd, TSDB_FILE_HEADER_LEN, SEEK_SET); read(pVnodeFileInfo->headerFd, buf, getCompHeaderSegSize(pCfg)); - + // check the offset value integrity - if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, buf - TSDB_FILE_HEADER_LEN, - getCompHeaderSegSize(pCfg)) < 0) { + if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, + buf - TSDB_FILE_HEADER_LEN, getCompHeaderSegSize(pCfg)) < 0) { free(buf); return -1; } SCompHeader *compHeader = (SCompHeader *)(buf + sizeof(SCompHeader) * pMeterObj->sid); - + // no data in this file for specified meter, abort if (compHeader->compInfoOffset == 0) { free(buf); return 0; } - + // corrupted file may cause the invalid compInfoOffset, check needs if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, getCompHeaderStartPosition(pCfg)) < 0) { free(buf); return -1; } - + lseek(pVnodeFileInfo->headerFd, compHeader->compInfoOffset, SEEK_SET); - - SCompInfo compInfo = {0}; + + SCompInfo compInfo = {0}; read(pVnodeFileInfo->headerFd, &compInfo, sizeof(SCompInfo)); // check compblock info integrity @@ -539,14 +543,14 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim // prepare buffer to hold compblock data if (pQuery->blockBufferSize != bufferSize) { pQuery->pBlock = realloc(pQuery->pBlock, bufferSize); - pQuery->blockBufferSize = (int32_t) bufferSize; + pQuery->blockBufferSize = (int32_t)bufferSize; } memset(pQuery->pBlock, 0, bufferSize); - + // read data: comp block + checksum read(pVnodeFileInfo->headerFd, pQuery->pBlock, compBlockSize + sizeof(TSCKSUM)); - TSCKSUM checksum = *(TSCKSUM*)((char*)pQuery->pBlock + compBlockSize); + TSCKSUM checksum = *(TSCKSUM *)((char *)pQuery->pBlock + compBlockSize); // check comp block integrity if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, &compInfo, (char *)pQuery->pBlock, @@ -565,7 +569,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim pSummary->totalCompInfoSize += compBlockSize; pSummary->loadCompInfoUs += (et - st); - + free(buf); return pQuery->numOfBlocks; } @@ -816,8 +820,8 @@ static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock // check fields integrity if (!taosCheckChecksumWhole((uint8_t *)(*pField), size)) { - dLError("QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%" PRId64, pQInfo, - pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pVnodeFilesInfo->dataFilePath, + dLError("QInfo:%p vid:%d sid:%d id:%s, slot:%d, failed to read sfields, file:%s, sfields area broken:%" PRId64, + pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pVnodeFilesInfo->dataFilePath, pBlock->offset); return -1; } @@ -870,20 +874,21 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR int32_t status = vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); if (status == DISK_BLOCK_NO_NEED_TO_LOAD) { - dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d," + dTrace( + "QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d," " brange:%lld-%lld, rows:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, loadPrimaryCol, pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); - + if (loadSField && (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL)) { loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); } - + return TSDB_CODE_SUCCESS; } else if (status == DISK_BLOCK_LOAD_TS) { dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, incrementally load ts", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId); - + assert(PRIMARY_TSCOL_LOADED(pQuery) == false && loadSField == true); if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); @@ -891,7 +896,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR // load primary timestamp int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes); - + vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); return ret; } @@ -988,7 +993,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR } // todo ignore the blockType, pass the pQuery into this function -SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void *pBlock, int32_t blockType) { +SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_t blockType) { SBlockInfo blockInfo = {0}; if (IS_FILE_BLOCK(blockType)) { SCompBlock *pDiskBlock = (SCompBlock *)pBlock; @@ -1034,7 +1039,7 @@ static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQueryRuntim */ static bool queryCompleteInBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { - assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0); + // assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0); assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pBlockInfo->size) || (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0)); @@ -1071,20 +1076,20 @@ void savePointPosition(SPositionInfo *position, int32_t fileId, int32_t slot, in position->pos = pos; } -bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj) { +bool isCacheBlockValid(SQuery *pQuery, SCacheBlock *pBlock, SMeterObj *pMeterObj) { if (pMeterObj != pBlock->pMeterObj || pBlock->blockId > pQuery->blockId) { - - SMeterObj* pNewMeterObj = pBlock->pMeterObj; - char* id = (pNewMeterObj != NULL)? pNewMeterObj->meterId:NULL; - - dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, " - "blockMeterObj:%p, blockMeter id:%s, first:%d, last:%d, numOfBlocks:%d", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, - pQuery->blockId, pMeterObj, pNewMeterObj, id, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks); - + SMeterObj *pNewMeterObj = pBlock->pMeterObj; + char * id = (pNewMeterObj != NULL) ? pNewMeterObj->meterId : NULL; + + dWarn( + "QInfo:%p vid:%d sid:%d id:%s, cache block is overwritten, slot:%d blockId:%d qBlockId:%d, meterObj:%p, " + "blockMeterObj:%p, blockMeter id:%s, first:%d, last:%d, numOfBlocks:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pBlock->blockId, + pQuery->blockId, pMeterObj, pNewMeterObj, id, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks); + return false; } - + /* * The check for empty block: * pBlock->numOfPoints == 0. There is a empty block, which is caused by allocate-and-write data into cache @@ -1092,36 +1097,40 @@ bool isCacheBlockValid(SQuery* pQuery, SCacheBlock* pBlock, SMeterObj* pMeterObj * block(newly allocated block), abort query. Otherwise, skip it and go on. */ if (pBlock->numOfPoints == 0) { - dWarn("QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d," - "allocated but not write data yet.", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, - pMeterObj->meterId, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot, pQuery->numOfBlocks); - + dWarn( + "QInfo:%p vid:%d sid:%d id:%s, cache block is empty. slot:%d first:%d, last:%d, numOfBlocks:%d," + "allocated but not write data yet.", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->slot, pQuery->firstSlot, + pQuery->currentSlot, pQuery->numOfBlocks); + return false; } - + return true; } // todo all functions that call this function should check the returned data blocks status -SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot) { - SQuery* pQuery = pRuntimeEnv->pQuery; - +SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeEnv, int32_t slot) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; if (pCacheInfo == NULL || pCacheInfo->cacheBlocks == NULL || slot < 0 || slot >= pCacheInfo->maxBlocks) { return NULL; } getBasicCacheInfoSnapshot(pQuery, pCacheInfo, pMeterObj->vnode); - + SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot]; - if (pBlock == NULL) { // the cache info snapshot must be existed. + if (pBlock == NULL) { // the cache info snapshot must be existed. int32_t curNumOfBlocks = pCacheInfo->numOfBlocks; int32_t curSlot = pCacheInfo->currentSlot; - - dError("QInfo:%p NULL Block In Cache, snapshot (available blocks:%d, last block:%d), current (available blocks:%d, " - "last block:%d), accessed null block:%d, pBlockId:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, - pQuery->currentSlot, curNumOfBlocks, curSlot, slot, pQuery->blockId); - + + dError( + "QInfo:%p NULL Block In Cache, snapshot (available blocks:%d, last block:%d), current (available blocks:%d, " + "last block:%d), accessed null block:%d, pBlockId:%d", + GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->currentSlot, curNumOfBlocks, curSlot, slot, + pQuery->blockId); + return NULL; } @@ -1129,20 +1138,21 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE if (!isCacheBlockValid(pQuery, pBlock, pMeterObj)) { return NULL; } - - //the accessed cache block has been loaded already, return directly + + // the accessed cache block has been loaded already, return directly if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD) { TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, 0); TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pBlock, pBlock->numOfPoints - 1); - - dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, cache block has been loaded, no need to load again, ts:%d, " - "slot:%d, brange:%lld-%lld, rows:%d", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, - pQuery->slot, skey, ekey, pBlock->numOfPoints); - + + dTrace( + "QInfo:%p vid:%d sid:%d id:%s, fileId:%d, cache block has been loaded, no need to load again, ts:%d, " + "slot:%d, brange:%lld-%lld, rows:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, pQuery->slot, + skey, ekey, pBlock->numOfPoints); + return &pRuntimeEnv->cacheBlock; } - + // keep the structure as well as the block data into local buffer memcpy(&pRuntimeEnv->cacheBlock, pBlock, sizeof(SCacheBlock)); @@ -1153,78 +1163,81 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv* pRuntimeE int32_t numOfPoints = pNewBlock->numOfPoints; if (pQuery->firstSlot == pQuery->commitSlot) { assert(pQuery->commitPoint >= 0 && pQuery->commitPoint <= pNewBlock->numOfPoints); - + offset = pQuery->commitPoint; numOfPoints = pNewBlock->numOfPoints - offset; - + if (offset != 0) { - dTrace("%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. " - "first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, - pQuery->firstSlot, pQuery->currentSlot); + dTrace( + "%p ignore the data in cache block that are commit already, numOfblock:%d slot:%d ignore points:%d. " + "first:%d last:%d", + GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->commitPoint, pQuery->firstSlot, + pQuery->currentSlot); } - + pNewBlock->numOfPoints = numOfPoints; - + // current block are all commit already, ignore it if (pNewBlock->numOfPoints == 0) { - dTrace("%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d" - "first:%d last:%d", GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, - pQuery->firstSlot, pQuery->currentSlot); + dTrace( + "%p ignore current in cache block that are all commit already, numOfblock:%d slot:%d" + "first:%d last:%d", + GET_QINFO_ADDR(pQuery), pQuery->numOfBlocks, pQuery->slot, pQuery->firstSlot, pQuery->currentSlot); return NULL; } } - + // keep the data from in cache into the temporarily allocated buffer - for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { SColumnInfoEx *pColumnInfoEx = &pQuery->colList[i]; - + int16_t columnIndex = pColumnInfoEx->colIdx; int16_t columnIndexInBuf = pColumnInfoEx->colIdxInBuf; - - SColumn* pCol = &pMeterObj->schema[columnIndex]; - + + SColumn *pCol = &pMeterObj->schema[columnIndex]; + int16_t bytes = pCol->bytes; int16_t type = pCol->type; - - char* dst = pRuntimeEnv->colDataBuffer[columnIndexInBuf]->data; - + + char *dst = pRuntimeEnv->colDataBuffer[columnIndexInBuf]->data; + if (pQuery->colList[i].colIdx != -1) { assert(pCol->colId == pQuery->colList[i].data.colId && bytes == pColumnInfoEx->data.bytes && - type == pColumnInfoEx->data.type); - + type == pColumnInfoEx->data.type); + memcpy(dst, pBlock->offset[columnIndex] + offset * bytes, numOfPoints * bytes); } else { setNullN(dst, type, bytes, numOfPoints); } } - assert(numOfPoints == pNewBlock->numOfPoints); + assert(numOfPoints == pNewBlock->numOfPoints); // if the primary timestamp are not loaded by default, always load it here into buffer - if(!PRIMARY_TSCOL_LOADED(pQuery)) { - memcpy(pRuntimeEnv->primaryColBuffer->data, pBlock->offset[0] + offset * TSDB_KEYSIZE, TSDB_KEYSIZE*numOfPoints); + if (!PRIMARY_TSCOL_LOADED(pQuery)) { + memcpy(pRuntimeEnv->primaryColBuffer->data, pBlock->offset[0] + offset * TSDB_KEYSIZE, TSDB_KEYSIZE * numOfPoints); } - + pQuery->fileId = -1; pQuery->slot = slot; - + if (!isCacheBlockValid(pQuery, pNewBlock, pMeterObj)) { return NULL; } - + /* * the accessed cache block still belongs to current meterObj, go on * update the load data block info */ vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, -1, true); - + TSKEY skey = getTimestampInCacheBlock(pRuntimeEnv, pNewBlock, 0); TSKEY ekey = getTimestampInCacheBlock(pRuntimeEnv, pNewBlock, numOfPoints - 1); - + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, load cache block, ts:%d, slot:%d, brange:%lld-%lld, rows:%d", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, - pQuery->slot, skey, ekey, numOfPoints); - + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, 1, pQuery->slot, + skey, ekey, numOfPoints); + return pNewBlock; } @@ -1234,8 +1247,8 @@ static SCompBlock *getDiskDataBlock(SQuery *pQuery, int32_t slot) { } static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeEnv, int32_t slot) { - SQuery* pQuery = pRuntimeEnv->pQuery; - + SQuery *pQuery = pRuntimeEnv->pQuery; + if (IS_DISK_DATA_BLOCK(pQuery)) { return getDiskDataBlock(pQuery, slot); } else { @@ -1378,9 +1391,9 @@ static bool hasNullVal(SQuery *pQuery, int32_t col, SBlockInfo *pBlockInfo, SFie return ret; } -static char *doGetDataBlocks(SQuery* pQuery, SData** data, int32_t colIdx) { +static char *doGetDataBlocks(SQuery *pQuery, SData **data, int32_t colIdx) { assert(colIdx >= 0 && colIdx < pQuery->numOfCols); - char* pData = data[colIdx]->data; + char *pData = data[colIdx]->data; return pData; } @@ -1409,7 +1422,7 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa sas->elemSize[i] = pColMsg->bytes; sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset } - + sas->numOfCols = pQuery->numOfCols; sas->offset = 0; } else { // other type of query function @@ -1443,8 +1456,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t SField *pFields, SBlockInfo *pBlockInfo) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; - - bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); + + bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); int64_t prevNumOfRes = getNumOfResult(pRuntimeEnv); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutputCols, sizeof(SArithmeticSupport)); @@ -1456,7 +1469,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, forwardStep); - + SField *tpField = NULL; if (pFields != NULL) { @@ -1471,12 +1484,12 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } } - TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; + TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey; - int64_t alignedTimestamp = - taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); - setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, - tpField, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); + // int64_t alignedTimestamp = + // taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); + setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, + pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } /* @@ -1529,7 +1542,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) { continue; } - + // all points in current column are NULL, no need to check its boundary value if (pField[colIndex].numOfNullPoints == numOfTotalPoints) { continue; @@ -1564,35 +1577,201 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * return true; } -static int32_t setGroupResultForKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { - SOutputRes *pOutputRes = NULL; +static SOutputRes *getResWindow(SSlidingWindowResInfo *pWindowResInfo, char *pData, int16_t bytes, + SWindowStatus **pStatus) { + int32_t p = -1; + + int32_t *p1 = (int32_t *)taosGetDataFromHash(pWindowResInfo->hashList, pData, bytes); + if (p1 != NULL) { + p = *p1; + + pWindowResInfo->curIndex = p; + if (pStatus != NULL) { + *pStatus = &pWindowResInfo->pStatus[p]; + } + } else { // more than the capacity, reallocate the resources + if (pWindowResInfo->size >= pWindowResInfo->capacity) { + int64_t newCap = pWindowResInfo->capacity * 2; + + char *t = realloc(pWindowResInfo->pStatus, newCap * sizeof(SWindowStatus)); + if (t != NULL) { + pWindowResInfo->pStatus = (SWindowStatus *)t; + memset(&pWindowResInfo->pStatus[pWindowResInfo->capacity], 0, sizeof(SWindowStatus) * pWindowResInfo->capacity); + } else { + // todo + } + + pWindowResInfo->capacity = newCap; + } + + // add a new result set for a new group + if (pStatus != NULL) { + *pStatus = &pWindowResInfo->pStatus[pWindowResInfo->size]; + } + + p = pWindowResInfo->size; + pWindowResInfo->curIndex = pWindowResInfo->size; + + pWindowResInfo->size += 1; + taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + } + + return &pWindowResInfo->pResult[p]; +} + +static int32_t initResWindowInfo(SSlidingWindowResInfo *pWindowResInfo, int32_t threshold, int16_t type, + SOutputRes *pRes) { + pWindowResInfo->capacity = threshold; + pWindowResInfo->threshold = threshold; + + pWindowResInfo->type = type; - // ignore the null value - if (isNull(pData, type)) { + _hash_fn_t fn = taosGetDefaultHashFunction(type); + pWindowResInfo->hashList = taosInitHashTable(threshold, fn, false); + + pWindowResInfo->curIndex = -1; + pWindowResInfo->size = 0; + pWindowResInfo->pResult = pRes; + pWindowResInfo->pStatus = calloc(threshold, sizeof(SWindowStatus)); + + if (pWindowResInfo->pStatus == NULL || pWindowResInfo->hashList == NULL) { return -1; } - SOutputRes **p1 = (SOutputRes **)taosGetDataFromHash(pRuntimeEnv->hashList, pData, bytes); - if (p1 != NULL) { - pOutputRes = *p1; - } else { // more than the threshold number, discard data that are not belong to current groups - if (pRuntimeEnv->usedIndex >= 10000) { - return -1; + return TSDB_CODE_SUCCESS; +} + +static void destroyResWindowInfo(SSlidingWindowResInfo *pWindowResInfo) { + if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { + assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); + return; + } + + taosCleanUpHashTable(pWindowResInfo->hashList); + tfree(pWindowResInfo->pStatus); +} + +void resetResWindowInfo(SSlidingWindowResInfo *pWindowResInfo, int32_t numOfCols) { + if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { + return; + } + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SOutputRes *pOneRes = &pWindowResInfo->pResult[i]; + clearGroupResultBuf(pOneRes, numOfCols); + } + + memset(pWindowResInfo->pStatus, 0, sizeof(SWindowStatus) * pWindowResInfo->capacity); + + pWindowResInfo->curIndex = -1; + taosCleanUpHashTable(pWindowResInfo->hashList); + pWindowResInfo->size = 0; + + _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); + pWindowResInfo->hashList = taosInitHashTable(pWindowResInfo->capacity, fn, false); + + pWindowResInfo->startTime = 0; + pWindowResInfo->prevSKey = 0; +} + +void clearCompletedResWindows(SSlidingWindowResInfo *pWindowResInfo, int32_t numOfCols) { + if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { + return; + } + + int32_t i = 0; + for (i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + if (pStatus->closed) { // remove the window slot from hash table + taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE); + } else { + break; } + } - // add a new result set for a new group - pOutputRes = &pRuntimeEnv->pResult[pRuntimeEnv->usedIndex++]; - taosAddToHashTable(pRuntimeEnv->hashList, pData, bytes, (char *)&pOutputRes, POINTER_BYTES); + if (i == 0) { + return; + } + + int32_t remain = pWindowResInfo->size - i; + //clear remain list + memmove(pWindowResInfo->pStatus, &pWindowResInfo->pStatus[i], remain * sizeof(SWindowStatus)); + memset(&pWindowResInfo->pStatus[remain], 0, (pWindowResInfo->capacity - remain) * sizeof(SWindowStatus)); + + for(int32_t k = 0; k < remain; ++k) { + copyGroupResultBuf(&pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k], numOfCols); + } + + for(int32_t k = remain; k < pWindowResInfo->size; ++k) { + SOutputRes *pOneRes = &pWindowResInfo->pResult[k]; + clearGroupResultBuf(pOneRes, numOfCols); + } + + pWindowResInfo->size = remain; + + for(int32_t k = 0; k < pWindowResInfo->size; ++k) { + SWindowStatus* pStatus = &pWindowResInfo->pStatus[k]; + int32_t *p = (int32_t*) taosGetDataFromHash(pWindowResInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE); + int32_t v = *p; + v = (v - i); + + taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE); + + taosAddToHashTable(pWindowResInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE, + (char *)&v, sizeof(int32_t)); + } + + pWindowResInfo->curIndex = -1; +} + +int32_t numOfResFromResWindowInfo(SSlidingWindowResInfo *pWindowResInfo) { + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + if (pStatus->closed == false) { + return i; + } + } +} + +static SWindowStatus* getCurrentSWindow(SSlidingWindowResInfo *pWindowResInfo) { + return &pWindowResInfo->pStatus[pWindowResInfo->curIndex]; +} + +static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { + if (isNull(pData, type)) { // ignore the null value + return -1; + } + + SOutputRes *pOutputRes = getResWindow(&pRuntimeEnv->swindowResInfo, pData, bytes, NULL); + if (pOutputRes == NULL) { + return -1; + } + + setGroupOutputBuffer(pRuntimeEnv, pOutputRes); + initCtxOutputBuf(pRuntimeEnv); + + return TSDB_CODE_SUCCESS; +} + +static int32_t setSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, int64_t skey, int64_t ekey) { + int64_t st = skey; + + SWindowStatus *pStatus = NULL; + SOutputRes * pOutputRes = getResWindow(&pRuntimeEnv->swindowResInfo, (char *)&st, TSDB_KEYSIZE, &pStatus); + if (pOutputRes == NULL) { + return -1; } + pStatus->window = (STimeWindow){.skey = skey, .ekey = ekey}; + setGroupOutputBuffer(pRuntimeEnv, pOutputRes); initCtxOutputBuf(pRuntimeEnv); return TSDB_CODE_SUCCESS; } -static char *getGroupbyColumnData(SQuery* pQuery, SData** data, int16_t* type, int16_t* bytes) { - char * groupbyColumnData = NULL; +static char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, int16_t *bytes) { + char *groupbyColumnData = NULL; SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; @@ -1600,22 +1779,22 @@ static char *getGroupbyColumnData(SQuery* pQuery, SData** data, int16_t* type, i if (pGroupbyExpr->columnInfo[k].flag == TSDB_COL_TAG) { continue; } - + int16_t colIndex = -1; int32_t colId = pGroupbyExpr->columnInfo[k].colId; - - for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { if (pQuery->colList[i].data.colId == colId) { colIndex = i; break; } } - + assert(colIndex >= 0 && colIndex < pQuery->numOfCols); - + *type = pQuery->colList[colIndex].data.type; *bytes = pQuery->colList[colIndex].data.bytes; - + groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].colIdxInBuf); break; } @@ -1637,7 +1816,8 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { TSKEY key = *(TSKEY *)(pCtx[0].aInputElemBuf + TSDB_KEYSIZE * offset); #if defined(_DEBUG_VIEW) - printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 ", tag:%d, id:%s, query order:%d, ts order:%d, traverse:%d, index:%d\n", + printf("elem in comp ts file:%" PRId64 ", key:%" PRId64 + ", tag:%d, id:%s, query order:%d, ts order:%d, traverse:%d, index:%d\n", elem.ts, key, elem.tag, pRuntimeEnv->pMeterObj->meterId, pQuery->order.order, pRuntimeEnv->pTSBuf->tsOrder, pRuntimeEnv->pTSBuf->cur.order, pRuntimeEnv->pTSBuf->cur.tsIndex); #endif @@ -1666,6 +1846,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return false; } + // in the supplementary scan, only the following functions need to be executed if (!IS_MASTER_SCAN(pRuntimeEnv) && !(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) { @@ -1679,8 +1860,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * SField *pFields, SBlockInfo *pBlockInfo) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; SQuery * pQuery = pRuntimeEnv->pQuery; - - bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); + + bool isDiskFileBlock = IS_FILE_BLOCK(pRuntimeEnv->blockStatus); SData **data = pRuntimeEnv->colDataBuffer; int64_t prevNumOfRes = 0; @@ -1705,13 +1886,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); - - TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->skey : pQuery->ekey; - int64_t alignedTimestamp = - taosGetIntervalStartTimestamp(ts, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); - setExecParams(pQuery, &pCtx[k], alignedTimestamp, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, - pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); + TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey; + setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, + pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } // set the input column data @@ -1735,7 +1913,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order); } - for (int32_t j = 0; j < (*forwardStep); ++j) { + int32_t j = 0; + int64_t lastKey = 0; + + for (j = 0; j < (*forwardStep); ++j) { int32_t offset = GET_COL_DATA_POS(pQuery, j, step); if (pRuntimeEnv->pTSBuf != NULL) { @@ -1754,23 +1935,115 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * continue; } - // decide which group this rows belongs to according to current state value - if (groupbyStateValue) { - char *stateVal = groupbyColumnData + bytes * offset; + // sliding window query + if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { + // decide the time window according to the primary timestamp + int64_t ts = primaryKeyCol[offset]; + int64_t wskey = 0; + int64_t wekey = 0; + + if (pRuntimeEnv->swindowResInfo.curIndex == -1) { + wskey = pRuntimeEnv->swindowResInfo.prevSKey; + wekey = wskey + pQuery->nAggTimeInterval - 1; + } else { + SWindowStatus *pStatus = &pRuntimeEnv->swindowResInfo.pStatus[pRuntimeEnv->swindowResInfo.curIndex]; + if (pStatus->window.skey <= ts && pStatus->window.ekey >= ts) { + wskey = pStatus->window.skey; + wekey = pStatus->window.ekey; + } else { + int64_t st = pStatus->window.skey; + + while (st > ts) { + st -= pQuery->slidingTime; + } + + while ((st + pQuery->nAggTimeInterval - 1) < ts) { + st += pQuery->slidingTime; + } + + wskey = st; + wekey = wskey + pQuery->nAggTimeInterval - 1; + } + } - int32_t ret = setGroupResultForKey(pRuntimeEnv, stateVal, type, bytes); + int32_t ret = setSlidingWindowFromKey(pRuntimeEnv, wskey, wekey); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } - } - // all startOffset are identical - offset -= pCtx[0].startOffset; + // all startOffset are identical + offset -= pCtx[0].startOffset; + + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].nStartQueryTimestamp = wskey; + + SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo); + if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { + qTrace("not completed in supplementary scan, ignore\n"); + continue; + } + + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } + } + + lastKey = ts; + int32_t index = pRuntimeEnv->swindowResInfo.curIndex; + while (1) { + getNextLogicalQueryRange(pRuntimeEnv, &wskey, &wekey); + if (pRuntimeEnv->swindowResInfo.startTime > wskey || (wskey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (wskey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + pRuntimeEnv->swindowResInfo.curIndex = index; + break; + } + + if (ts >= wskey && ts <= wekey) { + // null data, failed to allocate more memory buffer + if (setSlidingWindowFromKey(pRuntimeEnv, wskey, wekey) != TSDB_CODE_SUCCESS) { + pRuntimeEnv->swindowResInfo.curIndex = index; + break; + } + + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + pCtx[k].nStartQueryTimestamp = wskey; + + SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo); + if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { + qTrace("not completed in supplementary scan, ignore"); + continue; + } + + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } + } + } else { + pRuntimeEnv->swindowResInfo.curIndex = index; + break; + } + } + } else { // other queries + // decide which group this rows belongs to according to current state value + if (groupbyStateValue) { + char *stateVal = groupbyColumnData + bytes * offset; + + int32_t ret = setGroupResultFromKey(pRuntimeEnv, stateVal, type, bytes); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + continue; + } + } - for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { - aAggs[functionId].xFunctionF(&pCtx[k], offset); + // all startOffset are identical + offset -= pCtx[0].startOffset; + + for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; + if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + aAggs[functionId].xFunctionF(&pCtx[k], offset); + } } } @@ -1795,6 +2068,43 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * free(sasArray); + if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { + SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + // query completed + if (lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery) || + (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + pStatus->closed = true; + } + + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); + } else { + int32_t i = 0; + int64_t skey = 0; + + for (i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + if ((pStatus->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || + (pStatus->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { + pStatus->closed = true; + } else { + skey = pStatus->window.skey; + break; + } + } + + pWindowResInfo->prevSKey = skey; + + // the number of completed slots are larger than the threshold, dump to client immediately. + if (numOfResFromResWindowInfo(pWindowResInfo) > pWindowResInfo->threshold) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); + } + } + } + /* * No need to calculate the number of output results for groupby normal columns * because the results of group by normal column is put into intermediate buffer. @@ -1923,9 +2233,9 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * pQuery->lastKey = pPrimaryColumn[pQuery->pos + (newForwardStep - 1) * step] + step; } - if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - *numOfRes = - rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo); + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr) || + (pQuery->slidingTime != -1 && pQuery->nAggTimeInterval > 0)) { + *numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &newForwardStep, pPrimaryColumn, pFields, pBlockInfo); } else { *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo); } @@ -2192,7 +2502,7 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { tVariantCreateFromBinary(&pCtx->param[j], pSqlFuncMsg->arg->argValue.pz, bytes, type); } else { - tVariantCreateFromBinary(&pCtx->param[j], (char*) &pSqlFuncMsg->arg[j].argValue.i64, bytes, type); + tVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlFuncMsg->arg[j].argValue.i64, bytes, type); } } @@ -2249,8 +2559,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } tfree(pRuntimeEnv->secondaryUnzipBuffer); - - taosCleanUpHashTable(pRuntimeEnv->hashList); + destroyResWindowInfo(&pRuntimeEnv->swindowResInfo); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { @@ -2281,9 +2590,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->vnodeFileInfo.numOfFiles = 0; free(pRuntimeEnv->vnodeFileInfo.pFileInfo); } - + taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo); - + if (pRuntimeEnv->pInterpoBuf != NULL) { for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { tfree(pRuntimeEnv->pInterpoBuf[i]); @@ -2393,7 +2702,7 @@ bool isFirstLastRowQuery(SQuery *pQuery) { bool notHasQueryTimeRange(SQuery *pQuery) { return (pQuery->skey == 0 && pQuery->ekey == INT64_MAX && QUERY_IS_ASC_QUERY(pQuery)) || - (pQuery->skey == INT64_MAX && pQuery->ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery))); + (pQuery->skey == INT64_MAX && pQuery->ekey == 0 && (!QUERY_IS_ASC_QUERY(pQuery))); } bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functionId == TSDB_FUNC_TS_COMP; } @@ -2470,13 +2779,13 @@ static int32_t getFirstCacheSlot(int32_t numOfBlocks, int32_t lastSlot, SCacheIn return (lastSlot - numOfBlocks + 1 + pCacheInfo->maxBlocks) % pCacheInfo->maxBlocks; } -static bool cacheBoundaryCheck(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj *pMeterObj) { +static bool cacheBoundaryCheck(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) { /* * here we get the first slot from the meter cache, not from the cache snapshot from pQuery, since the * snapshot value in pQuery may have been expired now. */ - SQuery* pQuery = pRuntimeEnv->pQuery; - + SQuery *pQuery = pRuntimeEnv->pQuery; + SCacheInfo * pCacheInfo = (SCacheInfo *)pMeterObj->pCache; SCacheBlock *pBlock = NULL; @@ -2550,7 +2859,7 @@ void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t v pQuery->firstSlot = getFirstCacheSlot(numOfBlocks, lastSlot, pCacheInfo); pQuery->commitSlot = commitSlot; pQuery->commitPoint = commitPoint; - + /* * Note: the block id is continuous increasing, never becomes smaller. * @@ -2599,14 +2908,14 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; pQuery->slot = *slot; - + // cache block has been flushed to disk, no required data block in cache. - SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); + SCacheBlock *pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); if (pBlock == NULL) { - pQuery->skey = rawskey; // restore the skey + pQuery->skey = rawskey; // restore the skey return -1; } - + (*pos) = searchFn(pRuntimeEnv->primaryColBuffer->data, pBlock->numOfPoints, pQuery->skey, pQuery->order.order); // restore skey before return @@ -2685,57 +2994,73 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, setQueryStatus(pQuery, QUERY_NOT_COMPLETED); } -static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualifiedKey, int64_t keyFirst, int64_t keyLast, - int64_t *skey, int64_t *ekey) { - assert(qualifiedKey >= keyFirst && qualifiedKey <= keyLast); +static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, + int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey) { + assert(pKey >= keyFirst && pKey <= keyLast); + *skey = taosGetIntervalStartTimestamp(pKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, pQuery->precision); if (keyFirst > (INT64_MAX - pQuery->nAggTimeInterval)) { /* - * if the skey > INT64_MAX - pQuery->nAggTimeInterval, the query duration between - * skey and ekey must be less than one interval.Therefore, no need to adjust the query ranges. + * if the actualSkey > INT64_MAX - pQuery->nAggTimeInterval, the query duration between + * actualSkey and actualEkey must be less than one interval.Therefore, no need to adjust the query ranges. */ assert(keyLast - keyFirst < pQuery->nAggTimeInterval); - *skey = keyFirst; - *ekey = keyLast; + *actualSkey = keyFirst; + *actualEkey = keyLast; + + *ekey = INT64_MAX; return; } - *skey = taosGetIntervalStartTimestamp(qualifiedKey, pQuery->nAggTimeInterval, pQuery->intervalTimeUnit, - pQuery->precision); - int64_t endKey = *skey + pQuery->nAggTimeInterval - 1; + *ekey = *skey + pQuery->nAggTimeInterval - 1; if (*skey < keyFirst) { - *skey = keyFirst; + *actualSkey = keyFirst; + } else { + *actualSkey = *skey; } - if (endKey < keyLast) { - *ekey = endKey; + if (*ekey < keyLast) { + *actualEkey = *ekey; } else { - *ekey = keyLast; + *actualEkey = keyLast; } } -static void getAlignedIntervalQueryRange(SQuery *pQuery, TSKEY key, TSKEY skey, TSKEY ekey) { - if (pQuery->nAggTimeInterval == 0) { +static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, TSKEY skey, TSKEY ekey) { + SQuery *pQuery = pRuntimeEnv->pQuery; + if (pQuery->nAggTimeInterval == 0 || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { return; } - - TSKEY skey1, ekey1; - TSKEY skey2 = (skey < ekey) ? skey : ekey; - TSKEY ekey2 = (skey < ekey) ? ekey : skey; + TSKEY skey2 = MIN(skey, ekey); + TSKEY ekey2 = MAX(skey, ekey); + + // the actual first query range in skey1 and ekey1 + TSKEY skey1, ekey1; - doGetAlignedIntervalQueryRangeImpl(pQuery, key, skey2, ekey2, &skey1, &ekey1); + TSKEY windowSKey = 0, windowEKey = 0; + doGetAlignedIntervalQueryRangeImpl(pQuery, key, skey2, ekey2, &skey1, &ekey1, &windowSKey, &windowEKey); if (QUERY_IS_ASC_QUERY(pQuery)) { pQuery->skey = skey1; pQuery->ekey = ekey1; - assert(pQuery->skey <= pQuery->ekey); + + pRuntimeEnv->intervalSKey = windowSKey; + pRuntimeEnv->intervalEKey = windowEKey; + + assert(pQuery->skey <= pQuery->ekey && + pRuntimeEnv->intervalSKey + (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalEKey); } else { pQuery->skey = ekey1; pQuery->ekey = skey1; - assert(pQuery->skey >= pQuery->ekey); + + pRuntimeEnv->intervalSKey = windowEKey; + pRuntimeEnv->intervalEKey = windowSKey; + + assert(pQuery->skey >= pQuery->ekey && + pRuntimeEnv->intervalSKey - (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalEKey); } pQuery->lastKey = pQuery->skey; @@ -2819,7 +3144,8 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet * reset the status and load the data block that contains the qualified point */ if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { - dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64 ", out of range", + dTrace("QInfo:%p no previous data block, start fileId:%d, slot:%d, pos:%d, qrange:%" PRId64 "-%" PRId64 + ", out of range", GET_QINFO_ADDR(pQuery), pRuntimeEnv->startPos.fileId, pRuntimeEnv->startPos.slot, pRuntimeEnv->startPos.pos, pQuery->skey, pQuery->ekey); @@ -2837,7 +3163,7 @@ static bool getNeighborPoints(SMeterQuerySupportObj *pSupporter, SMeterObj *pMet // moveToNextBlock make sure there is a available cache block, if exists assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); pBlock = &pRuntimeEnv->cacheBlock; - + pQuery->pos = pBlock->numOfPoints - 1; getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); @@ -2864,7 +3190,7 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */ return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); } else { - getAlignedIntervalQueryRange(pQuery, key, pQuery->skey, pQuery->ekey); + getAlignedIntervalQueryRange(pRuntimeEnv, key, pQuery->skey, pQuery->ekey); return true; } } else { // key > pQuery->ekey, abort for normal query, continue for interp query @@ -2877,10 +3203,10 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn } static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter, - SMeterObj *pMeterObj,TSKEY nextKey) { + SMeterObj *pMeterObj, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - + if (isFirstLastRowQuery(pQuery)) { /* * if the pQuery->skey != pQuery->ekey for last_row query, @@ -2888,31 +3214,31 @@ static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, SPointInterpoSuppor */ if (pQuery->skey != pQuery->ekey) { assert(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery) && nextKey >= pQuery->ekey && - nextKey <= pQuery->skey); - + nextKey <= pQuery->skey); + pQuery->skey = nextKey; pQuery->ekey = nextKey; } - + return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); } else { - getAlignedIntervalQueryRange(pQuery, nextKey, pQuery->skey, pQuery->ekey); + getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pQuery->skey, pQuery->ekey); return true; } } -//TODO refactor code, the best way to implement the last_row is utilizing the iterator +// TODO refactor code, the best way to implement the last_row is utilizing the iterator bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter) { - SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; - - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; - + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; + + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; + assert(!QUERY_IS_ASC_QUERY(pQuery) && notHasQueryTimeRange(pQuery)); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; - + TSKEY lastKey = -1; - + pQuery->fileId = -1; vnodeFreeFieldsEx(pRuntimeEnv); @@ -2923,44 +3249,44 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter if (pCacheInfo != NULL && pCacheInfo->cacheBlocks != NULL && pQuery->numOfBlocks > 0) { pQuery->fileId = -1; TSKEY key = pMeterObj->lastKey; - + pQuery->skey = key; pQuery->ekey = key; pQuery->lastKey = pQuery->skey; - + /* * cache block may have been flushed to disk, and no data in cache anymore. * So, copy cache block to local buffer is required. */ lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); - if (lastKey < 0) { // data has been flushed to disk, try again search in file + if (lastKey < 0) { // data has been flushed to disk, try again search in file lastKey = getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn); - - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { + + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { return false; } } - } else { // no data in cache, try file + } else { // no data in cache, try file TSKEY key = pMeterObj->lastKeyOnFile; - + pQuery->skey = key; pQuery->ekey = key; pQuery->lastKey = pQuery->skey; - + bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn); if (!ret) { // no data in file, return false; return false; } - + lastKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); } - + assert(lastKey <= pQuery->skey); - + pQuery->skey = lastKey; pQuery->ekey = lastKey; pQuery->lastKey = pQuery->skey; - + return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter); } @@ -2970,7 +3296,7 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter * 10ms, which is guaranteed by parser at client-side */ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySupportObj *pSupporter, - SPointInterpoSupporter *pPointInterpSupporter) { + SPointInterpoSupporter *pPointInterpSupporter, int64_t *key) { SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; @@ -2979,10 +3305,14 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup if (QUERY_IS_ASC_QUERY(pQuery)) { // todo: the action return as the getQueryStartPositionInCache function if (dataInDisk && getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) { - TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - assert(key >= pQuery->skey); - - return doGetQueryPos(key, pSupporter, pPointInterpSupporter); + TSKEY nextKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + assert(nextKey >= pQuery->skey); + + if (key != NULL) { + *key = nextKey; + } + + return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); } // set no data in file @@ -2995,14 +3325,23 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup } TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); + + if (key != NULL) { + *key = nextKey; + } + return doGetQueryPos(nextKey, pSupporter, pPointInterpSupporter); - - } else { // descending order + + } else { // descending order if (dataInCache) { // todo handle error TSKEY nextKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false); assert(nextKey == -1 || nextKey <= pQuery->skey); - if (nextKey != -1) { // find qualified data in cache + if (key != NULL) { + *key = nextKey; + } + + if (nextKey != -1) { // find qualified data in cache if (nextKey >= pQuery->ekey) { return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, nextKey); } else { @@ -3020,13 +3359,17 @@ bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, SMeterQuerySup } if (dataInDisk && getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) { - TSKEY key = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); - assert(key <= pQuery->skey); + TSKEY nextKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos); + assert(nextKey <= pQuery->skey); + + if (key != NULL) { + *key = nextKey; + } // key in query range. If not, no qualified in disk file - if (key >= pQuery->ekey) { - return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, key); - } else { //In case of all queries, the value of false will be returned if key < pQuery->ekey + if (nextKey >= pQuery->ekey) { + return doSetDataInfo(pSupporter, pPointInterpSupporter, pMeterObj, nextKey); + } else { // In case of all queries, the value of false will be returned if key < pQuery->ekey return false; } } @@ -3313,7 +3656,8 @@ static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { // in case of point-interpolation query, use asc order scan char msg[] = - "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64 ", " + "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 "-%" PRId64 + ", " "new qrange:%" PRId64 "-%" PRId64; // descending order query for last_row query @@ -3322,13 +3666,13 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) { pQuery->order.order, TSQL_SO_DESC); pQuery->order.order = TSQL_SO_DESC; - + int64_t skey = MIN(pQuery->skey, pQuery->ekey); int64_t ekey = MAX(pQuery->skey, pQuery->ekey); - + pQuery->skey = ekey; pQuery->ekey = skey; - + return; } @@ -3448,7 +3792,7 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) { } else { pQuery->limit.offset -= maxReads; // update the lastkey, since the following skip operation may traverse to another media. update the lastkey first. - pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyLast+1:blockInfo.keyFirst-1; + pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.keyLast + 1 : blockInfo.keyFirst - 1; doSkipDataBlock(pRuntimeEnv); } } @@ -3501,7 +3845,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, SMeterQuerySupportObj * pQuery->lastKey = pQuery->skey; // todo opt performance - if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, NULL) == false) { + if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, NULL, NULL) == false) { sem_post(&pQInfo->dataReady); // hack for next read for empty return pQInfo->over = 1; return false; @@ -3746,7 +4090,7 @@ static void allocMemForInterpo(SMeterQuerySupportObj *pSupporter, SQuery *pQuery static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, bool isMetricQuery) { int32_t slot = 0; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { slot = 10000; } else { slot = pSupporter->pSidSet->numOfSubSet; @@ -3760,27 +4104,27 @@ static int32_t allocateOutputBufForGroup(SMeterQuerySupportObj *pSupporter, SQue for (int32_t k = 0; k < slot; ++k) { SOutputRes *pOneRes = &pSupporter->pResult[k]; pOneRes->nAlloc = 1; - + /* * for single table top/bottom query, the output for group by normal column, the output rows is * equals to the maximum rows, instead of 1. */ if (!isMetricQuery && isTopBottomQuery(pQuery)) { assert(pQuery->numOfOutputCols > 1); - + SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1]; pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64; } - + createGroupResultBuf(pQuery, pOneRes, isMetricQuery); } return TSDB_CODE_SUCCESS; } -static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* pMeterObj) { - SQuery* pQuery = pRuntimeEnv->pQuery; - +static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj) { + SQuery *pQuery = pRuntimeEnv->pQuery; + // To make sure the start position of each buffer is aligned to 4bytes in 32-bit ARM system. for (int32_t i = 0; i < pQuery->numOfCols; ++i) { int32_t bytes = pQuery->colList[i].data.bytes; @@ -3789,7 +4133,7 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* p goto _error_clean; } } - + // record the maximum column width among columns of this meter/metric int32_t maxColWidth = pQuery->colList[0].data.bytes; for (int32_t i = 1; i < pQuery->numOfCols; ++i) { @@ -3798,39 +4142,39 @@ static int32_t allocateRuntimeEnvBuf(SQueryRuntimeEnv* pRuntimeEnv, SMeterObj* p maxColWidth = bytes; } } - + pRuntimeEnv->primaryColBuffer = NULL; if (PRIMARY_TSCOL_LOADED(pQuery)) { pRuntimeEnv->primaryColBuffer = pRuntimeEnv->colDataBuffer[0]; } else { pRuntimeEnv->primaryColBuffer = - (SData *) malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES); + (SData *)malloc(pMeterObj->pointsPerFileBlock * TSDB_KEYSIZE + sizeof(SData) + EXTRA_BYTES); } - + pRuntimeEnv->unzipBufSize = (size_t)(maxColWidth * pMeterObj->pointsPerFileBlock + EXTRA_BYTES); // plus extra_bytes - + pRuntimeEnv->unzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize); pRuntimeEnv->secondaryUnzipBuffer = (char *)calloc(1, pRuntimeEnv->unzipBufSize); - + if (pRuntimeEnv->unzipBuffer == NULL || pRuntimeEnv->secondaryUnzipBuffer == NULL || pRuntimeEnv->primaryColBuffer == NULL) { goto _error_clean; } - + return TSDB_CODE_SUCCESS; - - _error_clean: + +_error_clean: for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { tfree(pRuntimeEnv->colDataBuffer[i]); } - + tfree(pRuntimeEnv->unzipBuffer); tfree(pRuntimeEnv->secondaryUnzipBuffer); - + if (!PRIMARY_TSCOL_LOADED(pQuery)) { tfree(pRuntimeEnv->primaryColBuffer); } - + return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -3838,7 +4182,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete void *param) { SQuery *pQuery = &pQInfo->query; int32_t code = TSDB_CODE_SUCCESS; - + /* * only the successful complete requries the sem_post/over = 1 operations. */ @@ -3870,11 +4214,11 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete // check data in file or cache bool dataInCache = true; bool dataInDisk = true; - - SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv; + + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pMeterObj = pMeterObj; - + if ((code = allocateRuntimeEnvBuf(pRuntimeEnv, pMeterObj)) != TSDB_CODE_SUCCESS) { return code; } @@ -3904,55 +4248,106 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete vnodeRecordAllFiles(pQInfo, pMeterObj->vnode); - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { if ((code = allocateOutputBufForGroup(pSupporter, pQuery, false)) != TSDB_CODE_SUCCESS) { return code; } - int16_t type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - _hash_fn_t fn = taosGetDefaultHashFunction(type); - - pRuntimeEnv->hashList = taosInitHashTable(10039, fn, false); - - pRuntimeEnv->usedIndex = 0; - pRuntimeEnv->pResult = pSupporter->pResult; + int16_t type = TSDB_DATA_TYPE_NULL; + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); + } else { + type = TSDB_DATA_TYPE_TIMESTAMP; + } + + // todo bug! + initResWindowInfo(&pRuntimeEnv->swindowResInfo, 3, type, pSupporter->pResult); } - + pSupporter->rawSKey = pQuery->skey; pSupporter->rawEKey = pQuery->ekey; - + /* query on single table */ pSupporter->numOfMeters = 1; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - + SPointInterpoSupporter interpInfo = {0}; pointInterpSupporterInit(pQuery, &interpInfo); - + /* * in case of last_row query without query range, we set the query timestamp to * pMeterObj->lastKey. Otherwise, keep the initial query time range unchanged. */ - + if (isFirstLastRowQuery(pQuery) && notHasQueryTimeRange(pQuery)) { if (!normalizeUnBoundLastRowQuery(pSupporter, &interpInfo)) { sem_post(&pQInfo->dataReady); pQInfo->over = 1; - + pointInterpSupporterDestroy(&interpInfo); return TSDB_CODE_SUCCESS; } } else { - if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo) == false) || - (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || - (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { - sem_post(&pQInfo->dataReady); - pQInfo->over = 1; - - pointInterpSupporterDestroy(&interpInfo); - return TSDB_CODE_SUCCESS; + // find the skey and ekey in case of sliding query + if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { + int64_t skey = 0; + + SWAP(pQuery->skey, pQuery->ekey, int64_t); + pQuery->order.order ^= 1; + pQuery->lastKey = pQuery->skey; + + if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + + pointInterpSupporterDestroy(&interpInfo); + return TSDB_CODE_SUCCESS; + } + + pQuery->skey = skey; + + pQuery->order.order ^= 1; + SWAP(pQuery->skey, pQuery->ekey, int64_t); + + int64_t ekey = 0; + pQuery->lastKey = pQuery->skey; + if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) { + // + } + + pQuery->skey = ekey; + + TSKEY skey1, ekey1; + TSKEY windowSKey = 0, windowEKey = 0; + + TSKEY minKey = MIN(pQuery->skey, pQuery->ekey); + TSKEY maxKey = MAX(pQuery->skey, pQuery->ekey); + + doGetAlignedIntervalQueryRangeImpl(pQuery, minKey, minKey, maxKey, &skey1, &ekey1, &windowSKey, &windowEKey); + pRuntimeEnv->swindowResInfo.startTime = windowSKey; + + pSupporter->rawSKey = pQuery->skey; + pSupporter->rawEKey = pQuery->ekey; + + if (QUERY_IS_ASC_QUERY(pQuery)) { + pRuntimeEnv->swindowResInfo.prevSKey = windowSKey; + } else { + pRuntimeEnv->swindowResInfo.prevSKey = windowSKey + ((pQuery->skey - windowSKey) / pQuery->slidingTime) * pQuery->slidingTime; + } + } else { + int64_t ekey = 0; + if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) || + (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || + (isTopBottomQuery(pQuery) && pQuery->limit.offset >= pQuery->pSelectExpr[1].pBase.arg[0].argValue.i64)) { + sem_post(&pQInfo->dataReady); + pQInfo->over = 1; + + pointInterpSupporterDestroy(&interpInfo); + return TSDB_CODE_SUCCESS; + } } } - + /* * here we set the value for before and after the specified time into the * parameter for interpolation query @@ -4050,7 +4445,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->pointsRead = 0; changeExecuteScanOrder(pQuery, true); - SQueryRuntimeEnv* pRuntimeEnv = &pSupporter->runtimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; doInitQueryFileInfoFD(&pRuntimeEnv->vnodeFileInfo); vnodeInitDataBlockInfo(&pRuntimeEnv->loadBlockInfo); @@ -4086,17 +4481,17 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSQL_SO_ASC : TSQL_SO_DESC; tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } - + int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true); if (ret != TSDB_CODE_SUCCESS) { return ret; } - + ret = allocateRuntimeEnvBuf(pRuntimeEnv, pMeter); if (ret != TSDB_CODE_SUCCESS) { return ret; } - + tSidSetSort(pSupporter->pSidSet); vnodeRecordAllFiles(pQInfo, pMeter->vnode); @@ -4105,9 +4500,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - pRuntimeEnv->hashList = taosInitHashTable(10039, taosIntHash_64, false); - pRuntimeEnv->usedIndex = 0; - pRuntimeEnv->pResult = pSupporter->pResult; + initResWindowInfo(&pRuntimeEnv->swindowResInfo, 10039, TSDB_DATA_TYPE_BIGINT, pSupporter->pResult); } if (pQuery->nAggTimeInterval != 0) { @@ -4186,12 +4579,12 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { } } -TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv* pRuntimeEnv, SCacheBlock *pBlock, int32_t index) { +TSKEY getTimestampInCacheBlock(SQueryRuntimeEnv *pRuntimeEnv, SCacheBlock *pBlock, int32_t index) { if (pBlock == NULL || index >= pBlock->numOfPoints || index < 0) { return -1; } - - return ((TSKEY*)(pRuntimeEnv->primaryColBuffer->data))[index]; + + return ((TSKEY *)(pRuntimeEnv->primaryColBuffer->data))[index]; } /* @@ -4214,19 +4607,19 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order); - + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot); - - bool loadTS = true; - bool loadFields = true; + + bool loadTS = true; + bool loadFields = true; int32_t slot = pQuery->slot; - + int32_t ret = loadDataBlockIntoMem(pBlock, &pQuery->pFields[slot], pRuntimeEnv, fileIndex, loadTS, loadFields); if (ret != TSDB_CODE_SUCCESS) { return -1; } - + SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus); SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus); @@ -4254,7 +4647,7 @@ static TSKEY getFirstDataBlockInCache(SQueryRuntimeEnv *pRuntimeEnv) { } else if (nextTimestamp > pQuery->ekey) { setQueryStatus(pQuery, QUERY_COMPLETED); } - + return nextTimestamp; } @@ -4286,13 +4679,13 @@ TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sea if (key < pQuery->ekey) { setQueryStatus(pQuery, QUERY_COMPLETED); } - + return key; } else { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); - return -1; // no data to check + return -1; // no data to check } - } else {//asc query + } else { // asc query bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn); if (ret) { dTrace("QInfo:%p vid:%d sid:%d id:%s find the possible position, fileId:%d, slot:%d, pos:%d", pQInfo, @@ -4304,7 +4697,7 @@ TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sea if (key > pQuery->ekey) { setQueryStatus(pQuery, QUERY_COMPLETED); } - + return key; } else { /* @@ -4321,9 +4714,9 @@ TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_sea } static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __block_search_fn_t searchFn) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; - + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; + SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache; assert(pQuery->fileId < 0); @@ -4344,7 +4737,7 @@ static int32_t moveToNextBlockInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t ste int32_t currentSlot = pCacheInfo->currentSlot; int32_t firstSlot = getFirstCacheSlot(numOfBlocks, currentSlot, pCacheInfo); - + if (step == QUERY_DESC_FORWARD_STEP && pQuery->slot == firstSlot) { bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn); if (ret) { @@ -4413,16 +4806,16 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl (step == QUERY_DESC_FORWARD_STEP && (pQuery->slot == 0))) { fileIndex = getNextDataFileCompInfo(pRuntimeEnv, pMeterObj, step); /* data maybe in cache */ - - if (fileIndex >= 0) { // next file + + if (fileIndex >= 0) { // next file pQuery->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->numOfBlocks - 1; pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1; - } else { // try data in cache + } else { // try data in cache assert(pQuery->fileId == -1); - + if (step == QUERY_ASC_FORWARD_STEP) { getFirstDataBlockInCache(pRuntimeEnv); - } else { // no data to check for desc order query + } else { // no data to check for desc order query setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); } @@ -4431,7 +4824,7 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl } else { // next block in the same file int32_t fid = pQuery->fileId; fileIndex = vnodeGetVnodeHeaderFileIdx(&fid, pRuntimeEnv, pQuery->order.order); - + pQuery->slot += step; pQuery->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQuery->pBlock[pQuery->slot].numOfPoints - 1; } @@ -4462,16 +4855,16 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl SQuery * pQuery = pRuntimeEnv->pQuery; SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; + TSKEY * primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; int64_t start = taosGetTimestampUs(); if (IS_DISK_DATA_BLOCK(pQuery)) { SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot); *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK); - + if (blockLoadStatus == DISK_DATA_LOADED) { - *forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], searchFn, - numOfRes); + *forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], + searchFn, numOfRes); } else { *forwardStep = pblockInfo->size; } @@ -4479,17 +4872,25 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl pSummary->fileTimeUs += (taosGetTimestampUs() - start); } else { assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); - + SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); *pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK); - - *forwardStep = - applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes); - + + *forwardStep = applyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes); + pSummary->cacheTimeUs += (taosGetTimestampUs() - start); } } +static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t *skey, int64_t *ekey) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + *skey += (pQuery->slidingTime * factor); + *ekey += (pQuery->slidingTime * factor); +} + static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; bool LOAD_DATA = true; @@ -4509,22 +4910,24 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SPositionInfo *pStartPos = &pRuntimeEnv->startPos; assert(pQuery->slot == pStartPos->slot); - dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d, start fileId:%d, slot:%d, pos:%d, bstatus:%d", + dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 + ", order:%d, start fileId:%d, slot:%d, pos:%d, bstatus:%d", GET_QINFO_ADDR(pQuery), pQuery->skey, pQuery->ekey, pQuery->lastKey, pQuery->order.order, pStartPos->fileId, pStartPos->slot, pStartPos->pos, pRuntimeEnv->blockStatus); while (1) { // check if query is killed or not set the status of query to pass the status check - if (isQueryKilled(pQuery)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); - return cnt; - } +// if (isQueryKilled(pQuery)) { +// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); +// return cnt; +// } int32_t numOfRes = 0; SBlockInfo blockInfo = {0}; doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, blockLoadStatus, &forwardStep); - dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d", + dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 + ", fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d", GET_QINFO_ADDR(pQuery), blockInfo.keyFirst, blockInfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos, pRuntimeEnv->blockStatus, blockInfo.size, forwardStep); @@ -4742,8 +5145,8 @@ static void printBinaryData(int32_t functionId, char *data, int32_t srcDataType) printf("%lf,%lf\t", *(double *)data, *(double *)(data + sizeof(double))); } else if (functionId == TSDB_FUNC_TWA) { data += 1; - printf("%lf,%" PRId64 ",%" PRId64 ",%" PRId64 "\t", *(double *)data, *(int64_t *)(data + 8), *(int64_t *)(data + 16), - *(int64_t *)(data + 24)); + printf("%lf,%" PRId64 ",%" PRId64 ",%" PRId64 "\t", *(double *)data, *(int64_t *)(data + 8), + *(int64_t *)(data + 16), *(int64_t *)(data + 24)); } else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) { switch (srcDataType) { case TSDB_DATA_TYPE_TINYINT: @@ -5195,13 +5598,20 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; } - for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { - SOutputRes *buf = &pRuntimeEnv->pResult[i]; + SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + if (!pStatus->closed) { + continue; + } + + SOutputRes *buf = &pWindowResInfo->pResult[i]; // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -5276,6 +5686,22 @@ void clearGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) { } } +void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols) { + for(int32_t i = 0; i < nOutputCols; ++i) { + SResultInfo *pDst = &dst->resultInfo[i]; + SResultInfo *pSrc = &src->resultInfo[i]; + + char* buf = pDst->interResultBuf; + memcpy(pDst, pSrc, sizeof(SResultInfo)); + pDst->interResultBuf = buf; + memcpy(pDst->interResultBuf, pSrc->interResultBuf, pDst->bufLen); + + int32_t size = sizeof(tFilePage) + pSrc->bufLen * src->nAlloc; + memcpy(dst->result[i], src->result[i], size); + } +} + + void destroyGroupResultBuf(SOutputRes *pOneOutputRes, int32_t nOutputCols) { if (pOneOutputRes == NULL) { return; @@ -5296,13 +5722,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; - - // ts_comp query does not required reversed output -// if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) { - pCtx->aOutputBuf = pQuery->sdata[i]->data; -// } else { // point to the last position of output buffer for desc query -// pCtx->aOutputBuf = pQuery->sdata[i]->data + (rows - 1) * pCtx->outputBytes; -// } + pCtx->aOutputBuf = pQuery->sdata[i]->data; /* * set the output buffer information and intermediate buffer @@ -5380,30 +5800,17 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->over &= (~QUERY_RESBUF_FULL); } else { int32_t numOfSkip = (int32_t)pQuery->limit.offset; - int32_t size = pQuery->pointsRead; - pQuery->pointsRead -= numOfSkip; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; - int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; -// if (QUERY_IS_ASC_QUERY(pQuery)) { - memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); -// } else { // DESC query -// int32_t maxrows = pQuery->pointsToRead; -// -// memmove(pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes, -// pQuery->sdata[i]->data + (maxrows - size) * bytes, pQuery->pointsRead * bytes); -// } - - pRuntimeEnv->pCtx[i].aOutputBuf -= bytes * numOfSkip * step; + memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes); + pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { - pRuntimeEnv->pCtx[i].ptsOutputBuf -= TSDB_KEYSIZE * numOfSkip * step; + pRuntimeEnv->pCtx[i].ptsOutputBuf += TSDB_KEYSIZE * numOfSkip; } } @@ -5411,27 +5818,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { } } -/** - * move remain data to the start position of output buffer - * @param pRuntimeEnv - */ -void moveDescOrderResultsToFront(SQueryRuntimeEnv *pRuntimeEnv) { -// SQuery *pQuery = pRuntimeEnv->pQuery; -// int32_t maxrows = pQuery->pointsToRead; -// -// if (QUERY_IS_ASC_QUERY(pQuery) || isTSCompQuery(pQuery)) { -// return; -// } -// -// if (pQuery->pointsRead > 0 && pQuery->pointsRead < maxrows) { -// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { -// int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; -// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + (maxrows - pQuery->pointsRead) * bytes, -// pQuery->pointsRead * bytes); -// } -// } -} - typedef struct SQueryStatus { SPositionInfo start; SPositionInfo next; @@ -5450,7 +5836,7 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus pStatus->overStatus = pQuery->over; pStatus->lastKey = pQuery->lastKey; - + pStatus->skey = pQuery->skey; pStatus->ekey = pQuery->ekey; @@ -5479,7 +5865,7 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta pQuery->lastKey = pStatus->lastKey; pQuery->skey = pStatus->skey; pQuery->ekey = pStatus->ekey; - + pQuery->over = pStatus->overStatus; pRuntimeEnv->startPos = pStatus->start; @@ -5541,13 +5927,20 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { while (1) { doScanAllDataBlocks(pRuntimeEnv); - // applied to agg functions (e.g., stddev) bool toContinue = true; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { // for each group result, call the finalize function for each column - for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { - SOutputRes *buf = &pRuntimeEnv->pResult[i]; + SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SOutputRes *buf = &pWindowResInfo->pResult[i]; + + SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + if (!pStatus->closed) { + continue; + } + setGroupOutputBuffer(pRuntimeEnv, buf); for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -5588,9 +5981,11 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { doSingleMeterSupplementScan(pRuntimeEnv); // reset status code - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { - SOutputRes *buf = &pRuntimeEnv->pResult[i]; + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { + SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SOutputRes *buf = &pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { buf->resultInfo[j].complete = false; } @@ -5608,10 +6003,17 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { // for each group result, call the finalize function for each column - for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { - SOutputRes *buf = &pRuntimeEnv->pResult[i]; + SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SOutputRes *buf = &pWindowResInfo->pResult[i]; + SWindowStatus* pStatus = &pWindowResInfo->pStatus[i]; + if (!pStatus->closed) { + continue; + } + setGroupOutputBuffer(pRuntimeEnv, buf); for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -5670,37 +6072,77 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { return maxOutput; } -/* - * forward the query range for next interval query - */ -void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) { +static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv, + int64_t *skey, int64_t *ekey) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - pQuery->ekey += (pQuery->nAggTimeInterval * factor); - pQuery->skey = pQuery->ekey - (pQuery->nAggTimeInterval - 1) * factor; + printf("------------------------%lld, %lld\n", pQuery->skey, pQuery->ekey); - // boundary check - if (QUERY_IS_ASC_QUERY(pQuery)) { - if (pQuery->skey > pSupporter->rawEKey) { - setQueryStatus(pQuery, QUERY_COMPLETED); - return; - } + *skey = pRuntimeEnv->intervalSKey + (pQuery->slidingTime * factor); + *ekey = pRuntimeEnv->intervalEKey + (pQuery->slidingTime * factor); + + printf("new window:%lld, %lld\n", *skey, *ekey); + + if (pQuery->slidingTime > 0) { + if (QUERY_IS_ASC_QUERY(pQuery)) { + // the next sliding window is not contained in the query time range + if (*skey < pSupporter->rawSKey) { + *skey = pSupporter->rawSKey; + } - if (pQuery->ekey > pSupporter->rawEKey) { - pQuery->ekey = pSupporter->rawEKey; + if (*skey > pSupporter->rawEKey) { + return QUERY_COMPLETED; + // setQueryStatus(pQuery, QUERY_COMPLETED); + // return; + } + + if (*ekey > pSupporter->rawEKey) { + *ekey = pSupporter->rawEKey; + } + } else { + if (*skey > pSupporter->rawSKey) { + *skey = pSupporter->rawSKey; + } + + if (*skey < pSupporter->rawEKey) { + // setQueryStatus(pQuery, QUERY_COMPLETED); + return QUERY_COMPLETED; + } + + if (*ekey < pSupporter->rawEKey) { + *ekey = pSupporter->rawEKey; + } } - } else { - if (pQuery->skey < pSupporter->rawEKey) { + } + + return QUERY_NOT_COMPLETED; +} + +/* + * forward the query range for next interval query + */ +void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { + if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->ekey) || + (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) { setQueryStatus(pQuery, QUERY_COMPLETED); - return; + } else { + TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); } - if (pQuery->ekey < pSupporter->rawEKey) { - pQuery->ekey = pSupporter->rawEKey; - } + return; + } + + int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey); + if (r == QUERY_COMPLETED) { + setQueryStatus(pQuery, QUERY_COMPLETED); + return; } + getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalSKey, &pRuntimeEnv->intervalEKey); + /* ensure the search in cache will return right position */ pQuery->lastKey = pQuery->skey; @@ -5715,7 +6157,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE // bridge the gap in group by time function if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - getAlignedIntervalQueryRange(pQuery, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); + getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); } } @@ -5750,33 +6192,33 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet SVnodeObj *pVnode = &vnodeList[vid]; - char* buf = calloc(1, getCompHeaderSegSize(&pVnode->cfg)); + char *buf = calloc(1, getCompHeaderSegSize(&pVnode->cfg)); if (buf == NULL) { *numOfMeters = 0; return TSDB_CODE_SERV_OUT_OF_MEMORY; } - + SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; - + int32_t headerSize = getCompHeaderSegSize(&pVnode->cfg); lseek(pVnodeFileInfo->headerFd, TSDB_FILE_HEADER_LEN, SEEK_SET); read(pVnodeFileInfo->headerFd, buf, headerSize); - + // check the offset value integrity if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, buf - TSDB_FILE_HEADER_LEN, headerSize) < 0) { free(buf); *numOfMeters = 0; - + return TSDB_CODE_FILE_CORRUPTED; } - int64_t oldestKey = getOldestKey(pVnode->numOfFiles, pVnode->fileId, &pVnode->cfg); + int64_t oldestKey = getOldestKey(pVnode->numOfFiles, pVnode->fileId, &pVnode->cfg); (*pReqMeterDataInfo) = malloc(POINTER_BYTES * pSidSet->numOfSids); if (*pReqMeterDataInfo == NULL) { free(buf); *numOfMeters = 0; - + return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -5821,19 +6263,19 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet } } - int64_t headerOffset = sizeof(SCompHeader) * pMeterObj->sid; + int64_t headerOffset = sizeof(SCompHeader) * pMeterObj->sid; SCompHeader *compHeader = (SCompHeader *)(buf + headerOffset); if (compHeader->compInfoOffset == 0) { // current table is empty continue; } - + // corrupted file may cause the invalid compInfoOffset, check needs int32_t compHeaderOffset = getCompHeaderStartPosition(&pVnode->cfg); if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, compHeaderOffset) != TSDB_CODE_SUCCESS) { free(buf); *numOfMeters = 0; - + return TSDB_CODE_FILE_CORRUPTED; } @@ -5855,7 +6297,7 @@ int32_t vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, tSidSet *pSidSet } free(buf); - + return TSDB_CODE_SUCCESS; } @@ -5973,8 +6415,8 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *p ((pQuery->lastKey <= pQuery->skey) && !QUERY_IS_ASC_QUERY(pQuery))); } -static void clearAllMeterDataBlockInfo(SMeterDataInfo** pMeterDataInfo, int32_t start, int32_t end) { - for(int32_t i = start; i < end; ++i) { +static void clearAllMeterDataBlockInfo(SMeterDataInfo **pMeterDataInfo, int32_t start, int32_t end) { + for (int32_t i = start; i < end; ++i) { tfree(pMeterDataInfo[i]->pBlock); pMeterDataInfo[i]->numOfBlocks = 0; pMeterDataInfo[i]->start = -1; @@ -6025,7 +6467,7 @@ static bool setValidDataBlocks(SMeterDataInfo *pMeterDataInfo, int32_t end) { if (size != pMeterDataInfo->numOfBlocks) { memmove(pMeterDataInfo->pBlock, &pMeterDataInfo->pBlock[pMeterDataInfo->start], size * sizeof(SCompBlock)); - + char *tmp = realloc(pMeterDataInfo->pBlock, size * sizeof(SCompBlock)); if (tmp == NULL) { return false; @@ -6043,7 +6485,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SMeterObj * pMeterObj = pMeterDataInfo->pMeterObj; SMeterQueryInfo *pMeterQInfo = pMeterDataInfo->pMeterQInfo; - + if (QUERY_IS_ASC_QUERY(pQuery)) { *minval = pMeterQInfo->lastKey; *maxval = endKey; @@ -6053,12 +6495,14 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, } if (*minval > *maxval) { - qTrace("QInfo:%p vid:%d sid:%d id:%s, no result in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey, pMeterQInfo->lastKey); + qTrace("QInfo:%p vid:%d sid:%d id:%s, no result in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo, + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey, + pMeterQInfo->lastKey); return false; } else { - qTrace("QInfo:%p vid:%d sid:%d id:%s, query in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo, pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey, pMeterQInfo->lastKey); + qTrace("QInfo:%p vid:%d sid:%d id:%s, query in files, qrange:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64, pQInfo, + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterQInfo->skey, pMeterQInfo->ekey, + pMeterQInfo->lastKey); return true; } } @@ -6077,19 +6521,19 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary; TSKEY minval, maxval; - + *numOfBlocks = 0; SQueryFilesInfo *pVnodeFileInfo = &pSupporter->runtimeEnv.vnodeFileInfo; - + // sequentially scan this header file to extract the compHeader info for (int32_t j = 0; j < numOfMeters; ++j) { SMeterObj *pMeterObj = pMeterDataInfo[j]->pMeterObj; lseek(pVnodeFileInfo->headerFd, pMeterDataInfo[j]->offsetInHeaderFile, SEEK_SET); - + SCompInfo compInfo = {0}; read(pVnodeFileInfo->headerFd, &compInfo, sizeof(SCompInfo)); - + int32_t ret = validateCompBlockInfoSegment(pQInfo, filePath, pMeterObj->vnode, &compInfo, pMeterDataInfo[j]->offsetInHeaderFile); if (ret != TSDB_CODE_SUCCESS) { // file corrupted @@ -6101,24 +6545,25 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); continue; } - + int32_t size = compInfo.numOfBlocks * sizeof(SCompBlock); size_t bufferSize = size + sizeof(TSCKSUM); - + pMeterDataInfo[j]->numOfBlocks = compInfo.numOfBlocks; pMeterDataInfo[j]->pBlock = calloc(1, bufferSize); if (pMeterDataInfo[j]->pBlock == NULL) { clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - + read(pVnodeFileInfo->headerFd, pMeterDataInfo[j]->pBlock, bufferSize); - TSCKSUM checksum = *(TSCKSUM*)((char*)pMeterDataInfo[j]->pBlock + size); + TSCKSUM checksum = *(TSCKSUM *)((char *)pMeterDataInfo[j]->pBlock + size); int64_t st = taosGetTimestampUs(); // check compblock integrity - ret = validateCompBlockSegment(pQInfo, filePath, &compInfo, (char*) pMeterDataInfo[j]->pBlock, pMeterObj->vnode, checksum); + ret = validateCompBlockSegment(pQInfo, filePath, &compInfo, (char *)pMeterDataInfo[j]->pBlock, pMeterObj->vnode, + checksum); if (ret != TSDB_CODE_SUCCESS) { clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); return TSDB_CODE_FILE_CORRUPTED; @@ -6145,7 +6590,7 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery if (!setValidDataBlocks(pMeterDataInfo[j], end)) { clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j); - + pQInfo->killed = 1; // set query kill, abort current query since no memory available return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -6207,14 +6652,14 @@ static int32_t blockAccessOrderComparator(const void *pLeft, const void *pRight, return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1; } -void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) { +void cleanBlockOrderSupporter(SBlockOrderSupporter *pSupporter, int32_t numOfTables) { tfree(pSupporter->numOfBlocksPerMeter); tfree(pSupporter->blockIndexArray); - + for (int32_t i = 0; i < numOfTables; ++i) { tfree(pSupporter->pDataBlockInfoEx[i]); } - + tfree(pSupporter->pDataBlockInfoEx); } @@ -6258,13 +6703,13 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet SCompBlock *pBlock = pMeterDataInfo[j]->pBlock; supporter.numOfBlocksPerMeter[numOfQualMeters] = pMeterDataInfo[j]->numOfBlocks; - char* buf = calloc(1, sizeof(SMeterDataBlockInfoEx) * pMeterDataInfo[j]->numOfBlocks); + char *buf = calloc(1, sizeof(SMeterDataBlockInfoEx) * pMeterDataInfo[j]->numOfBlocks); if (buf == NULL) { cleanBlockOrderSupporter(&supporter, numOfQualMeters); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - - supporter.pDataBlockInfoEx[numOfQualMeters] = (SMeterDataBlockInfoEx*) buf; + + supporter.pDataBlockInfoEx[numOfQualMeters] = (SMeterDataBlockInfoEx *)buf; for (int32_t k = 0; k < pMeterDataInfo[j]->numOfBlocks; ++k) { SMeterDataBlockInfoEx *pInfoEx = &supporter.pDataBlockInfoEx[numOfQualMeters][k]; @@ -6283,7 +6728,7 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet dTrace("QInfo %p create data blocks info struct completed", addr); - assert(cnt == numOfCompBlocks && numOfQualMeters <= numOfMeters); // the pMeterDataInfo[j]->numOfBlocks may be 0 + assert(cnt == numOfCompBlocks && numOfQualMeters <= numOfMeters); // the pMeterDataInfo[j]->numOfBlocks may be 0 supporter.numOfMeters = numOfQualMeters; SLoserTreeInfo *pTree = NULL; @@ -6306,7 +6751,7 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet if (supporter.blockIndexArray[pos] >= supporter.numOfBlocksPerMeter[pos]) { supporter.blockIndexArray[pos] = supporter.numOfBlocksPerMeter[pos] + 1; } - + tLoserTreeAdjust(pTree, pos + supporter.numOfMeters); } @@ -6606,7 +7051,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete /* current interval query is completed, set the next query range on other data blocks if exist */ int64_t prevEKey = pQuery->ekey; - getAlignedIntervalQueryRange(pQuery, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); + getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); saveIntervalQueryRange(pRuntimeEnv, pInfo); assert(queryCompleted && prevEKey < pQuery->skey); @@ -6657,7 +7102,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete // current interval query is completed, set the next query range on other data blocks if exist int64_t prevEKey = pQuery->ekey; - getAlignedIntervalQueryRange(pQuery, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); + getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->lastKey, pSupporter->rawSKey, pSupporter->rawEKey); saveIntervalQueryRange(pRuntimeEnv, pInfo); assert(queryCompleted && prevEKey > pQuery->skey); @@ -6696,7 +7141,7 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete (nextKey <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); /* still in the same block to query */ - getAlignedIntervalQueryRange(pQuery, nextKey, pSupporter->rawSKey, pSupporter->rawEKey); + getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pSupporter->rawSKey, pSupporter->rawEKey); saveIntervalQueryRange(pRuntimeEnv, pInfo); int32_t newPos = searchFn((char *)pPrimaryCol, pBlockInfo->size, pQuery->skey, pQuery->order.order); @@ -6743,7 +7188,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportO * last query on this block of the meter is done, start next interval on this block * otherwise, keep the previous query range and proceed */ - getAlignedIntervalQueryRange(pQuery, key, pSupporter->rawSKey, pSupporter->rawEKey); + getAlignedIntervalQueryRange(pRuntimeEnv, key, pSupporter->rawSKey, pSupporter->rawEKey); saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); // previous query does not be closed, save the results and close it @@ -6763,7 +7208,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportO return; } - getAlignedIntervalQueryRange(pQuery, pQuery->skey, pSupporter->rawSKey, pSupporter->rawEKey); + getAlignedIntervalQueryRange(pRuntimeEnv, pQuery->skey, pSupporter->rawSKey, pSupporter->rawEKey); saveIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); pMeterQueryInfo->queryRangeSet = 1; } @@ -6833,7 +7278,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus); } - if (pRuntimeEnv->pTSBuf > 0) { + if (pRuntimeEnv->pTSBuf > 0 || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { req |= BLK_DATA_ALL_NEEDED; } } @@ -6870,7 +7315,8 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk dTrace("QInfo:%p fileId:%d, slot:%d, block discarded by per-filter, ", GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->slot); #endif - qTrace("QInfo:%p id:%s slot:%d, data block ignored by pre-filter, fields loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", + qTrace("QInfo:%p id:%s slot:%d, data block ignored by pre-filter, fields loaded, brange:%" PRId64 "-%" PRId64 + ", rows:%d", GET_QINFO_ADDR(pQuery), pMeterObj->meterId, pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); return DISK_DATA_DISCARDED; @@ -6983,8 +7429,8 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0); SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; - qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery), pMeterObj->vnode, - pMeterObj->sid, pMeterObj->meterId, ts, pMeterQueryInfo->numOfRes + 1); + qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery), + pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, ts, pMeterQueryInfo->numOfRes + 1); pData->numOfElems += numOfResult; pMeterQueryInfo->numOfRes += numOfResult; @@ -7023,8 +7469,8 @@ static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) { SQuery *pQuery = pSupporter->runtimeEnv.pQuery; int32_t totalSubset = 0; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - totalSubset = pSupporter->runtimeEnv.usedIndex; + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { + totalSubset = numOfResFromResWindowInfo(&pSupporter->runtimeEnv.swindowResInfo); } else { totalSubset = pSupporter->pSidSet->numOfSubSet; } @@ -7104,8 +7550,7 @@ void copyFromGroupBuf(SQInfo *pQInfo, SOutputRes *result) { SQuery * pQuery = &pQInfo->query; SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; - int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_DESC; - + int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC; int32_t numOfResult = doCopyFromGroupBuf(pSupporter, result, orderType); pQuery->pointsRead += numOfResult; @@ -7326,8 +7771,8 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows); assert(ret == numOfFinalRows); + /* reached the start position of according to offset value, return immediately */ if (pQuery->limit.offset == 0) { - /* reached the start position of according to offset value, return immediately */ return ret; } @@ -7335,18 +7780,18 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage ret -= pQuery->limit.offset; // todo !!!!there exactly number of interpo is not valid. // todo refactor move to the beginning of buffer - if (QUERY_IS_ASC_QUERY(pQuery)) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset, - ret * pQuery->pSelectExpr[i].resBytes); - } - } else { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memmove(pDst[i]->data + (pQuery->pointsToRead - ret) * pQuery->pSelectExpr[i].resBytes, - pDst[i]->data + (pQuery->pointsToRead - ret - pQuery->limit.offset) * pQuery->pSelectExpr[i].resBytes, - ret * pQuery->pSelectExpr[i].resBytes); - } + // if (QUERY_IS_ASC_QUERY(pQuery)) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + memmove(pDst[i]->data, pDst[i]->data + pQuery->pSelectExpr[i].resBytes * pQuery->limit.offset, + ret * pQuery->pSelectExpr[i].resBytes); } + // } else { + // for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + // memmove(pDst[i]->data + (pQuery->pointsToRead - ret) * pQuery->pSelectExpr[i].resBytes, + // pDst[i]->data + (pQuery->pointsToRead - ret - pQuery->limit.offset) * + // pQuery->pSelectExpr[i].resBytes, ret * pQuery->pSelectExpr[i].resBytes); + // } + // } pQuery->limit.offset = 0; return ret; } else { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index b8b8c9ea80..5089b1b79a 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -535,7 +535,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start SPointInterpoSupporter pointInterpSupporter = {0}; pointInterpSupporterInit(pQuery, &pointInterpSupporter); - if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter)) { + if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) { pointInterpSupporterDestroy(&pointInterpSupporter); return 0; } @@ -666,19 +666,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { if (pSupporter->meterIdx >= pSids->numOfSids) { return; } - - for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { - SOutputRes *pOneRes = &pRuntimeEnv->pResult[i]; - clearGroupResultBuf(pOneRes, pQuery->numOfOutputCols); - } - - pRuntimeEnv->usedIndex = 0; - taosCleanUpHashTable(pRuntimeEnv->hashList); - - int32_t primeHashSlot = 10039; - pRuntimeEnv->hashList = taosInitHashTable(primeHashSlot, taosIntHash_32, false); - - while (pSupporter->meterIdx < pSupporter->numOfMeters) { + + resetResWindowInfo(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); + + while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; if (isQueryKilled(pQuery)) { @@ -703,7 +694,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { #endif SPointInterpoSupporter pointInterpSupporter = {0}; - if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter) == false) { + if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL) == false) { pQuery->skey = pSupporter->rawSKey; pQuery->ekey = pSupporter->rawEKey; @@ -778,8 +769,10 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { } if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - for (int32_t i = 0; i < pRuntimeEnv->usedIndex; ++i) { - SOutputRes *buf = &pRuntimeEnv->pResult[i]; + SSlidingWindowResInfo* pWindowResInfo = &pRuntimeEnv->swindowResInfo; + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SOutputRes *buf = &pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes); } @@ -787,14 +780,12 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pQInfo->pMeterQuerySupporter->subgroupIdx = 0; pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pRuntimeEnv->pResult); + copyFromGroupBuf(pQInfo, pWindowResInfo->pResult); } pQInfo->pointsRead += pQuery->pointsRead; pQuery->pointsOffset = pQuery->pointsToRead; - moveDescOrderResultsToFront(pRuntimeEnv); - dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, @@ -982,12 +973,11 @@ static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { pQInfo->pMeterQuerySupporter->subgroupIdx = 0; pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pRuntimeEnv->pResult); + copyFromGroupBuf(pQInfo, pRuntimeEnv->swindowResInfo.pResult); } doSkipResults(pRuntimeEnv); doRevisedResultsByLimit(pQInfo); - moveDescOrderResultsToFront(pRuntimeEnv); pQInfo->pointsRead = pQuery->pointsRead; } @@ -1034,8 +1024,6 @@ static void vnodeSingleMeterMultiOutputProcessor(SQInfo *pQInfo) { } doRevisedResultsByLimit(pQInfo); - moveDescOrderResultsToFront(pRuntimeEnv); - pQInfo->pointsRead += pQuery->pointsRead; if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { @@ -1063,7 +1051,8 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter (pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); initCtxOutputBuf(pRuntimeEnv); - + clearCompletedResWindows(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); + vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { return; @@ -1094,7 +1083,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter } forwardIntervalQueryRange(pSupporter, pRuntimeEnv); - if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_RESBUF_FULL)) { break; } @@ -1133,17 +1122,8 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) { taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->pointsRead, pQuery->interpoType); SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; - if (QUERY_IS_ASC_QUERY(pQuery)) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes); - } - } else { - int32_t size = pMeterObj->pointsPerFileBlock; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - memcpy(pInterpoBuf[i]->data, - pQuery->sdata[i]->data + (size - pQuery->pointsRead) * pQuery->pSelectExpr[i].resBytes, - pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes); - } + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->pointsRead * pQuery->pSelectExpr[i].resBytes); } numOfInterpo = 0; @@ -1160,12 +1140,16 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) { pQuery->pointsRead = 0; } } + + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0)) { + pQInfo->pMeterQuerySupporter->subgroupIdx = 0; + pQuery->pointsRead = 0; + copyFromGroupBuf(pQInfo, pRuntimeEnv->swindowResInfo.pResult); + } pQInfo->pointsRead += pQuery->pointsRead; pQInfo->pointsInterpo += numOfInterpo; -// moveDescOrderResultsToFront(pRuntimeEnv); - dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); @@ -1209,7 +1193,6 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); doRevisedResultsByLimit(pQInfo); - moveDescOrderResultsToFront(pRuntimeEnv); pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsRead += pQuery->pointsRead; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 936fde3a0e..cc6da83671 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -267,6 +267,7 @@ static SQInfo *vnodeAllocateQInfoEx(SQueryMeterMsg *pQueryMsg, SSqlGroupbyExpr * pQuery->pGroupbyExpr = pGroupbyExpr; pQuery->nAggTimeInterval = pQueryMsg->nAggTimeInterval; + pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->interpoType = pQueryMsg->interpoType; pQuery->intervalTimeUnit = pQueryMsg->intervalTimeUnit; @@ -966,6 +967,8 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { pQueryMsg->queryType = htons(pQueryMsg->queryType); pQueryMsg->nAggTimeInterval = htobe64(pQueryMsg->nAggTimeInterval); + pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime); + pQueryMsg->numOfTagsCols = htons(pQueryMsg->numOfTagsCols); pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols); diff --git a/src/util/src/hash.c b/src/util/src/hash.c index a57f04e11d..5068293688 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -423,7 +423,8 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { __wr_lock(&pObj->lock); } - SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, NULL); + uint32_t val = 0; + SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &val); if (pNode == NULL) { if (pObj->multithreadSafe) { __unlock(&pObj->lock); @@ -434,7 +435,12 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) { SHashNode *pNext = pNode->next; if (pNode->prev != NULL) { - pNode->prev->next = pNext; + int32_t slot = HASH_INDEX(val, pObj->capacity); + if (pObj->hashList[slot]->next == pNode) { + pObj->hashList[slot]->next = pNext; + } else { + pNode->prev->next = pNext; + } } if (pNext != NULL) { diff --git a/src/util/src/thashutil.c b/src/util/src/thashutil.c index 6ad41a3e78..94961e356e 100644 --- a/src/util/src/thashutil.c +++ b/src/util/src/thashutil.c @@ -93,6 +93,7 @@ uint32_t taosIntHash_64(const char *key, uint32_t UNUSED_PARAM(len)) { _hash_fn_t taosGetDefaultHashFunction(int32_t type) { _hash_fn_t fn = NULL; switch(type) { + case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: fn = taosIntHash_64;break; case TSDB_DATA_TYPE_BINARY: fn = MurmurHash3_32;break; case TSDB_DATA_TYPE_INT: fn = taosIntHash_32; break; diff --git a/src/util/src/tinterpolation.c b/src/util/src/tinterpolation.c index b17d172607..5df07a5c43 100644 --- a/src/util/src/tinterpolation.c +++ b/src/util/src/tinterpolation.c @@ -37,7 +37,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char * here we revised the start time of day according to the local time zone, * but in case of DST, the start time of one day need to be dynamically decided. * - * TODO dynmaically decide the start time of a day + * TODO dynamically decide the start time of a day */ #if defined(WINDOWS) && _MSC_VER >= 1900 @@ -94,7 +94,7 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD return; } - pInterpoInfo->rowIdx = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1; + pInterpoInfo->rowIdx = 0;//INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1; pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; } @@ -118,14 +118,14 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p if (numOfAvailRawData > 0) { int32_t finalNumOfResult = 0; - if (pInterpoInfo->order == TSQL_SO_ASC) { +// if (pInterpoInfo->order == TSQL_SO_ASC) { // get last timestamp, calculate the result size int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1]; - finalNumOfResult = (int32_t)((lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1; - } else { // todo error less than one!!! - TSKEY lastKey = pPrimaryKeyArray[0]; - finalNumOfResult = (int32_t)((pInterpoInfo->startTimestamp - lastKey) / nInterval) + 1; - } + finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1; +// } else { // todo error less than one!!! +// TSKEY lastKey = pPrimaryKeyArray[0]; +// finalNumOfResult = (int32_t)((pInterpoInfo->startTimestamp - lastKey) / nInterval) + 1; +// } assert(finalNumOfResult >= numOfAvailRawData); return finalNumOfResult; @@ -198,11 +198,11 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi } static char* getPos(char* data, int32_t bytes, int32_t order, int32_t capacity, int32_t index) { - if (order == TSQL_SO_ASC) { +// if (order == TSQL_SO_ASC) { return data + index * bytes; - } else { - return data + (capacity - index - 1) * bytes; - } +// } else { +// return data + (capacity - index - 1) * bytes; +// } } static void setTagsValueInInterpolation(tFilePage** data, char** pTags, tColModel* pModel, int32_t order, int32_t start, @@ -397,7 +397,7 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp } pInterpoInfo->startTimestamp += (nInterval * step); - pInterpoInfo->rowIdx += step; + pInterpoInfo->rowIdx += 1; num += 1; if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || -- GitLab