From d9beaeb3a36179a142b3d89a938c6c5cb85b1b44 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 21 Oct 2022 10:37:58 +0800 Subject: [PATCH] fix(stream):session window max_delay crash --- source/libs/executor/src/executorimpl.c | 15 ++++++++++----- source/libs/executor/src/timewindowoperator.c | 19 ++++++++++++------- source/libs/stream/src/streamState.c | 1 + source/libs/stream/test/CMakeLists.txt | 15 ++++++++++----- tests/script/tsim/stream/triggerSession0.sim | 2 +- 5 files changed, 34 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a6ca563d76..f703ebb355 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -469,7 +469,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -3036,7 +3036,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, + SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3055,9 +3056,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4283,6 +4284,10 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta void* pVal = NULL; int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size); ASSERT(code == 0); + if (code == -1) { + // coverity scan + continue; + } SResultRow* pRow = (SResultRow*)pVal; doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4ff5722748..823b457f89 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2667,8 +2667,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -3986,7 +3986,8 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa return code; } } - tSimpleHashIterateRemove(pHashMap, &pWinInfo->sessionWin, sizeof(SSessionKey), &pIte, &iter); + SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen); + tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter); } } return TSDB_CODE_SUCCESS; @@ -4006,7 +4007,7 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { - SResultWindowInfo* pWinInfo = *(void**)pIte; + SResultWindowInfo* pWinInfo = pIte; saveResult(*pWinInfo, pStUpdated); } return TSDB_CODE_SUCCESS; @@ -4584,6 +4585,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + SSessionKey key = curWin.winInfo.sessionWin; + key.win.ekey = key.win.skey; + tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); + } } } @@ -4974,8 +4981,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } - - SInterval interval = {.interval = pNode->interval, .sliding = pNode->sliding, .intervalUnit = pNode->intervalUnit, @@ -5523,7 +5528,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys } } - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4b3affa9de..1c44af49b7 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -656,6 +656,7 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S streamStateCurPrev(pState, pCur); } *curKey = resKey; + streamStateFreeCur(pCur); return res; } diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 96209f6812..5a97ba45f6 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -9,12 +9,17 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") TARGET_LINK_LIBRARIES( - streamUpdateTest - PUBLIC os util common gtest stream + streamUpdateTest + PUBLIC os util common gtest stream ) TARGET_INCLUDE_DIRECTORIES( - streamUpdateTest - PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" - PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" + streamUpdateTest + PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +) + +add_test( + NAME streamUpdateTest + COMMAND streamUpdateTest ) \ No newline at end of file diff --git a/tests/script/tsim/stream/triggerSession0.sim b/tests/script/tsim/stream/triggerSession0.sim index 1bef439884..4c664cf7c7 100644 --- a/tests/script/tsim/stream/triggerSession0.sim +++ b/tests/script/tsim/stream/triggerSession0.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database test vgroups 1 +sql create database test vgroups 1; sql select * from information_schema.ins_databases if $rows != 3 then return -1 -- GitLab