提交 f2364579 编写于 作者: L liuyao

feat:add data version

上级 9a1b2a6d
......@@ -142,9 +142,7 @@ typedef struct {
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
// int8_t triggerSaved;
// int64_t deleteMarkSaved;
int64_t dataVersion;
SStreamState* pState;
} SStreamTaskInfo;
......@@ -575,6 +573,7 @@ typedef struct SStreamIntervalOperatorInfo {
uint64_t numOfDatapack;
SArray* pUpdated;
SSHashObj* pUpdatedMap;
int64_t dataVersion;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
......@@ -623,6 +622,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool ignoreExpiredData;
SArray* pUpdated;
SSHashObj* pStUpdated;
int64_t dataVersion;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -640,6 +640,7 @@ typedef struct SStreamStateAggOperatorInfo {
bool ignoreExpiredData;
SArray* pUpdated;
SSHashObj* pSeUpdated;
int64_t dataVersion;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......
......@@ -2409,9 +2409,14 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos;
}
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) {
pTaskInfo->streamInfo.dataVersion = version;
}
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
......@@ -2599,6 +2604,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion);
qDebug("===stream===clear semi operator");
} else {
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
......@@ -2883,6 +2889,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState, pInfo->twAggSup.deleteMark);
pInfo->dataVersion = 0;
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -3233,6 +3240,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
int32_t rows = pSDataBlock->info.rows;
int32_t winRows = 0;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = NULL;
......@@ -3695,6 +3704,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL;
pInfo->dataVersion = 0;
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
......@@ -4005,6 +4015,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t winRows = 0;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
......@@ -4220,6 +4233,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL;
pInfo->dataVersion = 0;
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
......@@ -4856,6 +4870,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
streamStateCommit(pInfo->pState);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion);
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
}
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册