Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
91bbca0b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
91bbca0b
编写于
9月 05, 2022
作者:
W
wade zhang
提交者:
GitHub
9月 05, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16658 from taosdata/feature/TD-18820
feat(stream): refactor single interval
上级
229e1ba7
f35f5d5f
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
492 addition
and
218 deletion
+492
-218
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+20
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+3
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+318
-170
tests/script/tsim/stream/basic1.sim
tests/script/tsim/stream/basic1.sim
+34
-0
tests/script/tsim/stream/distributeInterval0.sim
tests/script/tsim/stream/distributeInterval0.sim
+40
-1
tests/script/tsim/stream/partitionbyColumn0.sim
tests/script/tsim/stream/partitionbyColumn0.sim
+14
-14
tests/script/tsim/stream/partitionbyColumn1.sim
tests/script/tsim/stream/partitionbyColumn1.sim
+53
-32
tests/script/tsim/stream/partitionbyColumn2.sim
tests/script/tsim/stream/partitionbyColumn2.sim
+10
-0
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
91bbca0b
...
...
@@ -591,6 +591,24 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
SNode
*
pCondition
;
}
SMergeAlignedIntervalAggOperatorInfo
;
typedef
struct
SStreamIntervalOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
// basic info
SAggSupporter
aggSup
;
// aggregate supporter
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindowAggSupp
twAggSup
;
bool
invertible
;
bool
ignoreExpiredData
;
SArray
*
pRecycledPages
;
SArray
*
pDelWins
;
// SWinRes
int32_t
delIndex
;
SSDataBlock
*
pDelRes
;
bool
isFinal
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SStreamFinalIntervalOperatorInfo
{
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
// basic info
...
...
@@ -1003,6 +1021,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
91bbca0b
...
...
@@ -3913,7 +3913,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
createAggregateOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pAggNode
->
node
.
pConditions
,
pScalarExprInfo
,
numOfScalarExpr
,
pAggNode
->
mergeDataBlock
,
pTaskInfo
);
}
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
==
type
||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
==
type
)
{
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
num
);
...
...
@@ -3938,6 +3938,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pIntervalPhyNode
,
pTaskInfo
,
isStream
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
pOptr
=
createStreamIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
type
)
{
SMergeAlignedIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeAlignedIntervalPhysiNode
*
)
pPhyNode
;
pOptr
=
createMergeAlignedIntervalOperatorInfo
(
ops
[
0
],
pIntervalPhyNode
,
pTaskInfo
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
91bbca0b
...
...
@@ -939,7 +939,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
)
{
return
isOverdue
(
pWin
->
ekey
,
pSup
);
}
static
void
hashIntervalAgg
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SHashObj
*
pUpdatedMap
)
{
int32_t
scanFlag
)
{
SIntervalAggOperatorInfo
*
pInfo
=
(
SIntervalAggOperatorInfo
*
)
pOperatorInfo
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
...
...
@@ -955,21 +955,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
inputOrder
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveWinResultRow
(
pResult
,
tableGroupId
,
pUpdatedMap
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
inputOrder
);
...
...
@@ -991,12 +981,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
win
,
startPos
,
forwardRows
,
pSup
);
}
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
...
...
@@ -1007,13 +994,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
startPos
<
0
)
{
break
;
}
if
(
pInfo
->
ignoreExpiredData
&&
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
))
{
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
inputOrder
);
continue
;
}
// null data, failed to allocate more memory buffer
int32_t
code
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
nextWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
...
...
@@ -1021,11 +1001,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveWinResultRow
(
pResult
,
tableGroupId
,
pUpdatedMap
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
inputOrder
);
...
...
@@ -1130,7 +1105,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
inputOrder
,
scanFlag
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
);
}
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
resultTsOrder
);
...
...
@@ -1581,141 +1556,6 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
}
}
static
SSDataBlock
*
doStreamIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SIntervalAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
pInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single interval"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainResults
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
qDebug
(
"===stream===single interval is done"
);
freeAllPages
(
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
}
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
}
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SResKeyPos
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
SStreamState
*
pState
=
pTaskInfo
->
streamInfo
.
pState
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
break
;
}
// qInfo("===stream===%ld", pBlock->info.version);
printDataBlock
(
pBlock
,
"single interval recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pOperator
->
exprSupp
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
doDeleteSpecifyIntervalWindow
(
&
pInfo
->
aggSup
,
pBlock
,
pInfo
->
pDelWins
,
&
pInfo
->
interval
,
pUpdatedMap
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdatedMap
);
continue
;
}
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
&&
pBlock
->
info
.
version
!=
0
)
{
// set input version
pTaskInfo
->
version
=
pBlock
->
info
.
version
;
}
if
(
pInfo
->
scalarSupp
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
inputOrder
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pSup
->
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
type
);
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdatedMap
);
}
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdatedMap
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
void
*
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pUpdatedMap
,
pIte
))
!=
NULL
)
{
taosArrayPush
(
pUpdated
,
pIte
);
}
taosArraySort
(
pUpdated
,
resultrowComparAsc
);
finalizeUpdatedResult
(
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pSup
->
rowEntryInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
removeDeleteResults
(
pUpdatedMap
,
pInfo
->
pDelWins
);
taosHashCleanup
(
pUpdatedMap
);
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single interval"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
}
static
void
destroyStateWindowOperatorInfo
(
void
*
param
)
{
SStateWindowOperatorInfo
*
pInfo
=
(
SStateWindowOperatorInfo
*
)
param
;
cleanupBasicInfo
(
&
pInfo
->
binfo
);
...
...
@@ -1925,7 +1765,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
doStreamIntervalAgg
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
NULL
,
destroyIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
if
(
nodeType
(
pPhyNode
)
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
)
{
...
...
@@ -5627,3 +5467,311 @@ _error:
pTaskInfo
->
code
=
code
;
return
NULL
;
}
static
void
doStreamIntervalAggImpl
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SHashObj
*
pUpdatedMap
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
(
SStreamIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
uint64_t
tableGroupId
=
pBlock
->
info
.
groupId
;
bool
ascScan
=
true
;
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
TSDB_ORDER_ASC
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveWinResultRow
(
pResult
,
tableGroupId
,
pUpdatedMap
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
}
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
ASSERT
(
forwardRows
>
0
);
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
win
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
TSDB_ORDER_ASC
);
if
(
startPos
<
0
)
{
break
;
}
if
(
pInfo
->
ignoreExpiredData
&&
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
))
{
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
continue
;
}
// null data, failed to allocate more memory buffer
int32_t
code
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
nextWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveWinResultRow
(
pResult
,
tableGroupId
,
pUpdatedMap
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
}
static
SSDataBlock
*
doStreamIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int64_t
maxTs
=
INT64_MIN
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single interval"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainResults
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
qDebug
(
"===stream===single interval is done"
);
freeAllPages
(
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
}
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
}
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SResKeyPos
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
SStreamState
*
pState
=
pTaskInfo
->
streamInfo
.
pState
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
break
;
}
printDataBlock
(
pBlock
,
"single interval recv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pOperator
->
exprSupp
,
&
pInfo
->
interval
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
doDeleteSpecifyIntervalWindow
(
&
pInfo
->
aggSup
,
pBlock
,
pInfo
->
pDelWins
,
&
pInfo
->
interval
,
pUpdatedMap
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdatedMap
);
continue
;
}
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
&&
pBlock
->
info
.
version
!=
0
)
{
// set input version
pTaskInfo
->
version
=
pBlock
->
info
.
version
;
}
if
(
pInfo
->
scalarSupp
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pSup
->
pCtx
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
type
);
}
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
doStreamIntervalAggImpl
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdatedMap
);
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdatedMap
,
pInfo
->
pRecycledPages
,
pInfo
->
aggSup
.
pResultBuf
);
void
*
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pUpdatedMap
,
pIte
))
!=
NULL
)
{
taosArrayPush
(
pUpdated
,
pIte
);
}
taosArraySort
(
pUpdated
,
resultrowComparAsc
);
finalizeUpdatedResult
(
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pSup
->
rowEntryInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
removeDeleteResults
(
pUpdatedMap
,
pInfo
->
pDelWins
);
taosHashCleanup
(
pUpdatedMap
);
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single interval"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
}
void
destroyStreamIntervalOperatorInfo
(
void
*
param
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
(
SStreamIntervalOperatorInfo
*
)
param
;
cleanupBasicInfo
(
&
pInfo
->
binfo
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
pInfo
->
pRecycledPages
=
taosArrayDestroy
(
pInfo
->
pRecycledPages
);
pInfo
->
pDelWins
=
taosArrayDestroy
(
pInfo
->
pDelWins
);
pInfo
->
pDelRes
=
blockDataDestroy
(
pInfo
->
pDelRes
);
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
colDataDestroy
(
&
pInfo
->
twAggSup
.
timeWindowData
);
taosMemoryFreeClear
(
param
);
}
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamIntervalOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamIntervalOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
SStreamIntervalPhysiNode
*
pIntervalPhyNode
=
(
SStreamIntervalPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
ASSERT
(
numOfCols
>
0
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
SInterval
interval
=
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
,
};
STimeWindowAggSupp
twAggSupp
=
{.
waterMark
=
pIntervalPhyNode
->
window
.
watermark
,
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
};
ASSERT
(
twAggSupp
.
calTrigger
!=
STREAM_TRIGGER_MAX_DELAY
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
interval
=
interval
;
pInfo
->
twAggSup
=
twAggSupp
;
pInfo
->
ignoreExpiredData
=
pIntervalPhyNode
->
window
.
igExpired
;
pInfo
->
isFinal
=
false
;
if
(
pIntervalPhyNode
->
window
.
pExprs
!=
NULL
)
{
int32_t
numOfScalar
=
0
;
SExprInfo
*
pScalarExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pExprs
,
NULL
,
&
numOfScalar
);
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSupp
,
pScalarExprInfo
,
numOfScalar
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
int32_t
code
=
initAggInfo
(
pSup
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initBasicInfo
(
&
pInfo
->
binfo
,
pResBlock
);
initStreamFunciton
(
pSup
->
pCtx
,
pSup
->
numOfExprs
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
pInfo
->
invertible
=
allInvertible
(
pSup
->
pCtx
,
numOfCols
);
pInfo
->
invertible
=
false
;
// Todo(liuyao): Dependent TSDB API
pInfo
->
pRecycledPages
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
pInfo
->
pDelWins
=
taosArrayInit
(
4
,
sizeof
(
SWinKey
));
pInfo
->
delIndex
=
0
;
pInfo
->
pDelRes
=
createSpecialDataBlock
(
STREAM_DELETE_RESULT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
pOperator
->
name
=
"StreamIntervalOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
NULL
,
destroyStreamIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
pInfo
->
twAggSup
.
waterMark
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
_error:
destroyStreamIntervalOperatorInfo
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
tests/script/tsim/stream/basic1.sim
浏览文件 @
91bbca0b
...
...
@@ -588,4 +588,38 @@ if $data00 != 5 then
goto loop3
endi
#max,min selectivity
sql create database test3 vgroups 1;
sql use test3;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from ts1 interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sleep 50
sql insert into ts1 values(1648791222001,2,2,3);
sleep 50
$loop_count = 0
loop3:
sql select * from streamtST3;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data02 != 1 then
print =====data02=$data02
goto loop3
endi
# row 1
if $data12 != 2 then
print =====data12=$data12
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/stream/distributeInterval0.sim
浏览文件 @
91bbca0b
...
...
@@ -198,7 +198,7 @@ endi
sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
sql create database test1 vgroups
1
;
sql create database test1 vgroups
4
;
sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
...
...
@@ -232,4 +232,43 @@ if $data11 != 2 then
goto loop2
endi
#max,min selectivity
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sleep 50
sql insert into ts1 values(1648791222001,2,2,3);
sleep 50
sql insert into ts2 values(1648791211000,1,2,3);
sleep 50
sql insert into ts2 values(1648791222001,2,2,3);
sleep 50
$loop_count = 0
loop3:
sql select * from streamtST3;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data02 != 1 then
print =====data02=$data02
goto loop3
endi
# row 1
if $data12 != 2 then
print =====data12=$data12
goto loop3
endi
system sh/stop_dnodes.sh
tests/script/tsim/stream/partitionbyColumn0.sim
浏览文件 @
91bbca0b
...
...
@@ -24,7 +24,7 @@ sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
sleep
10
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -48,7 +48,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop1:
sleep
10
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -71,7 +71,7 @@ sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop2:
sleep
10
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -97,7 +97,7 @@ sql insert into t1 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop3:
sleep
10
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -134,7 +134,7 @@ sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (16
$loop_count = 0
loop4:
sleep
10
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -208,7 +208,7 @@ sql insert into t1 values(1648791213001,1,2,3,2.0);
$loop_count = 0
loop5:
sleep
10
0
sleep
5
0
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -229,7 +229,7 @@ sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0);
$loop_count = 0
loop6:
sleep
10
0
sleep
5
0
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -294,7 +294,7 @@ sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep
10
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -318,7 +318,7 @@ sql insert into t2 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop8:
sleep
10
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -342,7 +342,7 @@ sql insert into t2 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop9:
sleep
10
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -372,7 +372,7 @@ sql insert into t2 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop10:
sleep
10
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -414,7 +414,7 @@ sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (16
$loop_count = 0
loop11:
sleep
10
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -492,7 +492,7 @@ sql insert into t4 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop13:
sleep
10
0
sleep
5
0
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
@@ -534,7 +534,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop14:
sleep
10
0
sleep
5
0
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
...
...
tests/script/tsim/stream/partitionbyColumn1.sim
浏览文件 @
91bbca0b
...
...
@@ -24,11 +24,11 @@ sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop0:
sleep
30
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -45,12 +45,14 @@ endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop1:
sleep
30
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -66,12 +68,14 @@ endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop2:
sleep
30
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -90,12 +94,14 @@ sql insert into t1 values(1648791213001,2,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop3:
sleep
30
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -125,12 +131,14 @@ sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
$loop_count = 0
loop4:
sleep
30
0
sleep
5
0
sql select * from streamt order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -199,11 +207,11 @@ sql insert into t1 values(1648791213001,1,2,3,2.0);
$loop_count = 0
loop5:
sleep
30
0
sleep
5
0
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -217,12 +225,14 @@ sql insert into t1 values(1648791223001,1,2,5,2.0);
sql insert into t1 values(1648791223002,1,2,5,2.0);
sql insert into t1 values(1648791213001,1,1,6,2.0) (1648791223002,1,1,7,2.0);
$loop_count = 0
loop6:
sleep
30
0
sleep
5
0
sql select * from streamt1 order by c1, c4, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -282,11 +292,11 @@ sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
$loop_count = 0
loop7:
sleep
30
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -303,12 +313,14 @@ endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop8:
sleep
30
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -324,12 +336,15 @@ endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop9:
sleep
30
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -352,12 +367,14 @@ sql insert into t2 values(1648791213001,2,2,3,1.0);
sql insert into t2 values(1648791213002,2,2,3,1.0);
sql insert into t2 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop10:
sleep
30
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -373,7 +390,7 @@ endi
if $data11 != 2 thenloop4
print =====data11=$data11
goto loop
3
goto loop
10
endi
if $data12 != 1 then
...
...
@@ -392,12 +409,14 @@ sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
$loop_count = 0
loop11:
sleep
30
0
sleep
5
0
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
@@ -470,17 +489,17 @@ sql insert into t4 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop13:
sleep
30
0
sleep
5
0
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop1
4
goto loop1
3
endi
if $data01 != 1 then
...
...
@@ -495,12 +514,12 @@ endi
if $data11 != 3 then
print =====data11=$data11
goto loop1
1
goto loop1
3
endi
if $data12 != 2 then
print =====data12=$data12
goto loop1
1
goto loop1
3
endi
sql insert into t4 values(1648791213000,2,2,3,1.0);
...
...
@@ -509,12 +528,14 @@ sql insert into t1 values(1648791233000,2,2,3,1.0);
sql insert into t1 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop14:
sleep
30
0
sleep
5
0
sql select * from test.streamt4 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count ==
1
0 then
if $loop_count ==
2
0 then
return -1
endi
...
...
tests/script/tsim/stream/partitionbyColumn2.sim
浏览文件 @
91bbca0b
...
...
@@ -40,6 +40,8 @@ endi
sql insert into t1 values(1648791213000,1,1,3,1.0);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
...
...
@@ -61,6 +63,8 @@ endi
sql insert into t1 values(1648791213000,2,1,3,1.0);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
...
...
@@ -85,6 +89,8 @@ sql insert into t1 values(1648791213001,2,1,3,1.0);
sql insert into t1 values(1648791213002,2,1,3,1.0);
sql insert into t1 values(1648791213002,1,1,3,1.0);
$loop_count = 0
loop3:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
...
...
@@ -120,6 +126,8 @@ sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,1,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
$loop_count = 0
loop4:
sleep 300
sql select * from streamt order by c1, c4, c2, c3;
...
...
@@ -212,6 +220,8 @@ sql insert into t1 values(1648791223001,1,2,2,5);
sql insert into t1 values(1648791223002,1,2,2,6);
sql insert into t1 values(1648791213001,1,1,1,7) (1648791223002,1,1,2,8);
$loop_count = 0
loop6:
sleep 300
sql select * from streamt1 order by c1, c4, c2, c3;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录