未验证 提交 3bf3300c 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19092 from taosdata/fix/TD-21449

fix:timestampe is out of order
...@@ -657,7 +657,6 @@ typedef struct SStreamFillOperatorInfo { ...@@ -657,7 +657,6 @@ typedef struct SStreamFillOperatorInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
SSDataBlock* pSrcBlock; SSDataBlock* pSrcBlock;
int32_t srcRowIndex; int32_t srcRowIndex;
SSDataBlock* pPrevSrcBlock;
SSDataBlock* pSrcDelBlock; SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex; int32_t srcDelRowIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
......
...@@ -470,7 +470,6 @@ static void destroyStreamFillOperatorInfo(void* param) { ...@@ -470,7 +470,6 @@ static void destroyStreamFillOperatorInfo(void* param) {
pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup); pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock);
pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
...@@ -992,12 +991,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { ...@@ -992,12 +991,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
if (pInfo->srcRowIndex == 0) { if (pInfo->srcRowIndex == 0) {
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
SSDataBlock* preBlock = pInfo->pPrevSrcBlock;
if (preBlock->info.rows > 0) {
int preRowId = preBlock->info.rows - 1;
SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol);
doFillResults(pOperator, pFillSup, pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId, pRes);
}
pInfo->srcRowIndex++; pInfo->srcRowIndex++;
} }
...@@ -1011,9 +1004,8 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { ...@@ -1011,9 +1004,8 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
} }
pInfo->srcRowIndex++; pInfo->srcRowIndex++;
} }
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol); blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
blockDataCleanup(pInfo->pPrevSrcBlock);
copyDataBlock(pInfo->pPrevSrcBlock, pInfo->pSrcBlock);
blockDataCleanup(pInfo->pSrcBlock); blockDataCleanup(pInfo->pSrcBlock);
} }
...@@ -1173,7 +1165,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { ...@@ -1173,7 +1165,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
} }
static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) { static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
blockDataCleanup(pInfo->pPrevSrcBlock);
tSimpleHashClear(pInfo->pFillSup->pResMap); tSimpleHashClear(pInfo->pFillSup->pResMap);
pInfo->pFillSup->hasDelete = false; pInfo->pFillSup->hasDelete = false;
taosArrayClear(pInfo->pFillInfo->delRanges); taosArrayClear(pInfo->pFillInfo->delRanges);
...@@ -1231,13 +1222,6 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1231,13 +1222,6 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
SSDataBlock* preBlock = pInfo->pPrevSrcBlock;
if (preBlock->info.rows > 0) {
int preRowId = preBlock->info.rows - 1;
SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol);
doFillResults(pOperator, pInfo->pFillSup, pInfo->pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId,
pInfo->pRes);
}
pInfo->pFillInfo->preRowKey = INT64_MIN; pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, "stream fill");
...@@ -1411,10 +1395,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1411,10 +1395,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity);
pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes); pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
if (!pInfo->pFillInfo) { if (!pInfo->pFillInfo) {
......
...@@ -127,7 +127,114 @@ if $rows != 13 then ...@@ -127,7 +127,114 @@ if $rows != 13 then
goto loop3 goto loop3
endi endi
sql insert into t2 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee') t1 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee');
$loop_count = 0
loop4:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt1 order by group_id, ts;
if $rows != 20 then
print ====streamt1=rows1=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt1=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt1 group by group_id;
if $rows != 2 then
print ====streamt1=rows2=$rows
goto loop4
endi
sql select * from streamt2 order by group_id, ts;
if $rows != 20 then
print ====streamt2=rows2=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt2=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt2 group by group_id;
if $rows != 2 then
print ====streamt2=rows2=$rows
goto loop4
endi
sql select * from streamt3 order by group_id, ts;
if $rows != 20 then
print ====streamt3=rows3=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt3=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt3 group by group_id;
if $rows != 2 then
print ====streamt3=rows2=$rows
goto loop4
endi
sql select * from streamt4 order by group_id, ts;
if $rows != 20 then
print ====streamt4=rows4=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt4=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt4 group by group_id;
if $rows != 2 then
print ====streamt4=rows2=$rows
goto loop4
endi
sql select * from streamt5 order by group_id, ts;
if $rows != 20 then
print ====streamt5=rows5=$rows
goto loop4
endi
if $data04 == 0 then
print ====streamt5=data04=$data04
return -1
endi
sql select group_id,count(*) from streamt5 group by group_id;
if $rows != 2 then
print ====streamt5=rows2=$rows
goto loop4
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册