diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 6b7049c18196375218b772f43db816455b6b1b47..71318ca7971c8343e41866275660ac81e8dfda0f 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -279,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); +SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index); #ifdef __cplusplus } diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 7d64abf052f8276aa015e16e4ee429bc8fb66b02..aa75758b413b471533457df31ab5998843140dd9 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1529,14 +1529,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.ekey = w.skey + pQuery->intervalTime - 1; } - /* - * query border check, skey should not be bounded by the query time range, since the value skey will - * be used as the time window index value. So we only change ekey of time window accordingly. - */ - if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) { - w.ekey = pQuery->ekey; - } - assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); return w; @@ -2055,36 +2047,37 @@ static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, S SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) { SQuery* pQuery = pRuntimeEnv->pQuery; - TSKEY* primaryKeyCol = (TSKEY*) pRuntimeEnv->primaryColBuffer->data; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (pRuntimeEnv->interpoSearch) { - int32_t s = startPos; - int32_t e = forwardStep * step + startPos - step; - - if (!QUERY_IS_ASC_QUERY(pQuery)) { - SWAP(s, e, int32_t); + if (!pRuntimeEnv->interpoSearch) { + return; + } + + int32_t s = startPos; + int32_t e = forwardStep * step + startPos - step; + + if (!QUERY_IS_ASC_QUERY(pQuery)) { + SWAP(s, e, int32_t); + } + + // interpolate for skey value + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) { + continue; } - for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); + } - interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); + // interpolate for ekey value + for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) { + continue; } - // the first time window, do not employ the interpolation - if (primaryKeyCol[s] == pWindowResInfo->startTime) { - for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].prev.key = -1; - } - } else { - for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; - - interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); - } - } + SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; + interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); } } @@ -2105,6 +2098,13 @@ static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowRe break; } + // do not check for the closed time window + SWindowResult* pWindowRes = getWindowRes(pWindowResInfo, slot); + if (pWindowRes->status.closed) { + slot += 1; + continue; + } + // if current active window locates before current data block, do interpolate the result and close it assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || (w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); @@ -2112,10 +2112,10 @@ static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowRe int32_t forwardStep = 0; doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); - SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, slot); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); - closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo)); + closeTimeWindow(pWindowResInfo, slot); // try next time window slot += 1; @@ -2168,14 +2168,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, &sasArray[k]); } - - // save the last row in current data block - for(int32_t i = 0; i < pQuery->numOfCols; ++i) { - SColumnInfo* pColInfo = &pQuery->colList[i].data; - int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0; - - memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes); - } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (isIntervalQuery(pQuery)) { @@ -2248,6 +2240,14 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t if (!isIntervalQuery(pQuery)) { num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } + + // save the last row in current data block + for(int32_t i = 0; i < pQuery->numOfCols; ++i) { + SColumnInfo* pColInfo = &pQuery->colList[i].data; + int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0; + + memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes); + } tfree(sasArray); return (int32_t)num; @@ -2458,6 +2458,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { } } +SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index) { + assert(index < pWindowResInfo->size); + return &pWindowResInfo->pResult[index]; +} + /* * remove the results that are not the FIRST time window that spreads beyond the * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time @@ -3276,6 +3281,12 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->pInterpoBuf); } + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + tfree(pRuntimeEnv->lastRowInBlock[i]); + } + + tfree(pRuntimeEnv->lastRowInBlock); + destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -5220,6 +5231,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pSupporter->rawEKey = pQuery->ekey; pSupporter->rawSKey = pQuery->skey; pQuery->lastKey = pQuery->skey; + pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery); // create runtime environment SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; @@ -7501,7 +7513,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp - // assert(pWindowResInfo->startTime > 0); if (pWindowResInfo->prevSKey == 0) { if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -7806,6 +7817,8 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo); updatelastkey(pQuery, pMeterQueryInfo); + + doCheckQueryCompleted(pRuntimeEnv, pMeterQueryInfo->lastKey, pWindowResInfo); } // we need to split the refstatsult into different packages.