提交 cf5d3283 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-17740

......@@ -3140,6 +3140,26 @@ void destroyStreamAggSupporter(SStreamAggSupporter* pSup) {
blockDataDestroy(pSup->pScanBlock);
}
void destroyStateWinInfo(void* ptr) {
if (ptr == NULL) {
return;
}
SStateWindowInfo* pWin = (SStateWindowInfo*) ptr;
taosMemoryFreeClear(pWin->stateKey.pData);
}
void destroyStateStreamAggSupporter(SStreamAggSupporter* pSup) {
taosMemoryFreeClear(pSup->pKeyBuf);
void** pIte = NULL;
while ((pIte = taosHashIterate(pSup->pResultRows, pIte)) != NULL) {
SArray* pWins = (SArray*)(*pIte);
taosArrayDestroyEx(pWins, (FDelete)destroyStateWinInfo);
}
taosHashCleanup(pSup->pResultRows);
destroyDiskbasedBuf(pSup->pResultBuf);
blockDataDestroy(pSup->pScanBlock);
}
void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
......@@ -3607,12 +3627,17 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
void deleteWindow(SArray* pWinInfos, int32_t index) {
void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
if (fp) {
void* ptr = taosArrayGet(pWinInfos, index);
fp(ptr);
}
taosArrayRemove(pWinInfos, index);
}
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result) {
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap,
SArray* result, FDelete fp) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
......@@ -3626,7 +3651,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
if (!pCurWin) {
break;
}
deleteWindow(pAggSup->pCurWins, winIndex);
deleteWindow(pAggSup->pCurWins, winIndex, fp);
if (result) {
taosArrayPush(result, pCurWin);
}
......@@ -3751,7 +3776,7 @@ SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
bool delete) {
bool delete, FDelete fp) {
// Todo(liuyao) save window to tdb
void** pIte = NULL;
size_t keyLen = 0;
......@@ -3773,7 +3798,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
pSeWin->isOutput = true;
}
if (delete) {
deleteWindow(pWins, i);
deleteWindow(pWins, i, fp);
i--;
size = taosArrayGetSize(pWins);
}
......@@ -3786,13 +3811,13 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
return TSDB_CODE_SUCCESS;
}
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) {
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete, FDelete fp) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete);
closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete, fp);
}
}
......@@ -3870,13 +3895,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
// gap must be 0
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins);
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, NULL);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
// gap must be 0
doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL);
doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
}
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
......@@ -3918,8 +3943,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
pInfo->ignoreExpiredData, NULL);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData, NULL);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
......@@ -4014,7 +4039,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
// gap must be 0
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, NULL);
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, NULL, NULL);
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
break;
......@@ -4120,7 +4145,7 @@ _error:
void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
destroyStreamAggSupporter(&pInfo->streamAggSup);
destroyStateStreamAggSupporter(&pInfo->streamAggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
if (pInfo->pChildren != NULL) {
int32_t size = taosArrayGetSize(pInfo->pChildren);
......@@ -4132,6 +4157,10 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChInfo);
}
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
taosHashCleanup(pInfo->pSeDeleted);
destroySqlFunctionCtx(pInfo->pDummyCtx, 0);
taosMemoryFreeClear(param);
}
......@@ -4314,7 +4343,7 @@ static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
pSeDeleted);
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex);
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
}
}
......@@ -4357,7 +4386,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&pSDataBlock->info.groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex);
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue;
}
code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pOperator);
......@@ -4415,7 +4444,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins);
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, destroyStateWinInfo);
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
taosArrayDestroy(pWins);
continue;
......@@ -4437,8 +4466,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
pInfo->ignoreExpiredData, destroyStateWinInfo);
// closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData, destroyStateWinInfo);
copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated);
......
......@@ -498,4 +498,7 @@ if $data15 != 3 then
goto loop5
endi
sql drop database test;
sql drop database test1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册