From 86015bfc258c468bc5cb998f7da0a3e8df62022e Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 27 Feb 2020 02:02:37 +0800 Subject: [PATCH] fix bugs founded in regression test. --- src/system/detail/inc/vnodeQueryImpl.h | 10 +- src/system/detail/src/vnodeQueryImpl.c | 230 ++++++++++------------ src/system/detail/src/vnodeQueryProcess.c | 2 +- 3 files changed, 107 insertions(+), 135 deletions(-) diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 8da88d6781..e3507d5f82 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -164,8 +164,7 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport); void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport); int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position); -int32_t doCloseAllOpenedResults(STableQuerySupportObj* pSupporter); -void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); +void disableFunctForSuppleScan(STableQuerySupportObj* pSupporter, int32_t order); void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter); @@ -238,13 +237,6 @@ void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQ tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo, STableQuerySupportObj* pSupporter); -/** - * save the query range data into SMeterQueryInfo - * @param pRuntimeEnv - * @param pMeterQueryInfo - */ -void saveIntervalQueryRange(SQueryRuntimeEnv* pRuntimeEnv, SMeterQueryInfo* pMeterQueryInfo); - /** * restore the query range data from SMeterQueryInfo to runtime environment * diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 56273d8891..d3fd143121 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -70,19 +70,17 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); static int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv); -static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); -static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); -static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); +static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); +static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); +static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow); static int32_t getGroupResultId(int32_t groupIndex) { int32_t base = 200000; return base + (groupIndex * 10000); } -static FORCE_INLINE bool isIntervalQuery(SQuery* pQuery) { - return pQuery->intervalTime > 0; -} +static FORCE_INLINE bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, @@ -1625,51 +1623,51 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, } // query completed - if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeAllTimeWindow(pWindowResInfo); - - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); - } else { // set the current index to be the last unclosed window - int32_t i = 0; - int64_t skey = 0; + if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + closeAllTimeWindow(pWindowResInfo); - for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = &pWindowResInfo->pResult[i]; - if (pResult->status.closed) { - continue; - } + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); + } else { // set the current index to be the last unclosed window + int32_t i = 0; + int64_t skey = 0; - if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { - closeTimeWindow(pWindowResInfo, i); - } else { - skey = pResult->window.skey; - break; - } + for (i = 0; i < pWindowResInfo->size; ++i) { + SWindowResult *pResult = &pWindowResInfo->pResult[i]; + if (pResult->status.closed) { + continue; } - // all windows are closed, set the last one to be the skey - if (skey == 0) { - assert(i == pWindowResInfo->size); - pWindowResInfo->curIndex = pWindowResInfo->size - 1; + if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || + (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { + closeTimeWindow(pWindowResInfo, i); } else { - pWindowResInfo->curIndex = i; + skey = pResult->window.skey; + break; } + } - pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey; + // all windows are closed, set the last one to be the skey + if (skey == 0) { + assert(i == pWindowResInfo->size); + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + } else { + pWindowResInfo->curIndex = i; + } - // the number of completed slots are larger than the threshold, dump to client immediately. - int32_t n = numOfClosedTimeWindow(pWindowResInfo); - if (n > pWindowResInfo->threshold) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - } + pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey; - dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n); + // the number of completed slots are larger than the threshold, dump to client immediately. + int32_t n = numOfClosedTimeWindow(pWindowResInfo); + if (n > pWindowResInfo->threshold) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); } - assert(pWindowResInfo->prevSKey != 0); + dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n); + } + + assert(pWindowResInfo->prevSKey != 0); } static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos, @@ -1797,7 +1795,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow } } -static TSKEY reviseWindowEkey(SQuery* pQuery, STimeWindow* pWindow) { +static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) { TSKEY ekey = -1; if (QUERY_IS_ASC_QUERY(pQuery)) { ekey = pWindow->ekey; @@ -1810,7 +1808,7 @@ static TSKEY reviseWindowEkey(SQuery* pQuery, STimeWindow* pWindow) { ekey = pQuery->ekey; } } - + return ekey; } @@ -2142,11 +2140,11 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat if (isNull(pData, type)) { // ignore the null value return -1; } - + int32_t GROUPRESULTID = 1; - + SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); if (pWindowRes == NULL) { return -1; @@ -2159,7 +2157,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat return -1; } } - + setWindowResOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); return TSDB_CODE_SUCCESS; @@ -2384,10 +2382,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * continue; } } - + // update the lastKey lastKey = primaryKeyCol[offset]; - + // all startOffset are identical offset -= pCtx[0].startOffset; @@ -2400,8 +2398,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * } if (pRuntimeEnv->pTSBuf != NULL) { - - // if timestamp filter list is empty, quit current query + // if timestamp filter list is empty, quit current query if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); break; @@ -2506,7 +2503,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI } TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; - doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); //todo refactor merge + doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); // todo refactor merge // interval query with limit applied if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 && @@ -2663,9 +2660,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE); - } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || - functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || - (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { + } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || + functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* * leastsquares function needs two columns of input, currently, the x value of linear equation is set to * timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer @@ -2961,8 +2957,8 @@ bool isSumAvgRateQuery(SQuery *pQuery) { continue; } - if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || - functionId == TSDB_FUNC_AVG_RATE || functionId == TSDB_FUNC_AVG_IRATE) { + if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || functionId == TSDB_FUNC_AVG_RATE || + functionId == TSDB_FUNC_AVG_IRATE) { return true; } } @@ -4637,13 +4633,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery } else { // find the skey and ekey in case of sliding query if (isIntervalQuery(pQuery)) { STimeWindow win = {0}; - + // find the minimum value for descending order query TSKEY minKey = -1; if (!QUERY_IS_ASC_QUERY(pQuery)) { minKey = getGreaterEqualTimestamp(pRuntimeEnv); } - + int64_t skey = 0; if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || @@ -4654,7 +4650,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery pointInterpSupporterDestroy(&interpInfo); return TSDB_CODE_SUCCESS; } - + if (!QUERY_IS_ASC_QUERY(pQuery)) { win.skey = minKey; win.ekey = skey; @@ -5853,51 +5849,62 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3 pMeterDataInfo->meterOrderIdx = meterIdx; } -void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && isIntervalQuery(pQuery))) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; +static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) { + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i); + if (!pStatus->closed) { + continue; } - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + SWindowResult *buf = getWindowResult(pWindowResInfo, i); - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i); - if (!pStatus->closed) { - continue; + // open/close the specified query for each group result + for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) { + buf->resultInfo[j].complete = false; + } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { + buf->resultInfo[j].complete = true; } + } + } +} - SWindowResult *buf = getWindowResult(pWindowResInfo, i); +void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { + SQuery *pQuery = pRuntimeEnv->pQuery; + assert(!pRuntimeEnv->stableQuery); - // open/close the specified query for each group result - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + // group by normal columns and interval query on normal table + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; + } - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) { - buf->resultInfo[j].complete = false; - } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { - buf->resultInfo[j].complete = true; - } - } + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); + + pQuery->order.order = pQuery->order.order ^ 1; +} + +void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order) { + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; + SQuery * pQuery = pRuntimeEnv->pQuery; + + if (isIntervalQuery(pQuery)) { + for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { + SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; + SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo; + + doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } - } else { // TODO ERROR!! - // need to handle for each query result, not just the single runtime ctx. + } else { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; - int32_t functId = pQuery->pSelectExpr[i].pBase.functionId; - - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) { - pResInfo->complete = false; - - } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { - pResInfo->complete = true; - } } + + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } pQuery->order.order = pQuery->order.order ^ 1; @@ -6158,7 +6165,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { (!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey)); // close necessary function execution during supplementary scan - disableFunctForSuppleScan(pRuntimeEnv, pQuery->order.order); + disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order); queryStatusSave(pRuntimeEnv, &qStatus); doScanAllDataBlocks(pRuntimeEnv); @@ -6239,8 +6246,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { * round scan all data blocks. */ TSKEY key = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos); - assert((QUERY_IS_ASC_QUERY(pQuery) && key >= pQuery->skey) || - (!QUERY_IS_ASC_QUERY(pQuery) && key <= pQuery->skey)); + assert((QUERY_IS_ASC_QUERY(pQuery) && key >= pQuery->skey) || (!QUERY_IS_ASC_QUERY(pQuery) && key <= pQuery->skey)); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pQuery->lastKey = pQuery->skey; @@ -6525,34 +6531,8 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQ SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY); pMeterQueryInfo->lastKey = pMeterQueryInfo->skey; - // pMeterQueryInfo->queryRangeSet = 0; pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1; pMeterQueryInfo->cur.vnodeIndex = -1; - - // previous does not generate any results - // SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); - // - // if (list.size == 0) { - // pMeterQueryInfo->reverseFillRes = 0; - // } else { - // pMeterQueryInfo->reverseIndex = pMeterQueryInfo->numOfRes; - // pMeterQueryInfo->reverseFillRes = 1; - // } -} - -void saveIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - // pMeterQueryInfo->skey = pQuery->skey; - // pMeterQueryInfo->ekey = pQuery->ekey; - // pMeterQueryInfo->lastKey = pQuery->lastKey; - - assert(((pQuery->lastKey >= pQuery->skey) && QUERY_IS_ASC_QUERY(pQuery)) || - ((pQuery->lastKey <= pQuery->skey) && !QUERY_IS_ASC_QUERY(pQuery))); - - if (pRuntimeEnv->pTSBuf != NULL) { - pMeterQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); - } } void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) { @@ -7052,8 +7032,8 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO SWindowResInfo *pWindowResInfo = &pMeterQueryInfo->windowResInfo; doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); - pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp -// assert(pWindowResInfo->startTime > 0); + pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp + // assert(pWindowResInfo->startTime > 0); if (pWindowResInfo->prevSKey == 0) { if (QUERY_IS_ASC_QUERY(pQuery)) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 16c37e0580..4caf79f6c9 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -869,7 +869,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) { } SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - disableFunctForSuppleScan(pRuntimeEnv, pQuery->order.order); + disableFunctForSuppleScan(pSupporter, pQuery->order.order); if (pRuntimeEnv->pTSBuf != NULL) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; -- GitLab