Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bcac5352
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bcac5352
编写于
8月 10, 2023
作者:
L
liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add stream task id
上级
c5dd1c55
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
64 addition
and
89 deletion
+64
-89
include/common/tdatablock.h
include/common/tdatablock.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+3
-3
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+1
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+3
-3
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+6
-6
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+5
-4
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+11
-40
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+33
-30
未找到文件。
include/common/tdatablock.h
浏览文件 @
bcac5352
...
@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
...
@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
const
char
*
pData
);
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
const
char
*
pData
);
// for debug
// for debug
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
dumpBuf
);
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
dumpBuf
,
const
char
*
taskIdStr
);
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq2
**
pReq
,
const
SSDataBlock
*
pDataBlocks
,
const
STSchema
*
pTSchema
,
int64_t
uid
,
int32_t
vgId
,
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq2
**
pReq
,
const
SSDataBlock
*
pDataBlocks
,
const
STSchema
*
pTSchema
,
int64_t
uid
,
int32_t
vgId
,
tb_uid_t
suid
);
tb_uid_t
suid
);
...
...
source/common/src/tdatablock.c
浏览文件 @
bcac5352
...
@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
...
@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
}
// for debug
// for debug
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
pDataBuf
)
{
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
pDataBuf
,
const
char
*
taskIdStr
)
{
int32_t
size
=
2048
*
1024
;
int32_t
size
=
2048
*
1024
;
*
pDataBuf
=
taosMemoryCalloc
(
size
,
1
);
*
pDataBuf
=
taosMemoryCalloc
(
size
,
1
);
char
*
dumpBuf
=
*
pDataBuf
;
char
*
dumpBuf
=
*
pDataBuf
;
...
@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
...
@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
len
=
0
;
int32_t
len
=
0
;
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"===stream===%s|block type %d|child id %d|group id:%"
PRIu64
"|uid:%"
PRId64
"
%s
===stream===%s|block type %d|child id %d|group id:%"
PRIu64
"|uid:%"
PRId64
"|rows:%"
PRId64
"|version:%"
PRIu64
"|cal start:%"
PRIu64
"|cal end:%"
PRIu64
"|tbl:%s
\n
"
,
"|rows:%"
PRId64
"|version:%"
PRIu64
"|cal start:%"
PRIu64
"|cal end:%"
PRIu64
"|tbl:%s
\n
"
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
id
.
groupId
,
taskIdStr
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
id
.
groupId
,
pDataBlock
->
info
.
id
.
uid
,
pDataBlock
->
info
.
rows
,
pDataBlock
->
info
.
version
,
pDataBlock
->
info
.
id
.
uid
,
pDataBlock
->
info
.
rows
,
pDataBlock
->
info
.
version
,
pDataBlock
->
info
.
calWin
.
skey
,
pDataBlock
->
info
.
calWin
.
ekey
,
pDataBlock
->
info
.
parTbName
);
pDataBlock
->
info
.
calWin
.
skey
,
pDataBlock
->
info
.
calWin
.
ekey
,
pDataBlock
->
info
.
parTbName
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
bcac5352
...
@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
...
@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if
(
k
==
0
)
{
if
(
k
==
0
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
dataIndex
);
SColumnInfoData
*
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
dataIndex
);
void
*
colData
=
colDataGetData
(
pColData
,
j
);
void
*
colData
=
colDataGetData
(
pColData
,
j
);
tq
Trace
(
"tq sink pipe2, row %d, col %d ts %"
PRId64
,
j
,
k
,
*
(
int64_t
*
)
colData
);
tq
Debug
(
"tq sink pipe2, row %d, col %d ts %"
PRId64
,
j
,
k
,
*
(
int64_t
*
)
colData
);
}
}
if
(
IS_SET_NULL
(
pCol
))
{
if
(
IS_SET_NULL
(
pCol
))
{
SColVal
cv
=
COL_VAL_NULL
(
pCol
->
colId
,
pCol
->
type
);
SColVal
cv
=
COL_VAL_NULL
(
pCol
->
colId
,
pCol
->
type
);
...
...
source/libs/executor/inc/executil.h
浏览文件 @
bcac5352
...
@@ -185,7 +185,7 @@ int32_t convertFillType(int32_t mode);
...
@@ -185,7 +185,7 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
,
SStorageAPI
*
pAPI
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
,
SStorageAPI
*
pAPI
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
,
const
char
*
taskIdStr
);
void
getNextTimeWindow
(
const
SInterval
*
pInterval
,
STimeWindow
*
tw
,
int32_t
order
);
void
getNextTimeWindow
(
const
SInterval
*
pInterval
,
STimeWindow
*
tw
,
int32_t
order
);
void
getInitialStartTimeWindow
(
SInterval
*
pInterval
,
TSKEY
ts
,
STimeWindow
*
w
,
bool
ascQuery
);
void
getInitialStartTimeWindow
(
SInterval
*
pInterval
,
TSKEY
ts
,
STimeWindow
*
w
,
bool
ascQuery
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
bcac5352
...
@@ -2177,12 +2177,12 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
...
@@ -2177,12 +2177,12 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
,
const
char
*
taskIdStr
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"
===stream===%s: Block is Null or Empty"
,
flag
);
qDebug
(
"
%s===stream===%s: Block is Null or Empty"
,
taskIdStr
,
flag
);
return
;
return
;
}
}
char
*
pBuf
=
NULL
;
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
,
taskIdStr
));
taosMemoryFree
(
pBuf
);
taosMemoryFree
(
pBuf
);
}
}
source/libs/executor/src/filloperator.c
浏览文件 @
bcac5352
...
@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
...
@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(
pInfo
->
pFillInfo
->
pos
!=
FILL_POS_INVALID
&&
pInfo
->
pFillInfo
->
needFill
==
true
))
{
(
pInfo
->
pFillInfo
->
pos
!=
FILL_POS_INVALID
&&
pInfo
->
pFillInfo
->
needFill
==
true
))
{
doStreamFillRange
(
pInfo
->
pFillInfo
,
pInfo
->
pFillSup
,
pInfo
->
pRes
);
doStreamFillRange
(
pInfo
->
pFillInfo
,
pInfo
->
pFillSup
,
pInfo
->
pRes
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
return
pInfo
->
pRes
;
}
}
}
}
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doDeleteFillFinalize
(
pOperator
);
doDeleteFillFinalize
(
pOperator
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
return
pInfo
->
pRes
;
}
}
setOperatorCompleted
(
pOperator
);
setOperatorCompleted
(
pOperator
);
...
@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
...
@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
pInfo
->
pFillInfo
->
preRowKey
=
INT64_MIN
;
pInfo
->
pFillInfo
->
preRowKey
=
INT64_MIN
;
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
return
pInfo
->
pRes
;
}
}
break
;
break
;
}
}
printDataBlock
(
pBlock
,
"stream fill recv"
);
printDataBlock
(
pBlock
,
"stream fill recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pInfo
->
pFillInfo
->
curGroupId
!=
pBlock
->
info
.
id
.
groupId
)
{
if
(
pInfo
->
pFillInfo
->
curGroupId
!=
pBlock
->
info
.
id
.
groupId
)
{
pInfo
->
pFillInfo
->
curGroupId
=
pBlock
->
info
.
id
.
groupId
;
pInfo
->
pFillInfo
->
curGroupId
=
pBlock
->
info
.
id
.
groupId
;
...
@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
...
@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo
->
pFillSup
->
hasDelete
=
true
;
pInfo
->
pFillSup
->
hasDelete
=
true
;
doDeleteFillResult
(
pOperator
);
doDeleteFillResult
(
pOperator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"stream fill delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"stream fill delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
continue
;
continue
;
...
@@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
...
@@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
}
pOperator
->
resultInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
return
pInfo
->
pRes
;
}
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
bcac5352
...
@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
...
@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
static
bool
hasRemainTbName
(
SStreamPartitionOperatorInfo
*
pInfo
)
{
return
pInfo
->
pTbNameIte
!=
NULL
;
}
static
bool
hasRemainTbName
(
SStreamPartitionOperatorInfo
*
pInfo
)
{
return
pInfo
->
pTbNameIte
!=
NULL
;
}
static
SSDataBlock
*
buildStreamPartitionResult
(
SOperatorInfo
*
pOperator
)
{
static
SSDataBlock
*
buildStreamPartitionResult
(
SOperatorInfo
*
pOperator
)
{
SStorageAPI
*
pAPI
=
&
pOperator
->
pTaskInfo
->
storageAPI
;
SStorageAPI
*
pAPI
=
&
pOperator
->
pTaskInfo
->
storageAPI
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pDest
=
pInfo
->
binfo
.
pRes
;
SSDataBlock
*
pDest
=
pInfo
->
binfo
.
pRes
;
...
@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
...
@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator
->
resultInfo
.
totalRows
+=
pDest
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
pDest
->
info
.
rows
;
pInfo
->
parIte
=
taosHashIterate
(
pInfo
->
pPartitions
,
pInfo
->
parIte
);
pInfo
->
parIte
=
taosHashIterate
(
pInfo
->
pPartitions
,
pInfo
->
parIte
);
ASSERT
(
pDest
->
info
.
rows
>
0
);
ASSERT
(
pDest
->
info
.
rows
>
0
);
printDataBlock
(
pDest
,
"stream partitionby"
);
printDataBlock
(
pDest
,
"stream partitionby"
,
GET_TASKID
(
pTaskInfo
)
);
return
pDest
;
return
pDest
;
}
}
...
@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
...
@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted
(
pOperator
);
setOperatorCompleted
(
pOperator
);
return
NULL
;
return
NULL
;
}
}
printDataBlock
(
pBlock
,
"stream partitionby recv"
);
printDataBlock
(
pBlock
,
"stream partitionby recv"
,
GET_TASKID
(
pTaskInfo
)
);
switch
(
pBlock
->
info
.
type
)
{
switch
(
pBlock
->
info
.
type
)
{
case
STREAM_NORMAL
:
case
STREAM_NORMAL
:
case
STREAM_PULL_DATA
:
case
STREAM_PULL_DATA
:
...
@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
...
@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case
STREAM_DELETE_DATA
:
{
case
STREAM_DELETE_DATA
:
{
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
printDataBlock
(
pInfo
->
pDelRes
,
"stream partitionby delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"stream partitionby delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
break
;
}
break
;
default:
default:
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
bcac5352
...
@@ -1344,7 +1344,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
...
@@ -1344,7 +1344,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if
(
rows
==
0
)
{
if
(
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
SExecTaskInfo
*
pTaskInfo
=
pInfo
->
pStreamScanOp
->
pTaskInfo
;
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
...
@@ -1361,7 +1361,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
...
@@ -1361,7 +1361,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY
startTs
=
srcStartTsCol
[
0
];
TSKEY
startTs
=
srcStartTsCol
[
0
];
TSKEY
endTs
=
srcEndTsCol
[
0
];
TSKEY
endTs
=
srcEndTsCol
[
0
];
SSDataBlock
*
pPreRes
=
readPreVersionData
(
pInfo
->
pTableScanOp
,
srcUid
,
startTs
,
endTs
,
ver
);
SSDataBlock
*
pPreRes
=
readPreVersionData
(
pInfo
->
pTableScanOp
,
srcUid
,
startTs
,
endTs
,
ver
);
printDataBlock
(
pPreRes
,
"pre res"
);
printDataBlock
(
pPreRes
,
"pre res"
,
GET_TASKID
(
pTaskInfo
)
);
blockDataCleanup
(
pSrcBlock
);
blockDataCleanup
(
pSrcBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pSrcBlock
,
pPreRes
->
info
.
rows
);
int32_t
code
=
blockDataEnsureCapacity
(
pSrcBlock
,
pPreRes
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
@@ -1376,7 +1376,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
...
@@ -1376,7 +1376,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
appendOneRowToStreamSpecialBlock
(
pSrcBlock
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
&
srcUid
,
appendOneRowToStreamSpecialBlock
(
pSrcBlock
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
&
srcUid
,
&
groupId
,
NULL
);
&
groupId
,
NULL
);
}
}
printDataBlock
(
pSrcBlock
,
"new delete"
);
printDataBlock
(
pSrcBlock
,
"new delete"
,
GET_TASKID
(
pTaskInfo
)
);
}
}
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
...
@@ -1922,38 +1922,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
...
@@ -1922,38 +1922,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
switch
(
pInfo
->
scanMode
)
{
switch
(
pInfo
->
scanMode
)
{
case
STREAM_SCAN_FROM_RES
:
{
case
STREAM_SCAN_FROM_RES
:
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
printDataBlock
(
pInfo
->
pRecoverRes
,
"scan recover"
);
printDataBlock
(
pInfo
->
pRecoverRes
,
"scan recover"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRecoverRes
;
return
pInfo
->
pRecoverRes
;
}
break
;
}
break
;
// case STREAM_SCAN_FROM_UPDATERES: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// printDataBlock(pInfo->pUpdateRes, "recover update");
// return pInfo->pUpdateRes;
// } break;
// case STREAM_SCAN_FROM_DELETE_DATA: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
// return pInfo->pDeleteDataRes;
// } break;
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
// if (pSDB) {
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
// checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "scan recover update");
// calBlockTbName(pInfo, pSDB);
// return pSDB;
// }
// blockDataCleanup(pInfo->pUpdateDataRes);
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
// } break;
default:
default:
break
;
break
;
}
}
...
@@ -1972,12 +1943,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
...
@@ -1972,12 +1943,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
if
(
pInfo
->
pCreateTbRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pCreateTbRes
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
printDataBlock
(
pInfo
->
pCreateTbRes
,
"recover createTbl"
);
printDataBlock
(
pInfo
->
pCreateTbRes
,
"recover createTbl"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pCreateTbRes
;
return
pInfo
->
pCreateTbRes
;
}
}
qDebug
(
"stream recover scan get block, rows %"
PRId64
,
pInfo
->
pRecoverRes
->
info
.
rows
);
qDebug
(
"stream recover scan get block, rows %"
PRId64
,
pInfo
->
pRecoverRes
->
info
.
rows
);
printDataBlock
(
pInfo
->
pRecoverRes
,
"scan recover"
);
printDataBlock
(
pInfo
->
pRecoverRes
,
"scan recover"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRecoverRes
;
return
pInfo
->
pRecoverRes
;
}
}
pStreamInfo
->
recoverStep
=
STREAM_RECOVER_STEP__NONE
;
pStreamInfo
->
recoverStep
=
STREAM_RECOVER_STEP__NONE
;
...
@@ -2033,7 +2004,7 @@ FETCH_NEXT_BLOCK:
...
@@ -2033,7 +2004,7 @@ FETCH_NEXT_BLOCK:
pAPI
->
stateStore
.
updateInfoAddCloseWindowSBF
(
pInfo
->
pUpdateInfo
);
pAPI
->
stateStore
.
updateInfoAddCloseWindowSBF
(
pInfo
->
pUpdateInfo
);
}
break
;
}
break
;
case
STREAM_DELETE_DATA
:
{
case
STREAM_DELETE_DATA
:
{
printDataBlock
(
pBlock
,
"stream scan delete recv"
);
printDataBlock
(
pBlock
,
"stream scan delete recv"
,
GET_TASKID
(
pTaskInfo
)
);
SSDataBlock
*
pDelBlock
=
NULL
;
SSDataBlock
*
pDelBlock
=
NULL
;
if
(
pInfo
->
tqReader
)
{
if
(
pInfo
->
tqReader
)
{
pDelBlock
=
createSpecialDataBlock
(
STREAM_DELETE_DATA
);
pDelBlock
=
createSpecialDataBlock
(
STREAM_DELETE_DATA
);
...
@@ -2044,7 +2015,7 @@ FETCH_NEXT_BLOCK:
...
@@ -2044,7 +2015,7 @@ FETCH_NEXT_BLOCK:
setBlockGroupIdByUid
(
pInfo
,
pDelBlock
);
setBlockGroupIdByUid
(
pInfo
,
pDelBlock
);
rebuildDeleteBlockData
(
pDelBlock
,
&
pStreamInfo
->
fillHistoryWindow
,
id
);
rebuildDeleteBlockData
(
pDelBlock
,
&
pStreamInfo
->
fillHistoryWindow
,
id
);
printDataBlock
(
pDelBlock
,
"stream scan delete recv filtered"
);
printDataBlock
(
pDelBlock
,
"stream scan delete recv filtered"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pDelBlock
->
info
.
rows
==
0
)
{
if
(
pDelBlock
->
info
.
rows
==
0
)
{
if
(
pInfo
->
tqReader
)
{
if
(
pInfo
->
tqReader
)
{
blockDataDestroy
(
pDelBlock
);
blockDataDestroy
(
pDelBlock
);
...
@@ -2055,7 +2026,7 @@ FETCH_NEXT_BLOCK:
...
@@ -2055,7 +2026,7 @@ FETCH_NEXT_BLOCK:
if
(
!
isIntervalWindow
(
pInfo
)
&&
!
isSessionWindow
(
pInfo
)
&&
!
isStateWindow
(
pInfo
))
{
if
(
!
isIntervalWindow
(
pInfo
)
&&
!
isSessionWindow
(
pInfo
)
&&
!
isStateWindow
(
pInfo
))
{
generateDeleteResultBlock
(
pInfo
,
pDelBlock
,
pInfo
->
pDeleteDataRes
);
generateDeleteResultBlock
(
pInfo
,
pDelBlock
,
pInfo
->
pDeleteDataRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
printDataBlock
(
pDelBlock
,
"stream scan delete result"
);
printDataBlock
(
pDelBlock
,
"stream scan delete result"
,
GET_TASKID
(
pTaskInfo
)
);
blockDataDestroy
(
pDelBlock
);
blockDataDestroy
(
pDelBlock
);
if
(
pInfo
->
pDeleteDataRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDeleteDataRes
->
info
.
rows
>
0
)
{
...
@@ -2070,7 +2041,7 @@ FETCH_NEXT_BLOCK:
...
@@ -2070,7 +2041,7 @@ FETCH_NEXT_BLOCK:
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
printDataBlock
(
pDelBlock
,
"stream scan delete data"
);
printDataBlock
(
pDelBlock
,
"stream scan delete data"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pInfo
->
tqReader
)
{
if
(
pInfo
->
tqReader
)
{
blockDataDestroy
(
pDelBlock
);
blockDataDestroy
(
pDelBlock
);
}
}
...
@@ -2121,7 +2092,7 @@ FETCH_NEXT_BLOCK:
...
@@ -2121,7 +2092,7 @@ FETCH_NEXT_BLOCK:
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
pSDB
->
info
.
type
=
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER_RANGE
?
STREAM_NORMAL
:
STREAM_PULL_DATA
;
pSDB
->
info
.
type
=
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER_RANGE
?
STREAM_NORMAL
:
STREAM_PULL_DATA
;
checkUpdateData
(
pInfo
,
true
,
pSDB
,
false
);
checkUpdateData
(
pInfo
,
true
,
pSDB
,
false
);
printDataBlock
(
pSDB
,
"stream scan update"
);
printDataBlock
(
pSDB
,
"stream scan update"
,
GET_TASKID
(
pTaskInfo
)
);
calBlockTbName
(
pInfo
,
pSDB
);
calBlockTbName
(
pInfo
,
pSDB
);
return
pSDB
;
return
pSDB
;
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
bcac5352
...
@@ -2507,20 +2507,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2507,20 +2507,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pPullDataRes
;
return
pInfo
->
pPullDataRes
;
}
}
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
!=
0
)
{
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
binfo
.
pRes
;
return
pInfo
->
binfo
.
pRes
;
}
}
...
@@ -2551,7 +2551,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2551,7 +2551,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
}
}
...
@@ -2588,7 +2588,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2588,7 +2588,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break
;
break
;
}
}
pInfo
->
numOfDatapack
++
;
pInfo
->
numOfDatapack
++
;
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval final recv"
:
"interval semi recv"
);
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval final recv"
:
"interval semi recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
)
{
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
pInfo
->
binfo
.
pRes
->
info
.
type
=
pBlock
->
info
.
type
;
...
@@ -2612,7 +2612,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2612,7 +2612,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
pInfo
->
pDelRes
->
info
.
type
=
STREAM_CLEAR
;
pInfo
->
pDelRes
->
info
.
type
=
STREAM_CLEAR
;
}
else
{
}
else
{
...
@@ -2676,20 +2676,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2676,20 +2676,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pPullDataRes
;
return
pInfo
->
pPullDataRes
;
}
}
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
// process the rest of the data
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
!=
0
)
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
!=
0
)
{
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval final"
:
"interval semi"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
binfo
.
pRes
;
return
pInfo
->
binfo
.
pRes
;
}
}
...
@@ -3578,18 +3578,19 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...
@@ -3578,18 +3578,19 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
qDebug
(
"===stream=== stream session agg"
);
qDebug
(
"===stream=== stream session agg"
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
return
NULL
;
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
);
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBInfo
->
pRes
;
return
pBInfo
->
pRes
;
}
}
...
@@ -3610,7 +3611,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...
@@ -3610,7 +3611,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
break
;
break
;
}
}
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"final session recv"
:
"single session recv"
);
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"final session recv"
:
"single session recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
...
@@ -3688,13 +3689,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
...
@@ -3688,13 +3689,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
);
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"final session"
:
"single session"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBInfo
->
pRes
;
return
pBInfo
->
pRes
;
}
}
...
@@ -3868,6 +3869,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
...
@@ -3868,6 +3869,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
TSKEY
maxTs
=
INT64_MIN
;
TSKEY
maxTs
=
INT64_MIN
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
qDebug
(
"===stream=== stream session semi agg"
);
qDebug
(
"===stream=== stream session semi agg"
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
...
@@ -3877,13 +3879,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
...
@@ -3877,13 +3879,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
{
{
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"semi session"
);
printDataBlock
(
pBInfo
->
pRes
,
"semi session"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBInfo
->
pRes
;
return
pBInfo
->
pRes
;
}
}
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"semi session delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"semi session delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
...
@@ -3911,7 +3913,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
...
@@ -3911,7 +3913,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
break
;
break
;
}
}
printDataBlock
(
pBlock
,
"semi session recv"
);
printDataBlock
(
pBlock
,
"semi session recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
...
@@ -3960,13 +3962,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
...
@@ -3960,13 +3962,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
doBuildSessionResult
(
pOperator
,
pAggSup
->
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"semi session"
);
printDataBlock
(
pBInfo
->
pRes
,
"semi session"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBInfo
->
pRes
;
return
pBInfo
->
pRes
;
}
}
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"semi session delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"semi session delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
...
@@ -4259,17 +4261,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
...
@@ -4259,17 +4261,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
qDebug
(
"===stream=== stream state agg"
);
qDebug
(
"===stream=== stream state agg"
);
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single state delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"single state delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildSessionResult
(
pOperator
,
pInfo
->
streamAggSup
.
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
doBuildSessionResult
(
pOperator
,
pInfo
->
streamAggSup
.
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"single state"
);
printDataBlock
(
pBInfo
->
pRes
,
"single state"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBInfo
->
pRes
;
return
pBInfo
->
pRes
;
}
}
...
@@ -4290,7 +4293,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
...
@@ -4290,7 +4293,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
break
;
break
;
}
}
printDataBlock
(
pBlock
,
"single state recv"
);
printDataBlock
(
pBlock
,
"single state recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
...
@@ -4343,13 +4346,13 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
...
@@ -4343,13 +4346,13 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
doBuildDeleteDataBlock
(
pOperator
,
pInfo
->
pSeDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single state delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"single state delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildSessionResult
(
pOperator
,
pInfo
->
streamAggSup
.
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
doBuildSessionResult
(
pOperator
,
pInfo
->
streamAggSup
.
pState
,
&
pInfo
->
groupResInfo
,
pBInfo
->
pRes
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"single state"
);
printDataBlock
(
pBInfo
->
pRes
,
"single state"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBInfo
->
pRes
;
return
pBInfo
->
pRes
;
}
}
setOperatorCompleted
(
pOperator
);
setOperatorCompleted
(
pOperator
);
...
@@ -5129,13 +5132,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -5129,13 +5132,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single interval delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"single interval delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
binfo
.
pRes
;
return
pInfo
->
binfo
.
pRes
;
}
}
...
@@ -5175,7 +5178,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -5175,7 +5178,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
}
pInfo
->
numOfDatapack
++
;
pInfo
->
numOfDatapack
++
;
printDataBlock
(
pBlock
,
"single interval recv"
);
printDataBlock
(
pBlock
,
"single interval recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
...
@@ -5187,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -5187,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
pUpdatedMap
);
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
pUpdatedMap
);
continue
;
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_CREATE_CHILD_TABLE
)
{
printDataBlock
(
pBlock
,
"single interval"
);
printDataBlock
(
pBlock
,
"single interval"
,
GET_TASKID
(
pTaskInfo
)
);
return
pBlock
;
return
pBlock
;
}
else
{
}
else
{
ASSERTS
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_INVALID
,
"invalid SSDataBlock type"
);
ASSERTS
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_INVALID
,
"invalid SSDataBlock type"
);
...
@@ -5241,13 +5244,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -5241,13 +5244,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
doBuildDeleteResult
(
pInfo
,
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"single interval delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
"single interval delete"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
return
pInfo
->
pDelRes
;
}
}
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
doBuildStreamIntervalResult
(
pOperator
,
pInfo
->
pState
,
pInfo
->
binfo
.
pRes
,
&
pInfo
->
groupResInfo
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
"single interval"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
binfo
.
pRes
;
return
pInfo
->
binfo
.
pRes
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录