未验证 提交 cb7bb954 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21577 from taosdata/fix/TD-24515

reset unclosed window info
......@@ -456,6 +456,7 @@ typedef struct SStreamIntervalOperatorInfo {
SSHashObj* pUpdatedMap;
int64_t dataVersion;
SStateStore statestore;
bool recvGetAll;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
......
......@@ -2439,6 +2439,15 @@ static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
return 0;
}
static void resetUnCloseWinInfo(SSHashObj* winMap) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pIte;
pPos->beUsed = true;
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -2472,6 +2481,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
}
setOperatorCompleted(pOperator);
if (!IS_FINAL_OP(pInfo)) {
clearFunctionContext(&pOperator->exprSupp);
......@@ -2565,6 +2579,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
......@@ -2773,6 +2788,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
pInfo->dataVersion = 0;
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -4751,6 +4767,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes;
}
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
}
setOperatorCompleted(pOperator);
if (pInfo->twAggSup.maxTs > 0 &&
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
......@@ -4790,6 +4812,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
qDebug("===stream===single interval recv|block type STREAM_GET_ALL");
pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
......@@ -4960,6 +4983,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -137,7 +137,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts)) {
if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册