提交 5e7e3351 编写于 作者: 5 54liuyao

feat(stream): free state memroy

上级 e2808487
...@@ -3140,6 +3140,26 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) { ...@@ -3140,6 +3140,26 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
blockDataDestroy(pSup->pScanBlock); blockDataDestroy(pSup->pScanBlock);
} }
void destroyStateWinInfo(void* ptr) {
if (ptr == NULL) {
return;
}
SStateWindowInfo* pWin = (SStateWindowInfo*) ptr;
taosMemoryFreeClear(pWin->stateKey.pData);
}
void destroyStateStreamAggSupporter(SStreamAggSupporter* pSup) {
taosMemoryFreeClear(pSup->pKeyBuf);
void** pIte = NULL;
while ((pIte = taosHashIterate(pSup->pResultRows, pIte)) != NULL) {
SArray* pWins = (SArray*)(*pIte);
taosArrayDestroyEx(pWins, (FDelete)destroyStateWinInfo);
}
taosHashCleanup(pSup->pResultRows);
destroyDiskbasedBuf(pSup->pResultBuf);
blockDataDestroy(pSup->pScanBlock);
}
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
...@@ -3607,12 +3627,17 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3607,12 +3627,17 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
} }
void deleteWindow(SArray* pWinInfos, int32_t index) { void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos)); ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
if (fp) {
void* ptr = taosArrayGet(pWinInfos, index);
fp(ptr);
}
taosArrayRemove(pWinInfos, index); taosArrayRemove(pWinInfos, index);
} }
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result) { static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap,
SArray* result, FDelete fp) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
...@@ -3626,7 +3651,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc ...@@ -3626,7 +3651,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
if (!pCurWin) { if (!pCurWin) {
break; break;
} }
deleteWindow(pAggSup->pCurWins, winIndex); deleteWindow(pAggSup->pCurWins, winIndex, fp);
if (result) { if (result) {
taosArrayPush(result, pCurWin); taosArrayPush(result, pCurWin);
} }
...@@ -3751,7 +3776,7 @@ SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo* ...@@ -3751,7 +3776,7 @@ SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn, int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
bool delete) { bool delete, FDelete fp) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void** pIte = NULL; void** pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
...@@ -3773,7 +3798,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra ...@@ -3773,7 +3798,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
pSeWin->isOutput = true; pSeWin->isOutput = true;
} }
if (delete) { if (delete) {
deleteWindow(pWins, i); deleteWindow(pWins, i, fp);
i--; i--;
size = taosArrayGetSize(pWins); size = taosArrayGetSize(pWins);
} }
...@@ -3786,13 +3811,13 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra ...@@ -3786,13 +3811,13 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) { static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete, FDelete fp) {
int32_t size = taosArrayGetSize(pChildren); int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete); closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete, fp);
} }
} }
...@@ -3870,13 +3895,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3870,13 +3895,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
// gap must be 0 // gap must be 0
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, NULL);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock); int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
// gap must be 0 // gap must be 0
doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL); doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
} }
copyDeleteWindowInfo(pWins, pInfo->pStDeleted); copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
...@@ -3918,8 +3943,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3918,8 +3943,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
pInfo->ignoreExpiredData); pInfo->ignoreExpiredData, NULL);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData, NULL);
copyUpdateResult(pStUpdated, pUpdated); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
...@@ -4014,7 +4039,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -4014,7 +4039,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
break; break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
// gap must be 0 // gap must be 0
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, NULL); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, NULL, NULL);
copyDataBlock(pInfo->pDelRes, pBlock); copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
break; break;
...@@ -4120,7 +4145,7 @@ _error: ...@@ -4120,7 +4145,7 @@ _error:
void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param; SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup); destroyStateStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo); cleanupGroupResInfo(&pInfo->groupResInfo);
if (pInfo->pChildren != NULL) { if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -4132,6 +4157,10 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4132,6 +4157,10 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChInfo); taosMemoryFreeClear(pChInfo);
} }
} }
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
taosHashCleanup(pInfo->pSeDeleted);
destroySqlFunctionCtx(pInfo->pDummyCtx, 0);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -4314,7 +4343,7 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc ...@@ -4314,7 +4343,7 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
pSeDeleted); pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
} }
} }
...@@ -4357,7 +4386,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4357,7 +4386,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&pSDataBlock->info.groupId); &pSDataBlock->info.groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue; continue;
} }
code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pOperator); code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pOperator);
...@@ -4415,7 +4444,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4415,7 +4444,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
continue; continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA) { } else if (pBlock->info.type == STREAM_DELETE_DATA) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins); doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, destroyStateWinInfo);
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted); copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
...@@ -4437,8 +4466,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4437,8 +4466,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
pInfo->ignoreExpiredData); pInfo->ignoreExpiredData, destroyStateWinInfo);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); // closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData, destroyStateWinInfo);
copyUpdateResult(pSeUpdated, pUpdated); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
......
...@@ -498,4 +498,7 @@ if $data15 != 3 then ...@@ -498,4 +498,7 @@ if $data15 != 3 then
goto loop5 goto loop5
endi endi
sql drop database test;
sql drop database test1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册