diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index 8783285fbbda3086550067ee74cc586871de51f2..b42358967213ae182574cd6a4c806fc090431af4 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -169,9 +169,9 @@ typedef struct SExtTagsInfo { // sql function runtime context typedef struct SQLFunctionCtx { - int32_t startOffset; - int32_t size; // number of rows - int32_t order; // asc|desc + int32_t startOffset; + int32_t size; // number of rows + uint32_t order; // asc|desc uint32_t scanFlag; // TODO merge with currentStage int16_t inputType; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 09c3e14dfbe877d347604f553e8ed0c78c936556..aebd5b28c6a20e83be0dc2f967542c28cb71ff64 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1861,7 +1861,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - if (isIntervalQuery(pQuery) && pQuery->slidingTime > 0) { + if (isIntervalQuery(pQuery)) { int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); TSKEY ts = primaryKeyCol[offset]; @@ -2423,7 +2423,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * * because the results of group by normal column is put into intermediate buffer. */ int32_t num = 0; - if (!groupbyStateValue && !(isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + if (!groupbyStateValue && !isIntervalQuery(pQuery)) { num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; } @@ -4588,7 +4588,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery vnodeRecordAllFiles(pQInfo, pMeterObj->vnode); pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false); - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { int32_t rows = getInitialPageNum(pSupporter); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); @@ -4731,9 +4731,9 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } if (pSupporter->pSidSet != NULL || isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || - (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + isIntervalQuery(pQuery)) { int32_t size = 0; - if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQInfo->query.pGroupbyExpr) || isIntervalQuery(pQuery)) { size = 10000; } else if (pSupporter->pSidSet != NULL) { size = pSupporter->pSidSet->numOfSubSet; @@ -5889,7 +5889,11 @@ void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - + + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; + } + if (isIntervalQuery(pQuery)) { for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo; @@ -5898,10 +5902,6 @@ void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order) doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } } else { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; - } - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } @@ -6193,7 +6193,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; bool toContinue = false; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { // for each group result, call the finalize function for each column SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; @@ -6530,7 +6530,7 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQ SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY); pMeterQueryInfo->lastKey = pMeterQueryInfo->skey; - pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1; + pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1u; pMeterQueryInfo->cur.vnodeIndex = -1; } @@ -7115,7 +7115,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk pQuery->pSelectExpr[i].pBase.colInfo.colId, *blkStatus); } - if (pRuntimeEnv->pTSBuf > 0 || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + if (pRuntimeEnv->pTSBuf > 0 || isIntervalQuery(pQuery)) { req |= BLK_DATA_ALL_NEEDED; } } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 4caf79f6c9068a7ee1688c22fba0d8d8a9c4aff0..ae51365918b142e392dcffa27a6b071543f3d02e 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -872,7 +872,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) { disableFunctForSuppleScan(pSupporter, pQuery->order.order); if (pRuntimeEnv->pTSBuf != NULL) { - pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; + pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1u; } SWAP(pSupporter->rawSKey, pSupporter->rawEKey, TSKEY); @@ -945,7 +945,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { doOrderedScan(pQInfo); int64_t et = taosGetTimestampMs(); dTrace("QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d", pQInfo, et - st, - pQuery->order.order ^ 1); + pQuery->order.order ^ 1u); if (pQuery->intervalTime > 0) { for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) { diff --git a/src/util/src/tinterpolation.c b/src/util/src/tinterpolation.c index 279dd07265d124ae6f31cf4851851ea2baf2d74a..cb7c8854ce914d22680db5429871857ac445f1fe 100644 --- a/src/util/src/tinterpolation.c +++ b/src/util/src/tinterpolation.c @@ -91,7 +91,7 @@ void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawD return; } - pInterpoInfo->rowIdx = 0; // INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? 0 : numOfRawDataInRows - 1; + pInterpoInfo->rowIdx = 0; pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; } @@ -295,6 +295,20 @@ static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interp (*num) += 1; } +static void initBeforeAfterDataBuf(SColumnModel* pModel, char** nextValues) { + if (*nextValues != NULL) { + return; + } + + *nextValues = calloc(1, pModel->rowSize); + for (int i = 1; i < pModel->numOfCols; i++) { + int16_t offset = getColumnModelOffset(pModel, i); + SSchema* pSchema = getColumnModelSchema(pModel, i); + + setNull(*nextValues + offset, pSchema->type, pSchema->bytes); + } +} + int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval, const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal, @@ -329,16 +343,8 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp if ((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || (pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { /* set the next value for interpolation */ - if (*nextValues == NULL) { - *nextValues = calloc(1, pModel->rowSize); - for (int i = 1; i < pModel->numOfCols; i++) { - int16_t offset = getColumnModelOffset(pModel, i); - SSchema* pSchema = getColumnModelSchema(pModel, i); - - setNull(*nextValues + offset, pSchema->type, pSchema->bytes); - } - } - + initBeforeAfterDataBuf(pModel, nextValues); + int32_t offset = pInterpoInfo->rowIdx; for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) { SSchema* pSchema = getColumnModelSchema(pModel, i); @@ -365,17 +371,10 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp return outputRows; } } else { - // if (pInterpoInfo->startTimestamp == currentTimestamp) { - if (*prevValues == NULL) { - *prevValues = calloc(1, pModel->rowSize); - for (int i = 1; i < pModel->numOfCols; i++) { - int16_t offset = getColumnModelOffset(pModel, i); - SSchema* pSchema = getColumnModelSchema(pModel, i); - - setNull(*prevValues + offset, pSchema->type, pSchema->bytes); - } - } - + assert(pInterpoInfo->startTimestamp == currentTimestamp); + + initBeforeAfterDataBuf(pModel, prevValues); + // assign rows to dst buffer int32_t i = 0; for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) { @@ -383,19 +382,19 @@ int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoTyp SSchema* pSchema = getColumnModelSchema(pModel, i); char* val1 = getPos(data[i]->data, pSchema->bytes, num); - + char* src = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes; + if (i == 0 || - (functionIDs[i] != TSDB_FUNC_COUNT && - !isNull(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->type)) || - (functionIDs[i] == TSDB_FUNC_COUNT && - *(int64_t*)(srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes) != 0)) { - assignVal(val1, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes, pSchema->type); - memcpy(*prevValues + tlen, srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes, pSchema->bytes); - } else { // i > 0 and isNULL, do interpolation + (functionIDs[i] != TSDB_FUNC_COUNT && !isNull(src, pSchema->type)) || + (functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(src) != 0)) { + assignVal(val1, src, pSchema->bytes, pSchema->type); + memcpy(*prevValues + tlen, src, pSchema->bytes); + } else { // i > 0 and data is null , do interpolation if (interpoType == TSDB_INTERPO_PREV) { assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type); } else if (interpoType == TSDB_INTERPO_LINEAR) { - // TODO: + assignVal(val1, src, pSchema->bytes, pSchema->type); + memcpy(*prevValues + tlen, src, pSchema->bytes); } else { assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type); }