Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dd38da79
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dd38da79
编写于
6月 08, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
merge 3.0
上级
08209f30
1e2fdd57
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
388 addition
and
202 deletion
+388
-202
include/util/tpagedbuf.h
include/util/tpagedbuf.h
+7
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+6
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+7
-8
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+206
-87
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+1
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+2
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+29
-8
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+7
-0
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+28
-0
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+4
-0
tests/script/tsim/stream/session0.sim
tests/script/tsim/stream/session0.sim
+45
-28
tests/script/tsim/stream/state0.sim
tests/script/tsim/stream/state0.sim
+41
-66
未找到文件。
include/util/tpagedbuf.h
浏览文件 @
dd38da79
...
...
@@ -188,6 +188,13 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
*/
void
dBufPrintStatis
(
const
SDiskbasedBuf
*
pBuf
);
/**
* Set all of page buffer are not need
* @param pBuf
* @return
*/
void
clearDiskbasedBuf
(
SDiskbasedBuf
*
pBuf
);
#ifdef __cplusplus
}
#endif
...
...
source/common/src/tdatablock.c
浏览文件 @
dd38da79
...
...
@@ -1500,7 +1500,7 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
if
(
pColInfoData
->
hasNull
)
{
if
(
colDataIsNull
(
pColInfoData
,
rows
,
j
,
NULL
)
)
{
printf
(
" %15s |"
,
"NULL"
);
continue
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
dd38da79
...
...
@@ -488,6 +488,8 @@ typedef struct SStreamFinalIntervalOperatorInfo {
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SArray
*
pChildren
;
SSDataBlock
*
pUpdateRes
;
SPhysiNode
*
pPhyNode
;
// create new child
}
SStreamFinalIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
...
...
@@ -793,9 +795,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
dd38da79
...
...
@@ -4634,6 +4634,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
int32_t
children
=
8
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
==
type
)
{
int32_t
children
=
0
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SORT
==
type
)
{
SSortPhysiNode
*
pSortPhyNode
=
(
SSortPhysiNode
*
)
pPhyNode
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
dd38da79
...
...
@@ -962,16 +962,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if
(
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
else
if
(
pInfo
->
pUpdateInfo
)
{
SSDataBlock
*
upRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
getUpdateDataBlock
(
pInfo
,
true
,
pInfo
->
pRes
,
upRes
);
if
(
upRes
)
{
pInfo
->
pUpdateRes
=
upRes
;
if
(
upRes
->
info
.
type
==
STREAM_REPROCESS
)
{
blockDataCleanup
(
pInfo
->
pUpdateRes
);
getUpdateDataBlock
(
pInfo
,
true
,
pInfo
->
pRes
,
pInfo
->
pUpdateRes
);
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pUpdateRes
->
info
.
type
==
STREAM_REPROCESS
)
{
pInfo
->
updateResIndex
=
0
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_UPDATERES
;
}
else
if
(
up
Res
->
info
.
type
==
STREAM_INVERT
)
{
}
else
if
(
pInfo
->
pUpdate
Res
->
info
.
type
==
STREAM_INVERT
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
return
up
Res
;
return
pInfo
->
pUpdate
Res
;
}
}
}
...
...
@@ -1050,13 +1049,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo
->
tableUid
=
pScanPhyNode
->
uid
;
pInfo
->
streamBlockReader
=
pHandle
->
reader
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
pUpdateRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
pCondition
=
pScanPhyNode
->
node
.
pConditions
;
pInfo
->
pDataReader
=
pDataReader
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
pInfo
->
pOperatorDumy
=
pTableScanDummy
;
pInfo
->
interval
=
pSTInfo
->
interval
;
pInfo
->
sessionSup
=
(
SessionWindowSupporter
){.
pStreamAggSup
=
NULL
,
.
gap
=
-
1
};
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
blocking
=
false
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
dd38da79
...
...
@@ -23,7 +23,6 @@ typedef enum SResultTsInterpType {
RESULT_ROW_END_INTERP
=
2
,
}
SResultTsInterpType
;
static
SSDataBlock
*
doStreamFinalIntervalAgg
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doStreamSessionAgg
(
SOperatorInfo
*
pOperator
);
static
int64_t
*
extractTsCol
(
SSDataBlock
*
pBlock
,
const
SIntervalAggOperatorInfo
*
pInfo
);
...
...
@@ -778,6 +777,22 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
return
TSDB_CODE_SUCCESS
;
}
static
void
removeResult
(
SArray
*
pUpdated
,
TSKEY
key
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
key
,
TSDB_ORDER_DESC
,
getReskey
);
if
(
index
>=
0
&&
key
==
getReskey
(
pUpdated
,
index
))
{
taosArrayRemove
(
pUpdated
,
index
);
}
}
static
void
removeResults
(
SArray
*
pWins
,
SArray
*
pUpdated
)
{
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
STimeWindow
*
pW
=
taosArrayGet
(
pWins
,
i
);
removeResult
(
pUpdated
,
pW
->
skey
);
}
}
static
void
hashIntervalAgg
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
,
SArray
*
pUpdated
)
{
SIntervalAggOperatorInfo
*
pInfo
=
(
SIntervalAggOperatorInfo
*
)
pOperatorInfo
->
info
;
...
...
@@ -1212,6 +1227,7 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
ASSERT
(
p1
);
doClearWindowImpl
(
p1
,
pSup
->
pResultBuf
,
pBinfo
,
numOfOutput
);
}
...
...
@@ -1363,6 +1379,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear
(
pChildOp
);
}
}
nodesDestroyNode
(
pInfo
->
pPhyNode
);
}
static
bool
allInvertible
(
SqlFunctionCtx
*
pFCtx
,
int32_t
numOfCols
)
{
...
...
@@ -1485,69 +1502,6 @@ _error:
return
NULL
;
}
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamFinalIntervalOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
pInfo
->
primaryTsIndex
=
primaryTsSlotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
4096
);
int32_t
code
=
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
pResBlock
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
1
);
int32_t
numOfChild
=
8
;
// Todo(liuyao) get it from phy plan
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
SOperatorInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SSDataBlock
*
chRes
=
createOneDataBlock
(
pResBlock
,
false
);
SOperatorInfo
*
pChildOp
=
createIntervalOperatorInfo
(
NULL
,
pExprInfo
,
numOfCols
,
chRes
,
pInterval
,
primaryTsSlotId
,
pTwAggSupp
,
pTaskInfo
);
if
(
pChildOp
&&
chRes
)
{
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
continue
;
}
goto
_error
;
}
pOperator
->
name
=
"StreamFinalIntervalOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
_error:
destroyStreamFinalIntervalOperatorInfo
(
pInfo
,
numOfCols
);
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -1913,12 +1867,12 @@ _error:
return
NULL
;
}
static
SArray
*
doHashInterval
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
int32_t
tableGroupId
)
{
static
void
doHashInterval
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
int32_t
tableGroupId
,
SArray
*
pUpdated
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
(
SStreamFinalIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SResultRowInfo
*
pResultRowInfo
=
&
(
pInfo
->
binfo
.
resultRowInfo
);
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
int32_t
numOfOutput
=
pOperatorInfo
->
numOfExprs
;
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
int32_t
step
=
1
;
bool
ascScan
=
true
;
TSKEY
*
tsCols
=
NULL
;
...
...
@@ -1929,7 +1883,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
}
else
{
return
pUpdated
;
return
;
}
int32_t
startPos
=
ascScan
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
);
...
...
@@ -1946,13 +1900,13 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
pos
->
groupId
=
tableGroupId
;
pos
->
pos
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
*
(
int64_t
*
)
pos
->
key
=
pResult
->
win
.
skey
;
taosArrayPush
(
pUpdated
,
&
pos
);
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdated
)
{
saveResult
(
pResult
,
tableGroupId
,
pUpdated
);
}
// window start(end) key interpolation
// disable it temporarily
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
// forwardRows);
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pInfo
->
binfo
.
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
...
...
@@ -1962,7 +1916,6 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
break
;
}
}
return
pUpdated
;
}
bool
isFinalInterval
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
return
pInfo
->
pChildren
!=
NULL
;
}
...
...
@@ -2006,24 +1959,72 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra
}
}
static
void
clearStreamIntervalOperator
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
taosHashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
clearDiskbasedBuf
(
pInfo
->
aggSup
.
pResultBuf
);
cleanupResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
1
);
}
static
void
clearUpdateDataBlock
(
SSDataBlock
*
pBlock
)
{
if
(
pBlock
->
info
.
rows
<=
0
)
{
return
;
}
blockDataCleanup
(
pBlock
);
}
static
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
ASSERT
(
pDest
->
info
.
capacity
>=
pSource
->
info
.
rows
);
clearUpdateDataBlock
(
pDest
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
0
);
SColumnInfoData
*
pSourceCol
=
taosArrayGet
(
pSource
->
pDataBlock
,
tsColIndex
);
// copy timestamp column
colDataAssign
(
pDestCol
,
pSourceCol
,
pSource
->
info
.
rows
);
for
(
int32_t
i
=
1
;
i
<
pDest
->
info
.
numOfCols
;
i
++
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
colDataAppendNNULL
(
pCol
,
0
,
pSource
->
info
.
rows
);
}
pDest
->
info
.
rows
=
pSource
->
info
.
rows
;
blockDataUpdateTsWindow
(
pDest
,
0
);
}
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
// if (pBlock->info.type != STREAM_INVALID && pBlock->info.rows < 4) { // for test
// return pBlock->info.rows - 1;
// }
return
0
;
}
static
SSDataBlock
*
doStreamFinalIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
NULL
;
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
SArray
*
pClosed
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hashRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
)
)
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
if
(
isFinalInterval
(
pInfo
)
||
pInfo
->
pUpdateRes
->
info
.
rows
==
0
)
{
if
(
!
isFinalInterval
(
pInfo
))
{
// semi interval operator clear disk buffer
clearStreamIntervalOperator
(
pInfo
);
}
return
NULL
;
}
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
return
pInfo
->
pUpdateRes
;
}
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
return
pInfo
->
binfo
.
pRes
;
}
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
clearUpdateDataBlock
(
pInfo
->
pUpdateRes
);
break
;
}
...
...
@@ -2033,31 +2034,149 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
pInfo
->
primaryTsIndex
,
pOperator
->
numOfExprs
,
pBlock
,
pUpWins
);
if
(
isFinalInterval
(
pInfo
))
{
int32_t
childIndex
=
0
;
// Todo(liuyao) get child id from SSDataBlock
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SIntervalAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearWindows
(
&
pChildInfo
->
aggSup
,
&
pChildInfo
->
binfo
,
&
pChildInfo
->
interval
,
pChildInfo
->
primaryTsIndex
,
pChildOp
->
numOfExprs
,
pBlock
,
NULL
);
rebuildIntervalWindow
(
pInfo
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
doClearWindows
(
&
pChildInfo
->
aggSup
,
&
pChildInfo
->
binfo
,
&
pChildInfo
->
interval
,
pChildInfo
->
primaryTsIndex
,
pChildOp
->
numOfExprs
,
pBlock
,
NULL
);
rebuildIntervalWindow
(
pInfo
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
taosArrayDestroy
(
pUpWins
);
continue
;
}
removeResults
(
pUpWins
,
pUpdated
);
copyUpdateDataBlock
(
pInfo
->
pUpdateRes
,
pBlock
,
pInfo
->
primaryTsIndex
);
taosArrayDestroy
(
pUpWins
);
continue
;
break
;
}
if
(
isFinalInterval
(
pInfo
))
{
int32_t
chIndex
=
1
;
// Todo(liuyao) get it from SSDataBlock
int32_t
chIndex
=
getChildIndex
(
pBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
// if chIndex + 1 - size > 0, add new child
for
(
int32_t
i
=
0
;
i
<
chIndex
+
1
-
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pInfo
->
pPhyNode
,
pOperator
->
pTaskInfo
,
0
);
if
(
!
pChildOp
)
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
}
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
doStreamIntervalAgg
(
pChildOp
);
SStreamFinalIntervalOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
setInputDataBlock
(
pChildOp
,
pChInfo
->
binfo
.
pCtx
,
pBlock
,
pChInfo
->
order
,
MAIN_SCAN
,
true
);
doHashInterval
(
pChildOp
,
pBlock
,
pBlock
->
info
.
groupId
,
NULL
);
}
doHashInterval
(
pOperator
,
pBlock
,
pBlock
->
info
.
groupId
,
pUpdated
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
if
(
isFinalInterval
(
pInfo
))
{
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pClosed
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pClosed
,
pInfo
->
binfo
.
rowCellInfoOffset
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
taosArrayAddAll
(
pUpdated
,
pClosed
);
}
pUpdated
=
doHashInterval
(
pOperator
,
pBlock
,
0
);
}
taosArrayDestroy
(
pClosed
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pInfo
->
binfo
.
rowCellInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
binfo
.
pRes
;
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
if
(
pInfo
->
pUpdateRes
->
info
.
rows
==
0
)
{
return
NULL
;
}
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
return
pInfo
->
pUpdateRes
;
}
return
pInfo
->
binfo
.
pRes
;
}
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
)
{
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
SStreamFinalIntervalOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamFinalIntervalOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
(
SInterval
)
{.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
.
intervalUnit
=
pIntervalPhyNode
->
intervalUnit
,
.
slidingUnit
=
pIntervalPhyNode
->
slidingUnit
,
.
offset
=
pIntervalPhyNode
->
offset
,
.
precision
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
node
.
resType
.
precision
};
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){.
waterMark
=
pIntervalPhyNode
->
window
.
watermark
,
.
calTrigger
=
pIntervalPhyNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
,
.
winMap
=
NULL
,
};
pInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
4096
);
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pIntervalPhyNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
int32_t
code
=
initAggInfo
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
pResBlock
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
1
);
pInfo
->
pChildren
=
NULL
;
if
(
numOfChild
>
0
)
{
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
SOperatorInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
if
(
pChildOp
)
{
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
continue
;
}
goto
_error
;
}
}
// semi interval operator does not catch result
if
(
!
isFinalInterval
(
pInfo
))
{
pInfo
->
twAggSup
.
calTrigger
=
STREAM_TRIGGER_AT_ONCE
;
}
pInfo
->
pUpdateRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
\
pInfo
->
pUpdateRes
->
info
.
type
=
STREAM_REPROCESS
;
blockDataEnsureCapacity
(
pInfo
->
pUpdateRes
,
128
);
pInfo
->
pPhyNode
=
nodesCloneNode
(
pPhyNode
);
pOperator
->
name
=
"StreamFinalIntervalOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
_error:
destroyStreamFinalIntervalOperatorInfo
(
pInfo
,
numOfCols
);
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
void
destroyStreamAggSupporter
(
SStreamAggSupporter
*
pSup
)
{
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
dd38da79
...
...
@@ -67,6 +67,7 @@ bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
int32_t
leastSQRFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
leastSQRFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
int32_t
leastSQRInvertFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
leastSQRCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
);
bool
getPercentileFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
dd38da79
...
...
@@ -1118,7 +1118,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
initFunc
=
leastSQRFunctionSetup
,
.
processFunc
=
leastSQRFunction
,
.
finalizeFunc
=
leastSQRFinalize
,
.
invertFunc
=
leastSQRInvertFunction
,
.
invertFunc
=
NULL
,
.
combineFunc
=
leastSQRCombine
,
},
{
.
name
=
"avg"
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
dd38da79
...
...
@@ -1799,17 +1799,17 @@ int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
param
[
1
][
1
]
=
(
double
)
pInfo
->
num
;
param
[
1
][
0
]
=
param
[
0
][
1
];
param
[
0
][
0
]
-=
param
[
1
][
0
]
*
(
param
[
0
][
1
]
/
param
[
1
][
1
]);
param
[
0
][
2
]
-=
param
[
1
][
2
]
*
(
param
[
0
][
1
]
/
param
[
1
][
1
]);
param
[
0
][
1
]
=
0
;
param
[
1
][
2
]
-=
param
[
0
][
2
]
*
(
param
[
1
][
0
]
/
param
[
0
][
0
]
);
param
[
1
][
0
]
=
0
;
param
[
0
][
2
]
/=
param
[
0
][
0
]
;
double
param00
=
param
[
0
][
0
]
-
param
[
1
][
0
]
*
(
param
[
0
][
1
]
/
param
[
1
][
1
]);
double
param02
=
param
[
0
][
2
]
-
param
[
1
][
2
]
*
(
param
[
0
][
1
]
/
param
[
1
][
1
]);
//
param[0][1] = 0;
double
param12
=
param
[
1
][
2
]
-
param02
*
(
param
[
1
][
0
]
/
param00
);
//
param[1][0] = 0;
param
02
/=
param00
;
param
[
1
][
2
]
/=
param
[
1
][
1
];
param
12
/=
param
[
1
][
1
];
char
buf
[
64
]
=
{
0
};
size_t
len
=
snprintf
(
varDataVal
(
buf
),
sizeof
(
buf
)
-
VARSTR_HEADER_SIZE
,
"{slop:%.6lf, intercept:%.6lf}"
,
param
[
0
][
2
],
param
[
1
][
2
]
);
size_t
len
=
snprintf
(
varDataVal
(
buf
),
sizeof
(
buf
)
-
VARSTR_HEADER_SIZE
,
"{slop:%.6lf, intercept:%.6lf}"
,
param
02
,
param12
);
varDataSetLen
(
buf
,
len
);
colDataAppend
(
pCol
,
currentRow
,
buf
,
pResInfo
->
isNullRes
);
...
...
@@ -1822,6 +1822,27 @@ int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
leastSQRCombine
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
)
{
SResultRowEntryInfo
*
pDResInfo
=
GET_RES_INFO
(
pDestCtx
);
SLeastSQRInfo
*
pDBuf
=
GET_ROWCELL_INTERBUF
(
pDResInfo
);
int32_t
type
=
pDestCtx
->
input
.
pData
[
0
]
->
info
.
type
;
double
(
*
pDparam
)[
3
]
=
pDBuf
->
matrix
;
SResultRowEntryInfo
*
pSResInfo
=
GET_RES_INFO
(
pSourceCtx
);
SLeastSQRInfo
*
pSBuf
=
GET_ROWCELL_INTERBUF
(
pSResInfo
);
double
(
*
pSparam
)[
3
]
=
pSBuf
->
matrix
;
for
(
int32_t
i
=
0
;
i
<
pSBuf
->
num
;
i
++
)
{
pDparam
[
0
][
0
]
+=
pDBuf
->
startVal
*
pDBuf
->
startVal
;
pDparam
[
0
][
1
]
+=
pDBuf
->
startVal
;
pDBuf
->
startVal
+=
pDBuf
->
stepVal
;
}
pDparam
[
0
][
2
]
+=
pSparam
[
0
][
2
]
+
pDBuf
->
num
*
pDBuf
->
stepVal
*
pSparam
[
1
][
2
];
pDparam
[
1
][
2
]
+=
pSparam
[
1
][
2
];
pDBuf
->
num
+=
pSBuf
->
num
;
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
return
TSDB_CODE_SUCCESS
;
}
bool
getPercentileFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SPercentileInfo
);
return
true
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
dd38da79
...
...
@@ -2441,6 +2441,7 @@ static const char* jkValueLiteralSize = "LiteralSize";
static
const
char
*
jkValueLiteral
=
"Literal"
;
static
const
char
*
jkValueDuration
=
"Duration"
;
static
const
char
*
jkValueTranslate
=
"Translate"
;
static
const
char
*
jkValueNotReserved
=
"NotReserved"
;
static
const
char
*
jkValueDatum
=
"Datum"
;
static
int32_t
datumToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
@@ -2513,6 +2514,9 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddBoolToObject
(
pJson
,
jkValueTranslate
,
pNode
->
translate
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddBoolToObject
(
pJson
,
jkValueNotReserved
,
pNode
->
notReserved
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
pNode
->
translate
)
{
code
=
datumToJson
(
pNode
,
pJson
);
}
...
...
@@ -2634,6 +2638,9 @@ static int32_t jsonToValueNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBoolValue
(
pJson
,
jkValueTranslate
,
&
pNode
->
translate
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBoolValue
(
pJson
,
jkValueNotReserved
,
&
pNode
->
notReserved
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
pNode
->
translate
)
{
code
=
jsonToDatum
(
pJson
,
pNode
);
}
...
...
source/util/src/tpagedbuf.c
浏览文件 @
dd38da79
...
...
@@ -651,3 +651,31 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
ps
->
getPages
,
ps
->
releasePages
,
ps
->
flushBytes
/
1024
.
0
f
,
ps
->
flushPages
,
ps
->
loadBytes
/
1024
.
0
f
,
ps
->
loadPages
,
ps
->
loadBytes
/
(
1024
.
0
*
ps
->
loadPages
));
}
void
clearDiskbasedBuf
(
SDiskbasedBuf
*
pBuf
)
{
SArray
**
p
=
taosHashIterate
(
pBuf
->
groupSet
,
NULL
);
while
(
p
)
{
size_t
n
=
taosArrayGetSize
(
*
p
);
for
(
int32_t
i
=
0
;
i
<
n
;
++
i
)
{
SPageInfo
*
pi
=
taosArrayGetP
(
*
p
,
i
);
taosMemoryFreeClear
(
pi
->
pData
);
taosMemoryFreeClear
(
pi
);
}
taosArrayDestroy
(
*
p
);
p
=
taosHashIterate
(
pBuf
->
groupSet
,
p
);
}
tdListEmpty
(
pBuf
->
lruList
);
tdListEmpty
(
pBuf
->
freePgList
);
taosArrayClear
(
pBuf
->
emptyDummyIdList
);
taosArrayClear
(
pBuf
->
pFree
);
taosHashClear
(
pBuf
->
groupSet
);
taosHashClear
(
pBuf
->
all
);
pBuf
->
numOfPages
=
0
;
// all pages are in buffer in the first place
pBuf
->
totalBufSize
=
0
;
pBuf
->
allocateId
=
-
1
;
pBuf
->
fileSize
=
0
;
}
\ No newline at end of file
tests/script/jenkins/basic.txt
浏览文件 @
dd38da79
...
...
@@ -72,6 +72,10 @@
./test.sh -f tsim/stream/basic2.sim
# ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim
# ./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim
# ---- transaction
./test.sh -f tsim/trans/lossdata1.sim
...
...
tests/script/tsim/stream/session0.sim
浏览文件 @
dd38da79
...
...
@@ -23,89 +23,98 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2);
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt order by s desc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 3 then
print ======$data01
return -1
print ======
data01=
$data01
goto loop0
endi
if $data02 != 3 then
print ======$data02
return -1
print ======
data02=
$data02
goto loop0
endi
if $data03 != 3 then
print ======$data03
return -1
print ======
data03=
$data03
goto loop0
endi
if $data04 != 2.100000000 then
print ======$data04
print ======
data04=
$data04
return -1
endi
if $data05 != 0.000000000 then
print ======$data05
print ======
data05=
$data05
return -1
endi
if $data06 != 3 then
print ======
$data05
print ======
data06=$data06
return -1
endi
if $data07 != 2.100000000 then
print ======
$data05
print ======
data07=$data07
return -1
endi
if $data08 != 6 then
print ======
$data05
print ======
data08=$data08
return -1
endi
# row 1
if $data11 != 3 then
print ======$data11
return -1
print ======
data11=
$data11
goto loop0
endi
if $data12 != 10 then
print ======$data12
return -1
print ======
data12=
$data12
goto loop0
endi
if $data13 != 10 then
print ======$data13
return -1
print ======
data13=
$data13
goto loop0
endi
if $data14 != 1.100000000 then
print ======$data14
print ======
data14=
$data14
return -1
endi
if $data15 != 0.000000000 then
print ======$data15
print ======
data15=
$data15
return -1
endi
if $data16 != 10 then
print ======
$data15
print ======
data16=$data16
return -1
endi
if $data17 != 1.100000000 then
print ======$data17
print ======
data17=
$data17
return -1
endi
if $data18 != 5 then
print ======$data18
print ======
data18=
$data18
return -1
endi
...
...
@@ -115,23 +124,31 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9);
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt order by s desc ;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 7 then
print ======$data01
return -
1
print =====
data01
=$data01
goto loop
1
endi
if $data02 != 9 then
print ======$data02
return -
1
print =====
data02
=$data02
goto loop
1
endi
if $data03 != 4 then
print ======$data03
return -
1
print =====
data03
=$data03
goto loop
1
endi
if $data04 != 1.100000000 then
...
...
tests/script/tsim/stream/state0.sim
浏览文件 @
dd38da79
...
...
@@ -20,21 +20,33 @@ sql create stream streams1 trigger at_once into streamt1 as select _wstartts,
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
$loop_count = 0
loop0:
sql select * from streamt1 order by c desc;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print ======$rows
return -1;
print =====
rows
=$rows
goto loop0
endi
sql insert into t1 values(1648791214000,1,2,3,1.0,3);
$loop_count = 0
loop00:
sql select * from streamt1 order by c desc;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print ======$rows
return -1;
print =====
rows
=$rows
goto loop00
endi
sql insert into t1 values(1648791213010,2,2,3,1.0,4);
...
...
@@ -44,27 +56,25 @@ $loop_count = 0
loop1:
sql select * from streamt1 where c >=4 order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 3 then
print ======$rows
print =====
rows
=$rows
goto loop1
return -1
endi
# row 0
if $data01 != 1 then
print ======$data01
return -
1
print =====
data01
=$data01
goto loop
1
endi
if $data02 != 1 then
print ======$data02
return -
1
print =====
data02
=$data02
goto loop
1
endi
if $data03 != 1 then
...
...
@@ -151,11 +161,10 @@ endi
sql insert into t1 values(1648791213011,1,2,3,1.0,7);
loop2:
$loop_count = 0
loop2:
sql select * from streamt1 where c in (5,4,7) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
...
...
@@ -163,57 +172,51 @@ endi
# row 2
if $data21 != 2 then
print ======$data21
print =====
data21
=$data21
goto loop2
return -1
endi
if $data22 != 2 then
print ======$data22
print =====
data22
=$data22
goto loop2
return -1
endi
if $data23 != 2 then
print ======$data23
goto loop2
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop2
return -1
endi
if $data25 != 3 then
print ======$data25
goto loop2
return -1
endi
if $data26 != 7 then
print ======$data26
goto loop2
return -1
endi
sql insert into t1 values(1648791213011,1,2,3,1.0,8);
loop21:
$loop_count = 0
loop21:
sql select * from streamt1 where c in (5,4,8) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data26 != 8 then
print ======$data26
print =====
data26
=$data26
goto loop21
return -1
endi
...
...
@@ -222,11 +225,10 @@ sql insert into t1 values(1648791213020,3,2,3,1.0,10);
sql insert into t1 values(1648791214000,1,2,3,1.0,11);
sql insert into t1 values(1648791213011,10,20,10,10.0,12);
loop3:
$loop_count = 0
loop3:
sql select * from streamt1 where c in (5,4,10,11,12) order by `_wstartts`;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
...
...
@@ -234,112 +236,100 @@ endi
# row 2
if $data21 != 1 then
print ======$data21
print =====
data21
=$data21
goto loop3
return -1
endi
if $data22 != 1 then
print ======$data22
print =====
data22
=$data22
goto loop3
return -1
endi
if $data23 != 10 then
print ======$data23
goto loop3
return -1
endi
if $data24 != 10 then
print ======$data24
goto loop3
return -1
endi
if $data25 != 10 then
print ======$data25
goto loop3
return -1
endi
if $data26 != 12 then
print ======$data26
goto loop3
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
print =====
data31
=$data31
goto loop3
return -1
endi
if $data32 != 1 then
print ======$data32
print =====
data32
=$data32
goto loop3
return -1
endi
if $data33 != 3 then
print ======$data33
goto loop3
return -1
endi
if $data34 != 3 then
print ======$data34
goto loop3
return -1
endi
if $data35 != 3 then
print ======$data35
goto loop3
return -1
endi
if $data36 != 10 then
print ======$data36
goto loop3
return -1
endi
# row 4
if $data41 != 1 then
print ======$data41
print =====
data41
=$data41
goto loop3
return -1
endi
if $data42 != 1 then
print ======$data42
print =====
data42
=$data42
goto loop3
return -1
endi
if $data43 != 1 then
print ======$data43
goto loop3
return -1
endi
if $data44 != 1 then
print ======$data44
goto loop3
return -1
endi
if $data45 != 3 then
print ======$data45
goto loop3
return -1
endi
if $data46 != 11 then
print ======$data46
goto loop3
return -1
endi
...
...
@@ -347,8 +337,8 @@ sql insert into t1 values(1648791213030,3,12,12,12.0,13);
sql insert into t1 values(1648791214040,1,13,13,13.0,14);
sql insert into t1 values(1648791213030,3,14,14,14.0,15) (1648791214020,15,15,15,15.0,16);
loop4:
$loop_count = 0
loop4:
sql select * from streamt1 where c in (14,15,16) order by `_wstartts`;
sleep 300
...
...
@@ -358,119 +348,104 @@ if $loop_count == 10 then
endi
if $rows != 3 then
print ======$rows
goto loop4
return -1;
print ====loop4=rows=$rows
# goto loop4
endi
# row 0
if $data01 != 2 then
print ======$data01
print =====
data01
=$data01
goto loop4
return -1
endi
if $data02 != 2 then
print ======$data02
goto loop4
return -1
endi
if $data03 != 6 then
print ======$data03
goto loop4
return -1
endi
if $data04 != 3 then
print ======$data04
goto loop4
return -1
endi
if $data05 != 3 then
print ======$data05
goto loop4
return -1
endi
if $data06 != 15 then
print ======$data06
goto loop4
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
print =====
data11
=$data11
goto loop4
return -1
endi
if $data12 != 1 then
print ======$data12
print =====
data12
=$data12
goto loop4
return -1
endi
if $data13 != 15 then
print ======$data13
goto loop4
return -1
endi
if $data14 != 15 then
print ======$data14
goto loop4
return -1
endi
if $data15 != 15 then
print ======$data15
goto loop4
return -1
endi
if $data16 != 16 then
print ======$data16
goto loop4
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
print =====
data21
=$data21
goto loop4
return -1
endi
if $data22 != 1 then
print ======$data22
print =====
data22
=$data22
goto loop4
return -1
endi
if $data23 != 1 then
print ======$data23
goto loop4
return -1
endi
if $data24 != 1 then
print ======$data24
goto loop4
return -1
endi
if $data25 != 13 then
print ======$data25
goto loop4
return -1
endi
if $data26 != 14 then
print ======$data26
goto loop4
return -1
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录