提交 f18cfc25 编写于 作者: 5 54liuyao

fix:reset parameter ignoreExpried for fill history

上级 31152651
...@@ -559,6 +559,7 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -559,6 +559,7 @@ typedef struct SStreamIntervalOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible; bool invertible;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
...@@ -620,6 +621,7 @@ typedef struct SStreamSessionAggOperatorInfo { ...@@ -620,6 +621,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal; bool isFinal;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pStUpdated; SSHashObj* pStUpdated;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
...@@ -637,6 +639,7 @@ typedef struct SStreamStateAggOperatorInfo { ...@@ -637,6 +639,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator; void* pDelIterator;
SArray* pChildren; // cache for children's result; SArray* pChildren; // cache for children's result;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pSeUpdated; SSHashObj* pSeUpdated;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;
......
...@@ -888,7 +888,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { ...@@ -888,7 +888,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
...@@ -904,6 +905,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { ...@@ -904,6 +905,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
...@@ -917,6 +920,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { ...@@ -917,6 +920,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} }
// iterate operator tree // iterate operator tree
...@@ -944,35 +949,23 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { ...@@ -944,35 +949,23 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} }
......
...@@ -2769,6 +2769,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2769,6 +2769,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
...@@ -3587,6 +3588,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3587,6 +3588,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode; pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL; pInfo->pStUpdated = NULL;
...@@ -4112,6 +4114,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4112,6 +4114,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL; pInfo->pSeUpdated = NULL;
...@@ -4885,6 +4888,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4885,6 +4888,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->interval = interval; pInfo->interval = interval;
pInfo->twAggSup = twAggSupp; pInfo->twAggSup = twAggSupp;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->isFinal = false; pInfo->isFinal = false;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册