diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 4ad6e8e78b053e97734783f821d974104b48f479..cb59d8597a35594ed150d633ac93141b5aee4ff2 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -228,7 +228,7 @@ void destroyMeterQueryInfo(SMeterQueryInfo* pMeterQueryInfo, int32_t numOfCols); * @param skey * @param ekey */ -void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf* pResultBuf, SMeterQueryInfo* pMeterQueryInfo, +void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo, TSKEY skey, TSKEY ekey); /** diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 42ae7ab33adf570ef07af4be3b2c7646b7456e8d..db47065a36e2eaf9844f5ed0a3a06abc7e95d813 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1445,9 +1445,15 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa } static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) { + assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); return (pWindowResInfo->pResult[slot].status.closed == true); } +static SWindowResult* getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) { + assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); + return &pWindowResInfo->pResult[slot]; +} + static int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) { assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size); return pWindowResInfo->curIndex; @@ -1516,8 +1522,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.ekey = w.skey + pQuery->intervalTime - 1; } - assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); - // query border check if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) { w.ekey = pQuery->ekey; @@ -1526,6 +1530,11 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t w.skey = pQuery->ekey; } + assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); + if (w.skey == 1542597000000) { + int32_t k = 1; + } + return w; } @@ -1559,6 +1568,9 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR return -1; } + if (pageId == 153 && pData->numOfElems >= 138) { + int32_t k = 1; + } // set the number of rows in current disk page if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer pWindowRes->pos.pageId = pageId; @@ -1686,7 +1698,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, assert(ekey < pPrimaryColumn[startPos]); } else { if (updateLastKey) { - pQuery->lastKey = MAX(ekey, pPrimaryColumn[startPos + (num - 1)]) + step; + pQuery->lastKey = pPrimaryColumn[startPos + (num - 1)] + step; } } } else { @@ -1702,7 +1714,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, assert(ekey > pPrimaryColumn[startPos]); } else { if (updateLastKey) { - pQuery->lastKey = MIN(ekey, pPrimaryColumn[startPos - (num - 1)]) + step; + pQuery->lastKey = pPrimaryColumn[startPos - (num - 1)] + step; } } } else { @@ -1712,6 +1724,10 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, } } } + + if (pQuery->lastKey == 1542597000001) { + int32_t k = 1; + } assert(num >= 0); return num; @@ -1849,12 +1865,20 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t TSKEY ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); + if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) { + int32_t k = 1; + } + + if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && !IS_MASTER_SCAN(pRuntimeEnv)) { + int32_t k = 1; + } + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } - + TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey; - forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false); + forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep); @@ -1868,7 +1892,11 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t if (startPos < 0) { break; } - + + if (nextWin.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) { + int32_t k = 1; + } + // null data, failed to allocate more memory buffer int32_t sid = pRuntimeEnv->pMeterObj->sid; if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) { @@ -1876,7 +1904,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } ekey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.ekey : nextWin.skey; - forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, false); + forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep); @@ -5486,7 +5514,7 @@ static void printBinaryData(int32_t functionId, char *data, int32_t srcDataType) void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOfRows) { int32_t numOfCols = pQuery->numOfOutputCols; - printf("metric query intern-result, total:%d\n", numOfRows); + printf("super table query intermediate result, total:%d\n", numOfRows); SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery)); SMeterObj *pMeterObj = pQInfo->pObj; @@ -5720,6 +5748,8 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery tFilePage * pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); int64_t ts = getCurrentTimestamp(&cs, pos); + assert(ts > 0); + if (ts == lastTimestamp) { // merge with the last one doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true); } else { @@ -6559,15 +6589,25 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols) free(pMeterQueryInfo); } -void changeMeterQueryInfoForSuppleQuery(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo, +void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey) { if (pMeterQueryInfo == NULL) { return; } + //order has change already! + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + if (!QUERY_IS_ASC_QUERY(pQuery)) { + assert(pMeterQueryInfo->ekey >= pMeterQueryInfo->lastKey + step); + } else { + assert(pMeterQueryInfo->ekey <= pMeterQueryInfo->lastKey + step); + } + + pMeterQueryInfo->ekey = pMeterQueryInfo->lastKey + step; + SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY); pMeterQueryInfo->lastKey = pMeterQueryInfo->skey; - + // pMeterQueryInfo->queryRangeSet = 0; pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1; pMeterQueryInfo->cur.vnodeIndex = -1; @@ -7524,15 +7564,20 @@ void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo if (pQuery->intervalTime == 0) { // todo refactor SWindowResInfo *p1 = &pRuntimeEnv->windowResInfo; for (int32_t i = 0; i < p1->size; ++i) { - p1->pResult[i].numOfRows = p1->pResult->resultInfo->numOfRes; + SWindowResult* pResult = getWindowResult(p1, i); + if (isWindowResClosed(p1, i) && pResult->numOfRows == 0) { + pResult->numOfRows = getNumOfResult(pRuntimeEnv); + } } } - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if ((pQuery->lastKey > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || - (pQuery->lastKey < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { - pMeterQueryInfo->ekey = pQuery->lastKey - step; - } + // get the true maximum timestamp within the query range to set the correct time window + // in the supplementary query +// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); +// if ((pQuery->lastKey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || +// (pQuery->lastKey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { +// pMeterQueryInfo->ekey = pQuery->lastKey - step; +// } updatelastkey(pQuery, pMeterQueryInfo); } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index b937cffbbb8ee662cf126a19d56c30e5828e7899..03aae6f5d2332085ef83496618cfecf536380dce 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -849,10 +849,11 @@ static void doOrderedScan(SQInfo *pQInfo) { } static void setupMeterQueryInfoForSupplementQuery(STableQuerySupportObj *pSupporter) { + SQuery* pQuery = pSupporter->runtimeEnv.pQuery; + for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { - SMeterQueryInfo * pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; - SQueryDiskbasedResultBuf *pResultBuf = pSupporter->runtimeEnv.pResultBuf; - changeMeterQueryInfoForSuppleQuery(pResultBuf, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); + SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; + changeMeterQueryInfoForSuppleQuery(pQuery, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey); } }