From 54da5e5f7ec3ab3c56e2adf9e5f47d651eff4137 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 22 Feb 2020 00:16:44 +0800 Subject: [PATCH] fix bugs found in regression test. --- src/client/src/tscSQLParser.c | 3 +- src/system/detail/src/vnodeQueryImpl.c | 550 ++++++++++------------ src/system/detail/src/vnodeQueryProcess.c | 26 +- src/util/src/tinterpolation.c | 132 +++--- 4 files changed, 315 insertions(+), 396 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e14b6ee6de..be3662ff0e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3859,10 +3859,11 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) { for (int32_t i = numOfFillVal; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { TAOS_FIELD* pFields = tscFieldInfoGetField(pQueryInfo, i); - tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type); if (pFields->type == TSDB_DATA_TYPE_BINARY || pFields->type == TSDB_DATA_TYPE_NCHAR) { setNull((char*)(&pQueryInfo->defaultVal[i]), pFields->type, pFields->bytes); + } else { + tVariantDump(&lastItem->pVar, (char*)&pQueryInfo->defaultVal[i], pFields->type); } } } diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9cfa313ca7..0d6ba7ef39 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -67,10 +67,9 @@ static int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); -static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); +static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo); static int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv); -// static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); @@ -81,6 +80,10 @@ static int32_t getGroupResultId(int32_t groupIndex) { return base + (groupIndex * 10000); } +static FORCE_INLINE bool isIntervalQuery(SQuery* pQuery) { + return pQuery->intervalTime > 0; +} + // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, int32_t size) { @@ -244,7 +247,7 @@ static void vnodeInitLoadCompBlockInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { } static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, - bool loadPrimaryTS) { + bool loadTS) { SQuery * pQuery = pRuntimeEnv->pQuery; SLoadDataBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; @@ -252,7 +255,7 @@ static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj * if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 && pLoadInfo->sid == pMeterObj->sid && pLoadInfo->fileListIndex == fileIndex) { // previous load operation does not load the primary timestamp column, we only need to load the timestamp column - if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) { + if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadTS) { return DISK_BLOCK_LOAD_TS; } else { return DISK_BLOCK_NO_NEED_TO_LOAD; @@ -917,7 +920,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR int32_t ret = 0; /* the first round always be 1, the secondary round is determined by queried function */ - int32_t round = pRuntimeEnv->scanFlag; + int32_t round = (IS_MASTER_SCAN(pRuntimeEnv)) ? 0 : 1; while (j < pBlock->numOfCols && i < pQuery->numOfCols) { if ((*pField)[j].colId < pQuery->colList[i].data.colId) { @@ -1005,16 +1008,6 @@ SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_ return blockInfo; } -// static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQuery *pQuery) { -// if ((QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyFirst > pQuery->ekey) || -// (!QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyLast < pQuery->ekey)) { -// setQueryStatus(pQuery, QUERY_COMPLETED); -// return false; -// } -// -// return true; -//} - /** * * @param pQuery @@ -1108,6 +1101,7 @@ SCacheBlock *getCacheDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntimeE return NULL; } + vnodeFreeFields(pQuery); getBasicCacheInfoSnapshot(pQuery, pCacheInfo, pMeterObj->vnode); SCacheBlock *pBlock = pCacheInfo->cacheBlocks[slot]; @@ -1621,10 +1615,16 @@ static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t s return forwardStep; } +/** + * NOTE: the query status only set for the first scan of master scan. + */ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; + if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!isIntervalQuery(pQuery))) { + return; + } - if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { // query completed + // query completed if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { closeAllTimeWindow(pWindowResInfo); @@ -1670,7 +1670,6 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, } assert(pWindowResInfo->prevSKey != 0); - } } static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos, @@ -1798,6 +1797,23 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow } } +static TSKEY reviseWindowEkey(SQuery* pQuery, STimeWindow* pWindow) { + TSKEY ekey = -1; + if (QUERY_IS_ASC_QUERY(pQuery)) { + ekey = pWindow->ekey; + if (ekey > pQuery->ekey) { + ekey = pQuery->ekey; + } + } else { + ekey = pWindow->skey; + if (ekey < pQuery->ekey) { + ekey = pQuery->ekey; + } + } + + return ekey; +} + /** * * @param pRuntimeEnv @@ -1847,7 +1863,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { + if (isIntervalQuery(pQuery) && pQuery->slidingTime > 0) { int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = primaryKeyCol[offset]; @@ -1856,19 +1872,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t return 0; } - TSKEY ekey = -1; - if (QUERY_IS_ASC_QUERY(pQuery)) { - ekey = win.ekey; - if (ekey > pQuery->ekey) { - ekey = pQuery->ekey; - } - } else { - ekey = win.skey; - if (ekey < pQuery->ekey) { - ekey = pQuery->ekey; - } - } - + TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); @@ -1890,19 +1894,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t break; } - ekey = -1; - if (QUERY_IS_ASC_QUERY(pQuery)) { - ekey = nextWin.ekey; - if (ekey > pQuery->ekey) { - ekey = pQuery->ekey; - } - } else { - ekey = nextWin.skey; - if (ekey < pQuery->ekey) { - ekey = pQuery->ekey; - } - } - + ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true); pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); @@ -1929,7 +1921,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * because the results of group by normal column is put into intermediate buffer. */ int32_t num = 0; - if (pQuery->intervalTime == 0) { + if (!isIntervalQuery(pQuery)) { num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } @@ -2103,7 +2095,8 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); - int32_t v = (*p - remain); + int32_t v = (*p - num); + assert(v >= 0 && v <= pWindowResInfo->size); // todo add the update function for hash table taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); @@ -2238,7 +2231,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx } // in the supplementary scan, only the following functions need to be executed - if (!IS_MASTER_SCAN(pRuntimeEnv) && + if (IS_SUPPLEMENT_SCAN(pRuntimeEnv) && !(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) { return false; @@ -2327,8 +2320,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * continue; } - // sliding window query - if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) { + // interval window query + if (isIntervalQuery(pQuery)) { // decide the time window according to the primary timestamp int64_t ts = primaryKeyCol[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); @@ -2380,7 +2373,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * continue; } } - + + // update the lastKey + lastKey = primaryKeyCol[offset]; + // all startOffset are identical offset -= pCtx[0].startOffset; @@ -2393,7 +2389,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * } if (pRuntimeEnv->pTSBuf != NULL) { - // if timestamp filter list is empty, quit current query + + // if timestamp filter list is empty, quit current query if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); break; @@ -2418,7 +2415,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * * because the results of group by normal column is put into intermediate buffer. */ int32_t num = 0; - if (!groupbyStateValue && !(pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + if (!groupbyStateValue && !(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } @@ -2470,10 +2467,12 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY !QUERY_IS_ASC_QUERY(pQuery))); } -static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, - int64_t *pPrimaryColumn, SField *pFields, __block_search_fn_t searchFn, - int32_t *numOfRes, SWindowResInfo *pWindowResInfo) { +static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, SField *pFields, + __block_search_fn_t searchFn, int32_t *numOfRes, + SWindowResInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; + TSKEY * pPrimaryColumn = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; + validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -2499,8 +2498,9 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); // interval query with limit applied - if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 && - pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) { + if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 && + (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosedTimeWindow(pWindowResInfo) && + pRuntimeEnv->scanFlag == MASTER_SCAN) { setQueryStatus(pQuery, QUERY_COMPLETED); } @@ -2723,6 +2723,12 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { } } +static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interResBytes, isStableQuery); + } +} + static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) { dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pQuery)); @@ -2791,12 +2797,11 @@ static int32_t setupQueryRuntimeEnv(SMeterObj *pMeterObj, SQuery *pQuery, SQuery if (i > 0) { pRuntimeEnv->offset[i] = pRuntimeEnv->offset[i - 1] + pRuntimeEnv->pCtx[i - 1].outputBytes; } - - // set the intermediate result output buffer - SResultInfo *pResInfo = &pRuntimeEnv->resultInfo[i]; - setResultInfoBuf(pResInfo, pQuery->pSelectExpr[i].interResBytes, isSTableQuery); } + // set the intermediate result output buffer + setWindowResultInfo(pRuntimeEnv->resultInfo, pQuery, isSTableQuery); + // if it is group by normal column, do not set output buffer, the output buffer is pResult if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isSTableQuery) { resetCtxOutputBuf(pRuntimeEnv); @@ -4074,7 +4079,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj * * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is * not valid. otherwise, we only forward pQuery->limit.offset number of points */ - if (pQuery->intervalTime > 0) { + if (isIntervalQuery(pQuery)) { int16_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pMeterObj->searchAlgorithm]; SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo; @@ -4375,9 +4380,9 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter *pPointInterpSupport) { static void allocMemForInterpo(STableQuerySupportObj *pSupporter, SQuery *pQuery, SMeterObj *pMeterObj) { if (pQuery->interpoType != TSDB_INTERPO_NONE) { - assert(pQuery->intervalTime > 0 || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery))); + assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery))); - if (pQuery->intervalTime > 0) { + if (isIntervalQuery(pQuery)) { pSupporter->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutputCols); for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -4395,7 +4400,7 @@ static int32_t getInitialPageNum(STableQuerySupportObj *pSupporter) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { num = 128; - } else if (pQuery->intervalTime > 0) { // time window query, allocate one page for each table + } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table num = pSupporter->numOfMeters; } else { // for super table query, one page for each subset num = pSupporter->pSidSet->numOfSubSet; @@ -4560,7 +4565,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery vnodeRecordAllFiles(pQInfo, pMeterObj->vnode); pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false); - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { int32_t rows = getInitialPageNum(pSupporter); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); @@ -4602,16 +4607,15 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery return TSDB_CODE_SUCCESS; } } else { // find the skey and ekey in case of sliding query - if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) { + if (isIntervalQuery(pQuery)) { STimeWindow win = {0}; - // getActualRange(pSupporter, &win); - // - // // there is no qualified data with respect to the primary timestamp - // if (win.skey > win.ekey) { - // sem_post(&pQInfo->dataReady); - // pQInfo->over = 1; - // return TSDB_CODE_SUCCESS; - // } + + // find the minimum value for descending order query + TSKEY minKey = -1; + if (!QUERY_IS_ASC_QUERY(pQuery)) { + minKey = getGreaterEqualTimestamp(pRuntimeEnv); + } + int64_t skey = 0; if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) || (isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) || @@ -4622,9 +4626,9 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery pointInterpSupporterDestroy(&interpInfo); return TSDB_CODE_SUCCESS; } - + if (!QUERY_IS_ASC_QUERY(pQuery)) { - win.skey = getGreaterEqualTimestamp(pRuntimeEnv); + win.skey = minKey; win.ekey = skey; } else { win.skey = skey; @@ -4704,9 +4708,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || - (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { int32_t size = 0; - if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { size = 10000; } else if (pSupporter->pSidSet != NULL) { size = pSupporter->pSidSet->numOfSubSet; @@ -5139,31 +5143,39 @@ static int32_t moveToNextBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t step, __bl return DISK_DATA_LOADED; } -static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pblockInfo, __block_search_fn_t searchFn, - int32_t *numOfRes, int32_t blockLoadStatus, int32_t *forwardStep) { +static int32_t doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, + __block_search_fn_t searchFn, int32_t blockLoadStatus, int32_t *forwardStep) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryCostSummary *pSummary = &pRuntimeEnv->summary; + int32_t numOfRes = 0; - TSKEY * primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; - int64_t start = taosGetTimestampUs(); - *pblockInfo = getBlockInfo(pRuntimeEnv); + if (IS_DISK_DATA_BLOCK(pQuery) && blockLoadStatus != DISK_DATA_LOADED) { + *forwardStep = pBlockInfo->size; + return numOfRes; + } + SField *pFields = NULL; if (IS_DISK_DATA_BLOCK(pQuery)) { - if (blockLoadStatus == DISK_DATA_LOADED) { - *forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot], - searchFn, numOfRes, &pRuntimeEnv->windowResInfo); - } else { - *forwardStep = pblockInfo->size; - } - - pSummary->fileTimeUs += (taosGetTimestampUs() - start); - } else { + pFields = pQuery->pFields[pQuery->slot]; + } else { // in case of cache data block, no need to load operation assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); - *forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes, - &pRuntimeEnv->windowResInfo); + pFields = NULL; + } - pSummary->cacheTimeUs += (taosGetTimestampUs() - start); + int64_t start = taosGetTimestampUs(); + + *forwardStep = + tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, pFields, searchFn, &numOfRes, &pRuntimeEnv->windowResInfo); + + int64_t elapsedTime = taosGetTimestampUs() - start; + + if (IS_DISK_DATA_BLOCK(pQuery)) { + pSummary->fileTimeUs += elapsedTime; + } else { + pSummary->cacheTimeUs += elapsedTime; } + + return numOfRes; } // previous time window may not be of the same size of pQuery->intervalTime @@ -5175,14 +5187,12 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) { } static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery *pQuery = pRuntimeEnv->pQuery; - bool LOAD_DATA = true; + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; - int32_t forwardStep = 0; + bool LOAD_DATA = true; int64_t cnt = 0; - SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; - __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; int32_t blockLoadStatus = DISK_DATA_LOADED; @@ -5204,9 +5214,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { return cnt; } - int32_t numOfRes = 0; - SBlockInfo blockInfo = {0}; - doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, &numOfRes, blockLoadStatus, &forwardStep); + int32_t forwardStep = 0; + SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); + /*int32_t numOfRes = */ doHandleDataBlockImpl(pRuntimeEnv, &blockInfo, searchFn, blockLoadStatus, &forwardStep); dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, bstatus:%d, rows:%d, checked:%d", @@ -5261,7 +5271,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } // while(1) - if (pQuery->intervalTime > 0) { + if (isIntervalQuery(pQuery)) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { closeAllTimeWindow(&pRuntimeEnv->windowResInfo); } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed @@ -5331,8 +5341,7 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM } } -static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult* pWindowRes, /*int32_t inputIdx,*/ - bool mergeFlag) { +static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult *pWindowRes, bool mergeFlag) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; @@ -5349,9 +5358,9 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes pCtx[i].hasNull = true; pCtx[i].nStartQueryTimestamp = timestamp; pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes); -// pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + -// ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + -// pCtx[i].outputBytes * inputIdx; + // pCtx[i].aInputElemBuf = ((char *)inputSrc->data) + + // ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + + // pCtx[i].outputBytes * inputIdx; // in case of tag column, the tag information should be extracted from input buffer if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) { @@ -5481,57 +5490,54 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf } } -static tFilePage *getMeterDataPage(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo, - int32_t index) { - SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); - return getResultBufferPageById(pResultBuf, pList.pData[index]); -} +// static tFilePage *getMeterDataPage(SQueryDiskbasedResultBuf *pResultBuf, SMeterQueryInfo *pMeterQueryInfo, +// int32_t index) { +// SIDList pList = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid); +// return getResultBufferPageById(pResultBuf, pList.pData[index]); +//} -typedef struct Position { - int32_t pageIdx; - int32_t rowIdx; -} Position; +// typedef struct Position { +// int32_t pageIdx; +// int32_t rowIdx; +//} Position; typedef struct SCompSupporter { SMeterDataInfo ** pMeterDataInfo; - Position * pPosition; + int32_t * position; STableQuerySupportObj *pSupporter; } SCompSupporter; -int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) { - Position * pPos = &pSupportor->pPosition[meterIdx]; - tFilePage *pPage = getMeterDataPage(pSupportor->pSupporter->runtimeEnv.pResultBuf, - pSupportor->pMeterDataInfo[meterIdx]->pMeterQInfo, pPos->pageIdx); - - return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx); -} - int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) { int32_t left = *(int32_t *)pLeft; int32_t right = *(int32_t *)pRight; - SCompSupporter * supporter = (SCompSupporter *)param; - SQueryDiskbasedResultBuf *pResultBuf = supporter->pSupporter->runtimeEnv.pResultBuf; + SCompSupporter * supporter = (SCompSupporter *)param; + SQueryRuntimeEnv *pRuntimeEnv = &supporter->pSupporter->runtimeEnv; - Position leftPos = supporter->pPosition[left]; - Position rightPos = supporter->pPosition[right]; + int32_t leftPos = supporter->position[left]; + int32_t rightPos = supporter->position[right]; /* left source is exhausted */ - if (leftPos.pageIdx == -1 && leftPos.rowIdx == -1) { + if (leftPos == -1) { return 1; } /* right source is exhausted*/ - if (rightPos.pageIdx == -1 && rightPos.rowIdx == -1) { + if (rightPos == -1) { return -1; } - //!!!!! - tFilePage *pPageLeft = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[left]->pMeterQInfo, leftPos.pageIdx); - int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx); + SWindowResInfo *pWindowResInfo1 = &supporter->pMeterDataInfo[left]->pMeterQInfo->windowResInfo; + SWindowResult * pWindowRes1 = getWindowResult(pWindowResInfo1, leftPos); - tFilePage *pPageRight = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[right]->pMeterQInfo, rightPos.pageIdx); - int64_t rightTimestamp = *(int64_t *)(pPageRight->data + TSDB_KEYSIZE * rightPos.rowIdx); + char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1); + TSKEY leftTimestamp = GET_INT64_VAL(b1); + + SWindowResInfo *pWindowResInfo2 = &supporter->pMeterDataInfo[right]->pMeterQInfo->windowResInfo; + SWindowResult * pWindowRes2 = getWindowResult(pWindowResInfo2, rightPos); + + char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2); + TSKEY rightTimestamp = GET_INT64_VAL(b2); if (leftTimestamp == rightTimestamp) { return 0; @@ -5624,57 +5630,44 @@ void copyResToQueryResultBuf(STableQuerySupportObj *pSupporter, SQuery *pQuery) pSupporter->offset += 1; } -int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* pWindowRes) { +int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { SQuery *pQuery = pRuntimeEnv->pQuery; -// bool hasMainFunction = hasMainOutput(pQuery); - + int64_t maxOutput = 0; for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId; - + /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output */ - if (/*hasMainFunction &&*/ - (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) { + if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) { continue; } - + SResultInfo *pResultInfo = &pWindowRes->resultInfo[j]; if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) { maxOutput = pResultInfo->numOfRes; } } - + return maxOutput; } int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end) { - // calculate the maximum required space - if (pSupporter->groupResultSize == 0) { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pSupporter->groupResultSize += sizeof(tFilePage) + pQuery->pointsToRead * pRuntimeEnv->pCtx[i].outputBytes; - } - } - tFilePage ** buffer = (tFilePage **)pQuery->sdata; - Position * posList = calloc(1, sizeof(Position) * (end - start)); + int32_t * posList = calloc((end - start), sizeof(int32_t)); SMeterDataInfo **pTableList = malloc(POINTER_BYTES * (end - start)); - //todo opt for the case of one table per group + // todo opt for the case of one table per group int32_t numOfMeters = 0; for (int32_t i = start; i < end; ++i) { int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid; - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid); + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, sid); if (list.size > 0 && pMeterDataInfo[i].pMeterQInfo->windowResInfo.size > 0) { pTableList[numOfMeters] = &pMeterDataInfo[i]; - // set the merge start position: page:0, index:0 - posList[numOfMeters].pageIdx = 0; - posList[numOfMeters].rowIdx = 0; - numOfMeters += 1; } } @@ -5682,44 +5675,41 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery if (numOfMeters == 0) { tfree(posList); tfree(pTableList); + assert(pSupporter->numOfGroupResultPages == 0); return 0; } - SCompSupporter cs = {pTableList, posList, pSupporter}; - SLoserTreeInfo *pTree = NULL; + SCompSupporter cs = {pTableList, posList, pSupporter}; + SLoserTreeInfo *pTree = NULL; tLoserTreeCreate(&pTree, numOfMeters, &cs, tableResultComparFn); - SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - resetMergeResultBuf(pQuery, pCtx); + SResultInfo *pResultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); + setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery); + resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); int64_t lastTimestamp = -1; int64_t startt = taosGetTimestampMs(); - while (1) {//todo add iterator - int32_t pos = pTree->pNode[0].index; - Position * position = &cs.pPosition[pos]; - -// SQueryDiskbasedResultBuf *pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf; -// tFilePage *pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); - SWindowResInfo* pWindowResInfo = &pTableList[pos]->pMeterQInfo->windowResInfo; - SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, position->rowIdx); - - char* b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); + while (1) { + int32_t pos = pTree->pNode[0].index; + + SWindowResInfo *pWindowResInfo = &pTableList[pos]->pMeterQInfo->windowResInfo; + SWindowResult * pWindowRes = getWindowResult(pWindowResInfo, cs.position[pos]); + + char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); TSKEY ts = GET_INT64_VAL(b); - -// int64_t ts = getCurrentTimestamp(&cs, pos); - assert(ts > 0 && ts == pWindowRes->window.skey); + assert(ts > 0 && ts == pWindowRes->window.skey); int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); if (num <= 0) { - cs.pPosition[pos].rowIdx += 1; - - if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) { - cs.pPosition[pos].rowIdx = -1; - + cs.position[pos] += 1; + + if (cs.position[pos] >= pWindowResInfo->size) { + cs.position[pos] = -1; + // all input sources are exhausted if (--numOfMeters == 0) { break; @@ -5727,63 +5717,30 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery } } else { if (ts == lastTimestamp) { // merge with the last one - doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ true); - } else { - // copy data to disk buffer + doMerge(pRuntimeEnv, ts, pWindowRes, true); + } else { // copy data to disk buffer if (buffer[0]->numOfElems == pQuery->pointsToRead) { if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { return -1; } - - resetMergeResultBuf(pQuery, pCtx); + + resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); } -// pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); -// if (pPage->numOfElems <= 0) { // current source data page is empty - // do nothing -// } else { - doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ false); + doMerge(pRuntimeEnv, ts, pWindowRes, false); buffer[0]->numOfElems += 1; -// } } - + lastTimestamp = ts; - - if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) { - cs.pPosition[pos].rowIdx = -1; - + + cs.position[pos] += 1; + if (cs.position[pos] >= pWindowResInfo->size) { + cs.position[pos] = -1; + // all input sources are exhausted if (--numOfMeters == 0) { break; } -// if (cs.pPosition[pos].rowIdx >= pPage->numOfElems - 1) { -// cs.pPosition[pos].rowIdx = 0; -// cs.pPosition[pos].pageIdx += 1; // try next page -// -// // check if current page is empty or not. if it is empty, ignore it and try next -// SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid); -// if (cs.pPosition[pos].pageIdx <= list.size - 1) { -// tFilePage *newPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx); -// -// // if current source data page is null, it must be the last page of source output page -// if (newPage->numOfElems <= 0) { -// cs.pPosition[pos].pageIdx += 1; -// assert(cs.pPosition[pos].pageIdx >= list.size - 1); -// } -// } -// -// // the following code must be executed if current source pages are exhausted -// if (cs.pPosition[pos].pageIdx >= list.size) { -// cs.pPosition[pos].pageIdx = -1; -// cs.pPosition[pos].rowIdx = -1; -// -// // all input sources are exhausted -// if (--numOfMeters == 0) { -// break; -// } -// } - } else { - cs.pPosition[pos].rowIdx += 1; } } @@ -5797,6 +5754,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery tfree(pTree); tfree(pTableList); tfree(posList); + tfree(pResultInfo); return -1; } @@ -5812,6 +5770,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery tfree(pTree); tfree(pTableList); tfree(posList); + tfree(pResultInfo); pSupporter->offset = 0; @@ -5855,11 +5814,13 @@ int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQue return TSDB_CODE_SUCCESS; } -void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx) { +void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo) { for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes; pCtx[k].size = 1; pCtx[k].startOffset = 0; + pCtx[k].resultInfo = &pResultInfo[k]; + pQuery->sdata[k]->len = 0; } } @@ -5873,7 +5834,7 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3 void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->intervalTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && isIntervalQuery(pQuery))) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; } @@ -5936,11 +5897,8 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo)); pResultRow->pos = *posInfo; - for (int32_t i = 0; i < numOfCols; ++i) { - SResultInfo *pResultInfo = &pResultRow->resultInfo[i]; - size_t size = pQuery->pSelectExpr[i].interResBytes; - setResultInfoBuf(pResultInfo, (int32_t)size, isSTableQuery); - } + // set the intermediate result output buffer + setWindowResultInfo(pResultRow->resultInfo, pQuery, isSTableQuery); } void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) { @@ -6116,24 +6074,17 @@ typedef struct SQueryStatus { SPositionInfo start; SPositionInfo next; SPositionInfo end; - - TSKEY skey; - TSKEY ekey; - int8_t overStatus; - TSKEY lastKey; - - STSCursor cur; + int8_t overStatus; + TSKEY lastKey; + STSCursor cur; } SQueryStatus; - +// todo refactor static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; pStatus->overStatus = pQuery->over; pStatus->lastKey = pQuery->lastKey; - pStatus->skey = pQuery->skey; - pStatus->ekey = pQuery->ekey; - pStatus->start = pRuntimeEnv->startPos; pStatus->next = pRuntimeEnv->nextPos; pStatus->end = pRuntimeEnv->endPos; @@ -6157,9 +6108,6 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta SWAP(pQuery->skey, pQuery->ekey, TSKEY); pQuery->lastKey = pStatus->lastKey; - pQuery->skey = pStatus->skey; - pQuery->ekey = pStatus->ekey; - pQuery->over = pStatus->overStatus; pRuntimeEnv->startPos = pStatus->start; @@ -6213,58 +6161,68 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { } } -void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { +bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + bool toContinue = false; - /* store the start query position */ - savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos); - int64_t skey = pQuery->lastKey; - - while (1) { - doScanAllDataBlocks(pRuntimeEnv); - - bool toContinue = true; - - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { - // for each group result, call the finalize function for each column - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowResult *pResult = getWindowResult(pWindowResInfo, i); - if (!pResult->status.closed) { - continue; - } + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + // for each group result, call the finalize function for each column + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - setWindowResOutputBuf(pRuntimeEnv, pResult); + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowResult *pResult = getWindowResult(pWindowResInfo, i); + if (!pResult->status.closed) { + continue; + } - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]); - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); + setWindowResOutputBuf(pRuntimeEnv, pResult); - toContinue &= (pResInfo->complete); - } - } - } else { for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]); SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); - toContinue &= (pResInfo->complete); + toContinue |= (!pResInfo->complete); } } + } else { + for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { + aAggs[pQuery->pSelectExpr[j].pBase.functionId].xNextStep(&pRuntimeEnv->pCtx[j]); + SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]); - if (toContinue) { + toContinue |= (!pResInfo->complete); + } + } + + return toContinue; +} + +void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + + /* store the start query position */ + savePointPosition(&pRuntimeEnv->startPos, pQuery->fileId, pQuery->slot, pQuery->pos); + int64_t skey = pQuery->lastKey; + SET_MASTER_SCAN_FLAG(pRuntimeEnv); + + while (1) { + doScanAllDataBlocks(pRuntimeEnv); + + if (!needScanDataBlocksAgain(pRuntimeEnv)) { break; } - // set the correct start position, and load the corresponding block in buffer if required. - TSKEY actKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos); - assert((QUERY_IS_ASC_QUERY(pQuery) && actKey >= pQuery->skey) || - (!QUERY_IS_ASC_QUERY(pQuery) && actKey <= pQuery->skey)); + /* + * set the correct start position, and load the corresponding block in buffer for next + * round scan all data blocks. + */ + TSKEY key = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos); + assert((QUERY_IS_ASC_QUERY(pQuery) && key >= pQuery->skey) || + (!QUERY_IS_ASC_QUERY(pQuery) && key <= pQuery->skey)); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pQuery->lastKey = pQuery->skey; + pRuntimeEnv->scanFlag = REPEAT_SCAN; /* check if query is killed or not */ if (isQueryKilled(pQuery)) { @@ -6285,7 +6243,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { // for each group result, call the finalize function for each column SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { @@ -6361,7 +6319,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { */ // void forwardIntervalQueryRange(STableQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) { // SQuery *pQuery = pRuntimeEnv->pQuery; -// if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) { +// if (pQuery->slidingTime > 0 && isIntervalQuery(pQuery)) { // if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->ekey) || // (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) { // setQueryStatus(pQuery, QUERY_COMPLETED); @@ -7067,21 +7025,6 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S return 0; } -// int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) { -// assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1); -// -// TSKEY key = -1; -// if (IS_DATA_BLOCK_LOADED(blockStatus)) { -// key = pPrimaryCol[pQuery->pos]; -// } else { // while the data block is not loaded, the position must be the first or last position -// assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0); -// key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast; -// } -// -// assert((key >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (key <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))); -// return key; -//} - /* * There are two cases to handle: * @@ -7130,7 +7073,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); pWindowResInfo->startTime = windowSKey; - assert(pWindowResInfo->startTime > 0); +// assert(pWindowResInfo->startTime > 0); if (pWindowResInfo->prevSKey == 0) { if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -7213,7 +7156,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus); } - if (pRuntimeEnv->pTSBuf > 0 || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + if (pRuntimeEnv->pTSBuf > 0 || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { req |= BLK_DATA_ALL_NEEDED; } } @@ -7296,14 +7239,14 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk } bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) { - return (pQuery->intervalTime == 0) || ((queryRangeSet == 1) && (pQuery->intervalTime > 0)); + return (pQuery->intervalTime == 0) || ((queryRangeSet == 1) && (isIntervalQuery(pQuery))); } static int32_t getNumOfSubset(STableQuerySupportObj *pSupporter) { SQuery *pQuery = pSupporter->runtimeEnv.pQuery; int32_t totalSubset = 0; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pSupporter->runtimeEnv.windowResInfo); } else { totalSubset = pSupporter->pSidSet->numOfSubSet; @@ -7433,7 +7376,6 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn assert(numOfRes >= 0); - // todo merge refactor updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo); updatelastkey(pQuery, pMeterQueryInfo); } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index f75e3aebfd..deccc1d161 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -424,7 +424,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo // if data block is not loaded, it must be the intermediate blocks assert((pBlock->keyFirst >= pQuery->lastKey && pBlock->keyLast <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery))); - nextKey = QUERY_IS_ASC_QUERY(pQuery)? pBlock->keyFirst:pBlock->keyLast; + nextKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->keyFirst : pBlock->keyLast; } if (pQuery->intervalTime == 0) { @@ -1091,18 +1091,15 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter while (1) { initCtxOutputBuf(pRuntimeEnv); - vnodeScanAllData(pRuntimeEnv); + if (isQueryKilled(pQuery)) { return; } assert(!Q_STATUS_EQUAL(pQuery->over, QUERY_NOT_COMPLETED)); - doFinalizeResult(pRuntimeEnv); - // int64_t maxOutput = getNumOfResult(pRuntimeEnv); - // here we can ignore the records in case of no interpolation // todo handle offset, in case of top/bottom interval query if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && @@ -1113,30 +1110,17 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter int32_t c = MIN(numOfClosed, pQuery->limit.offset); clearFirstNTimeWindow(pRuntimeEnv, c); pQuery->limit.offset -= c; - } else { - // pQuery->pointsRead += maxOutput; - // forwardCtxOutputBuf(pRuntimeEnv, maxOutput); } if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { break; } + // load the data block for the next retrieve loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { break; } - - // /* - // * the scan limitation mechanism is upon here, - // * 1. since there is only one(k) record is generated in one scan operation - // * 2. remain space is not sufficient for next query output, abort - // */ - // if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || - // ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { - // setQueryStatus(pQuery, QUERY_RESBUF_FULL); - // break; - // } } } @@ -1262,7 +1246,9 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { // here we have scan all qualified data in both data file and cache if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { // continue to get push data from the group result - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->intervalTime > 0) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || + (pQuery->intervalTime > 0 && pQInfo->pointsReturned < pQuery->limit.limit)) { + //todo limit the output for interval query? pQuery->pointsRead = 0; pSupporter->subgroupIdx = 0; // always start from 0 diff --git a/src/util/src/tinterpolation.c b/src/util/src/tinterpolation.c index 82cc52cd42..279dd07265 100644 --- a/src/util/src/tinterpolation.c +++ b/src/util/src/tinterpolation.c @@ -13,9 +13,6 @@ * along with this program. If not, see . */ -#include -#include - #include "os.h" #include "taosmsg.h" #include "textbuffer.h" @@ -47,7 +44,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char char** tzname = _tzname; #endif - int64_t t = (precision == TSDB_TIME_PRECISION_MILLI)?MILLISECOND_PER_SECOND:MILLISECOND_PER_SECOND*1000L; + int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; int64_t revStartime = (startTime / timeRange) * timeRange + timezone * t; int64_t revEndtime = revStartime + timeRange - 1; @@ -78,14 +75,14 @@ void taosInitInterpoInfo(SInterpolationInfo* pInterpoInfo, int32_t order, int64_ } // the SInterpolationInfo itself will not be released -void taosDestoryInterpoInfo(SInterpolationInfo *pInterpoInfo) { +void taosDestoryInterpoInfo(SInterpolationInfo* pInterpoInfo) { if (pInterpoInfo == NULL) { return; } - + tfree(pInterpoInfo->prevValues); tfree(pInterpoInfo->nextValues); - + tfree(pInterpoInfo->pTags); } @@ -94,7 +91,7 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD return; } - pInterpoInfo->rowIdx = 0;//INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1; + pInterpoInfo->rowIdx = 0; // INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1; pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; } @@ -118,14 +115,9 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p if (numOfAvailRawData > 0) { int32_t finalNumOfResult = 0; -// if (pInterpoInfo->order == TSQL_SO_ASC) { - // get last timestamp, calculate the result size - int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1]; - finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1; -// } else { // todo error less than one!!! -// TSKEY lastKey = pPrimaryKeyArray[0]; -// finalNumOfResult = (int32_t)((pInterpoInfo->startTimestamp - lastKey) / nInterval) + 1; -// } + // get last timestamp, calculate the result size + int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1]; + finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1; assert(finalNumOfResult >= numOfAvailRawData); return finalNumOfResult; @@ -140,7 +132,9 @@ int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* p } } -bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { return taosNumOfRemainPoints(pInterpoInfo) > 0; } +bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { + return taosNumOfRemainPoints(pInterpoInfo) > 0; +} int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) { if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) { @@ -197,28 +191,22 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi return 0; } -static char* getPos(char* data, int32_t bytes, int32_t order, int32_t capacity, int32_t index) { -// if (order == TSQL_SO_ASC) { - return data + index * bytes; -// } else { -// return data + (capacity - index - 1) * bytes; -// } -} +static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; } -static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, int32_t start, - int32_t capacity, int32_t num) { +static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, + int32_t start, int32_t capacity, int32_t num) { for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) { SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, order, capacity, num); + + char* val1 = getPos(data[i]->data, pSchema->bytes, num); assignVal(val1, pTags[j], pSchema->bytes, pSchema->type); } } static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, - SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval, int64_t* defaultVal, - int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, char** pTags, - bool outOfBound) { + SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval, + int64_t* defaultVal, int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, + char** pTags, bool outOfBound) { char** prevValues = &pInterpoInfo->prevValues; char** nextValues = &pInterpoInfo->nextValues; @@ -226,7 +214,7 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order); - char* val = getPos(data[0]->data, TSDB_KEYSIZE, pInterpoInfo->order, capacity, *num); + char* val = getPos(data[0]->data, TSDB_KEYSIZE, *num); *(TSKEY*)val = pInterpoInfo->startTimestamp; int32_t numOfValCols = pModel->numOfCols - numOfTags; @@ -237,9 +225,9 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp if (pInterpolationData != NULL) { for (int32_t i = 1; i < numOfValCols; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); - int16_t offset = getColumnModelOffset(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num); + int16_t offset = getColumnModelOffset(pModel, i); + + char* val1 = getPos(data[i]->data, pSchema->bytes, *num); if (isNull(pInterpolationData + offset, pSchema->type)) { setNull(val1, pSchema->type, pSchema->bytes); @@ -250,8 +238,8 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp } else { /* no prev value yet, set the value for null */ for (int32_t i = 1; i < numOfValCols; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num); + + char* val1 = getPos(data[i]->data, pSchema->bytes, *num); setNull(val1, pSchema->type, pSchema->bytes); } } @@ -262,10 +250,10 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp if (*prevValues != NULL && !outOfBound) { for (int32_t i = 1; i < numOfValCols; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); - int16_t offset = getColumnModelOffset(pModel, i); - + int16_t offset = getColumnModelOffset(pModel, i); + int16_t type = pSchema->type; - char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num); + char* val1 = getPos(data[i]->data, pSchema->bytes, *num); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { setNull(val1, type, pSchema->bytes); @@ -283,8 +271,8 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp } else { for (int32_t i = 1; i < numOfValCols; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num); + + char* val1 = getPos(data[i]->data, pSchema->bytes, *num); setNull(val1, pSchema->type, pSchema->bytes); } @@ -293,8 +281,8 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp } else { /* default value interpolation */ for (int32_t i = 1; i < numOfValCols; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, capacity, *num); + + char* val1 = getPos(data[i]->data, pSchema->bytes, *num); assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type); } @@ -344,9 +332,9 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp if (*nextValues == NULL) { *nextValues = calloc(1, pModel->rowSize); for (int i = 1; i < pModel->numOfCols; i++) { - int16_t offset = getColumnModelOffset(pModel, i); + int16_t offset = getColumnModelOffset(pModel, i); SSchema* pSchema = getColumnModelSchema(pModel, i); - + setNull(*nextValues + offset, pSchema->type, pSchema->bytes); } } @@ -354,33 +342,36 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp int32_t offset = pInterpoInfo->rowIdx; for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); - + memcpy(*nextValues + tlen, srcData[i] + offset * pSchema->bytes, pSchema->bytes); tlen += pSchema->bytes; } } - while (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && - num < outputRows) { - doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal, - currentTimestamp, bufSize, numOfTags, pTags, false); - } - - /* output buffer is full, abort */ - if ((num == outputRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || - (num < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { - pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; - return outputRows; - } + if (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || + (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && + num < outputRows) { + while (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || + (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && + num < outputRows) { + doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal, + currentTimestamp, bufSize, numOfTags, pTags, false); + } - if (pInterpoInfo->startTimestamp == currentTimestamp) { + /* output buffer is full, abort */ + if ((num == outputRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || + (num < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { + pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; + return outputRows; + } + } else { + // if (pInterpoInfo->startTimestamp == currentTimestamp) { if (*prevValues == NULL) { *prevValues = calloc(1, pModel->rowSize); for (int i = 1; i < pModel->numOfCols; i++) { - int16_t offset = getColumnModelOffset(pModel, i); + int16_t offset = getColumnModelOffset(pModel, i); SSchema* pSchema = getColumnModelSchema(pModel, i); - + setNull(*prevValues + offset, pSchema->type, pSchema->bytes); } } @@ -388,17 +379,16 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp // assign rows to dst buffer int32_t i = 0; for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) { - int16_t offset = getColumnModelOffset(pModel, i); + int16_t offset = getColumnModelOffset(pModel, i); SSchema* pSchema = getColumnModelSchema(pModel, i); - - char* val1 = getPos(data[i]->data, pSchema->bytes, pInterpoInfo->order, bufSize, num); + + char* val1 = getPos(data[i]->data, pSchema->bytes, num); if (i == 0 || (functionIDs[i] != TSDB_FUNC_COUNT && !isNull(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->type)) || (functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes) != 0)) { - assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes, pSchema->type); memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes); } else { // i > 0 and isNULL, do interpolation @@ -416,11 +406,11 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp /* set the tag value for final result */ setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, pModel->numOfCols - numOfTags, bufSize, num); - } - pInterpoInfo->startTimestamp += (nInterval * step); - pInterpoInfo->rowIdx += 1; - num += 1; + pInterpoInfo->startTimestamp += (nInterval * step); + pInterpoInfo->rowIdx += 1; + num += 1; + } if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || (pInterpoInfo->rowIdx < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || num >= outputRows) { -- GitLab