提交 d9beaeb3 编写于 作者: 5 54liuyao

fix(stream):session window max_delay crash

上级 12fd4bfc
...@@ -469,7 +469,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc ...@@ -469,7 +469,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol) { bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SqlFunctionCtx* pCtx = pExprSup->pCtx; SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
...@@ -3036,7 +3036,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { ...@@ -3036,7 +3036,8 @@ void cleanupExprSupp(SExprSupp* pSupp) {
taosMemoryFree(pSupp->rowEntryInfoOffset); taosMemoryFree(pSupp->rowEntryInfoOffset);
} }
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) { SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -3055,9 +3056,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -3055,9 +3056,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4283,6 +4284,10 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ...@@ -4283,6 +4284,10 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
void* pVal = NULL; void* pVal = NULL;
int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size); int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size);
ASSERT(code == 0); ASSERT(code == 0);
if (code == -1) {
// coverity scan
continue;
}
SResultRow* pRow = (SResultRow*)pVal; SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one // no results, continue to check the next one
......
...@@ -2667,8 +2667,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2667,8 +2667,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3986,7 +3986,8 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa ...@@ -3986,7 +3986,8 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa
return code; return code;
} }
} }
tSimpleHashIterateRemove(pHashMap, &pWinInfo->sessionWin, sizeof(SSessionKey), &pIte, &iter); SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -4006,7 +4007,7 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { ...@@ -4006,7 +4007,7 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
void* pIte = NULL; void* pIte = NULL;
int32_t iter = 0; int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
SResultWindowInfo* pWinInfo = *(void**)pIte; SResultWindowInfo* pWinInfo = pIte;
saveResult(*pWinInfo, pStUpdated); saveResult(*pWinInfo, pStUpdated);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -4584,6 +4585,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4584,6 +4585,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = curWin.winInfo.sessionWin;
key.win.ekey = key.win.skey;
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
}
} }
} }
...@@ -4974,8 +4981,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4974,8 +4981,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
SInterval interval = {.interval = pNode->interval, SInterval interval = {.interval = pNode->interval,
.sliding = pNode->sliding, .sliding = pNode->sliding,
.intervalUnit = pNode->intervalUnit, .intervalUnit = pNode->intervalUnit,
...@@ -5523,7 +5528,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5523,7 +5528,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
} }
} }
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -656,6 +656,7 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S ...@@ -656,6 +656,7 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S
streamStateCurPrev(pState, pCur); streamStateCurPrev(pState, pCur);
} }
*curKey = resKey; *curKey = resKey;
streamStateFreeCur(pCur);
return res; return res;
} }
......
...@@ -9,12 +9,17 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ...@@ -9,12 +9,17 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
streamUpdateTest streamUpdateTest
PUBLIC os util common gtest stream PUBLIC os util common gtest stream
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
streamUpdateTest streamUpdateTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
add_test(
NAME streamUpdateTest
COMMAND streamUpdateTest
) )
\ No newline at end of file
...@@ -5,7 +5,7 @@ sleep 50 ...@@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
print =============== create database print =============== create database
sql create database test vgroups 1 sql create database test vgroups 1;
sql select * from information_schema.ins_databases sql select * from information_schema.ins_databases
if $rows != 3 then if $rows != 3 then
return -1 return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册