提交 34ff1322 编写于 作者: L liuyao

op final window

上级 1a42986e
...@@ -2172,7 +2172,7 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { ...@@ -2172,7 +2172,7 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
} }
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId}; SWinKey key = {.ts = pWin->skey, .groupId = groupId};
if (streamStateCheck(pState, &key)) { if (streamStateCheck(pState, &key)) {
return true; return true;
...@@ -2279,17 +2279,18 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { ...@@ -2279,17 +2279,18 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) {
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SWinKey* winKey = taosArrayGet(wins, i); SWinKey* winKey = taosArrayGet(wins, i);
STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval);
if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { if (isOverdue(nextWin.ekey, &pInfo->twAggSup) && pInfo->ignoreExpiredData) {
void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); continue;
if (!chIds) { }
SPullWindowInfo pull = { void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey));
.window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; if (!chIds) {
// add pull data request SPullWindowInfo pull = {
if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
int32_t size1 = taosArrayGetSize(pInfo->pChildren); // add pull data request
addPullWindow(pInfo->pPullDataMap, winKey, size1); if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) {
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); int32_t size1 = taosArrayGetSize(pInfo->pChildren);
} addPullWindow(pInfo->pPullDataMap, winKey, size1);
qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1);
} }
} }
} }
...@@ -2450,14 +2451,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -2450,14 +2451,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
continue; continue;
} }
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { if (IS_FINAL_OP(pInfo) && pInfo->pChildren) {
bool ignore = true; bool ignore = true;
SWinKey winRes = { SWinKey winRes = {
.ts = nextWin.skey, .ts = nextWin.skey,
.groupId = groupId, .groupId = groupId,
}; };
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) { if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && isClosed && !chIds) {
SPullWindowInfo pull = { SPullWindowInfo pull = {
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request // add pull data request
...@@ -2654,12 +2655,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2654,12 +2655,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL);
rebuildIntervalWindow(pOperator, delWins, pInfo->pUpdatedMap);
addRetriveWindow(delWins, pInfo); addRetriveWindow(delWins, pInfo);
taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins); taosArrayDestroy(delWins);
...@@ -2701,25 +2696,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2701,25 +2696,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
// if chIndex + 1 - size > 0, add new child
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
if (!pChildOp) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info;
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
taosArrayPush(pInfo->pChildren, &pChildOp);
qDebug("===stream===add child, id:%d", chIndex);
}
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL);
}
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
...@@ -2729,7 +2705,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2729,7 +2705,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval,
pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator);
closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs);
} }
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
......
...@@ -6,32 +6,6 @@ system sh/exec.sh -n dnode1 -s start ...@@ -6,32 +6,6 @@ system sh/exec.sh -n dnode1 -s start
sleep 50 sleep 50
sql connect sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2 print ===== step2
sql drop stream if exists stream_t1; sql drop stream if exists stream_t1;
sql drop database if exists test; sql drop database if exists test;
...@@ -58,6 +32,28 @@ sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL); ...@@ -58,6 +32,28 @@ sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL); sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL); sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 1 select * from streamtST1;
sql select * from streamtST1;
if $rows != 1 then
print =====rows=$rows
goto loop0
endi
if $data01 != 8 then
print =====data01=$data01
goto loop0
endi
sql insert into ts1 values(1648791223002,2,2,3,1.1); sql insert into ts1 values(1648791223002,2,2,3,1.1);
sql insert into ts1 values(1648791233003,3,2,3,2.1); sql insert into ts1 values(1648791233003,3,2,3,2.1);
sql insert into ts2 values(1648791243004,4,2,43,73.1); sql insert into ts2 values(1648791243004,4,2,43,73.1);
...@@ -66,10 +62,162 @@ sql insert into ts1 values(1648791243005,4,20,3,3.1); ...@@ -66,10 +62,162 @@ sql insert into ts1 values(1648791243005,4,20,3,3.1);
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
$loop_count = 0
loop01:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 2 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop01
endi
if $data01 != 8 then
print =====data01=$data01
goto loop01
endi
if $data11 != 1 then
print =====data11=$data11
goto loop01
endi
if $data21 != 1 then
print =====data21=$data21
goto loop01
endi
if $data31 != 11 then
print =====data31=$data31
goto loop01
endi
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop011:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 3 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop011
endi
if $data01 != 8 then
print =====data01=$data01
goto loop011
endi
if $data11 != 2 then
print =====data11=$data11
goto loop011
endi
if $data21 != 1 then
print =====data21=$data21
goto loop011
endi
if $data31 != 13 then
print =====data31=$data31
goto loop011
endi
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
$loop_count = 0
loop02:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 4 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop02
endi
if $data01 != 8 then
print =====data01=$data01
goto loop02
endi
if $data11 != 3 then
print =====data11=$data11
goto loop02
endi
if $data21 != 2 then
print =====data21=$data21
goto loop02
endi
if $data31 != 15 then
print =====data31=$data31
goto loop02
endi
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop03:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 5 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop03
endi
if $data01 != 8 then
print =====data01=$data01
goto loop03
endi
if $data11 != 3 then
print =====data11=$data11
goto loop03
endi
if $data21 != 2 then
print =====data21=$data21
goto loop03
endi
if $data31 != 15 then
print =====data31=$data31
goto loop03
endi
sql insert into ts3 values(1648791223002,2,2,3,1.1); sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1); sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1); sql insert into ts3 values(1648791243004,4,2,43,73.1);
...@@ -79,6 +227,44 @@ sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; ...@@ -79,6 +227,44 @@ sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop04:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print 6 select * from streamtST1;
sql select * from streamtST1;
if $rows != 4 then
print =====rows=$rows
goto loop04
endi
if $data01 != 8 then
print =====data01=$data01
goto loop04
endi
if $data11 != 5 then
print =====data11=$data11
goto loop04
endi
if $data21 != 3 then
print =====data21=$data21
goto loop04
endi
if $data31 != 28 then
print =====data31=$data31
goto loop04
endi
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册