提交 d429e070 编写于 作者: L liuyao

fix fill bug

上级 f15b806e
......@@ -112,6 +112,7 @@ typedef struct SStreamFillInfo {
int32_t pos;
SArray* delRanges;
int32_t delIndex;
uint64_t curGroupId;
} SStreamFillInfo;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
......
......@@ -520,7 +520,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
pFillSup->cur.pRowVal = curVal;
SStreamStateCur* pCur = streamStateFillSeekKeyPrev(pState, &key);
SWinKey preKey = {.groupId = groupId};
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
void* preVal = NULL;
int32_t preVLen = 0;
code = streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
......@@ -542,7 +542,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
pCur = streamStateFillSeekKeyNext(pState, &key);
}
SWinKey nextKey = {.groupId = groupId};
SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
void* nextVal = NULL;
int32_t nextVLen = 0;
code = streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
......@@ -1242,6 +1242,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
printDataBlock(pBlock, "stream fill recv");
if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
pInfo->pFillInfo->preRowKey = INT64_MIN;
}
switch (pBlock->info.type) {
case STREAM_RETRIEVE:
return pBlock;
......@@ -1392,6 +1397,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->type = pFillSup->type;
pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange));
pFillInfo->delIndex = 0;
pFillInfo->curGroupId = 0;
return pFillInfo;
}
......
......@@ -2854,7 +2854,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
if ( (!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo ) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
}
pScanInfo->twAggSup = *pTwSup;
......
......@@ -102,8 +102,8 @@ print step 2
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a)
sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a);
print create stream streams2 trigger at_once watermark 1000s IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a)
sql create stream streams2 trigger at_once watermark 1000s IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213010,1,2,3,1.1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册