Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
60e7e2ae
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
60e7e2ae
编写于
5月 24, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(query): add more information regarding analyze sql execution.
上级
b4f6f3ef
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
160 addition
and
239 deletion
+160
-239
include/common/tmsg.h
include/common/tmsg.h
+15
-2
include/libs/command/command.h
include/libs/command/command.h
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+8
-4
source/libs/command/inc/commandInt.h
source/libs/command/inc/commandInt.h
+1
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+31
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+13
-48
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+0
-13
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+38
-124
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+15
-10
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+0
-4
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+30
-14
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+8
-16
未找到文件。
include/common/tmsg.h
浏览文件 @
60e7e2ae
...
...
@@ -1210,9 +1210,10 @@ typedef struct {
}
SRetrieveMetaTableRsp
;
typedef
struct
SExplainExecInfo
{
uint64_t
startupCost
;
uint64_t
totalCost
;
double
startupCost
;
double
totalCost
;
uint64_t
numOfRows
;
uint32_t
verboseLen
;
void
*
verboseInfo
;
}
SExplainExecInfo
;
...
...
@@ -1221,6 +1222,18 @@ typedef struct {
SExplainExecInfo
*
subplanInfo
;
}
SExplainRsp
;
typedef
struct
STableScanAnalyzeInfo
{
uint64_t
totalRows
;
uint64_t
totalCheckedRows
;
uint32_t
totalBlocks
;
uint32_t
loadBlocks
;
uint32_t
loadBlockStatis
;
uint32_t
skipBlocks
;
uint32_t
filterOutBlocks
;
double
elapsedTime
;
uint64_t
filterTime
;
}
STableScanAnalyzeInfo
;
int32_t
tSerializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
);
int32_t
tDeserializeSExplainRsp
(
void
*
buf
,
int32_t
bufLen
,
SExplainRsp
*
pRsp
);
...
...
include/libs/command/command.h
浏览文件 @
60e7e2ae
...
...
@@ -24,7 +24,7 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
int32_t
qExecStaticExplain
(
SQueryPlan
*
pDag
,
SRetrieveTableRsp
**
pRsp
);
int32_t
qExecExplainBegin
(
SQueryPlan
*
pDag
,
SExplainCtx
**
pCtx
,
int64_t
startTs
);
int32_t
qExecExplainEnd
(
SExplainCtx
*
pCtx
,
SRetrieveTableRsp
**
pRsp
);
int32_t
qExplainUpdateExecInfo
(
SExplainCtx
*
pCtx
,
SExplainRsp
*
pRspMsg
,
int32_t
groupId
,
SRetrieveTableRsp
**
pRsp
);
int32_t
qExplainUpdateExecInfo
(
SExplainCtx
*
pCtx
,
SExplainRsp
*
pRspMsg
,
int32_t
groupId
,
SRetrieveTableRsp
**
pRsp
);
void
qExplainFreeCtx
(
SExplainCtx
*
pCtx
);
source/common/src/tmsg.c
浏览文件 @
60e7e2ae
...
...
@@ -3318,9 +3318,11 @@ int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
numOfPlans
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfPlans
;
++
i
)
{
SExplainExecInfo
*
info
=
&
pRsp
->
subplanInfo
[
i
];
if
(
tEncode
U64
(
&
encoder
,
info
->
startupCost
)
<
0
)
return
-
1
;
if
(
tEncode
U64
(
&
encoder
,
info
->
totalCost
)
<
0
)
return
-
1
;
if
(
tEncode
Double
(
&
encoder
,
info
->
startupCost
)
<
0
)
return
-
1
;
if
(
tEncode
Double
(
&
encoder
,
info
->
totalCost
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
info
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
info
->
verboseLen
)
<
0
)
return
-
1
;
if
(
tEncodeBinary
(
&
encoder
,
info
->
verboseInfo
,
info
->
verboseLen
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
...
...
@@ -3341,9 +3343,11 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if
(
pRsp
->
subplanInfo
==
NULL
)
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
numOfPlans
;
++
i
)
{
if
(
tDecode
U64
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
startupCost
)
<
0
)
return
-
1
;
if
(
tDecode
U64
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
totalCost
)
<
0
)
return
-
1
;
if
(
tDecode
Double
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
startupCost
)
<
0
)
return
-
1
;
if
(
tDecode
Double
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
totalCost
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
numOfRows
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pRsp
->
subplanInfo
[
i
].
verboseLen
)
<
0
)
return
-
1
;
if
(
tDecodeBinary
(
&
decoder
,
(
const
uint8_t
**
)
&
pRsp
->
subplanInfo
[
i
].
verboseInfo
,
&
pRsp
->
subplanInfo
[
i
].
verboseLen
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
...
...
source/libs/command/inc/commandInt.h
浏览文件 @
60e7e2ae
...
...
@@ -60,7 +60,7 @@ extern "C" {
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%
" PRIu64 "..%" PRIu64 "
rows=%" PRIu64
#define EXPLAIN_EXECINFO_FORMAT "cost=%
.3f..%.3f
rows=%" PRIu64
typedef
struct
SExplainGroup
{
int32_t
nodeNum
;
...
...
source/libs/command/src/explain.c
浏览文件 @
60e7e2ae
...
...
@@ -381,6 +381,35 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
// basic analyze output
if
(
EXPLAIN_MODE_ANALYZE
==
ctx
->
mode
)
{
EXPLAIN_ROW_NEW
(
level
+
1
,
"I/O: "
);
int32_t
nodeNum
=
taosArrayGetSize
(
pResNode
->
pExecInfo
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
;
++
i
)
{
SExplainExecInfo
*
execInfo
=
taosArrayGet
(
pResNode
->
pExecInfo
,
i
);
STableScanAnalyzeInfo
*
pScanInfo
=
(
STableScanAnalyzeInfo
*
)
execInfo
->
verboseInfo
;
EXPLAIN_ROW_APPEND
(
"total_blocks=%d"
,
pScanInfo
->
totalBlocks
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
"load_blocks=%d"
,
pScanInfo
->
loadBlocks
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
"load_block_SMAs=%d"
,
pScanInfo
->
loadBlockStatis
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
"total_rows=%"
PRIu64
,
pScanInfo
->
totalRows
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
"check_rows=%"
PRIu64
,
pScanInfo
->
totalCheckedRows
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
}
if
(
verbose
)
{
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_OUTPUT_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
...
...
@@ -390,8 +419,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_TIMERANGE_FORMAT
,
pTblScanNode
->
scanRange
.
skey
,
pTblScanNode
->
scanRange
.
ekey
);
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_TIMERANGE_FORMAT
,
pTblScanNode
->
scanRange
.
skey
,
pTblScanNode
->
scanRange
.
ekey
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
...
...
@@ -637,6 +665,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_FUNCTIONS_FORMAT
,
pIntNode
->
window
.
pFuncs
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pIntNode
->
window
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
60e7e2ae
...
...
@@ -86,43 +86,12 @@ typedef struct STableQueryInfo {
// SVariant tag;
}
STableQueryInfo
;
typedef
enum
{
QUERY_PROF_BEFORE_OPERATOR_EXEC
=
0
,
QUERY_PROF_AFTER_OPERATOR_EXEC
,
QUERY_PROF_QUERY_ABORT
}
EQueryProfEventType
;
typedef
struct
{
EQueryProfEventType
eventType
;
int64_t
eventTime
;
union
{
uint8_t
operatorType
;
// for operator event
int32_t
abortCode
;
// for query abort event
};
}
SQueryProfEvent
;
typedef
struct
{
uint8_t
operatorType
;
int64_t
sumSelfTime
;
int64_t
sumRunTimes
;
}
SOperatorProfResult
;
typedef
struct
SLimit
{
int64_t
limit
;
int64_t
offset
;
}
SLimit
;
typedef
struct
SFileBlockLoadRecorder
{
uint64_t
totalRows
;
uint64_t
totalCheckedRows
;
uint32_t
totalBlocks
;
uint32_t
loadBlocks
;
uint32_t
loadBlockStatis
;
uint32_t
skipBlocks
;
uint32_t
filterOutBlocks
;
uint64_t
elapsedTime
;
}
SFileBlockLoadRecorder
;
typedef
struct
STableScanAnalyzeInfo
SFileBlockLoadRecorder
;
typedef
struct
STaskCostInfo
{
int64_t
created
;
...
...
@@ -152,8 +121,8 @@ typedef struct STaskCostInfo {
}
STaskCostInfo
;
typedef
struct
SOperatorCostInfo
{
uint64_t
openCost
;
uint64_t
totalCost
;
double
openCost
;
double
totalCost
;
}
SOperatorCostInfo
;
// The basic query information extracted from the SQueryInfo tree to support the
...
...
@@ -200,7 +169,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
int32_t
(
*
__optr_
get_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplai
n
);
typedef
int32_t
(
*
__optr_
explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
le
n
);
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
...
...
@@ -264,14 +233,14 @@ enum {
};
typedef
struct
SOperatorFpSet
{
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model, todo remove it
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_
get_explain_fn_t
getExplainFn
;
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model, todo remove it
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_
explain_fn_t
getExplainFn
;
}
SOperatorFpSet
;
typedef
struct
SOperatorInfo
{
...
...
@@ -656,7 +625,7 @@ typedef struct SJoinOperatorInfo {
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
streamFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_encode_fn_t
encode
,
__optr_decode_fn_t
decode
,
__optr_
get_
explain_fn_t
explain
);
__optr_decode_fn_t
decode
,
__optr_explain_fn_t
explain
);
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
...
...
@@ -775,10 +744,6 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
publishOperatorProfEvent
(
SOperatorInfo
*
operatorInfo
,
EQueryProfEventType
eventType
);
void
publishQueryAbortEvent
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
code
);
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
60e7e2ae
...
...
@@ -30,13 +30,6 @@
#include "tlosertree.h"
#include "ttypes.h"
typedef
struct
STaskMgmt
{
TdThreadMutex
lock
;
SCacheObj
*
qinfoPool
;
// query handle pool
int32_t
vgId
;
bool
closed
;
}
STaskMgmt
;
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
...
...
@@ -131,7 +124,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
// error occurs, record the error code and return to client
int32_t
ret
=
setjmp
(
pTaskInfo
->
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
pTaskInfo
->
code
=
ret
;
cleanUpUdfs
();
qDebug
(
"%s task abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
...
...
@@ -141,16 +133,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
qDebug
(
"%s execTask is launched"
,
GET_TASKID
(
pTaskInfo
));
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
int64_t
st
=
taosGetTimestampUs
();
*
pRes
=
pTaskInfo
->
pRoot
->
fpSet
.
getNextFn
(
pTaskInfo
->
pRoot
);
uint64_t
el
=
(
taosGetTimestampUs
()
-
st
);
pTaskInfo
->
cost
.
elapsedTime
+=
el
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
NULL
==
*
pRes
)
{
*
useconds
=
pTaskInfo
->
cost
.
elapsedTime
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
60e7e2ae
...
...
@@ -125,6 +125,8 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
pOperator
->
cost
.
totalCost
=
(
taosGetTimestampUs
()
-
pOperator
->
pTaskInfo
->
cost
.
start
*
1000
)
/
1000
.
0
;
if
(
pOperator
->
pTaskInfo
!=
NULL
)
{
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
}
...
...
@@ -138,7 +140,7 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
streamFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_encode_fn_t
encode
,
__optr_decode_fn_t
decode
,
__optr_
get_
explain_fn_t
explain
)
{
__optr_decode_fn_t
decode
,
__optr_explain_fn_t
explain
)
{
SOperatorFpSet
fpSet
=
{
.
_openFn
=
openFn
,
.
getNextFn
=
nextFn
,
...
...
@@ -2136,102 +2138,6 @@ int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock
return
pBlock
->
info
.
rows
;
}
void
publishOperatorProfEvent
(
SOperatorInfo
*
pOperator
,
EQueryProfEventType
eventType
)
{
SQueryProfEvent
event
=
{
0
};
event
.
eventType
=
eventType
;
event
.
eventTime
=
taosGetTimestampUs
();
event
.
operatorType
=
pOperator
->
operatorType
;
// if (pQInfo->summary.queryProfEvents) {
// taosArrayPush(pQInfo->summary.queryProfEvents, &event);
// }
}
void
publishQueryAbortEvent
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
code
)
{
SQueryProfEvent
event
;
event
.
eventType
=
QUERY_PROF_QUERY_ABORT
;
event
.
eventTime
=
taosGetTimestampUs
();
event
.
abortCode
=
code
;
if
(
pTaskInfo
->
cost
.
queryProfEvents
)
{
taosArrayPush
(
pTaskInfo
->
cost
.
queryProfEvents
,
&
event
);
}
}
typedef
struct
{
uint8_t
operatorType
;
int64_t
beginTime
;
int64_t
endTime
;
int64_t
selfTime
;
int64_t
descendantsTime
;
}
SOperatorStackItem
;
static
void
doOperatorExecProfOnce
(
SOperatorStackItem
*
item
,
SQueryProfEvent
*
event
,
SArray
*
opStack
,
SHashObj
*
profResults
)
{
item
->
endTime
=
event
->
eventTime
;
item
->
selfTime
=
(
item
->
endTime
-
item
->
beginTime
)
-
(
item
->
descendantsTime
);
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
opStack
);
++
j
)
{
SOperatorStackItem
*
ancestor
=
taosArrayGet
(
opStack
,
j
);
ancestor
->
descendantsTime
+=
item
->
selfTime
;
}
uint8_t
operatorType
=
item
->
operatorType
;
SOperatorProfResult
*
result
=
taosHashGet
(
profResults
,
&
operatorType
,
sizeof
(
operatorType
));
if
(
result
!=
NULL
)
{
result
->
sumRunTimes
++
;
result
->
sumSelfTime
+=
item
->
selfTime
;
}
else
{
SOperatorProfResult
opResult
;
opResult
.
operatorType
=
operatorType
;
opResult
.
sumSelfTime
=
item
->
selfTime
;
opResult
.
sumRunTimes
=
1
;
taosHashPut
(
profResults
,
&
(
operatorType
),
sizeof
(
operatorType
),
&
opResult
,
sizeof
(
opResult
));
}
}
void
calculateOperatorProfResults
(
void
)
{
// if (pQInfo->summary.queryProfEvents == NULL) {
// // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId);
// return;
// }
//
// if (pQInfo->summary.operatorProfResults == NULL) {
// // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId);
// return;
// }
SArray
*
opStack
=
taosArrayInit
(
32
,
sizeof
(
SOperatorStackItem
));
if
(
opStack
==
NULL
)
{
return
;
}
#if 0
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults;
for (int i = 0; i < size; ++i) {
SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i);
if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) {
SOperatorStackItem opItem;
opItem.operatorType = event->operatorType;
opItem.beginTime = event->eventTime;
opItem.descendantsTime = 0;
taosArrayPush(opStack, &opItem);
} else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) {
SOperatorStackItem* item = taosArrayPop(opStack);
assert(item->operatorType == event->operatorType);
doOperatorExecProfOnce(item, event, opStack, profResults);
} else if (event->eventType == QUERY_PROF_QUERY_ABORT) {
SOperatorStackItem* item;
while ((item = taosArrayPop(opStack)) != NULL) {
doOperatorExecProfOnce(item, event, opStack, profResults);
}
}
}
#endif
taosArrayDestroy
(
opStack
);
}
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
...
...
@@ -2264,15 +2170,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
if
(
pSummary
->
operatorProfResults
)
{
SOperatorProfResult
*
opRes
=
taosHashIterate
(
pSummary
->
operatorProfResults
,
NULL
);
while
(
opRes
!=
NULL
)
{
// qDebug("QInfo:0x%" PRIx64 " :cost summary: operator : %d, exec times: %" PRId64 ", self time: %" PRId64,
// pQInfo->qId, opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime);
opRes
=
taosHashIterate
(
pSummary
->
operatorProfResults
,
opRes
);
}
}
}
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
...
...
@@ -3523,14 +3420,13 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pInfo
=
&
pAggInfo
->
binfo
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int64_t
st
=
taosGetTimestampUs
();
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
@@ -3576,6 +3472,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
0
);
OPTR_SET_OPENED
(
pOperator
);
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3590,6 +3488,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
pTaskInfo
->
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -3599,7 +3498,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted
(
pOperator
);
}
return
(
blockDataGetNumOfRows
(
pInfo
->
pRes
)
!=
0
)
?
pInfo
->
pRes
:
NULL
;
size_t
rows
=
blockDataGetNumOfRows
(
pInfo
->
pRes
);
//pInfo->pRes : NULL;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
return
(
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
void
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
SAggSupporter
*
pSup
,
SOptrBasicInfo
*
pInfo
,
char
**
result
,
...
...
@@ -3825,22 +3727,25 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
#endif
int64_t
st
=
0
;
int32_t
order
=
0
;
int32_t
scanFlag
=
0
;
if
(
pOperator
->
cost
.
openCost
==
0
)
{
st
=
taosGetTimestampUs
();
}
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
// The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
doSetOperatorCompleted
(
pOperator
);
break
;
}
#if 0
// Return result of the previous group in the firstly.
if (false) {
if (pRes->info.rows > 0) {
...
...
@@ -3850,6 +3755,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs);
}
}
#endif
// the pDataBlock are always the same one, no need to call this again
int32_t
code
=
getTableScanInfo
(
pOperator
->
pDownstream
[
0
],
&
order
,
&
scanFlag
);
...
...
@@ -3875,8 +3781,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pProjectInfo
->
curOutput
+=
pInfo
->
pRes
->
info
.
rows
;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
return
(
pInfo
->
pRes
->
info
.
rows
>
0
)
?
pInfo
->
pRes
:
NULL
;
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
if
(
pOperator
->
cost
.
openCost
==
0
)
{
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
return
(
rows
>
0
)
?
pInfo
->
pRes
:
NULL
;
}
static
void
doHandleRemainBlockForNewGroupImpl
(
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
bool
*
newgroup
,
...
...
@@ -3933,10 +3845,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
pDownstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
pDownstream
->
fpSet
.
getNextFn
(
pDownstream
);
publishOperatorProfEvent
(
pDownstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
*
newgroup
)
{
assert
(
pBlock
!=
NULL
);
}
...
...
@@ -5213,16 +5122,21 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
}
}
(
*
pRes
)[
*
resNum
].
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
(
*
pRes
)[
*
resNum
].
startupCost
=
operatorInfo
->
cost
.
openCost
;
(
*
pRes
)[
*
resNum
].
totalCost
=
operatorInfo
->
cost
.
totalCost
;
SExplainExecInfo
*
pInfo
=
&
(
*
pRes
)[
*
resNum
];
pInfo
->
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
pInfo
->
startupCost
=
operatorInfo
->
cost
.
openCost
;
pInfo
->
totalCost
=
operatorInfo
->
cost
.
totalCost
;
if
(
operatorInfo
->
fpSet
.
getExplainFn
)
{
int32_t
code
=
(
*
operatorInfo
->
fpSet
.
getExplainFn
)(
operatorInfo
,
&
(
*
pRes
)
->
verboseInfo
);
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
pInfo
->
verboseInfo
,
&
pInfo
->
verboseLen
);
if
(
code
)
{
qError
(
"
operator getExplainFn failed, error:%s"
,
tstrerror
(
code
));
qError
(
"
%s operator getExplainFn failed, code:%s"
,
GET_TASKID
(
operatorInfo
->
pTaskInfo
)
,
tstrerror
(
code
));
return
code
;
}
}
else
{
pInfo
->
verboseLen
=
0
;
pInfo
->
verboseInfo
=
NULL
;
}
++
(
*
resNum
);
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
60e7e2ae
...
...
@@ -270,24 +270,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pRes
->
info
.
rows
==
0
||
!
hashRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
doSetOperatorCompleted
(
pOperator
)
;
}
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pRes
;
}
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
int32_t
code
=
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
scanFlag
,
true
);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
pScalarExprInfo
!=
NULL
)
{
...
...
@@ -297,7 +302,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
doHashGroupbyAgg
(
pOperator
,
pBlock
);
}
...
...
@@ -319,7 +323,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
bool
hasRemain
=
hashRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
doSetOperatorCompleted
(
pOperator
)
;
break
;
}
...
...
@@ -328,7 +332,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
}
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pRes
;
size_t
rows
=
pRes
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
return
(
rows
==
0
)
?
NULL
:
pRes
;
}
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
...
...
@@ -574,9 +581,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
60e7e2ae
...
...
@@ -98,9 +98,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
// todo extract method
if
(
pJoinInfo
->
pLeft
==
NULL
||
pJoinInfo
->
leftPos
>=
pJoinInfo
->
pLeft
->
info
.
rows
)
{
SOperatorInfo
*
ds1
=
pOperator
->
pDownstream
[
0
];
publishOperatorProfEvent
(
ds1
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pJoinInfo
->
pLeft
=
ds1
->
fpSet
.
getNextFn
(
ds1
);
publishOperatorProfEvent
(
ds1
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
pJoinInfo
->
leftPos
=
0
;
if
(
pJoinInfo
->
pLeft
==
NULL
)
{
...
...
@@ -111,9 +109,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if
(
pJoinInfo
->
pRight
==
NULL
||
pJoinInfo
->
rightPos
>=
pJoinInfo
->
pRight
->
info
.
rows
)
{
SOperatorInfo
*
ds2
=
pOperator
->
pDownstream
[
1
];
publishOperatorProfEvent
(
ds2
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pJoinInfo
->
pRight
=
ds2
->
fpSet
.
getNextFn
(
ds2
);
publishOperatorProfEvent
(
ds2
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
pJoinInfo
->
rightPos
=
0
;
if
(
pJoinInfo
->
pRight
==
NULL
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
60e7e2ae
...
...
@@ -253,9 +253,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
addTagPseudoColumnData
(
pTableScanInfo
,
pBlock
);
}
// todo record the filter time cost
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColMatchInfo
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
if
(
pBlock
->
info
.
rows
==
0
)
{
pCost
->
filterOutBlocks
+=
1
;
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
...
...
@@ -347,6 +350,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
SSDataBlock
*
pBlock
=
pTableScanInfo
->
pResBlock
;
int64_t
st
=
taosGetTimestampUs
();
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
dataReader
))
{
if
(
isTaskKilled
(
pOperator
->
pTaskInfo
))
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
...
...
@@ -366,6 +371,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue
;
}
pOperator
->
resultInfo
.
totalRows
=
pTableScanInfo
->
readRecorder
.
totalRows
;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
cost
.
totalCost
=
pTableScanInfo
->
readRecorder
.
elapsedTime
;
return
pBlock
;
}
...
...
@@ -452,6 +461,15 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
return
interval
;
}
static
int32_t
getTableScannerExecInfo
(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
)
{
SFileBlockLoadRecorder
*
pRecorder
=
taosMemoryCalloc
(
1
,
sizeof
(
SFileBlockLoadRecorder
));
STableScanInfo
*
pTableScanInfo
=
pOptr
->
info
;
*
pRecorder
=
pTableScanInfo
->
readRecorder
;
*
pOptrExplain
=
pRecorder
;
*
len
=
sizeof
(
SFileBlockLoadRecorder
);
return
0
;
}
static
void
destroyTableScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
STableScanInfo
*
pTableScanInfo
=
(
STableScanInfo
*
)
param
;
taosMemoryFree
(
pTableScanInfo
->
pResBlock
);
...
...
@@ -509,14 +527,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
NULL
,
destroyTableScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
static
int32_t
cost
=
0
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
NULL
,
destroyTableScanOperatorInfo
,
NULL
,
NULL
,
getTableScannerExecInfo
);
// for non-blocking operator, the open cost is always 0
pOperator
->
cost
.
openCost
=
0
;
pOperator
->
cost
.
totalCost
=
++
cost
;
pOperator
->
resultInfo
.
totalRows
=
++
cost
;
return
pOperator
;
}
...
...
@@ -1603,18 +1617,20 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR
(
str
,
mr
.
me
.
name
);
colDataAppend
(
pDst
,
count
,
str
,
false
);
}
else
{
// it is a tag value
if
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_JSON
){
const
uint8_t
*
tmp
=
mr
.
me
.
ctbEntry
.
pTags
;
char
*
data
=
taosMemoryCalloc
(
kvRowLen
(
tmp
)
+
1
,
1
);
if
(
data
==
NULL
){
qError
(
"doTagScan calloc error:%d"
,
kvRowLen
(
tmp
)
+
1
);
return
NULL
;
if
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_JSON
)
{
const
uint8_t
*
tmp
=
mr
.
me
.
ctbEntry
.
pTags
;
// TODO opt perf by realloc memory
char
*
data
=
taosMemoryCalloc
(
kvRowLen
(
tmp
)
+
1
,
1
);
if
(
data
==
NULL
)
{
qError
(
"%s failed to malloc memory, size:%d"
,
GET_TASKID
(
pTaskInfo
),
kvRowLen
(
tmp
)
+
1
);
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_OUT_OF_MEMORY
);
}
*
data
=
TSDB_DATA_TYPE_JSON
;
memcpy
(
data
+
1
,
tmp
,
kvRowLen
(
tmp
));
memcpy
(
data
+
1
,
tmp
,
kvRowLen
(
tmp
));
colDataAppend
(
pDst
,
count
,
data
,
false
);
taosMemoryFree
(
data
);
}
else
{
}
else
{
const
char
*
p
=
metaGetTableTagVal
(
&
mr
.
me
,
pExprInfo
[
j
].
base
.
pParam
[
0
].
pCol
->
colId
);
colDataAppend
(
pDst
,
count
,
p
,
(
p
==
NULL
));
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
60e7e2ae
...
...
@@ -782,13 +782,11 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
int32_t
scanFlag
=
MAIN_SCAN
;
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
@@ -821,6 +819,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
order
);
OPTR_SET_OPENED
(
pOperator
);
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -946,10 +946,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
@@ -998,7 +995,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted
(
pOperator
);
}
return
pBlock
->
info
.
rows
==
0
?
NULL
:
pBlock
;
size_t
rows
=
pBlock
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
return
(
rows
==
0
)
?
NULL
:
pBlock
;
}
}
...
...
@@ -1092,10 +1092,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SArray
*
pUpdated
=
NULL
;
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
@@ -1425,9 +1422,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
@@ -1472,9 +1467,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
...
...
@@ -1702,12 +1695,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
if
(
pBlock
->
info
.
type
==
STREAM_REPROCESS
)
{
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录