Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7ba56169
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
7ba56169
编写于
12月 03, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
12月 03, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18652 from taosdata/fix/liao_cov
refactor: add function for extract the required buffer during query.
上级
a4385717
de0022b4
变更
15
显示空白变更内容
内联
并排
Showing
15 changed file
with
95 addition
and
135 deletion
+95
-135
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+11
-10
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+1
-1
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+1
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+12
-7
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+22
-53
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+2
-2
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+3
-3
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+1
-1
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+3
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+9
-9
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+4
-4
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+2
-2
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+16
-22
source/libs/function/src/detail/tavgfunction.c
source/libs/function/src/detail/tavgfunction.c
+7
-16
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
7ba56169
...
...
@@ -46,7 +46,6 @@ extern "C" {
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
...
...
@@ -109,6 +108,7 @@ typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
);
typedef
int32_t
(
*
__optr_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
typedef
int32_t
(
*
__optr_reqBuf_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
...
...
@@ -170,8 +170,9 @@ struct SExecTaskInfo {
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
qbufQuota
;
// total available buffer (in KB) during execution query
int64_t
version
;
// used for stream to record wal version
int64_t
version
;
// used for stream to record wal version
, why not move to sschemainfo
SStreamTaskInfo
streamInfo
;
SSchemaInfo
schemaInfo
;
STableListInfo
*
pTableInfoList
;
// this is a table list
...
...
@@ -198,6 +199,7 @@ typedef struct SOperatorFpSet {
__optr_fn_t
getNextFn
;
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_reqBuf_fn_t
reqBufFn
;
// total used buffer for blocking operator
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_explain_fn_t
getExplainFn
;
...
...
@@ -486,12 +488,10 @@ typedef struct STableCountScanSupp {
int16_t
dbNameSlotId
;
int16_t
stbNameSlotId
;
int16_t
tbCountSlotId
;
bool
groupByDbName
;
bool
groupByStbName
;
char
dbNameFilter
[
TSDB_DB_NAME_LEN
];
char
stbNameFilter
[
TSDB_TABLE_NAME_LEN
];
}
STableCountScanSupp
;
typedef
struct
STableCountScanOperatorInfo
{
...
...
@@ -671,13 +671,14 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_explain_fn_t
explain
);
int32_t
op
erato
rDummyOpenFn
(
SOperatorInfo
*
pOperator
);
__optr_close_fn_t
closeFn
,
__optr_
reqBuf_fn_t
reqBufFn
,
__optr_
explain_fn_t
explain
);
int32_t
op
t
rDummyOpenFn
(
SOperatorInfo
*
pOperator
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
void
setOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
setOperatorInfo
(
SOperatorInfo
*
pOperator
,
const
char
*
name
,
int32_t
type
,
bool
blocking
,
int32_t
status
,
void
*
pInfo
,
SExecTaskInfo
*
pTaskInfo
);
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
);
void
initBasicInfo
(
SOptrBasicInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
);
...
...
@@ -740,6 +741,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
readHandle
,
SSystemTableScanPhysiNode
*
pScanPhyNode
,
const
char
*
pUser
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableCountScanOperatorInfo
(
SReadHandle
*
handle
,
STableCountScanPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIndefinitOutputOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -812,8 +815,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
int32_t
getMaximumIdleDurationSec
();
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getNumOfRowsInTimeWindow
(
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
7ba56169
...
...
@@ -117,7 +117,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doScanCache
,
NULL
,
destroyCacheScanOperator
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doScanCache
,
NULL
,
destroyCacheScanOperator
,
optrDefaultBufFn
,
NULL
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
7ba56169
...
...
@@ -307,7 +307,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pDummyBlock
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
prepareLoadRemoteData
,
loadRemoteData
,
NULL
,
destroyExchangeOperatorInfo
,
NULL
);
createOperatorFpSet
(
prepareLoadRemoteData
,
loadRemoteData
,
NULL
,
destroyExchangeOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/executor/src/executor.c
浏览文件 @
7ba56169
...
...
@@ -207,7 +207,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
cost
.
created
=
taosGetTimestamp
M
s
();
pTaskInfo
->
cost
.
created
=
taosGetTimestamp
U
s
();
pTaskInfo
->
execModel
=
OPTR_EXEC_MODEL_QUEUE
;
pTaskInfo
->
pRoot
=
createRawScanOperatorInfo
(
readers
,
pTaskInfo
);
if
(
NULL
==
pTaskInfo
->
pRoot
)
{
...
...
@@ -503,7 +503,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
}
if
(
pTaskInfo
->
cost
.
start
==
0
)
{
pTaskInfo
->
cost
.
start
=
taosGetTimestamp
M
s
();
pTaskInfo
->
cost
.
start
=
taosGetTimestamp
U
s
();
}
if
(
isTaskKilled
(
pTaskInfo
))
{
...
...
@@ -597,7 +597,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
}
if
(
pTaskInfo
->
cost
.
start
==
0
)
{
pTaskInfo
->
cost
.
start
=
taosGetTimestamp
M
s
();
pTaskInfo
->
cost
.
start
=
taosGetTimestamp
U
s
();
}
if
(
isTaskKilled
(
pTaskInfo
))
{
...
...
@@ -706,15 +706,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
static
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
int64_t
idleTime
=
pSummary
->
start
-
pSummary
->
created
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
"createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
GET_TASKID
(
pTaskInfo
),
idleTime
/
1000
.
0
,
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
else
{
qDebug
(
"%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
idleTime
/
1000
.
0
,
pSummary
->
elapsedTime
/
1000
.
0
);
}
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
7ba56169
...
...
@@ -85,8 +85,6 @@ typedef struct SAggOperatorInfo {
SExprSupp
scalarExprSup
;
}
SAggOperatorInfo
;
int32_t
getMaximumIdleDurationSec
()
{
return
tsShellActivityTimer
*
2
;
}
static
void
setBlockSMAInfo
(
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExpr
,
SSDataBlock
*
pBlock
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
...
...
@@ -106,7 +104,7 @@ void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_EXEC_DONE
;
ASSERT
(
pOperator
->
pTaskInfo
!=
NULL
);
pOperator
->
cost
.
totalCost
=
(
taosGetTimestampUs
()
-
pOperator
->
pTaskInfo
->
cost
.
start
*
1000
)
/
1000
.
0
;
pOperator
->
cost
.
totalCost
=
(
taosGetTimestampUs
()
-
pOperator
->
pTaskInfo
->
cost
.
start
)
/
1000
.
0
;
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
}
...
...
@@ -120,19 +118,21 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
pOperator
->
pTaskInfo
=
pTaskInfo
;
}
int32_t
op
erato
rDummyOpenFn
(
SOperatorInfo
*
pOperator
)
{
int32_t
op
t
rDummyOpenFn
(
SOperatorInfo
*
pOperator
)
{
OPTR_SET_OPENED
(
pOperator
);
pOperator
->
cost
.
openCost
=
0
;
return
TSDB_CODE_SUCCESS
;
}
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_explain_fn_t
explain
)
{
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
__optr_explain_fn_t
explain
)
{
SOperatorFpSet
fpSet
=
{
.
_openFn
=
openFn
,
.
getNextFn
=
nextFn
,
.
cleanupFn
=
cleanup
,
.
closeFn
=
closeFn
,
.
reqBufFn
=
reqBufFn
,
.
getExplainFn
=
explain
,
};
...
...
@@ -939,10 +939,10 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
}
static
void
doUpdateNumOfRows
(
SqlFunctionCtx
*
pCtx
,
SResultRow
*
pRow
,
int32_t
numOfExprs
,
const
int32_t
*
row
Cell
Offset
)
{
const
int32_t
*
row
Entry
Offset
)
{
bool
returnNotNull
=
false
;
for
(
int32_t
j
=
0
;
j
<
numOfExprs
;
++
j
)
{
struct
SResultRowEntryInfo
*
pResInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowCell
Offset
);
SResultRowEntryInfo
*
pResInfo
=
getResultEntryInfo
(
pRow
,
j
,
rowEntry
Offset
);
if
(
!
isRowEntryInitialized
(
pResInfo
))
{
continue
;
}
...
...
@@ -1145,43 +1145,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
}
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
// return;
// }
//
// pQueryAttr->pos = 0;
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
//
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
//
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pTsdbReadHandle)) {
// if (isTaskKilled(pRuntimeEnv->qinfo)) {
// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// }
//
// if (pQueryAttr->limit.offset > blockInfo.rows) {
// pQueryAttr->limit.offset -= blockInfo.rows;
// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
// pTableQueryInfo->lastKey += step;
//
// //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
// pQuery->limit.offset);
// } else { // find the appropriated start position in current block
// updateOffsetVal(pRuntimeEnv, &blockInfo);
// break;
// }
// }
//
// if (terrno != TSDB_CODE_SUCCESS) {
// T_LONG_JMP(pRuntimeEnv->env, terrno);
// }
// }
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
...
...
@@ -1406,11 +1369,11 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
SqlFunctionCtx
*
pCtx
=
pOperator
->
exprSupp
.
pCtx
;
bool
hasCountFunc
=
false
;
for
(
int32_t
i
=
0
;
i
<
pOperator
->
exprSupp
.
numOfExprs
;
++
i
)
{
if
((
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"count"
)
==
0
)
||
(
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"hyperloglog"
)
==
0
)
||
(
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_hyperloglog_partial"
)
==
0
)
||
(
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_hyperloglog_merge"
)
==
0
))
{
const
char
*
pName
=
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
;
if
((
strcmp
(
pName
,
"count"
)
==
0
)
||
(
strcmp
(
pName
,
"hyperloglog"
)
==
0
)
||
(
strcmp
(
pName
,
"_hyperloglog_partial"
)
==
0
)
||
(
strcmp
(
pName
,
"_hyperloglog_merge"
)
==
0
))
{
hasCountFunc
=
true
;
break
;
}
...
...
@@ -1463,7 +1426,6 @@ static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppB
*
ppBlock
=
NULL
;
}
// this is a blocking operator
static
int32_t
doOpenAggregateOptr
(
SOperatorInfo
*
pOperator
)
{
if
(
OPTR_IS_OPENED
(
pOperator
))
{
...
...
@@ -1614,6 +1576,15 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
taosMemoryFreeClear
(
pOperator
);
}
// each operator should be set their own function to return total cost buffer
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
blocking
)
{
ASSERT
(
0
);
}
else
{
return
0
;
}
}
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
)
{
*
defaultPgsz
=
4096
;
while
(
*
defaultPgsz
<
rowSize
*
4
)
{
...
...
@@ -1794,7 +1765,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
setOperatorInfo
(
pOperator
,
"TableAggregate"
,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenAggregateOptr
,
getAggregateResult
,
NULL
,
destroyAggOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenAggregateOptr
,
getAggregateResult
,
NULL
,
destroyAggOperatorInfo
,
optrDefaultBufFn
,
NULL
);
if
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pTableScanInfo
=
downstream
->
info
;
...
...
@@ -1855,7 +1826,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
schemaInfo
.
dbname
=
strdup
(
dbFName
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
execModel
=
model
;
pTaskInfo
->
pTableInfoList
=
tableListCreate
();
...
...
@@ -1871,8 +1841,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
);
SOperatorInfo
*
createTableCountScanOperatorInfo
(
SReadHandle
*
handle
,
STableCountScanPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
extractTableSchemaInfo
(
SReadHandle
*
pHandle
,
SScanPhysiNode
*
pScanNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
...
...
@@ -2364,6 +2332,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto
_complete
;
}
(
*
pTaskInfo
)
->
cost
.
created
=
taosGetTimestampUs
();
return
TSDB_CODE_SUCCESS
;
_complete:
...
...
source/libs/executor/src/filloperator.c
浏览文件 @
7ba56169
...
...
@@ -381,7 +381,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
setOperatorInfo
(
pOperator
,
"FillOperator"
,
QUERY_NODE_PHYSICAL_PLAN_FILL
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
pInfo
->
numOfExpr
;
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doFill
,
NULL
,
destroyFillOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doFill
,
NULL
,
destroyFillOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -1478,7 +1478,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pInfo
->
srcRowIndex
=
0
;
setOperatorInfo
(
pOperator
,
"StreamFillOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doStreamFill
,
NULL
,
destroyStreamFillOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doStreamFill
,
NULL
,
destroyStreamFillOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
7ba56169
...
...
@@ -470,7 +470,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
setOperatorInfo
(
pOperator
,
"GroupbyAggOperator"
,
0
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
destroyGroupOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
destroyGroupOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -850,7 +850,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
hashPartition
,
NULL
,
destroyPartitionOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
hashPartition
,
NULL
,
destroyPartitionOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
...
...
@@ -1142,7 +1142,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doStreamHashPartition
,
NULL
,
destroyStreamPartitionOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doStreamHashPartition
,
NULL
,
destroyStreamPartitionOperatorInfo
,
optrDefaultBufFn
,
NULL
);
initParDownStream
(
downstream
,
&
pInfo
->
partitionSup
,
&
pInfo
->
scalarSup
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
7ba56169
...
...
@@ -136,7 +136,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo
->
inputOrder
=
TSDB_ORDER_DESC
;
}
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doMergeJoin
,
NULL
,
destroyMergeJoinOperator
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doMergeJoin
,
NULL
,
destroyMergeJoinOperator
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
7ba56169
...
...
@@ -118,8 +118,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pOperator
->
exprSupp
.
pCtx
,
numOfCols
);
setOperatorInfo
(
pOperator
,
"ProjectOperator"
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
erato
rDummyOpenFn
,
doProjectOperation
,
NULL
,
destroyProjectOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
t
rDummyOpenFn
,
doProjectOperation
,
NULL
,
destroyProjectOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -415,7 +415,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pSup
->
pCtx
,
numOfExpr
);
setOperatorInfo
(
pOperator
,
"IndefinitOperator"
,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doApplyIndefinitFunction
,
NULL
,
destroyIndefinitOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doApplyIndefinitFunction
,
NULL
,
destroyIndefinitOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
7ba56169
...
...
@@ -899,8 +899,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
taosLRUCacheSetStrictCapacity
(
pInfo
->
base
.
metaCache
.
pTableMetaEntryCache
,
false
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
erato
rDummyOpenFn
,
doTableScan
,
NULL
,
destroyTableScanOperatorInfo
,
getTableScannerExecInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
t
rDummyOpenFn
,
doTableScan
,
NULL
,
destroyTableScanOperatorInfo
,
optrDefaultBufFn
,
getTableScannerExecInfo
);
// for non-blocking operator, the open cost is always 0
pOperator
->
cost
.
openCost
=
0
;
...
...
@@ -925,7 +925,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
setOperatorInfo
(
pOperator
,
"TableSeqScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doTableScanImpl
,
NULL
,
NULL
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doTableScanImpl
,
NULL
,
NULL
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
}
...
...
@@ -2139,7 +2139,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
setOperatorInfo
(
pOperator
,
"RawScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doRawScan
,
NULL
,
destroyRawScanOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doRawScan
,
NULL
,
destroyRawScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_end:
...
...
@@ -2328,7 +2328,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
__optr_fn_t
nextFn
=
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
?
doStreamScan
:
doQueueScan
;
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
nextFn
,
NULL
,
destroyStreamScanOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
nextFn
,
NULL
,
destroyStreamScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
...
...
@@ -2465,7 +2465,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doTagScan
,
NULL
,
destroyTagScanOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doTagScan
,
NULL
,
destroyTagScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
...
...
@@ -2866,8 +2866,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
fpSet
=
createOperatorFpSet
(
op
erato
rDummyOpenFn
,
doTableMergeScan
,
NULL
,
destroyTableMergeScanOperatorInfo
,
getTableMergeScanExplainExecInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
t
rDummyOpenFn
,
doTableMergeScan
,
NULL
,
destroyTableMergeScanOperatorInfo
,
optrDefaultBufFn
,
getTableMergeScanExplainExecInfo
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
...
...
@@ -3015,7 +3015,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo
(
pOperator
,
"TableCountScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doTableCountScan
,
NULL
,
destoryTableCountScanOperator
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doTableCountScan
,
NULL
,
destoryTableCountScanOperator
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
7ba56169
...
...
@@ -75,7 +75,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
// TODO dynamic set the available sort buffer
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenSortOperator
,
doSort
,
NULL
,
destroySortOperatorInfo
,
getExplainExecInfo
);
createOperatorFpSet
(
doOpenSortOperator
,
doSort
,
NULL
,
destroySortOperatorInfo
,
optrDefaultBufFn
,
getExplainExecInfo
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -521,8 +521,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pInfo
->
pSortInfo
=
createSortInfo
(
pSortPhyNode
->
pSortKeys
);
setOperatorInfo
(
pOperator
,
"GroupSortOperator"
,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
erato
rDummyOpenFn
,
doGroupSort
,
NULL
,
destroyGroupSortOperatorInfo
,
getGroupSortExplainExecInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
t
rDummyOpenFn
,
doGroupSort
,
NULL
,
destroyGroupSortOperatorInfo
,
optrDefaultBufFn
,
getGroupSortExplainExecInfo
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -798,7 +798,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
setOperatorInfo
(
pOperator
,
"MultiwayMergeOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
openMultiwayMergeOperator
,
doMultiwayMerge
,
NULL
,
destroyMultiwayMergeOperatorInfo
,
getMultiwayMergeExplainExecInfo
);
destroyMultiwayMergeOperatorInfo
,
optrDefaultBufFn
,
getMultiwayMergeExplainExecInfo
);
code
=
appendDownstream
(
pOperator
,
downStreams
,
numStreams
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
7ba56169
...
...
@@ -1437,7 +1437,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo
(
pOperator
,
"SysTableScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
@@ -1943,7 +1943,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo
(
pOperator
,
"DataBlockDistScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/executor/src/timesliceoperator.c
浏览文件 @
7ba56169
...
...
@@ -544,7 +544,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
setOperatorInfo
(
pOperator
,
"TimeSliceOperator"
,
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doTimeslice
,
NULL
,
destroyTimeSliceOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
trDummyOpenFn
,
doTimeslice
,
NULL
,
destroyTimeSliceOperatorInfo
,
optrDefaultBufFn
,
NULL
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
7ba56169
...
...
@@ -1050,13 +1050,13 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SIntervalAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
int32_t
scanFlag
=
MAIN_SCAN
;
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
...
...
@@ -1268,9 +1268,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
}
SSDataBlock
*
pBlock
=
pInfo
->
binfo
.
pRes
;
ASSERT
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_BATCH
);
pTaskInfo
->
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
...
...
@@ -1765,7 +1762,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
interval
=
interval
;
pInfo
->
execModel
=
pTaskInfo
->
execModel
;
pInfo
->
twAggSup
=
as
;
pInfo
->
binfo
.
mergeResultBlock
=
pPhyNode
->
window
.
mergeDataBlock
;
...
...
@@ -1797,7 +1793,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
destroyIntervalOperatorInfo
,
NULL
);
createOperatorFpSet
(
doOpenIntervalAgg
,
doBuildIntervalResult
,
NULL
,
destroyIntervalOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2015,7 +2011,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
setOperatorInfo
(
pOperator
,
"StateWindowOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
openStateWindowAggOptr
,
doStateWindowAgg
,
NULL
,
destroyStateWindowOperatorInfo
,
NULL
);
createOperatorFpSet
(
openStateWindowAggOptr
,
doStateWindowAgg
,
NULL
,
destroyStateWindowOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2088,7 +2084,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
setOperatorInfo
(
pOperator
,
"SessionWindowAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doSessionWindowAgg
,
NULL
,
destroySWindowOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doSessionWindowAgg
,
NULL
,
destroySWindowOperatorInfo
,
optrDefaultBufFn
,
NULL
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2749,7 +2745,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
NULL
);
createOperatorFpSet
(
NULL
,
doStreamFinalIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
optrDefaultBufFn
,
NULL
);
if
(
pPhyNode
->
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
)
{
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
}
...
...
@@ -3561,7 +3557,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
setOperatorInfo
(
pOperator
,
"StreamSessionWindowAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
optrDefaultBufFn
,
NULL
);
if
(
downstream
)
{
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pOperator
->
operatorType
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
twAggSup
);
...
...
@@ -3705,8 +3701,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
if
(
pPhyNode
->
type
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
)
{
pInfo
->
pUpdateRes
=
createSpecialDataBlock
(
STREAM_CLEAR
);
blockDataEnsureCapacity
(
pInfo
->
pUpdateRes
,
128
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
erato
rDummyOpenFn
,
doStreamSessionSemiAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
t
rDummyOpenFn
,
doStreamSessionSemiAgg
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
optrDefaultBufFn
,
NULL
);
}
setOperatorInfo
(
pOperator
,
name
,
pPhyNode
->
type
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
...
...
@@ -4066,7 +4062,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo
(
pOperator
,
"StreamStateAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doStreamStateAgg
,
NULL
,
destroyStreamStateOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doStreamStateAgg
,
NULL
,
destroyStreamStateOperatorInfo
,
optrDefaultBufFn
,
NULL
);
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pOperator
->
operatorType
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
twAggSup
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4314,7 +4310,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo
->
win
=
pTaskInfo
->
window
;
iaInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
iaInfo
->
interval
=
interval
;
iaInfo
->
execModel
=
pTaskInfo
->
execModel
;
iaInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pNode
->
window
.
pTspk
)
->
slotId
;
iaInfo
->
binfo
.
mergeResultBlock
=
pNode
->
window
.
mergeDataBlock
;
...
...
@@ -4344,7 +4339,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
false
,
OP_NOT_OPENED
,
miaInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
mergeAlignedIntervalAgg
,
NULL
,
destroyMAIOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
mergeAlignedIntervalAgg
,
NULL
,
destroyMAIOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4620,7 +4615,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
pIntervalInfo
->
win
=
pTaskInfo
->
window
;
pIntervalInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
pIntervalInfo
->
interval
=
interval
;
pIntervalInfo
->
execModel
=
pTaskInfo
->
execModel
;
pIntervalInfo
->
binfo
.
mergeResultBlock
=
pIntervalPhyNode
->
window
.
mergeDataBlock
;
pIntervalInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
...
...
@@ -4650,7 +4644,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
setOperatorInfo
(
pOperator
,
"TimeMergeIntervalAggOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
,
false
,
OP_NOT_OPENED
,
pMergeIntervalInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
op
eratorDummyOpenFn
,
doMergeIntervalAgg
,
NULL
,
destroyMergeIntervalOperatorInfo
,
NULL
);
createOperatorFpSet
(
op
trDummyOpenFn
,
doMergeIntervalAgg
,
NULL
,
destroyMergeIntervalOperatorInfo
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4866,8 +4860,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorInfo
(
pOperator
,
"StreamIntervalOperator"
,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doStreamIntervalAgg
,
NULL
,
destroyStreamFinalIntervalOperatorInfo
,
optrDefaultBufFn
,
NULL
);
initIntervalDownStream
(
downstream
,
pPhyNode
->
type
,
&
pInfo
->
aggSup
,
&
pInfo
->
interval
,
&
pInfo
->
twAggSup
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
source/libs/function/src/detail/tavgfunction.c
浏览文件 @
7ba56169
...
...
@@ -270,7 +270,7 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
#if __AVX2__
// find the start position that are aligned to 32bytes address in memory
int32_t
width
=
(
bitWidth
>>
3u
)
/
sizeof
(
int64_t
);
int32_t
width
=
(
bitWidth
>>
3u
)
/
sizeof
(
int64_t
);
int32_t
remainder
=
numOfRows
%
width
;
int32_t
rounds
=
numOfRows
/
width
;
...
...
@@ -286,21 +286,12 @@ static void i64VectorSumAVX2(const int64_t* plist, int32_t numOfRows, SAvgRes* p
}
// let sum up the final results
if
(
type
==
TSDB_DATA_TYPE_BIGINT
)
{
const
int64_t
*
q
=
(
const
int64_t
*
)
&
sum
;
pRes
->
sum
.
isum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
isum
+=
plist
[
j
+
rounds
*
width
];
}
}
else
{
const
uint64_t
*
q
=
(
const
uint64_t
*
)
&
sum
;
pRes
->
sum
.
usum
+=
q
[
0
]
+
q
[
1
]
+
q
[
2
]
+
q
[
3
];
for
(
int32_t
j
=
0
;
j
<
remainder
;
++
j
)
{
pRes
->
sum
.
usum
+=
(
uint64_t
)
plist
[
j
+
rounds
*
width
];
}
}
#endif
}
...
...
@@ -588,14 +579,14 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
const
int64_t
*
plist
=
(
const
int64_t
*
)
pCol
->
pData
;
// 1. If the CPU supports AVX, let's employ AVX instructions to speedup this loop
if
(
simdAvailable
)
{
if
(
simdAvailable
&&
type
==
TSDB_DATA_TYPE_BIGINT
)
{
i64VectorSumAVX2
(
plist
,
numOfRows
,
pAvgRes
);
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
type
==
TSDB_DATA_TYPE_BIGINT
)
{
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
else
{
pAvgRes
->
sum
.
i
sum
+=
(
uint64_t
)
plist
[
i
];
pAvgRes
->
sum
.
u
sum
+=
(
uint64_t
)
plist
[
i
];
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录