提交 17f2c3f8 编写于 作者: L liuyao

reset wind info

上级 6a9c0330
...@@ -456,6 +456,7 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -456,6 +456,7 @@ typedef struct SStreamIntervalOperatorInfo {
SSHashObj* pUpdatedMap; SSHashObj* pUpdatedMap;
int64_t dataVersion; int64_t dataVersion;
SStateStore statestore; SStateStore statestore;
bool recvGetAll;
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {
......
...@@ -2439,6 +2439,15 @@ static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { ...@@ -2439,6 +2439,15 @@ static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
return 0; return 0;
} }
static void resetUnCloseWinInfo(SSHashObj* winMap) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pIte;
pPos->beUsed = true;
}
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -2472,6 +2481,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2472,6 +2481,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
}
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
...@@ -2565,6 +2579,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2565,6 +2579,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
...@@ -2773,6 +2788,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2773,6 +2788,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
...@@ -4751,6 +4767,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4751,6 +4767,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pInfo->binfo.pRes, "single interval"); printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable);
}
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (pInfo->twAggSup.maxTs > 0 && if (pInfo->twAggSup.maxTs > 0 &&
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
...@@ -4790,6 +4812,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4790,6 +4812,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
qDebug("===stream===single interval recv|block type STREAM_GET_ALL"); qDebug("===stream===single interval recv|block type STREAM_GET_ALL");
pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
...@@ -4960,6 +4983,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4960,6 +4983,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL);
pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
initIntervalDownStream(downstream, pPhyNode->type, pInfo); initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -137,7 +137,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { ...@@ -137,7 +137,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListNode* pNode = NULL; SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) { while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data); SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts)) { if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
ASSERT(pPos->pRowBuff != NULL); ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff)); tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL; pPos->pRowBuff = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册