Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
519c57cc
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
519c57cc
编写于
8月 17, 2023
作者:
H
Haojun Liao
提交者:
GitHub
8月 17, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22423 from taosdata/feat/ly_refactor
stream operator refactoring
上级
9cbbbc6d
5ce7688a
变更
16
展开全部
隐藏空白更改
内联
并排
Showing
16 changed file
with
3348 addition
and
3387 deletion
+3348
-3387
include/common/tdatablock.h
include/common/tdatablock.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+4
-4
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+1
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+7
-3
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+7
-2
source/libs/executor/src/eventwindowoperator.c
source/libs/executor/src/eventwindowoperator.c
+1
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+58
-3
source/libs/executor/src/executorInt.c
source/libs/executor/src/executorInt.c
+12
-0
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+6
-6
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+5
-4
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+4
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+12
-46
source/libs/executor/src/streamtimewindowoperator.c
source/libs/executor/src/streamtimewindowoperator.c
+3195
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+31
-3302
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+2
-2
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+2
-2
未找到文件。
include/common/tdatablock.h
浏览文件 @
519c57cc
...
...
@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
const
char
*
pData
);
// for debug
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
dumpBuf
);
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
dumpBuf
,
const
char
*
taskIdStr
);
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq2
**
pReq
,
const
SSDataBlock
*
pDataBlocks
,
const
STSchema
*
pTSchema
,
int64_t
uid
,
int32_t
vgId
,
tb_uid_t
suid
);
...
...
source/common/src/tdatablock.c
浏览文件 @
519c57cc
...
...
@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
// for debug
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
pDataBuf
)
{
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
pDataBuf
,
const
char
*
taskIdStr
)
{
int32_t
size
=
2048
*
1024
;
*
pDataBuf
=
taosMemoryCalloc
(
size
,
1
);
char
*
dumpBuf
=
*
pDataBuf
;
...
...
@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
len
=
0
;
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
===stream===%s|block type %d|child id %d|group id:%"
PRIu64
"|uid:%"
PRId64
"|rows
:%"
PRId64
"|version:%"
PRIu64
"|cal start:%"
PRIu64
"|cal end:%"
PRIu64
"|tbl:%s
\n
"
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
id
.
groupId
,
"
%s===stream===%s|block type %d|child id %d|group id:%"
PRIu64
"|uid
:%"
PRId64
"|
rows:%"
PRId64
"|
version:%"
PRIu64
"|cal start:%"
PRIu64
"|cal end:%"
PRIu64
"|tbl:%s
\n
"
,
taskIdStr
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
id
.
groupId
,
pDataBlock
->
info
.
id
.
uid
,
pDataBlock
->
info
.
rows
,
pDataBlock
->
info
.
version
,
pDataBlock
->
info
.
calWin
.
skey
,
pDataBlock
->
info
.
calWin
.
ekey
,
pDataBlock
->
info
.
parTbName
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
519c57cc
...
...
@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if
(
k
==
0
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
dataIndex
);
void
*
colData
=
colDataGetData
(
pColData
,
j
);
tq
Trace
(
"tq sink pipe2, row %d, col %d ts %"
PRId64
,
j
,
k
,
*
(
int64_t
*
)
colData
);
tq
Debug
(
"tq sink pipe2, row %d, col %d ts %"
PRId64
,
j
,
k
,
*
(
int64_t
*
)
colData
);
}
if
(
IS_SET_NULL
(
pCol
))
{
SColVal
cv
=
COL_VAL_NULL
(
pCol
->
colId
,
pCol
->
type
);
...
...
source/libs/executor/inc/executil.h
浏览文件 @
519c57cc
...
...
@@ -183,13 +183,17 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t
convertFillType
(
int32_t
mode
);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
,
SStorageAPI
*
pAPI
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
,
SStorageAPI
*
pAPI
);
char
*
getStreamOpName
(
uint16_t
opType
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
,
const
char
*
taskIdStr
);
void
printSpecDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
,
const
char
*
opStr
,
const
char
*
taskIdStr
);
void
getNextTimeWindow
(
const
SInterval
*
pInterval
,
STimeWindow
*
tw
,
int32_t
order
);
void
getInitialStartTimeWindow
(
SInterval
*
pInterval
,
TSKEY
ts
,
STimeWindow
*
w
,
bool
ascQuery
);
TSKEY
getStartTsKey
(
STimeWindow
*
win
,
const
TSKEY
*
tsCols
);
void
updateTimeWindowInfo
(
SColumnInfoData
*
pColData
,
STimeWindow
*
pWin
,
int64_t
delta
);
SSDataBlock
*
createTagValBlockForFilter
(
SArray
*
pColList
,
int32_t
numOfTables
,
SArray
*
pUidTagList
,
void
*
pVnode
,
SStorageAPI
*
pStorageAPI
);
#endif // TDENGINE_EXECUTIL_H
source/libs/executor/inc/executorInt.h
浏览文件 @
519c57cc
...
...
@@ -470,7 +470,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray
*
pPullWins
;
// SPullWindowInfo
int32_t
pullIndex
;
SSDataBlock
*
pPullDataRes
;
bool
isFinal
;
SArray
*
pChildren
;
int32_t
numOfChild
;
SStreamState
*
pState
;
// void
...
...
@@ -521,7 +520,6 @@ typedef struct SStreamSessionAggOperatorInfo {
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result; final stream operator
SPhysiNode
*
pPhyNode
;
// create new child
bool
isFinal
;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pUpdated
;
...
...
@@ -709,6 +707,13 @@ uint64_t calcGroupId(char* pData, int32_t len);
void
streamOpReleaseState
(
struct
SOperatorInfo
*
pOperator
);
void
streamOpReloadState
(
struct
SOperatorInfo
*
pOperator
);
bool
inSlidingWindow
(
SInterval
*
pInterval
,
STimeWindow
*
pWin
,
SDataBlockInfo
*
pBlockInfo
);
bool
inCalSlidingWindow
(
SInterval
*
pInterval
,
STimeWindow
*
pWin
,
TSKEY
calStart
,
TSKEY
calEnd
,
EStreamType
blockType
);
bool
compareVal
(
const
char
*
v
,
const
SStateKeys
*
pKey
);
int32_t
getNextQualifiedWindow
(
SInterval
*
pInterval
,
STimeWindow
*
pNext
,
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
primaryKeys
,
int32_t
prevPosition
,
int32_t
order
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/eventwindowoperator.c
浏览文件 @
519c57cc
...
...
@@ -58,16 +58,6 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup
->
groupId
=
groupId
;
}
static
void
updateTimeWindowInfo
(
SColumnInfoData
*
pColData
,
STimeWindow
*
pWin
,
bool
includeEndpoint
)
{
int64_t
*
ts
=
(
int64_t
*
)
pColData
->
pData
;
int32_t
delta
=
includeEndpoint
?
1
:
0
;
int64_t
duration
=
pWin
->
ekey
-
pWin
->
skey
+
delta
;
ts
[
2
]
=
duration
;
// set the duration
ts
[
3
]
=
pWin
->
skey
;
// window start key
ts
[
4
]
=
pWin
->
ekey
+
delta
;
// window end key
}
SOperatorInfo
*
createEventwindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
physiNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SEventWindowOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SEventWindowOperatorInfo
));
...
...
@@ -250,7 +240,7 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_APP_ERROR
);
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pRowSup
->
win
,
false
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pRowSup
->
win
,
0
);
applyAggFunctionOnPartialTuples
(
pTaskInfo
,
pSup
->
pCtx
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startIndex
,
numOfRows
,
pBlock
->
info
.
rows
,
numOfOutput
);
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
519c57cc
...
...
@@ -2177,12 +2177,67 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return
TSDB_CODE_SUCCESS
;
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
char
*
getStreamOpName
(
uint16_t
opType
)
{
switch
(
opType
)
{
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
:
return
"stream scan"
;
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:
return
"project"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
:
return
"interval single"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
:
return
"interval final"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
:
return
"interval semi"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL
:
return
"stream fill"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
:
return
"session single"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION
:
return
"session semi"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
:
return
"session final"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
:
return
"state single"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
:
return
"stream partitionby"
;
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT
:
return
"stream event"
;
}
return
""
;
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
,
const
char
*
taskIdStr
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"
===stream===%s: Block is Null or Empty"
,
flag
);
qDebug
(
"
%s===stream===%s: Block is Null or Empty"
,
taskIdStr
,
flag
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
,
taskIdStr
));
taosMemoryFree
(
pBuf
);
}
void
printSpecDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
,
const
char
*
opStr
,
const
char
*
taskIdStr
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"%s===stream===%s: Block is Null or Empty"
,
taskIdStr
,
flag
);
return
;
}
if
(
qDebugFlag
&
DEBUG_DEBUG
)
{
char
*
pBuf
=
NULL
;
char
flagBuf
[
64
];
snprintf
(
flagBuf
,
sizeof
(
flagBuf
),
"%s %s"
,
flag
,
opStr
);
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flagBuf
,
&
pBuf
,
taskIdStr
));
taosMemoryFree
(
pBuf
);
}
}
TSKEY
getStartTsKey
(
STimeWindow
*
win
,
const
TSKEY
*
tsCols
)
{
return
tsCols
==
NULL
?
win
->
skey
:
tsCols
[
0
];
}
void
updateTimeWindowInfo
(
SColumnInfoData
*
pColData
,
STimeWindow
*
pWin
,
int64_t
delta
)
{
int64_t
*
ts
=
(
int64_t
*
)
pColData
->
pData
;
int64_t
duration
=
pWin
->
ekey
-
pWin
->
skey
+
delta
;
ts
[
2
]
=
duration
;
// set the duration
ts
[
3
]
=
pWin
->
skey
;
// window start key
ts
[
4
]
=
pWin
->
ekey
+
delta
;
// window end key
}
source/libs/executor/src/executorInt.c
浏览文件 @
519c57cc
...
...
@@ -1070,3 +1070,15 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
downstream
->
fpSet
.
reloadStreamStateFn
(
downstream
);
}
}
bool
compareVal
(
const
char
*
v
,
const
SStateKeys
*
pKey
)
{
if
(
IS_VAR_DATA_TYPE
(
pKey
->
type
))
{
if
(
varDataLen
(
v
)
!=
varDataLen
(
pKey
->
pData
))
{
return
false
;
}
else
{
return
memcmp
(
varDataVal
(
v
),
varDataVal
(
pKey
->
pData
),
varDataLen
(
v
))
==
0
;
}
}
else
{
return
memcmp
(
pKey
->
pData
,
v
,
pKey
->
bytes
)
==
0
;
}
}
source/libs/executor/src/filloperator.c
浏览文件 @
519c57cc
...
...
@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(
pInfo
->
pFillInfo
->
pos
!=
FILL_POS_INVALID
&&
pInfo
->
pFillInfo
->
needFill
==
true
))
{
doStreamFillRange
(
pInfo
->
pFillInfo
,
pInfo
->
pFillSup
,
pInfo
->
pRes
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
}
}
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doDeleteFillFinalize
(
pOperator
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
}
setOperatorCompleted
(
pOperator
);
...
...
@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
pInfo
->
pFillInfo
->
preRowKey
=
INT64_MIN
;
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
}
break
;
}
print
DataBlock
(
pBlock
,
"stream fill recv"
);
print
SpecDataBlock
(
pBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
"recv"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pInfo
->
pFillInfo
->
curGroupId
!=
pBlock
->
info
.
id
.
groupId
)
{
pInfo
->
pFillInfo
->
curGroupId
=
pBlock
->
info
.
id
.
groupId
;
...
...
@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo
->
pFillSup
->
hasDelete
=
true
;
doDeleteFillResult
(
pOperator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
"stream fill delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
}
continue
;
...
...
@@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
pOperator
->
resultInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
printDataBlock
(
pInfo
->
pRes
,
"stream fill"
);
printDataBlock
(
pInfo
->
pRes
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRes
;
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
519c57cc
...
...
@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
static
bool
hasRemainTbName
(
SStreamPartitionOperatorInfo
*
pInfo
)
{
return
pInfo
->
pTbNameIte
!=
NULL
;
}
static
SSDataBlock
*
buildStreamPartitionResult
(
SOperatorInfo
*
pOperator
)
{
SStorageAPI
*
pAPI
=
&
pOperator
->
pTaskInfo
->
storageAPI
;
SStorageAPI
*
pAPI
=
&
pOperator
->
pTaskInfo
->
storageAPI
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamPartitionOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pDest
=
pInfo
->
binfo
.
pRes
;
...
...
@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator
->
resultInfo
.
totalRows
+=
pDest
->
info
.
rows
;
pInfo
->
parIte
=
taosHashIterate
(
pInfo
->
pPartitions
,
pInfo
->
parIte
);
ASSERT
(
pDest
->
info
.
rows
>
0
);
printDataBlock
(
pDest
,
"stream partitionby"
);
printDataBlock
(
pDest
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pDest
;
}
...
...
@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted
(
pOperator
);
return
NULL
;
}
print
DataBlock
(
pBlock
,
"stream partitionby recv"
);
print
SpecDataBlock
(
pBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
"recv"
,
GET_TASKID
(
pTaskInfo
)
);
switch
(
pBlock
->
info
.
type
)
{
case
STREAM_NORMAL
:
case
STREAM_PULL_DATA
:
...
...
@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case
STREAM_DELETE_DATA
:
{
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
printDataBlock
(
pInfo
->
pDelRes
,
"stream partitionby delete"
);
printDataBlock
(
pInfo
->
pDelRes
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pDelRes
;
}
break
;
default:
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
519c57cc
...
...
@@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
)
{
printDataBlock
(
p
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
));
}
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
519c57cc
...
...
@@ -1343,7 +1343,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if
(
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
SExecTaskInfo
*
pTaskInfo
=
pInfo
->
pStreamScanOp
->
pTaskInfo
;
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
...
...
@@ -1360,7 +1360,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY
startTs
=
srcStartTsCol
[
0
];
TSKEY
endTs
=
srcEndTsCol
[
0
];
SSDataBlock
*
pPreRes
=
readPreVersionData
(
pInfo
->
pTableScanOp
,
srcUid
,
startTs
,
endTs
,
ver
);
printDataBlock
(
pPreRes
,
"pre res"
);
printDataBlock
(
pPreRes
,
"pre res"
,
GET_TASKID
(
pTaskInfo
)
);
blockDataCleanup
(
pSrcBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pSrcBlock
,
pPreRes
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1375,7 +1375,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
appendOneRowToStreamSpecialBlock
(
pSrcBlock
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
&
srcUid
,
&
groupId
,
NULL
);
}
printDataBlock
(
pSrcBlock
,
"new delete"
);
printDataBlock
(
pSrcBlock
,
"new delete"
,
GET_TASKID
(
pTaskInfo
)
);
}
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
...
...
@@ -1921,38 +1921,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
switch
(
pInfo
->
scanMode
)
{
case
STREAM_SCAN_FROM_RES
:
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
print
DataBlock
(
pInfo
->
pRecoverRes
,
"scan recover"
);
print
SpecDataBlock
(
pInfo
->
pRecoverRes
,
getStreamOpName
(
pOperator
->
operatorType
),
"recover"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRecoverRes
;
}
break
;
// case STREAM_SCAN_FROM_UPDATERES: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// printDataBlock(pInfo->pUpdateRes, "recover update");
// return pInfo->pUpdateRes;
// } break;
// case STREAM_SCAN_FROM_DELETE_DATA: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
// return pInfo->pDeleteDataRes;
// } break;
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
// if (pSDB) {
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
// checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "scan recover update");
// calBlockTbName(pInfo, pSDB);
// return pSDB;
// }
// blockDataCleanup(pInfo->pUpdateDataRes);
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
// } break;
default:
break
;
}
...
...
@@ -1961,22 +1932,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
pRecoverRes
!=
NULL
)
{
calBlockTbName
(
pInfo
,
pInfo
->
pRecoverRes
);
if
(
!
pInfo
->
igCheckUpdate
&&
pInfo
->
pUpdateInfo
)
{
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY
maxTs
=
pAPI
->
stateStore
.
updateInfoFillBlockData
(
pInfo
->
pUpdateInfo
,
pInfo
->
pRecoverRes
,
pInfo
->
primaryTsIndex
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
// } else {
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
// }
}
if
(
pInfo
->
pCreateTbRes
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
print
DataBlock
(
pInfo
->
pCreateTbRes
,
"recover createTbl"
);
print
SpecDataBlock
(
pInfo
->
pCreateTbRes
,
getStreamOpName
(
pOperator
->
operatorType
),
"recover"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pCreateTbRes
;
}
qDebug
(
"stream recover scan get block, rows %"
PRId64
,
pInfo
->
pRecoverRes
->
info
.
rows
);
print
DataBlock
(
pInfo
->
pRecoverRes
,
"scan recover"
);
print
SpecDataBlock
(
pInfo
->
pRecoverRes
,
getStreamOpName
(
pOperator
->
operatorType
),
"recover"
,
GET_TASKID
(
pTaskInfo
)
);
return
pInfo
->
pRecoverRes
;
}
pStreamInfo
->
recoverStep
=
STREAM_RECOVER_STEP__NONE
;
...
...
@@ -2032,7 +1998,7 @@ FETCH_NEXT_BLOCK:
pAPI
->
stateStore
.
updateInfoAddCloseWindowSBF
(
pInfo
->
pUpdateInfo
);
}
break
;
case
STREAM_DELETE_DATA
:
{
print
DataBlock
(
pBlock
,
"stream scan delete recv"
);
print
SpecDataBlock
(
pBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
"delete recv"
,
GET_TASKID
(
pTaskInfo
)
);
SSDataBlock
*
pDelBlock
=
NULL
;
if
(
pInfo
->
tqReader
)
{
pDelBlock
=
createSpecialDataBlock
(
STREAM_DELETE_DATA
);
...
...
@@ -2043,7 +2009,7 @@ FETCH_NEXT_BLOCK:
setBlockGroupIdByUid
(
pInfo
,
pDelBlock
);
rebuildDeleteBlockData
(
pDelBlock
,
&
pStreamInfo
->
fillHistoryWindow
,
id
);
print
DataBlock
(
pDelBlock
,
"stream scan delete recv filtered"
);
print
SpecDataBlock
(
pDelBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
"delete recv filtered"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pDelBlock
->
info
.
rows
==
0
)
{
if
(
pInfo
->
tqReader
)
{
blockDataDestroy
(
pDelBlock
);
...
...
@@ -2054,7 +2020,7 @@ FETCH_NEXT_BLOCK:
if
(
!
isIntervalWindow
(
pInfo
)
&&
!
isSessionWindow
(
pInfo
)
&&
!
isStateWindow
(
pInfo
))
{
generateDeleteResultBlock
(
pInfo
,
pDelBlock
,
pInfo
->
pDeleteDataRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
print
DataBlock
(
pDelBlock
,
"stream scan delete result"
);
print
SpecDataBlock
(
pDelBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
"delete result"
,
GET_TASKID
(
pTaskInfo
)
);
blockDataDestroy
(
pDelBlock
);
if
(
pInfo
->
pDeleteDataRes
->
info
.
rows
>
0
)
{
...
...
@@ -2069,7 +2035,7 @@ FETCH_NEXT_BLOCK:
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
print
DataBlock
(
pDelBlock
,
"stream scan delete data"
);
print
SpecDataBlock
(
pDelBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
"delete result"
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pInfo
->
tqReader
)
{
blockDataDestroy
(
pDelBlock
);
}
...
...
@@ -2084,7 +2050,7 @@ FETCH_NEXT_BLOCK:
default:
break
;
}
// printDataBlock(pBlock, "stream scan recv"
);
printDataBlock
(
pBlock
,
getStreamOpName
(
pOperator
->
operatorType
),
GET_TASKID
(
pTaskInfo
)
);
return
pBlock
;
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__DATA_SUBMIT
)
{
qDebug
(
"stream scan mode:%d, %s"
,
pInfo
->
scanMode
,
id
);
...
...
@@ -2120,7 +2086,7 @@ FETCH_NEXT_BLOCK:
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
pSDB
->
info
.
type
=
pInfo
->
scanMode
==
STREAM_SCAN_FROM_DATAREADER_RANGE
?
STREAM_NORMAL
:
STREAM_PULL_DATA
;
checkUpdateData
(
pInfo
,
true
,
pSDB
,
false
);
print
DataBlock
(
pSDB
,
"stream scan update"
);
print
SpecDataBlock
(
pSDB
,
getStreamOpName
(
pOperator
->
operatorType
),
"update"
,
GET_TASKID
(
pTaskInfo
)
);
calBlockTbName
(
pInfo
,
pSDB
);
return
pSDB
;
}
...
...
source/libs/executor/src/streamtimewindowoperator.c
0 → 100644
浏览文件 @
519c57cc
此差异已折叠。
点击以展开。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
519c57cc
此差异已折叠。
点击以展开。
source/libs/stream/src/streamExec.c
浏览文件 @
519c57cc
...
...
@@ -450,11 +450,11 @@ static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pI
if
(
qItem
==
NULL
)
{
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
&&
(
++
retryTimes
)
<
MAX_RETRY_TIMES
)
{
taosMsleep
(
10
);
qDebug
(
"
===stream===
try again batchSize:%d, retry:%d, %s"
,
*
numOfBlocks
,
retryTimes
,
id
);
qDebug
(
"try again batchSize:%d, retry:%d, %s"
,
*
numOfBlocks
,
retryTimes
,
id
);
continue
;
}
qDebug
(
"
===stream===
break batchSize:%d, %s"
,
*
numOfBlocks
,
id
);
qDebug
(
"break batchSize:%d, %s"
,
*
numOfBlocks
,
id
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/stream/src/streamQueue.c
浏览文件 @
519c57cc
...
...
@@ -130,11 +130,11 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char*
if
(
pReader
->
taskLevel
==
TASK_LEVEL__SOURCE
&&
numOfBlocks
<
MIN_STREAM_EXEC_BATCH_NUM
&&
tryCount
<
pReader
->
waitDuration
)
{
tryCount
++
;
taosMsleep
(
1
);
qDebug
(
"
===stream===
try again batchSize:%d"
,
numOfBlocks
);
qDebug
(
"try again batchSize:%d"
,
numOfBlocks
);
continue
;
}
qDebug
(
"
===stream===
break batchSize:%d"
,
numOfBlocks
);
qDebug
(
"break batchSize:%d"
,
numOfBlocks
);
break
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录