diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9e485e7684da599cbd4d94169f67df893be1b288..59b3328f62176c8751ad397bd848b94b2ff60a36 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1433,7 +1433,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u pAggInfo->groupId = groupId; } -static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) { +static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, + const int32_t* rowCellOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset); @@ -1613,7 +1614,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG if (!pbInfo->mergeResultBlock) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); } else { - while(hasRemainResults(pGroupResInfo)) { + while (hasRemainResults(pGroupResInfo)) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; @@ -2062,10 +2063,10 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t } int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, - char** pNextStart) { + char** pNextStart) { if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); - *pNextStart = (char*) blockDecode(pRes, pData); + *pNextStart = (char*)blockDecode(pRes, pData); } else { // extract data according to pColList ASSERT(numOfOutput == taosArrayGetSize(pColList)); char* pStart = pData; @@ -2161,9 +2162,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; - int32_t index = 0; - char* pStart = pRetrieveRsp->data; - while(index++ < pRetrieveRsp->numOfBlocks) { + int32_t index = 0; + char* pStart = pRetrieveRsp->data; + while (index++ < pRetrieveRsp->numOfBlocks) { SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart); if (code != 0) { @@ -2177,8 +2178,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d" - " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb," + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + " execId:%d" + " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", total:%.2f Kb," " completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, @@ -2186,9 +2189,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn completed += 1; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 - ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, - pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0); + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, + pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } taosMemoryFreeClear(pDataInfo->pRsp); @@ -3521,7 +3525,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -3562,7 +3566,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; _error: destroyAggOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -4175,7 +4178,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, - pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); + pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, + pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; @@ -4190,7 +4194,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); + pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, + pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b4489a92fbdd3f2cd95809a6ef11541ca88e27dd..5485e166f4ee8aed684241da491cb79d6c63e9b3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -97,7 +97,8 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { pRowSup->groupId = groupId; } -static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) { +static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, + uint64_t groupId) { pRowSup->startRowIndex = rowIndex; pRowSup->numOfRows = 0; pRowSup->win.skey = tsList[rowIndex]; @@ -869,7 +870,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ } static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { - return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);; + return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); + ; } static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { @@ -910,9 +912,9 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) { static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) { if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) { return; - } + } int32_t delSize = taosArrayGetSize(pDelWins); - void* pIte = NULL; + void* pIte = NULL; while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { SResKeyPos* pResKey = (SResKeyPos*)pIte; int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes); @@ -1592,9 +1594,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1874,7 +1876,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* _error: destroyIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -1931,7 +1932,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr _error: destroyIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -1965,7 +1965,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepTuple(pRowSup, tsList[j], gid); } else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap || - (pRowSup->prevTs - tsList[j] >= 0 ) && (pRowSup->prevTs - tsList[j] <= gap)) { + (pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) { // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); if (j == 0 && pRowSup->startRowIndex != 0) { @@ -2118,7 +2118,7 @@ static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex, bool isLastRow) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - bool fillLastPoint = pSliceInfo->fillLastPoint; + bool fillLastPoint = pSliceInfo->fillLastPoint; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); @@ -2150,11 +2150,9 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo } pSliceInfo->fillLastPoint = isLastRow ? true : false; - } -static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, - SSDataBlock* pResBlock) { +static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { int32_t rows = pResBlock->info.rows; // todo set the correct primary timestamp column @@ -2165,7 +2163,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t dstSlot = pExprInfo->base.resSchema.slotId; - //SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + // SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); switch (pSliceInfo->fillType) { @@ -2181,15 +2179,15 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { float v = 0; GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char *)&v, false); + colDataAppend(pDst, rows, (char*)&v, false); } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { double v = 0; GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char *)&v, false); + colDataAppend(pDst, rows, (char*)&v, false); } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { int64_t v = 0; GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char *)&v, false); + colDataAppend(pDst, rows, (char*)&v, false); } pResBlock->info.rows += 1; break; @@ -2198,8 +2196,8 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp case TSDB_FILL_LINEAR: { SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); - SPoint start = pLinearInfo->start; - SPoint end = pLinearInfo->end; + SPoint start = pLinearInfo->start; + SPoint end = pLinearInfo->end; SPoint current = {.key = pSliceInfo->current}; current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); @@ -2212,7 +2210,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp colDataAppendNULL(pDst, rows); } else { taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); - colDataAppend(pDst, rows, (char *)current.val, false); + colDataAppend(pDst, rows, (char*)current.val, false); } pResBlock->info.rows += 1; @@ -2317,11 +2315,11 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB SFillLinearInfo linearInfo = {0}; linearInfo.start.key = INT64_MIN; - linearInfo.end.key = INT64_MAX; + linearInfo.end.key = INT64_MAX; linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.hasNull = false; - linearInfo.type = pColInfo->info.type; + linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); + linearInfo.hasNull = false; + linearInfo.type = pColInfo->info.type; linearInfo.bytes = pColInfo->info.bytes; taosArrayPush(pInfo->pLinearInfo, &linearInfo); } @@ -2399,7 +2397,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block + if (i == 0 && needToFillLastPoint(pSliceInfo)) { // first row in current block doKeepLinearInfo(pSliceInfo, pBlock, i, false); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); @@ -2445,8 +2443,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2457,11 +2455,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { break; } } - } else {// it is the last row of current block - //store ts value as start, and calculate interp value when processing next block + } else { // it is the last row of current block + // store ts value as start, and calculate interp value when processing next block doKeepLinearInfo(pSliceInfo, pBlock, i, true); } - } else { // non-linear interpolation + } else { // non-linear interpolation pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (pSliceInfo->current > pSliceInfo->win.ekey) { @@ -2480,15 +2478,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { doKeepLinearInfo(pSliceInfo, pBlock, i, false); // no need to increate pSliceInfo->current here - //pSliceInfo->current = + // pSliceInfo->current = // taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (i < pBlock->info.rows - 1) { int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2500,7 +2498,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } } - } else { // non-linear interpolation + } else { // non-linear interpolation if (i < pBlock->info.rows - 1) { // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate doKeepNextRows(pSliceInfo, pBlock, i + 1); @@ -2508,8 +2506,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2556,7 +2554,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { pResBlock->info.rows += 1; doKeepPrevRows(pSliceInfo, pBlock, i); - if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { doKeepLinearInfo(pSliceInfo, pBlock, i, false); pSliceInfo->current = @@ -2566,8 +2563,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); if (pResBlock->info.rows >= pResBlock->info.capacity) { break; } @@ -2579,7 +2576,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } } - } else { // non-linear interpolation + } else { // non-linear interpolation pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); @@ -2595,12 +2592,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } } - } // check if need to interpolate after last datablock // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { + while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && + pSliceInfo->fillType != TSDB_FILL_LINEAR) { genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); @@ -2795,7 +2792,6 @@ _error: destroySWindowOperatorInfo(pInfo, numOfCols); } - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3432,7 +3428,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, _error: destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3617,7 +3612,6 @@ _error: destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); } - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3735,8 +3729,8 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star return insertNewSessionWindow(pWinInfos, startTs, index + 1); } -int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows, - int32_t start, int64_t gap, SHashObj* pStDeleted) { +int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId, + int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) { for (int32_t i = start; i < rows; ++i) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) { return i - start; @@ -3913,8 +3907,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } int32_t winIndex = 0; SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex); - winRows = - updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted); + winRows = updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, + pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -4021,7 +4015,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { - SWinRes* res = *Ite; + SWinRes* res = *Ite; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -4152,7 +4146,7 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { int32_t size = taosArrayGetSize(pResWins); for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; + SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); } } @@ -4449,7 +4443,6 @@ _error: destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs); } - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -4886,7 +4879,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys _error: destroyStreamStateOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -4996,7 +4988,6 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); } - static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -5071,7 +5062,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { blockDataCleanup(pRes); if (iaInfo->binfo.mergeResultBlock) { - while(1) { + while (1) { if (pOperator->status == OP_EXEC_DONE) { break; } @@ -5161,7 +5152,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, _error: destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); - taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5466,7 +5456,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI _error: destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); - taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL;