Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1b8c5d92
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1b8c5d92
编写于
6月 29, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(query): add api to extract scan status
上级
02e450c5
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
69 addition
and
44 deletion
+69
-44
include/libs/executor/executor.h
include/libs/executor/executor.h
+9
-7
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+23
-21
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+7
-10
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+21
-3
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+7
-1
未找到文件。
include/libs/executor/executor.h
浏览文件 @
1b8c5d92
...
...
@@ -36,7 +36,6 @@ typedef struct SReadHandle {
void
*
vnode
;
void
*
mnd
;
SMsgCb
*
pMsgCb
;
// int8_t initTsdbReader;
}
SReadHandle
;
enum
{
...
...
@@ -140,12 +139,6 @@ int32_t qKillTask(qTaskInfo_t tinfo);
*/
int32_t
qAsyncKillTask
(
qTaskInfo_t
tinfo
);
/**
* return whether query is completed or not
* @param tinfo
* @return
*/
int32_t
qIsTaskCompleted
(
qTaskInfo_t
tinfo
);
/**
* destroy query info structure
...
...
@@ -176,6 +169,15 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t
qDeserializeTaskStatus
(
qTaskInfo_t
tinfo
,
const
char
*
pInput
,
int32_t
len
);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
* @param uid
* @param ts
* @return
*/
int32_t
qGetStreamScanStatus
(
qTaskInfo_t
tinfo
,
uint64_t
*
uid
,
int64_t
*
ts
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
1b8c5d92
...
...
@@ -253,18 +253,15 @@ typedef struct STableScanInfo {
SReadHandle
readHandle
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SqlFunctionCtx
*
pCtx
;
// which belongs to the direct upstream operator operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowEntryInfoOffset
;
SExprInfo
*
pExpr
;
SqlFunctionCtx
*
pCtx
;
// which belongs to the direct upstream operator operator query context
,todo: remove this by using SExprSup
int32_t
*
rowEntryInfoOffset
;
// todo: remove this by using SExprSup
SExprInfo
*
pExpr
;
// todo: remove this by using SExprSup
SSDataBlock
*
pResBlock
;
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
...
...
@@ -275,8 +272,13 @@ typedef struct STableScanInfo {
int32_t
curTWinIdx
;
int32_t
currentGroupId
;
uint64_t
queryId
;
uint64_t
taskId
;
uint64_t
queryId
;
// todo remove it
uint64_t
taskId
;
// todo remove it
struct
{
uint64_t
uid
;
int64_t
t
;
}
scanStatus
;
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
...
...
@@ -321,31 +323,31 @@ typedef struct SessionWindowSupporter {
}
SessionWindowSupporter
;
typedef
struct
SStreamBlockScanInfo
{
uint64_t
tableUid
;
// queried super table uid
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SArray
*
pColMatchInfo
;
//
SNode
*
pCondition
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
SColumnInfo
*
pCols
;
// the output column info
uint64_t
numOfExec
;
// execution times
void
*
streamBlockReader
;
// stream block reader handle
SArray
*
pColMatchInfo
;
//
SNode
*
pCondition
;
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
uint64_t
tableUid
;
// queried super table uid
EStreamScanMode
scanMode
;
SOperatorInfo
*
pSnapshotReadOp
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
...
...
@@ -683,7 +685,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
void
initBasicInfo
(
SOptrBasicInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
);
int32_t
initExprSupp
(
SExprSupp
*
pSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
void
cleanupExprSup
(
SExprSupp
*
pSup
);
void
cleanupExprSup
p
(
SExprSupp
*
pSup
);
int32_t
initAggInfo
(
SExprSupp
*
pSup
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
size_t
keyBufSize
,
const
char
*
pkey
);
void
initResultSizeInfo
(
SOperatorInfo
*
pOperator
,
int32_t
numOfRows
);
...
...
@@ -707,7 +709,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
void
*
pMeta
,
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
);
void
cleanupExecSupp
(
SExprSupp
*
pSupp
);
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
);
SSDataBlock
*
loadNextDataBlock
(
void
*
param
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
1b8c5d92
...
...
@@ -191,16 +191,6 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qIsTaskCompleted
(
qTaskInfo_t
qinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qinfo
;
if
(
pTaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
return
isTaskKilled
(
pTaskInfo
);
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
qDebug
(
"%s execTask completed, numOfRows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
pRoot
->
resultInfo
.
totalRows
);
...
...
@@ -236,3 +226,10 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
}
int32_t
qGetStreamScanStatus
(
qTaskInfo_t
tinfo
,
uint64_t
*
uid
,
int64_t
*
ts
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/executorimpl.c
浏览文件 @
1b8c5d92
...
...
@@ -1034,7 +1034,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
SqlFunctionCtx
*
pCtx
=
pTableScanInfo
->
pCtx
;
uint32_t
status
=
BLK_DATA_NOT_LOAD
;
int32_t
numOfOutput
=
pTableScanInfo
->
numOfOutput
;
int32_t
numOfOutput
=
0
;
//
pTableScanInfo->numOfOutput;
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pCtx
[
i
].
functionId
;
int32_t
colId
=
pTableScanInfo
->
pExpr
[
i
].
base
.
pParam
[
0
].
pCol
->
colId
;
...
...
@@ -2822,6 +2822,24 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
)
{
int32_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamBlockScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pSnapShotScanInfo
=
pScanInfo
->
pSnapshotReadOp
->
info
;
*
uid
=
pSnapShotScanInfo
->
scanStatus
.
uid
;
*
ts
=
pSnapShotScanInfo
->
scanStatus
.
t
;
}
else
{
if
(
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
else
{
doGetScanStatus
(
pOperator
->
pDownstream
[
0
],
uid
,
ts
);
}
}
return
TSDB_CODE_SUCCESS
;
}
// this is a blocking operator
static
int32_t
doOpenAggregateOptr
(
SOperatorInfo
*
pOperator
)
{
if
(
OPTR_IS_OPENED
(
pOperator
))
{
...
...
@@ -3544,7 +3562,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy
(
pInfo
->
pPseudoColInfo
);
}
void
cleanupEx
ec
Supp
(
SExprSupp
*
pSupp
)
{
void
cleanupEx
pr
Supp
(
SExprSupp
*
pSupp
)
{
destroySqlFunctionCtx
(
pSupp
->
pCtx
,
pSupp
->
numOfExprs
);
destroyExprInfo
(
pSupp
->
pExprInfo
,
pSupp
->
numOfExprs
);
...
...
@@ -3557,7 +3575,7 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy
(
pInfo
->
pPseudoColInfo
);
cleanupAggSup
(
&
pInfo
->
aggSup
);
cleanupEx
ec
Supp
(
&
pInfo
->
scalarSup
);
cleanupEx
pr
Supp
(
&
pInfo
->
scalarSup
);
}
void
destroyExchangeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
1b8c5d92
...
...
@@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear
(
pInfo
->
keyBuf
);
taosArrayDestroy
(
pInfo
->
pGroupCols
);
taosArrayDestroy
(
pInfo
->
pGroupColVals
);
cleanupEx
ec
Supp
(
&
pInfo
->
scalarSup
);
cleanupEx
pr
Supp
(
&
pInfo
->
scalarSup
);
}
static
int32_t
initGroupOptrInfo
(
SArray
**
pGroupColVals
,
int32_t
*
keyLen
,
char
**
keyBuf
,
const
SArray
*
pGroupColList
)
{
...
...
@@ -701,7 +701,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosHashCleanup
(
pInfo
->
pGroupSet
);
taosMemoryFree
(
pInfo
->
columnOffset
);
cleanupEx
ec
Supp
(
&
pInfo
->
scalarSup
);
cleanupEx
pr
Supp
(
&
pInfo
->
scalarSup
);
}
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
1b8c5d92
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include <vnode.h>
#include "filter.h"
#include "function.h"
...
...
@@ -413,6 +414,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
cost
.
totalCost
=
pTableScanInfo
->
readRecorder
.
elapsedTime
;
// todo refactor
pTableScanInfo
->
scanStatus
.
uid
=
pBlock
->
info
.
uid
;
pTableScanInfo
->
scanStatus
.
t
=
pBlock
->
info
.
window
.
ekey
;
return
pBlock
;
}
return
NULL
;
...
...
@@ -459,7 +465,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
int32_t
total
=
pTableScanInfo
->
scanInfo
.
numOfAsc
+
pTableScanInfo
->
scanInfo
.
numOfDesc
;
if
(
pTableScanInfo
->
scanTimes
<
total
)
{
if
(
pTableScanInfo
->
cond
.
order
==
TSDB_ORDER_ASC
)
{
prepareForDescendingScan
(
pTableScanInfo
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
);
prepareForDescendingScan
(
pTableScanInfo
,
pTableScanInfo
->
pCtx
,
0
);
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
pTableScanInfo
->
curTWinIdx
=
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录