Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f2b66cc1
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f2b66cc1
编写于
6月 21, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(stream): convert datablk to submit blk
上级
b04e7c93
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
211 addition
and
224 deletion
+211
-224
examples/c/stream_demo.c
examples/c/stream_demo.c
+4
-3
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+6
-8
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+200
-212
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
未找到文件。
examples/c/stream_demo.c
浏览文件 @
f2b66cc1
...
...
@@ -90,9 +90,10 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger window_close into outstb as select _wstartts, sum(k) from st1 "
"interval(10s) "
);
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger window_close watermark 10s into outstb as select _wstartts, sum(k) from st1 "
"interval(10s) "
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
f2b66cc1
...
...
@@ -16,7 +16,7 @@
#include "tq.h"
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pTSchema
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
int32_t
vgId
)
{
const
char
*
stbFullName
,
int32_t
vgId
)
{
SSubmitReq
*
ret
=
NULL
;
SArray
*
tagArray
=
taosArrayInit
(
1
,
sizeof
(
STagVal
));
if
(
!
tagArray
)
{
...
...
@@ -80,11 +80,10 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
ret
->
length
=
sizeof
(
SSubmitReq
);
ret
->
numOfBlocks
=
htonl
(
sz
);
void
*
submitBlk
=
POINTER_SHIFT
(
ret
,
sizeof
(
SSubmitReq
));
SSubmitBlk
*
blkHead
=
POINTER_SHIFT
(
ret
,
sizeof
(
SSubmitReq
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pBlocks
,
i
);
SSubmitBlk
*
blkHead
=
submitBlk
;
blkHead
->
numOfRows
=
htons
(
pDataBlock
->
info
.
rows
);
blkHead
->
sversion
=
htonl
(
pTSchema
->
version
);
// TODO
...
...
@@ -97,7 +96,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
/*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead
->
dataLen
=
0
;
void
*
bl
ockData
=
POINTER_SHIFT
(
submitBlk
,
sizeof
(
SSubmitBlk
));
void
*
bl
kSchema
=
POINTER_SHIFT
(
blkHead
,
sizeof
(
SSubmitBlk
));
int32_t
schemaLen
=
0
;
if
(
createTb
)
{
...
...
@@ -135,7 +134,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
}
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
bl
ockDat
a
,
schemaLen
);
tEncoderInit
(
&
encoder
,
bl
kSchem
a
,
schemaLen
);
code
=
tEncodeSVCreateTbReq
(
&
encoder
,
&
createTbReq
);
tEncoderClear
(
&
encoder
);
tdDestroySVCreateTbReq
(
&
createTbReq
);
...
...
@@ -148,7 +147,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
}
blkHead
->
schemaLen
=
htonl
(
schemaLen
);
STSRow
*
rowData
=
POINTER_SHIFT
(
bl
ockDat
a
,
schemaLen
);
STSRow
*
rowData
=
POINTER_SHIFT
(
bl
kSchem
a
,
schemaLen
);
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
SRowBuilder
rb
=
{
0
};
...
...
@@ -174,8 +173,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
blkHead
->
dataLen
=
htonl
(
dataLen
);
ret
->
length
+=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
;
blkHead
=
POINTER_SHIFT
(
blkHead
,
schemaLen
+
dataLen
);
/*submitBlk = blkHead;*/
blkHead
=
POINTER_SHIFT
(
blkHead
,
sizeof
(
SSubmitBlk
)
+
schemaLen
+
dataLen
);
}
ret
->
length
=
htonl
(
ret
->
length
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
f2b66cc1
...
...
@@ -16,8 +16,8 @@
#include "function.h"
#include "functionMgt.h"
#include "tdatablock.h"
#include "ttime.h"
#include "tfill.h"
#include "ttime.h"
typedef
enum
SResultTsInterpType
{
RESULT_ROW_START_INTERP
=
1
,
...
...
@@ -42,7 +42,7 @@ static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOper
// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
// * is a previous result generated or not.
// */
//static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
//
static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
// // do nothing
//}
...
...
@@ -324,8 +324,8 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
tw
->
ekey
-=
1
;
}
void
doTimeWindowInterpolation
(
SArray
*
pPrevValues
,
SArray
*
pDataBlock
,
TSKEY
prevTs
,
int32_t
prevRowIndex
,
TSKEY
curTs
,
int32_t
curRowIndex
,
TSKEY
windowKey
,
int32_t
type
,
SExprSupp
*
pSup
)
{
void
doTimeWindowInterpolation
(
SArray
*
pPrevValues
,
SArray
*
pDataBlock
,
TSKEY
prevTs
,
int32_t
prevRowIndex
,
TSKEY
curTs
,
int32_t
curRowIndex
,
TSKEY
windowKey
,
int32_t
type
,
SExprSupp
*
pSup
)
{
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
int32_t
index
=
1
;
...
...
@@ -405,8 +405,8 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
}
}
static
bool
setTimeWindowInterpolationStartTs
(
SIntervalAggOperatorInfo
*
pInfo
,
int32_t
pos
,
SSDataBlock
*
pBlock
,
const
TSKEY
*
tsCols
,
STimeWindow
*
win
,
SExprSupp
*
pSup
)
{
static
bool
setTimeWindowInterpolationStartTs
(
SIntervalAggOperatorInfo
*
pInfo
,
int32_t
pos
,
SSDataBlock
*
pBlock
,
const
TSKEY
*
tsCols
,
STimeWindow
*
win
,
SExprSupp
*
pSup
)
{
bool
ascQuery
=
(
pInfo
->
order
==
TSDB_ORDER_ASC
);
TSKEY
curTs
=
tsCols
[
pos
];
...
...
@@ -434,9 +434,9 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
return
true
;
}
static
bool
setTimeWindowInterpolationEndTs
(
SIntervalAggOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
int32_t
endRowIndex
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
TSKEY
blockEkey
,
STimeWindow
*
win
)
{
static
bool
setTimeWindowInterpolationEndTs
(
SIntervalAggOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
int32_t
endRowIndex
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
TSKEY
blockEkey
,
STimeWindow
*
win
)
{
int32_t
order
=
pInfo
->
order
;
TSKEY
actualEndKey
=
tsCols
[
endRowIndex
];
...
...
@@ -548,8 +548,8 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
}
}
static
void
doWindowBorderInterpolation
(
SIntervalAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
SResultRow
*
pResult
,
STimeWindow
*
win
,
int32_t
startPos
,
int32_t
forwardRows
,
SExprSupp
*
pSup
)
{
static
void
doWindowBorderInterpolation
(
SIntervalAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
SResultRow
*
pResult
,
STimeWindow
*
win
,
int32_t
startPos
,
int32_t
forwardRows
,
SExprSupp
*
pSup
)
{
if
(
!
pInfo
->
timeWindowInterpo
)
{
return
;
}
...
...
@@ -628,7 +628,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SIntervalAggOperatorInfo
*
pInfo
=
(
SIntervalAggOperatorInfo
*
)
pOperatorInfo
->
info
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
...
...
@@ -655,9 +655,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
STimeWindow
w
=
pr
->
win
;
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
w
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
w
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
groupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -672,8 +671,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo
(
pResult
,
RESULT_ROW_END_INTERP
);
setNotInterpoWindowKey
(
pSup
->
pCtx
,
numOfExprs
,
RESULT_ROW_START_INTERP
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
w
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
0
,
tsCols
,
pBlock
->
info
.
rows
,
numOfExprs
,
pInfo
->
order
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
w
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
0
,
tsCols
,
pBlock
->
info
.
rows
,
numOfExprs
,
pInfo
->
order
);
if
(
isResultRowInterpolated
(
pResult
,
RESULT_ROW_END_INTERP
))
{
closeResultRow
(
pr
);
...
...
@@ -753,8 +752,7 @@ int64_t getReskey(void* data, int32_t index) {
return
*
(
int64_t
*
)
pos
->
key
;
}
static
int32_t
saveResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
static
int32_t
saveResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
ts
,
TSDB_ORDER_DESC
,
getReskey
);
if
(
index
==
-
1
)
{
...
...
@@ -806,7 +804,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
SIntervalAggOperatorInfo
*
pInfo
=
(
SIntervalAggOperatorInfo
*
)
pOperatorInfo
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
...
...
@@ -819,9 +817,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
&
pInfo
->
win
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -843,9 +840,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doInterpUnclosedTimeWindow
(
pOperatorInfo
,
numOfOutput
,
pResultRowInfo
,
pBlock
,
scanFlag
,
tsCols
,
&
pos
);
// restore current time window
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -870,8 +866,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// null data, failed to allocate more memory buffer
int32_t
code
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
nextWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -890,8 +885,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
,
pSup
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
order
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
pInfo
->
order
);
doCloseWindow
(
pResultRowInfo
,
pInfo
,
pResult
);
}
...
...
@@ -1002,7 +997,7 @@ static bool compareVal(const char* v, const SStateKeys* pKey) {
static
void
doStateWindowAggImpl
(
SOperatorInfo
*
pOperator
,
SStateWindowOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SColumnInfoData
*
pStateColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
stateCol
.
slotId
);
int64_t
gid
=
pBlock
->
info
.
groupId
;
...
...
@@ -1050,9 +1045,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
STimeWindow
window
=
pRowSup
->
win
;
pRowSup
->
win
.
ekey
=
pRowSup
->
win
.
skey
;
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
window
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
window
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -1076,9 +1070,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SResultRow
*
pResult
=
NULL
;
pRowSup
->
win
.
ekey
=
tsList
[
pBlock
->
info
.
rows
-
1
];
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
pRowSup
->
win
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
pRowSup
->
win
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -1095,8 +1088,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SStateWindowOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
...
...
@@ -1207,7 +1200,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
}
void
doClearWindowImpl
(
SResultRowPosition
*
p1
,
SDiskbasedBuf
*
pResultBuf
,
SExprSupp
*
pSup
,
int32_t
numOfOutput
)
{
void
doClearWindowImpl
(
SResultRowPosition
*
p1
,
SDiskbasedBuf
*
pResultBuf
,
SExprSupp
*
pSup
,
int32_t
numOfOutput
)
{
SResultRow
*
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
);
SqlFunctionCtx
*
pCtx
=
pSup
->
pCtx
;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
...
...
@@ -1223,7 +1216,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
}
}
void
doClearWindow
(
SAggSupporter
*
pAggSup
,
SExprSupp
*
pSup
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
void
doClearWindow
(
SAggSupporter
*
pAggSup
,
SExprSupp
*
pSup
,
char
*
pData
,
int16_t
bytes
,
uint64_t
groupId
,
int32_t
numOfOutput
)
{
SET_RES_WINDOW_KEY
(
pAggSup
->
keyBuf
,
pData
,
bytes
,
groupId
);
SResultRowPosition
*
p1
=
...
...
@@ -1259,9 +1252,9 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
void
*
key
=
taosHashGetKey
(
pIte
,
&
keyLen
);
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
ASSERT
(
keyLen
==
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
int32_t
code
=
saveResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
resWins
);
int32_t
code
=
saveResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
resWins
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1269,9 +1262,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return
TSDB_CODE_SUCCESS
;
}
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
)
{
return
pWin
->
ekey
<
pSup
->
maxTs
-
pSup
->
waterMark
;
}
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
)
{
return
pWin
->
ekey
<
pSup
->
maxTs
-
pSup
->
waterMark
;
}
static
int32_t
closeIntervalWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
SArray
*
closeWins
)
{
...
...
@@ -1330,7 +1321,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pOperator
->
exprSupp
,
&
pInfo
->
interval
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
NULL
);
doClearWindows
(
&
pInfo
->
aggSup
,
&
pOperator
->
exprSupp
,
&
pInfo
->
interval
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
...
...
@@ -1580,7 +1572,7 @@ _error:
// todo handle multiple tables cases.
static
void
doSessionWindowAggImpl
(
SOperatorInfo
*
pOperator
,
SSessionAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
tsSlotId
);
...
...
@@ -1617,9 +1609,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
STimeWindow
window
=
pRowSup
->
win
;
pRowSup
->
win
.
ekey
=
pRowSup
->
win
.
skey
;
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
window
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
window
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -1637,9 +1628,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SResultRow
*
pResult
=
NULL
;
pRowSup
->
win
.
ekey
=
tsList
[
pBlock
->
info
.
rows
-
1
];
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
pRowSup
->
win
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
ret
=
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
&
pRowSup
->
win
,
masterScan
,
&
pResult
,
gid
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -1656,7 +1646,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
...
...
@@ -1707,8 +1697,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
static
void
doKeepPrevRows
(
STimeSliceOperatorInfo
*
pSliceInfo
,
const
SSDataBlock
*
pBlock
,
int32_t
rowIndex
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
// null data should not be kept since it can not be used to perform interpolation
if
(
!
colDataIsNull_s
(
pColInfoData
,
i
))
{
...
...
@@ -1844,22 +1834,22 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STimeSliceOperatorInfo
*
pSliceInfo
=
pOperator
->
info
;
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
blockDataEnsureCapacity
(
pResBlock
,
pOperator
->
resultInfo
.
capacity
);
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator);
// }
//
// return pResBlock;
// }
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator);
// }
//
// return pResBlock;
// }
int32_t
order
=
TSDB_ORDER_ASC
;
SInterval
*
pInterval
=
&
pSliceInfo
->
interval
;
int32_t
order
=
TSDB_ORDER_ASC
;
SInterval
*
pInterval
=
&
pSliceInfo
->
interval
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
numOfRows
=
0
;
...
...
@@ -1878,14 +1868,14 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pSliceInfo
->
tsCol
.
slotId
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
ts
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
ts
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
);
if
(
ts
==
pSliceInfo
->
current
)
{
for
(
int32_t
j
=
0
;
j
<
pOperator
->
exprSupp
.
numOfExprs
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pOperator
->
exprSupp
.
numOfExprs
;
++
j
)
{
SExprInfo
*
pExprInfo
=
&
pOperator
->
exprSupp
.
pExprInfo
[
j
];
int32_t
dstSlot
=
pExprInfo
->
base
.
resSchema
.
slotId
;
int32_t
srcSlot
=
pExprInfo
->
base
.
pParam
[
0
].
pCol
->
slotId
;
int32_t
dstSlot
=
pExprInfo
->
base
.
resSchema
.
slotId
;
int32_t
srcSlot
=
pExprInfo
->
base
.
pParam
[
0
].
pCol
->
slotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pBlock
->
pDataBlock
,
srcSlot
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pResBlock
->
pDataBlock
,
dstSlot
);
...
...
@@ -1897,7 +1887,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock
->
info
.
rows
+=
1
;
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
...
...
@@ -1908,7 +1899,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
}
else
if
(
ts
<
pSliceInfo
->
current
)
{
if
(
i
<
pBlock
->
info
.
rows
-
1
)
{
int64_t
nextTs
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
+
1
);
int64_t
nextTs
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
+
1
);
if
(
nextTs
>
pSliceInfo
->
current
)
{
while
(
pSliceInfo
->
current
<
nextTs
&&
pSliceInfo
->
current
<=
pSliceInfo
->
win
.
ekey
)
{
genInterpolationResult
(
pSliceInfo
,
&
pOperator
->
exprSupp
,
pBlock
,
i
,
pResBlock
);
...
...
@@ -1956,7 +1947,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
return
pResBlock
->
info
.
rows
==
0
?
NULL
:
pResBlock
;
}
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
STimeSliceOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STimeSliceOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
||
pInfo
==
NULL
)
{
...
...
@@ -1964,17 +1955,17 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
}
SInterpFuncPhysiNode
*
pInterpPhyNode
=
(
SInterpFuncPhysiNode
*
)
pPhyNode
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
int32_t
numOfExprs
=
0
;
int32_t
numOfExprs
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pInterpPhyNode
->
pFuncs
,
NULL
,
&
numOfExprs
);
int32_t
code
=
initExprSupp
(
pSup
,
pExprInfo
,
numOfExprs
);
int32_t
code
=
initExprSupp
(
pSup
,
pExprInfo
,
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
pInterpPhyNode
->
pExprs
!=
NULL
)
{
int32_t
num
=
0
;
int32_t
num
=
0
;
SExprInfo
*
pScalarExprInfo
=
createExprInfo
(
pInterpPhyNode
->
pExprs
,
NULL
,
&
num
);
code
=
initExprSupp
(
&
pInfo
->
scalarSup
,
pScalarExprInfo
,
num
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1986,21 +1977,21 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo
->
fillType
=
convertFillType
(
pInterpPhyNode
->
fillMode
);
initResultSizeInfo
(
pOperator
,
4096
);
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfExprs
,
(
SNodeListNode
*
)
pInterpPhyNode
->
pFillValues
);
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfExprs
,
(
SNodeListNode
*
)
pInterpPhyNode
->
pFillValues
);
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
interval
.
interval
=
pInterpPhyNode
->
interval
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTimeslice
,
NULL
,
NULL
,
destroyBasicOperatorInfo
,
NULL
,
NULL
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTimeslice
,
NULL
,
NULL
,
destroyBasicOperatorInfo
,
NULL
,
NULL
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -2138,22 +2129,22 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
}
}
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
STimeWindow
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pCurResult
,
0
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pCurResult
,
0
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SIntervalAggOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
SResultRow
*
pChResult
=
NULL
;
SResultRow
*
pChResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pChInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pChResult
,
0
,
pChildSup
->
pCtx
,
pChildSup
->
numOfExprs
,
pChildSup
->
rowEntryInfoOffset
,
&
pChInfo
->
aggSup
,
pTaskInfo
);
compactFunctions
(
pSup
->
pCtx
,
pChildSup
->
pCtx
,
numOfOutput
,
pTaskInfo
);
...
...
@@ -2163,8 +2154,8 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
)
{
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
&
pWin
->
skey
,
sizeof
(
int64_t
),
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
int64_t
)));
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
int64_t
)));
return
p1
==
NULL
;
}
...
...
@@ -2197,7 +2188,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
isDeletedWindow
(
&
nextWin
,
tableGroupId
,
&
pInfo
->
aggSup
))
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
STimeWindow
));
taosArrayPush
(
pUpWins
,
&
nextWin
);
rebuildIntervalWindow
(
pInfo
,
pSup
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pSup
->
numOfExprs
,
pOperatorInfo
->
pTaskInfo
);
rebuildIntervalWindow
(
pInfo
,
pSup
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pSup
->
numOfExprs
,
pOperatorInfo
->
pTaskInfo
);
taosArrayDestroy
(
pUpWins
);
}
int32_t
code
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
nextWin
,
true
,
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
...
...
@@ -2215,8 +2207,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
,
TSDB_ORDER_ASC
);
int32_t
prevEndPos
=
(
forwardRows
-
1
)
*
step
+
startPos
;
ASSERT
(
pSDataBlock
->
info
.
window
.
skey
>
0
&&
pSDataBlock
->
info
.
window
.
ekey
>
0
);
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pSDataBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
order
);
...
...
@@ -2257,9 +2249,7 @@ void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsCol
blockDataUpdateTsWindow
(
pDest
,
0
);
}
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
childId
;
}
static
int32_t
getChildIndex
(
SSDataBlock
*
pBlock
)
{
return
pBlock
->
info
.
childId
;
}
static
SSDataBlock
*
doStreamFinalIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -2304,7 +2294,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SIntervalAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
doClearWindows
(
&
pChildInfo
->
aggSup
,
pChildSup
,
&
pChildInfo
->
interval
,
pChildInfo
->
primaryTsIndex
,
pChildSup
->
numOfExprs
,
pBlock
,
NULL
);
...
...
@@ -2406,7 +2396,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
pInfo
->
pChildren
=
NULL
;
if
(
numOfChild
>
0
)
{
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
SOperatorInfo
));
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalIntervalOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
if
(
pChildOp
)
{
...
...
@@ -2463,9 +2453,9 @@ _error:
void
destroyStreamAggSupporter
(
SStreamAggSupporter
*
pSup
)
{
taosMemoryFreeClear
(
pSup
->
pKeyBuf
);
void
**
pIte
=
NULL
;
void
**
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pSup
->
pResultRows
,
pIte
))
!=
NULL
)
{
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
taosArrayDestroy
(
pWins
);
}
taosHashCleanup
(
pSup
->
pResultRows
);
...
...
@@ -2489,18 +2479,19 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
}
}
int32_t
initBasicInfoEx
(
SOptrBasicInfo
*
pBasicInfo
,
SExprSupp
*
pSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
)
{
int32_t
initBasicInfoEx
(
SOptrBasicInfo
*
pBasicInfo
,
SExprSupp
*
pSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
)
{
int32_t
code
=
initExprSupp
(
pSup
,
pExprInfo
,
numOfCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
initBasicInfo
(
pBasicInfo
,
pResultBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
pSup
->
pCtx
[
i
].
pBuf
=
NULL
;
}
ASSERT
(
numOfCols
>
0
);
increaseTs
(
pSup
->
pCtx
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2519,18 +2510,20 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
waterMark
);
}
int32_t
initSessionAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
int32_t
initSessionAggSupporter
(
SStreamAggSupporter
*
pSup
,
const
char
*
pKey
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
return
initStreamAggSupporter
(
pSup
,
pKey
,
pCtx
,
numOfOutput
,
sizeof
(
SResultWindowInfo
));
}
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SSessionWinodwPhysiNode
*
pSessionNode
=
(
SSessionWinodwPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSessionNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SSessionWinodwPhysiNode
*
pSessionNode
=
(
SSessionWinodwPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSessionNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SStreamSessionAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamSessionAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
...
...
@@ -2542,8 +2535,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initSessionAggSupporter
(
&
pInfo
->
streamAggSup
,
"StreamSessionAggOperatorInfo"
,
pSup
->
pCtx
,
numOfCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -2555,10 +2547,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
}
initDummyFunction
(
pInfo
->
pDummyCtx
,
pSup
->
pCtx
,
numOfCols
);
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
)
{
.
waterMark
=
pSessionNode
->
window
.
watermark
,
.
calTrigger
=
pSessionNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
};
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){
.
waterMark
=
pSessionNode
->
window
.
watermark
,
.
calTrigger
=
pSessionNode
->
window
.
triggerType
,
.
maxTs
=
INT64_MIN
};
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
...
...
@@ -2632,19 +2622,19 @@ static SResultWindowInfo* addNewSessionWindow(SArray* pWinInfos, TSKEY ts) {
}
SArray
*
getWinInfos
(
SStreamAggSupporter
*
pAggSup
,
uint64_t
groupId
)
{
void
**
ite
=
taosHashGet
(
pAggSup
->
pResultRows
,
&
groupId
,
sizeof
(
uint64_t
));
void
**
ite
=
taosHashGet
(
pAggSup
->
pResultRows
,
&
groupId
,
sizeof
(
uint64_t
));
SArray
*
pWinInfos
=
NULL
;
if
(
ite
==
NULL
)
{
pWinInfos
=
taosArrayInit
(
1024
,
pAggSup
->
valueSize
);
taosHashPut
(
pAggSup
->
pResultRows
,
&
groupId
,
sizeof
(
uint64_t
),
&
pWinInfos
,
sizeof
(
void
*
));
taosHashPut
(
pAggSup
->
pResultRows
,
&
groupId
,
sizeof
(
uint64_t
),
&
pWinInfos
,
sizeof
(
void
*
));
}
else
{
pWinInfos
=
*
ite
;
}
return
pWinInfos
;
}
SResultWindowInfo
*
getSessionTimeWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
)
{
SResultWindowInfo
*
getSessionTimeWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
int64_t
gap
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
...
...
@@ -2683,10 +2673,10 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
return
insertNewSessionWindow
(
pWinInfos
,
startTs
,
index
+
1
);
}
int32_t
updateSessionWindowInfo
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
rows
,
int32_t
start
,
int64_t
gap
,
SHashObj
*
pStDeleted
)
{
int32_t
updateSessionWindowInfo
(
SResultWindowInfo
*
pWinInfo
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
int32_t
rows
,
int32_t
start
,
int64_t
gap
,
SHashObj
*
pStDeleted
)
{
for
(
int32_t
i
=
start
;
i
<
rows
;
++
i
)
{
if
(
!
isInWindow
(
pWinInfo
,
pStartTs
[
i
],
gap
)
&&
(
!
pEndTs
||
!
isInWindow
(
pWinInfo
,
pEndTs
[
i
],
gap
))
)
{
if
(
!
isInWindow
(
pWinInfo
,
pStartTs
[
i
],
gap
)
&&
(
!
pEndTs
||
!
isInWindow
(
pWinInfo
,
pEndTs
[
i
],
gap
)))
{
return
i
-
start
;
}
if
(
pWinInfo
->
win
.
skey
>
pStartTs
[
i
])
{
...
...
@@ -2742,7 +2732,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
SColumnInfoData
*
pTimeWindowData
,
SSDataBlock
*
pSDataBlock
,
SResultWindowInfo
*
pCurWin
,
SResultRow
**
pResult
,
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SOperatorInfo
*
pOperator
)
{
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
tsColId
);
...
...
@@ -2760,14 +2750,14 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
static
int32_t
doOneWindowAgg
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
,
SResultWindowInfo
*
pCurWin
,
SResultRow
**
pResult
,
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SOperatorInfo
*
pOperator
)
{
int32_t
numOutput
,
SOperatorInfo
*
pOperator
)
{
return
doOneWindowAggImpl
(
pInfo
->
primaryTsIndex
,
&
pInfo
->
binfo
,
&
pInfo
->
streamAggSup
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pSDataBlock
,
pCurWin
,
pResult
,
startIndex
,
winRows
,
numOutput
,
pOperator
);
}
static
int32_t
doOneStateWindowAgg
(
SStreamStateAggOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
,
SResultWindowInfo
*
pCurWin
,
SResultRow
**
pResult
,
int32_t
startIndex
,
int32_t
winRows
,
int32_t
numOutput
,
SOperatorInfo
*
pOperator
)
{
int32_t
winRows
,
int32_t
numOutput
,
SOperatorInfo
*
pOperator
)
{
return
doOneWindowAggImpl
(
pInfo
->
primaryTsIndex
,
&
pInfo
->
binfo
,
&
pInfo
->
streamAggSup
,
&
pInfo
->
twAggSup
.
timeWindowData
,
pSDataBlock
,
pCurWin
,
pResult
,
startIndex
,
winRows
,
numOutput
,
pOperator
);
}
...
...
@@ -2788,7 +2778,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
void
compactTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
int32_t
startIndex
,
int32_t
num
,
uint64_t
groupId
,
int32_t
numOfOutput
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
,
SOperatorInfo
*
pOperator
)
{
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SResultWindowInfo
*
pCurWin
=
taosArrayGet
(
pInfo
->
streamAggSup
.
pCurWins
,
startIndex
);
...
...
@@ -2819,8 +2809,8 @@ typedef struct SWinRes {
uint64_t
groupId
;
}
SWinRes
;
static
void
doStreamSessionAggImpl
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pSDataBlock
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
,
bool
hasEndTs
)
{
static
void
doStreamSessionAggImpl
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pSDataBlock
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
,
bool
hasEndTs
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
bool
masterScan
=
true
;
...
...
@@ -2838,14 +2828,14 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
startTsCols
=
(
int64_t
*
)
pStartTsCol
->
pData
;
startTsCols
=
(
int64_t
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
NULL
;
if
(
hasEndTs
)
{
pEndTsCol
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
endTsIndex
);
}
else
{
pEndTsCol
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
}
endTsCols
=
(
int64_t
*
)
pEndTsCol
->
pData
;
endTsCols
=
(
int64_t
*
)
pEndTsCol
->
pData
;
}
else
{
return
;
}
...
...
@@ -2853,10 +2843,9 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
for
(
int32_t
i
=
0
;
i
<
pSDataBlock
->
info
.
rows
;)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
startTsCols
[
i
],
endTsCols
[
i
],
groupId
,
gap
,
&
winIndex
);
winRows
=
updateSessionWindowInfo
(
pCurWin
,
startTsCols
,
endTsCols
,
pSDataBlock
->
info
.
rows
,
i
,
pInfo
->
gap
,
pStDeleted
);
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
startTsCols
[
i
],
endTsCols
[
i
],
groupId
,
gap
,
&
winIndex
);
winRows
=
updateSessionWindowInfo
(
pCurWin
,
startTsCols
,
endTsCols
,
pSDataBlock
->
info
.
rows
,
i
,
pInfo
->
gap
,
pStDeleted
);
code
=
doOneWindowAgg
(
pInfo
,
pSDataBlock
,
pCurWin
,
&
pResult
,
i
,
winRows
,
numOfOutput
,
pOperator
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -2879,14 +2868,15 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
static
void
doClearSessionWindows
(
SStreamAggSupporter
*
pAggSup
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
int32_t
numOfOutput
,
int64_t
gap
,
SArray
*
result
)
{
static
void
doClearSessionWindows
(
SStreamAggSupporter
*
pAggSup
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
int32_t
numOfOutput
,
int64_t
gap
,
SArray
*
result
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
TSKEY
*
tsCols
=
(
TSKEY
*
)
pColDataInfo
->
pData
;
int32_t
step
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
i
+=
step
)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
pBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
i
],
INT64_MIN
,
pBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
step
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
NULL
,
pBlock
->
info
.
rows
,
i
,
gap
,
NULL
);
ASSERT
(
isInWindow
(
pCurWin
,
tsCols
[
i
],
gap
));
doClearWindowImpl
(
&
pCurWin
->
pos
,
pAggSup
->
pResultBuf
,
pSup
,
numOfOutput
);
...
...
@@ -2936,7 +2926,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
static
void
rebuildTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SOperatorInfo
*
pOperator
)
{
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
...
...
@@ -2955,7 +2945,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
int32_t
chWinSize
=
taosArrayGetSize
(
pChWins
);
int32_t
index
=
binarySearch
(
pChWins
,
chWinSize
,
pParentWin
->
win
.
skey
,
TSDB_ORDER_DESC
,
getSessionWindowEndkey
);
if
(
index
<
0
)
{
index
=
0
;
index
=
0
;
}
for
(
int32_t
k
=
index
;
k
<
chWinSize
;
k
++
)
{
SResultWindowInfo
*
pcw
=
taosArrayGet
(
pChWins
,
k
);
...
...
@@ -2976,15 +2966,14 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo
*
getResWinForSession
(
void
*
pData
)
{
return
(
SResultWindowInfo
*
)
pData
;
}
SResultWindowInfo
*
getResWinForState
(
void
*
pData
)
{
return
&
((
SStateWindowInfo
*
)
pData
)
->
winInfo
;
}
int32_t
closeSessionWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
__get_win_info_
fn
)
{
int32_t
closeSessionWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
__get_win_info_
fn
)
{
// Todo(liuyao) save window to tdb
void
**
pIte
=
NULL
;
void
**
pIte
=
NULL
;
size_t
keyLen
=
0
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
uint64_t
*
pGroupId
=
taosHashGetKey
(
pIte
,
&
keyLen
);
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
SResultWindowInfo
*
pSeWin
=
fn
(
pWin
);
...
...
@@ -3005,9 +2994,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
}
int32_t
getAllSessionWindow
(
SHashObj
*
pHashMap
,
SArray
*
pClosed
,
__get_win_info_
fn
)
{
void
**
pIte
=
NULL
;
void
**
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
void
*
pWin
=
taosArrayGet
(
pWins
,
i
);
...
...
@@ -3052,7 +3041,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
gap
,
pWins
);
doClearSessionWindows
(
&
pInfo
->
streamAggSup
,
&
pOperator
->
exprSupp
,
pBlock
,
0
,
pOperator
->
exprSupp
.
numOfExprs
,
pInfo
->
gap
,
pWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
...
...
@@ -3076,13 +3066,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
// if chIndex + 1 - size > 0, add new child
for
(
int32_t
i
=
0
;
i
<
chIndex
+
1
-
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
createStreamFinalSessionAggOperatorInfo
(
NULL
,
pInfo
->
pPhyNode
,
pOperator
->
pTaskInfo
,
0
);
SOperatorInfo
*
pChildOp
=
createStreamFinalSessionAggOperatorInfo
(
NULL
,
pInfo
->
pPhyNode
,
pOperator
->
pTaskInfo
,
0
);
if
(
!
pChildOp
)
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
taosArrayPush
(
pInfo
->
pChildren
,
&
pChildOp
);
}
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
chIndex
);
setInputDataBlock
(
pChildOp
,
pChildOp
->
exprSupp
.
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
doStreamSessionAggImpl
(
pChildOp
,
pBlock
,
NULL
,
NULL
,
true
);
}
...
...
@@ -3093,13 +3084,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForSession
);
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForSession
);
copyUpdateResult
(
pStUpdated
,
pUpdated
);
taosHashCleanup
(
pStUpdated
);
finalizeUpdatedResult
(
pSup
->
numOfExprs
,
pInfo
->
streamAggSup
.
pResultBuf
,
pUpdated
,
pSup
->
rowEntryInfoOffset
);
finalizeUpdatedResult
(
pSup
->
numOfExprs
,
pInfo
->
streamAggSup
.
pResultBuf
,
pUpdated
,
pSup
->
rowEntryInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildDeleteDataBlock
(
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
...
...
@@ -3111,9 +3100,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
static
void
clearStreamSessionOperator
(
SStreamSessionAggOperatorInfo
*
pInfo
)
{
void
**
pIte
=
NULL
;
void
**
pIte
=
NULL
;
while
((
pIte
=
taosHashIterate
(
pInfo
->
streamAggSup
.
pResultRows
,
pIte
))
!=
NULL
)
{
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SResultWindowInfo
*
pWin
=
(
SResultWindowInfo
*
)
taosArrayGet
(
pWins
,
i
);
...
...
@@ -3139,7 +3128,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
TSKEY
maxTs
=
INT64_MIN
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
...
...
@@ -3221,8 +3210,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
)
{
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
)
{
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SOperatorInfo
*
pOperator
=
createStreamSessionAggOperatorInfo
(
downstream
,
pPhyNode
,
pTaskInfo
);
if
(
pOperator
==
NULL
)
{
...
...
@@ -3240,15 +3229,14 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
blockDataEnsureCapacity
(
pInfo
->
pUpdateRes
,
128
);
pOperator
->
name
=
"StreamSessionSemiAggOperator"
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionSemiAgg
,
NULL
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionSemiAgg
,
NULL
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
}
pOperator
->
operatorType
=
pPhyNode
->
type
;
if
(
numOfChild
>
0
)
{
pInfo
->
pChildren
=
taosArrayInit
(
numOfChild
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
numOfChild
;
i
++
)
{
SOperatorInfo
*
pChild
=
createStreamFinalSessionAggOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
SOperatorInfo
*
pChild
=
createStreamFinalSessionAggOperatorInfo
(
NULL
,
pPhyNode
,
pTaskInfo
,
0
);
if
(
pChild
==
NULL
)
{
goto
_error
;
}
...
...
@@ -3366,8 +3354,8 @@ SStateWindowInfo* getStateWindowByTs(SStreamAggSupporter* pAggSup, TSKEY ts, uin
return
NULL
;
}
SStateWindowInfo
*
getStateWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
ts
,
uint64_t
groupId
,
char
*
pKeyData
,
SColumn
*
pCol
,
int32_t
*
pIndex
)
{
SStateWindowInfo
*
getStateWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
ts
,
uint64_t
groupId
,
char
*
pKeyData
,
SColumn
*
pCol
,
int32_t
*
pIndex
)
{
SArray
*
pWinInfos
=
getWinInfos
(
pAggSup
,
groupId
);
pAggSup
->
pCurWins
=
pWinInfos
;
int32_t
size
=
taosArrayGetSize
(
pWinInfos
);
...
...
@@ -3499,11 +3487,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
char
*
pKeyData
=
colDataGetData
(
pKeyColInfo
,
i
);
int32_t
winIndex
=
0
;
bool
allEqual
=
true
;
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
,
tsCols
[
i
],
pSDataBlock
->
info
.
groupId
,
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCols
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pInfo
->
pSeDeleted
);
SStateWindowInfo
*
pCurWin
=
getStateWindow
(
pAggSup
,
tsCols
[
i
],
pSDataBlock
->
info
.
groupId
,
pKeyData
,
&
pInfo
->
stateCol
,
&
winIndex
);
winRows
=
updateStateWindowInfo
(
pAggSup
->
pCurWins
,
winIndex
,
tsCols
,
pKeyColInfo
,
pSDataBlock
->
info
.
rows
,
i
,
&
allEqual
,
pInfo
->
pSeDeleted
);
if
(
!
allEqual
)
{
taosArrayPush
(
pAggSup
->
pScanWindow
,
&
pCurWin
->
winInfo
.
win
);
taosHashRemove
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
));
...
...
@@ -3517,8 +3504,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
pCurWin
->
winInfo
.
isClosed
=
false
;
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
SWinRes
value
=
{.
ts
=
pCurWin
->
winInfo
.
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinRes
));
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinRes
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -3532,7 +3518,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return
NULL
;
}
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
...
...
@@ -3574,8 +3560,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForState
);
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForState
);
copyUpdateResult
(
pSeUpdated
,
pUpdated
);
taosHashCleanup
(
pSeUpdated
);
...
...
@@ -3688,22 +3673,22 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
destroyIntervalOperatorInfo
(
&
miaInfo
->
intervalAggOperatorInfo
,
numOfOutput
);
}
static
int32_t
outputMergeIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
TSKEY
wstartTs
)
{
static
int32_t
outputMergeIntervalResult
(
SOperatorInfo
*
pOperatorInfo
,
uint64_t
tableGroupId
,
SSDataBlock
*
pResultBlock
,
TSKEY
wstartTs
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
bool
ascScan
=
(
iaInfo
->
order
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
order
==
TSDB_ORDER_ASC
);
SET_RES_WINDOW_KEY
(
iaInfo
->
aggSup
.
keyBuf
,
&
wstartTs
,
TSDB_KEYSIZE
,
tableGroupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
ASSERT
(
p1
!=
NULL
);
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
pSup
->
pCtx
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pSup
->
rowEntryInfoOffset
,
pResultBlock
,
pTaskInfo
);
finalizeResultRowIntoResultDataBlock
(
iaInfo
->
aggSup
.
pResultBuf
,
p1
,
pSup
->
pCtx
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pSup
->
rowEntryInfoOffset
,
pResultBlock
,
pTaskInfo
);
taosHashRemove
(
iaInfo
->
aggSup
.
pResultRowHashTable
,
iaInfo
->
aggSup
.
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
TSDB_KEYSIZE
));
return
0
;
...
...
@@ -3715,7 +3700,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperatorInfo
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperatorInfo
->
exprSupp
;
int32_t
startPos
=
0
;
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
...
...
@@ -3726,18 +3711,18 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow
win
;
win
.
skey
=
blockStartTs
;
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
win
.
ekey
=
taosTimeAdd
(
win
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
//TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
// TODO: remove the hash table usage (groupid + winkey => result row position)
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
TSKEY
currTs
=
blockStartTs
;
TSKEY
currPos
=
startPos
;
TSKEY
currTs
=
blockStartTs
;
TSKEY
currPos
=
startPos
;
STimeWindow
currWin
=
win
;
while
(
1
)
{
++
currPos
;
...
...
@@ -3748,25 +3733,27 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
continue
;
}
else
{
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
outputMergeIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
currTs
=
tsCols
[
currPos
];
currWin
.
skey
=
currTs
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
startPos
=
currPos
;
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
iaInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
updateTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
currWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
currWin
,
&
iaInfo
->
twAggSup
.
timeWindowData
,
startPos
,
currPos
-
startPos
,
tsCols
,
pBlock
->
info
.
rows
,
numOfOutput
,
iaInfo
->
order
);
outputMergeIntervalResult
(
pOperatorInfo
,
tableGroupId
,
pResultBlock
,
currTs
);
}
...
...
@@ -3780,7 +3767,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
return
NULL
;
}
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SSDataBlock
*
pRes
=
iaInfo
->
binfo
.
pRes
;
blockDataCleanup
(
pRes
);
blockDataEnsureCapacity
(
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -3841,18 +3828,19 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
}
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
iaInfo
->
win
=
pTaskInfo
->
window
;
iaInfo
->
order
=
TSDB_ORDER_ASC
;
iaInfo
->
interval
=
*
pInterval
;
iaInfo
->
win
=
pTaskInfo
->
window
;
iaInfo
->
order
=
TSDB_ORDER_ASC
;
iaInfo
->
interval
=
*
pInterval
;
iaInfo
->
execModel
=
pTaskInfo
->
execModel
;
iaInfo
->
primaryTsIndex
=
primaryTsSlotId
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
pOperator
,
4096
);
int32_t
code
=
initAggInfo
(
&
pOperator
->
exprSupp
,
&
iaInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
int32_t
code
=
initAggInfo
(
&
pOperator
->
exprSupp
,
&
iaInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
initBasicInfo
(
&
iaInfo
->
binfo
,
pResBlock
);
initExecTimeWindowInfo
(
&
iaInfo
->
twAggSup
.
timeWindowData
,
&
iaInfo
->
win
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
f2b66cc1
...
...
@@ -75,7 +75,7 @@
./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
# ./test.sh -f tsim/stream/session0.sim
#
./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录