提交 93f546f5 编写于 作者: L liuyao

stream operator decode

上级 87bedef2
...@@ -2616,7 +2616,7 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { ...@@ -2616,7 +2616,7 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) {
tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins); tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins);
// 5.dataVersion // 5.dataVersion
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
return tlen; return tlen;
} }
...@@ -3086,6 +3086,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3086,6 +3086,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, pOperator); doStreamIntervalDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
} }
return pOperator; return pOperator;
...@@ -4153,6 +4154,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -4153,6 +4154,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, pOperator); doStreamSessionDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
} }
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
...@@ -4876,6 +4878,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4876,6 +4878,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamStateDecodeOpState(buff, pOperator); doStreamStateDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->streamAggSup.pState->checkPointId);
} }
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
...@@ -5732,6 +5735,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5732,6 +5735,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, pOperator); doStreamIntervalDecodeOpState(buff, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
} }
initIntervalDownStream(downstream, pPhyNode->type, pInfo); initIntervalDownStream(downstream, pPhyNode->type, pInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册