diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 866569fdd61e7c8e81d0b33ab12e7ba6bd615b30..5414d4a41280e0c7c46b90d40a537474c5d7ae41 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -584,9 +584,14 @@ static bool isTopBottomQuery(SQueryInfo* pQueryInfo) { int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { const char* msg1 = "invalid query expression"; const char* msg2 = "interval cannot be less than 10 ms"; - + const char* msg3 = "interval required"; + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + if (pQuerySql->interval.type == 0 && pQuerySql->sliding.type != 0) { + return invalidSqlErrMsg(pQueryInfo->msg, msg3); + } + if (pQuerySql->interval.type == 0 || pQuerySql->interval.n == 0) { return TSDB_CODE_SUCCESS; } diff --git a/src/inc/tresultBuf.h b/src/inc/tresultBuf.h index b99c44e73fcbd834152640b80f8a59728c7cfd8f..90000e15e639cc64156f387b5c9d56653ee5a0ec 100644 --- a/src/inc/tresultBuf.h +++ b/src/inc/tresultBuf.h @@ -88,7 +88,7 @@ int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf); * destroy result buffer * @param pResultBuf */ -void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf); +void destroyDiskbasedResultBuf(SQueryDiskbasedResultBuf* pResultBuf); /** * diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index e3507d5f82e8c156ffdc5a3babae8ea5af079398..c26778b32866ee3252e1c5456a95b565d1cd2e44 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -277,9 +277,9 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* dst, con int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t size, int32_t threshold, int16_t type); -void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv); +void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, int32_t numOfCols); void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); -void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); +void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index f12db0971e434a75e3c8094b3f835daf15d12104..14cb3ca80bb3c16c233537a978099d25970d822a 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -585,7 +585,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t StartQue void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo); -static void destroyTimeWindowRes(SWindowResult *pOneOutputRes, int32_t nOutputCols); +static void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols); static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { int32_t firstSlot = 0; @@ -1740,7 +1740,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].size = forwardStep; pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? startPos : startPos - (forwardStep - 1); - + pCtx[k].ptsList = (TSKEY *)((char*)pRuntimeEnv->primaryColBuffer->data + pCtx[k].startOffset * TSDB_KEYSIZE); + int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); @@ -1785,7 +1786,19 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow return -1; } - TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? pNextWin->skey : pNextWin->ekey; + TSKEY startKey = -1; + if (QUERY_IS_ASC_QUERY(pQuery)) { + startKey = pNextWin->skey; + if (startKey < pQuery->skey) { + startKey = pQuery->skey; + } + } else { + startKey = pNextWin->ekey; + if (startKey > pQuery->skey) { + startKey = pQuery->skey; + } + } + int32_t startPos = searchFn((char *)primaryKeys, pBlockInfo->size, startKey, pQuery->order.order); /* @@ -2008,10 +2021,6 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t threshold, int16_t type) { - if (size < threshold) { - size = threshold; - } - pWindowResInfo->capacity = size; pWindowResInfo->threshold = threshold; @@ -2025,7 +2034,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun // use the pointer arraylist pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult)); - for (int32_t i = 0; i < threshold; ++i) { + for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { SPosInfo posInfo = {-1, -1}; createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &posInfo); } @@ -2033,15 +2042,15 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun return TSDB_CODE_SUCCESS; } -void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv) { +void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) { if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); return; } - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; - destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols); + destroyTimeWindowRes(pResult, numOfCols); } taosCleanUpHashTable(pWindowResInfo->hashList); @@ -2846,16 +2855,18 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { return; } - dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pRuntimeEnv->pQuery)); - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfCols; ++i) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + dTrace("QInfo:%p teardown runtime env", GET_QINFO_ADDR(pQuery)); + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { tfree(pRuntimeEnv->colDataBuffer[i]); } tfree(pRuntimeEnv->secondaryUnzipBuffer); - cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv); + cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutputCols); if (pRuntimeEnv->pCtx != NULL) { - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -2873,7 +2884,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->unzipBuffer); - if (pRuntimeEnv->pQuery && (!PRIMARY_TSCOL_LOADED(pRuntimeEnv->pQuery))) { + if (pQuery && (!PRIMARY_TSCOL_LOADED(pQuery))) { tfree(pRuntimeEnv->primaryColBuffer); } @@ -2887,13 +2898,14 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo); if (pRuntimeEnv->pInterpoBuf != NULL) { - for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { tfree(pRuntimeEnv->pInterpoBuf[i]); } tfree(pRuntimeEnv->pInterpoBuf); } - + + destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -4504,19 +4516,19 @@ static void allocMemForInterpo(STableQuerySupportObj *pSupporter, SQuery *pQuery static int32_t getInitialPageNum(STableQuerySupportObj *pSupporter) { SQuery *pQuery = pSupporter->runtimeEnv.pQuery; - + int32_t INITIAL_RESULT_ROWS_VALUE = 16; + int32_t num = 0; if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { num = 128; } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table - num = pSupporter->numOfMeters; + num = MAX(pSupporter->numOfMeters, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset num = pSupporter->pSidSet->numOfSubSet; } assert(num > 0); - return num; } @@ -4816,20 +4828,6 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { pSupporter->pMetersHashTable = NULL; } - if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || - isIntervalQuery(pQuery)) { - int32_t size = 0; - if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || isIntervalQuery(pQuery)) { - size = 10000; - } else if (pSupporter->pSidSet != NULL) { - size = pSupporter->pSidSet->numOfSubSet; - } - - for (int32_t i = 0; i < size; ++i) { - // destroyTimeWindowRes(&pSupporter->pResult[i], pQInfo->query.numOfOutputCols); - } - } - tSidSetDestroy(&pSupporter->pSidSet); if (pSupporter->pMeterDataInfo != NULL) { @@ -5874,10 +5872,13 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery tfree(pTree); tfree(pTableList); tfree(posList); - tfree(pResultInfo); pSupporter->offset = 0; - + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + tfree(pResultInfo[i].interResultBuf); + } + + tfree(pResultInfo); return pSupporter->numOfGroupResultPages; } @@ -6345,6 +6346,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { int64_t skey = pQuery->lastKey; int32_t status = pQuery->over; + int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex; SET_MASTER_SCAN_FLAG(pRuntimeEnv); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -6370,6 +6372,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { status = pQuery->over; pQuery->ekey = pQuery->lastKey - step; pQuery->lastKey = pQuery->skey; + pRuntimeEnv->windowResInfo.curIndex = activeSlot; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; @@ -6629,13 +6632,8 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols) if (pMeterQueryInfo == NULL) { return; } - - // free(pMeterQueryInfo->pageList); - // for (int32_t i = 0; i < numOfCols; ++i) { - // tfree(pMeterQueryInfo->[i].interResultBuf); - // } - - // free(pMeterQueryInfo->resultInfo); + + cleanupTimeWindowInfo(&pMeterQueryInfo->windowResInfo, numOfCols); free(pMeterQueryInfo); } diff --git a/src/util/src/tresultBuf.c b/src/util/src/tresultBuf.c index a7377f16575147934f68148adb2d16126288ffc9..8b719ee5444aab9723788718aba25a92c1ce9efc 100644 --- a/src/util/src/tresultBuf.c +++ b/src/util/src/tresultBuf.c @@ -189,7 +189,7 @@ SIDList getDataBufPagesIdList(SQueryDiskbasedResultBuf* pResultBuf, int32_t grou } } -void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) { +void destroyDiskbasedResultBuf(SQueryDiskbasedResultBuf* pResultBuf) { if (pResultBuf == NULL) { return; }