提交 c8af8daa 编写于 作者: 5 54liuyao

feat(stream): state\session max delay

上级 824c3fbe
...@@ -2649,7 +2649,7 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); ...@@ -2649,7 +2649,7 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getSessionWinInfo(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getStateWinInfo(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed, int8_t calTrigger, int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pClosed,
__get_win_info_ fn) { __get_win_info_ fn) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
...@@ -2658,19 +2658,9 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC ...@@ -2658,19 +2658,9 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC
SResultWindowInfo* pSeWin = fn(pWin); SResultWindowInfo* pSeWin = fn(pWin);
if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) {
if (!pSeWin->isClosed) { if (!pSeWin->isClosed) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pos->groupId = 0;
pos->pos = pSeWin->pos;
*(int64_t*)pos->key = pSeWin->win.ekey;
if (!taosArrayPush(pClosed, &pos)) {
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSeWin->isClosed = true; pSeWin->isClosed = true;
if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
pSeWin->isOutput = true; pSeWin->isOutput = true;
} }
} }
...@@ -2681,6 +2671,19 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC ...@@ -2681,6 +2671,19 @@ int32_t closeSessionWindow(SArray* pWins, STimeWindowAggSupp* pTwSup, SArray* pC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getAllSessionWindow(SArray* pWins, SArray* pClosed, __get_win_info_ fn) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
void* pWin = taosArrayGet(pWins, i);
SResultWindowInfo* pSeWin = fn(pWin);
if (!pSeWin->isClosed) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, 0, pClosed);
pSeWin->isOutput = true;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -2703,6 +2706,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2703,6 +2706,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); SHashObj* pStUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -2723,7 +2727,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2723,7 +2727,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} }
taosArrayDestroy(pWins); taosArrayDestroy(pWins);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getSessionWinInfo);
continue;
} }
if (isFinalSession(pInfo)) { if (isFinalSession(pInfo)) {
int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock int32_t childIndex = 0; // Todo(liuyao) get child id from SSDataBlock
SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SOptrBasicInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
...@@ -2735,15 +2744,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -2735,15 +2744,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
SArray* pClosed = taosArrayInit(16, POINTER_BYTES); closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger,
getSessionWinInfo); getSessionWinInfo);
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId); copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowCellInfoOffset);
...@@ -3067,6 +3071,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3067,6 +3071,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pSeUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK); SHashObj* pSeUpdated = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -3078,6 +3083,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3078,6 +3083,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
pSeUpdated, pInfo->pSeDeleted); pSeUpdated, pInfo->pSeDeleted);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL &&
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getStateWinInfo);
continue;
} }
doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted); doStreamStateAggImpl(pOperator, pBlock, pSeUpdated, pInfo->pSeDeleted);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
...@@ -3085,15 +3094,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -3085,15 +3094,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
SArray* pClosed = taosArrayInit(16, POINTER_BYTES); closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pClosed, pInfo->twAggSup.calTrigger,
getStateWinInfo); getStateWinInfo);
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId); copyUpdateResult(pSeUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pInfo->binfo.rowCellInfoOffset); pInfo->binfo.rowCellInfoOffset);
......
...@@ -71,6 +71,7 @@ ...@@ -71,6 +71,7 @@
./test.sh -f tsim/stream/basic0.sim ./test.sh -f tsim/stream/basic0.sim
./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic2.sim
# ./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim # ./test.sh -f tsim/stream/state0.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream stream_t1 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sleep 1000
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts1 values(1648791223002,2,2,3,1.1);
sql insert into ts1 values(1648791233003,3,2,3,2.1);
sql insert into ts2 values(1648791243004,4,2,43,73.1);
sql insert into ts1 values(1648791213002,24,22,23,4.1);
sql insert into ts1 values(1648791243005,4,20,3,3.1);
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1);
sql insert into ts4 values(1648791213002,24,22,23,4.1);
sql insert into ts3 values(1648791243005,4,20,3,3.1);
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
$loop_count = 0
loop1:
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 8 then
print =====data01=$data01
goto loop1
endi
if $data02 != 4 then
print =====data02=$data02
goto loop1
endi
if $data03 != 4 then
print ======$data03
return -1
endi
if $data04 != 52 then
print ======$data04
return -1
endi
if $data05 != 13 then
print ======$data05
return -1
endi
# row 1
if $data11 != 6 then
print =====data11=$data11
goto loop1
endi
if $data12 != 6 then
print =====data12=$data12
goto loop1
endi
if $data13 != 92 then
print ======$data13
return -1
endi
if $data14 != 22 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 4 then
print =====data21=$data21
goto loop1
endi
if $data22 != 4 then
print =====data22=$data22
goto loop1
endi
if $data23 != 32 then
print ======$data23
return -1
endi
if $data24 != 12 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 30 then
print =====data31=$data31
goto loop1
endi
if $data32 != 30 then
print =====data32=$data32
goto loop1
endi
if $data33 != 180 then
print ======$data33
return -1
endi
if $data34 != 42 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册