diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a6ca563d766d500c837fb19c38c292c5a8a4e432..f703ebb3558b1351914718e449243ed76e4bf60f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -469,7 +469,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -3036,7 +3036,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, + SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3055,9 +3056,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4283,6 +4284,10 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta void* pVal = NULL; int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size); ASSERT(code == 0); + if (code == -1) { + // coverity scan + continue; + } SResultRow* pRow = (SResultRow*)pVal; doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4ff57227481910e8d3b9c498254009abc2612e58..823b457f897c925f8bcf9ba88d025dba8570b318 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2667,8 +2667,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -3986,7 +3986,8 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa return code; } } - tSimpleHashIterateRemove(pHashMap, &pWinInfo->sessionWin, sizeof(SSessionKey), &pIte, &iter); + SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter); } } return TSDB_CODE_SUCCESS; @@ -4006,7 +4007,7 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - SResultWindowInfo* pWinInfo = *(void**)pIte; + SResultWindowInfo* pWinInfo = pIte; saveResult(*pWinInfo, pStUpdated); } return TSDB_CODE_SUCCESS; @@ -4584,6 +4585,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + SSessionKey key = curWin.winInfo.sessionWin; + key.win.ekey = key.win.skey; + tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); + } } } @@ -4974,8 +4981,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } - - SInterval interval = {.interval = pNode->interval, .sliding = pNode->sliding, .intervalUnit = pNode->intervalUnit, @@ -5523,7 +5528,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys } } - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4b3affa9de1babc9eaaa27b58e3314224c3449bf..1c44af49b7196a74edf4dfe1bf7f693d346ff27c 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -656,6 +656,7 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S streamStateCurPrev(pState, pCur); } *curKey = resKey; + streamStateFreeCur(pCur); return res; } diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 96209f681259da0ac8369a30327dcb868729b177..5a97ba45f684dc444f3e7b5cb0cfdeade8728fd1 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -9,12 +9,17 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") TARGET_LINK_LIBRARIES( - streamUpdateTest - PUBLIC os util common gtest stream + streamUpdateTest + PUBLIC os util common gtest stream ) TARGET_INCLUDE_DIRECTORIES( - streamUpdateTest - PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" - PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" + streamUpdateTest + PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +) + +add_test( + NAME streamUpdateTest + COMMAND streamUpdateTest ) \ No newline at end of file diff --git a/tests/script/tsim/stream/triggerSession0.sim b/tests/script/tsim/stream/triggerSession0.sim index 1bef4398845178be526723cdaf75863e46e3da02..4c664cf7c7b9ad8d29b0439066cefc3c130fc25e 100644 --- a/tests/script/tsim/stream/triggerSession0.sim +++ b/tests/script/tsim/stream/triggerSession0.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database test vgroups 1 +sql create database test vgroups 1; sql select * from information_schema.ins_databases if $rows != 3 then return -1