Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5c2ac1a6
T
TDengine
项目概览
taosdata
/
TDengine
12 个月 前同步成功
通知
1180
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
5c2ac1a6
编写于
11月 30, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 30, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18591 from taosdata/fix/liao_cov
refactor: do some internal refactor.
上级
c0ecd4ce
2c8b962b
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
1535 addition
and
1541 deletion
+1535
-1541
include/libs/executor/executor.h
include/libs/executor/executor.h
+0
-7
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-22
source/libs/executor/inc/tfill.h
source/libs/executor/inc/tfill.h
+10
-6
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+0
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+10
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+14
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+0
-349
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+1494
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+0
-24
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+2
-1111
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+1
-11
未找到文件。
include/libs/executor/executor.h
浏览文件 @
5c2ac1a6
...
...
@@ -160,13 +160,6 @@ int32_t qAsyncKillTask(qTaskInfo_t tinfo);
*/
void
qDestroyTask
(
qTaskInfo_t
tinfo
);
/**
* Get the queried table uid
* @param qHandle
* @return
*/
int64_t
qGetQueriedTableUid
(
qTaskInfo_t
tinfo
);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
*
...
...
source/libs/executor/inc/executil.h
浏览文件 @
5c2ac1a6
...
...
@@ -161,4 +161,6 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
5c2ac1a6
...
...
@@ -538,23 +538,6 @@ typedef struct SStreamIntervalOperatorInfo {
SWinKey
delKey
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
struct
SFillInfo
*
pFillInfo
;
SSDataBlock
*
pRes
;
SSDataBlock
*
pFinalRes
;
int64_t
totalInputRows
;
void
**
p
;
SSDataBlock
*
existNewGroupBlock
;
STimeWindow
win
;
SColMatchInfo
matchInfo
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
uint64_t
curGroupId
;
// current handled group id
SExprInfo
*
pExprInfo
;
int32_t
numOfExpr
;
SExprSupp
noFillExprSupp
;
}
SFillOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
...
...
@@ -806,8 +789,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
getMaximumIdleDurationSec
();
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
...
...
@@ -825,9 +806,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SStreamState
*
pState
,
STimeWindowAggSupp
*
pTwSup
);
void
appendOneRowToStreamSpecialBlock
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
,
void
*
pTbName
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/inc/tfill.h
浏览文件 @
5c2ac1a6
...
...
@@ -25,6 +25,8 @@ extern "C" {
#include "tcommon.h"
#include "tsimplehash.h"
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
struct
SSDataBlock
;
typedef
struct
SFillColInfo
{
...
...
@@ -113,12 +115,12 @@ typedef struct SStreamFillInfo {
int64_t
getNumOfResultsAfterFillGap
(
SFillInfo
*
pFillInfo
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
void
taosFillSetStartInfo
(
struct
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
void
taosResetFillInfo
(
struct
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosFillSetInputDataBlock
(
struct
SFillInfo
*
pFillInfo
,
const
struct
SSDataBlock
*
pInput
);
struct
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
void
taosFillSetStartInfo
(
struct
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
void
taosResetFillInfo
(
struct
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosFillSetInputDataBlock
(
struct
SFillInfo
*
pFillInfo
,
const
struct
SSDataBlock
*
pInput
);
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
SFillInfo
*
taosCreateFillInfo
(
TSKEY
skey
,
int32_t
numOfFillCols
,
int32_t
numOfNotFillCols
,
int32_t
capacity
,
SInterval
*
pInterval
,
int32_t
fillType
,
struct
SFillColInfo
*
pCol
,
int32_t
slotId
,
...
...
@@ -128,6 +130,8 @@ void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
int64_t
taosFillResultDataBlock
(
struct
SFillInfo
*
pFillInfo
,
SSDataBlock
*
p
,
int32_t
capacity
);
int64_t
getFillInfoStart
(
struct
SFillInfo
*
pFillInfo
);
bool
fillIfWindowPseudoColumn
(
SFillInfo
*
pFillInfo
,
SFillColInfo
*
pCol
,
SColumnInfoData
*
pDstColInfoData
,
int32_t
rowIndex
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
5c2ac1a6
...
...
@@ -15,26 +15,15 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tsort.h"
#include "ttime.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
5c2ac1a6
...
...
@@ -2002,3 +2002,13 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return
TSDB_CODE_SUCCESS
;
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"===stream===printDataBlock: Block is Null or Empty"
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
taosMemoryFree
(
pBuf
);
}
\ No newline at end of file
source/libs/executor/src/executor.c
浏览文件 @
5c2ac1a6
...
...
@@ -704,6 +704,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
return
TSDB_CODE_SUCCESS
;
}
static
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
if
(
pTaskInfo
==
NULL
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
5c2ac1a6
...
...
@@ -91,7 +91,6 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
void
destroyFillOperatorInfo
(
void
*
param
);
static
void
destroyAggOperatorInfo
(
void
*
param
);
static
void
initCtxOutputBuffer
(
SqlFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
doSetTableGroupOutputBuf
(
SOperatorInfo
*
pOperator
,
int32_t
numOfOutput
,
uint64_t
groupId
);
...
...
@@ -1157,20 +1156,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
}
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
}
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
...
...
@@ -1603,179 +1588,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
static
void
doHandleRemainBlockForNewGroupImpl
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
pInfo
->
totalInputRows
=
pInfo
->
existNewGroupBlock
->
info
.
rows
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
int64_t
ekey
=
pInfo
->
existNewGroupBlock
->
info
.
window
.
ekey
;
taosResetFillInfo
(
pInfo
->
pFillInfo
,
getFillInfoStart
(
pInfo
->
pFillInfo
));
blockDataCleanup
(
pInfo
->
pRes
);
doApplyScalarCalculation
(
pOperator
,
pInfo
->
existNewGroupBlock
,
order
,
scanFlag
);
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
ekey
);
taosFillSetInputDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
);
int32_t
numOfResultRows
=
pResultInfo
->
capacity
-
pResBlock
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pResBlock
,
numOfResultRows
);
pInfo
->
curGroupId
=
pInfo
->
existNewGroupBlock
->
info
.
id
.
groupId
;
pInfo
->
existNewGroupBlock
=
NULL
;
}
static
void
doHandleRemainBlockFromNewGroup
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
if
(
taosFillHasMoreResults
(
pInfo
->
pFillInfo
))
{
int32_t
numOfResultRows
=
pResultInfo
->
capacity
-
pInfo
->
pFinalRes
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pFinalRes
,
numOfResultRows
);
pInfo
->
pRes
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
;
}
// handle the cached new group data block
if
(
pInfo
->
existNewGroupBlock
)
{
doHandleRemainBlockForNewGroupImpl
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
}
}
static
void
doApplyScalarCalculation
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
setInputDataBlock
(
pSup
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
NULL
);
// reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
pInfo
->
pRes
->
info
.
rows
=
0
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
setInputDataBlock
(
pNoFillSupp
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pNoFillSupp
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pNoFillSupp
->
pCtx
,
pNoFillSupp
->
numOfExprs
,
NULL
);
pInfo
->
pRes
->
info
.
id
.
groupId
=
pBlock
->
info
.
id
.
groupId
;
}
static
SSDataBlock
*
doFillImpl
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
blockDataCleanup
(
pResBlock
);
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>
0
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
SSDataBlock
*
pBlock
=
pDownstream
->
fpSet
.
getNextFn
(
pDownstream
);
if
(
pBlock
==
NULL
)
{
if
(
pInfo
->
totalInputRows
==
0
)
{
setOperatorCompleted
(
pOperator
);
return
NULL
;
}
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
else
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primarySrcSlotId
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pBlock
->
info
.
rows
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
scanFlag
);
if
(
pInfo
->
curGroupId
==
0
||
pInfo
->
curGroupId
==
pInfo
->
pRes
->
info
.
id
.
groupId
)
{
pInfo
->
curGroupId
=
pInfo
->
pRes
->
info
.
id
.
groupId
;
// the first data block
pInfo
->
totalInputRows
+=
pInfo
->
pRes
->
info
.
rows
;
if
(
order
==
pInfo
->
pFillInfo
->
order
)
{
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
pBlock
->
info
.
window
.
ekey
);
}
else
{
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
pBlock
->
info
.
window
.
skey
);
}
taosFillSetInputDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
);
}
else
if
(
pInfo
->
curGroupId
!=
pBlock
->
info
.
id
.
groupId
)
{
// the new group data block
pInfo
->
existNewGroupBlock
=
pBlock
;
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
}
int32_t
numOfResultRows
=
pOperator
->
resultInfo
.
capacity
-
pResBlock
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pResBlock
,
numOfResultRows
);
// current group has no more result to return
if
(
pResBlock
->
info
.
rows
>
0
)
{
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if
(
pResBlock
->
info
.
rows
>
pResultInfo
->
threshold
||
pBlock
==
NULL
||
pInfo
->
existNewGroupBlock
!=
NULL
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
||
pBlock
==
NULL
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
}
else
if
(
pInfo
->
existNewGroupBlock
)
{
// try next group
assert
(
pBlock
!=
NULL
);
blockDataCleanup
(
pResBlock
);
doHandleRemainBlockForNewGroupImpl
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>
pResultInfo
->
threshold
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
}
else
{
return
NULL
;
}
}
}
static
SSDataBlock
*
doFill
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SSDataBlock
*
fillResult
=
NULL
;
while
(
true
)
{
fillResult
=
doFillImpl
(
pOperator
);
if
(
fillResult
==
NULL
)
{
setOperatorCompleted
(
pOperator
);
break
;
}
doFilter
(
fillResult
,
pOperator
->
exprSupp
.
pFilterInfo
,
&
pInfo
->
matchInfo
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
}
if
(
fillResult
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
fillResult
->
info
.
rows
;
}
return
fillResult
;
}
void
destroyExprInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfExprs
)
{
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pExpr
[
i
];
...
...
@@ -2045,167 +1857,6 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear
(
param
);
}
void
destroyFillOperatorInfo
(
void
*
param
)
{
SFillOperatorInfo
*
pInfo
=
(
SFillOperatorInfo
*
)
param
;
pInfo
->
pFillInfo
=
taosDestroyFillInfo
(
pInfo
->
pFillInfo
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
pInfo
->
pFinalRes
=
blockDataDestroy
(
pInfo
->
pFinalRes
);
cleanupExprSupp
(
&
pInfo
->
noFillExprSupp
);
taosMemoryFreeClear
(
pInfo
->
p
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
static
int32_t
initFillInfo
(
SFillOperatorInfo
*
pInfo
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
SNodeListNode
*
pValNode
,
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pNotFillExpr
,
numOfNotFillCols
,
pValNode
);
int64_t
startKey
=
(
order
==
TSDB_ORDER_ASC
)
?
win
.
skey
:
win
.
ekey
;
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
startKey
);
w
=
getFirstQualifiedTimeWindow
(
startKey
,
&
w
,
pInterval
,
order
);
pInfo
->
pFillInfo
=
taosCreateFillInfo
(
w
.
skey
,
numOfCols
,
numOfNotFillCols
,
capacity
,
pInterval
,
fillType
,
pColInfo
,
pInfo
->
primaryTsCol
,
order
,
id
);
if
(
order
==
TSDB_ORDER_ASC
)
{
pInfo
->
win
.
skey
=
win
.
skey
;
pInfo
->
win
.
ekey
=
win
.
ekey
;
}
else
{
pInfo
->
win
.
skey
=
win
.
ekey
;
pInfo
->
win
.
ekey
=
win
.
skey
;
}
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
if
(
pInfo
->
pFillInfo
==
NULL
||
pInfo
->
p
==
NULL
)
{
taosMemoryFree
(
pInfo
->
pFillInfo
);
taosMemoryFree
(
pInfo
->
p
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
return
TSDB_CODE_SUCCESS
;
}
}
static
bool
isWstartColumnExist
(
SFillOperatorInfo
*
pInfo
)
{
if
(
pInfo
->
noFillExprSupp
.
numOfExprs
==
0
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
pInfo
->
noFillExprSupp
.
numOfExprs
;
++
i
)
{
SExprInfo
*
exprInfo
=
pInfo
->
noFillExprSupp
.
pExprInfo
+
i
;
if
(
exprInfo
->
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
&&
exprInfo
->
base
.
numOfParams
==
1
&&
exprInfo
->
base
.
pParam
[
0
].
pCol
->
colType
==
COLUMN_TYPE_WINDOW_START
)
{
return
true
;
}
}
return
false
;
}
static
int32_t
createPrimaryTsExprIfNeeded
(
SFillOperatorInfo
*
pInfo
,
SFillPhysiNode
*
pPhyFillNode
,
SExprSupp
*
pExprSupp
,
const
char
*
idStr
)
{
bool
wstartExist
=
isWstartColumnExist
(
pInfo
);
if
(
wstartExist
==
false
)
{
if
(
pPhyFillNode
->
pWStartTs
->
type
!=
QUERY_NODE_TARGET
)
{
qError
(
"pWStartTs of fill physical node is not a target node, %s"
,
idStr
);
return
TSDB_CODE_QRY_SYS_ERROR
;
}
SExprInfo
*
pExpr
=
taosMemoryRealloc
(
pExprSupp
->
pExprInfo
,
(
pExprSupp
->
numOfExprs
+
1
)
*
sizeof
(
SExprInfo
));
if
(
pExpr
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
createExprFromTargetNode
(
&
pExpr
[
pExprSupp
->
numOfExprs
],
(
STargetNode
*
)
pPhyFillNode
->
pWStartTs
);
pExprSupp
->
numOfExprs
+=
1
;
pExprSupp
->
pExprInfo
=
pExpr
;
}
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SFillOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SFillOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pRes
=
createDataBlockFromDescNode
(
pPhyFillNode
->
node
.
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pFillExprs
,
NULL
,
&
pInfo
->
numOfExpr
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
pNoFillSupp
->
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
pNoFillSupp
->
numOfExprs
);
int32_t
code
=
createPrimaryTsExprIfNeeded
(
pInfo
,
pPhyFillNode
,
pNoFillSupp
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initExprSupp
(
pNoFillSupp
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SInterval
*
pInterval
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
downstream
->
operatorType
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
pInfo
->
numOfExpr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
primaryTsCol
=
((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
slotId
;
pInfo
->
primarySrcSlotId
=
((
SColumnNode
*
)((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
pExpr
)
->
slotId
;
int32_t
numOfOutputCols
=
0
;
code
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
pInfo
->
numOfExpr
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
,
order
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
pFinalRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pOperator
->
resultInfo
.
capacity
);
code
=
filterInitFromNode
((
SNode
*
)
pPhyFillNode
->
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
setOperatorInfo
(
pOperator
,
"FillOperator"
,
QUERY_NODE_PHYSICAL_PLAN_FILL
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
pInfo
->
numOfExpr
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doFill
,
NULL
,
destroyFillOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
if
(
pInfo
!=
NULL
)
{
destroyFillOperatorInfo
(
pInfo
);
}
pTaskInfo
->
code
=
code
;
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
static
SExecTaskInfo
*
createExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
...
...
source/libs/executor/src/filloperator.c
0 → 100644
浏览文件 @
5c2ac1a6
此差异已折叠。
点击以展开。
source/libs/executor/src/scanoperator.c
浏览文件 @
5c2ac1a6
...
...
@@ -1339,30 +1339,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
static
void
calBlockTag
(
SExprSupp
*
pTagCalSup
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pResBlock
)
{
if
(
pTagCalSup
==
NULL
||
pTagCalSup
->
numOfExprs
==
0
)
return
;
if
(
pBlock
==
NULL
||
pBlock
->
info
.
rows
==
0
)
return
;
SSDataBlock
*
pSrcBlock
=
blockCopyOneRow
(
pBlock
,
0
);
ASSERT
(
pSrcBlock
->
info
.
rows
==
1
);
blockDataEnsureCapacity
(
pResBlock
,
1
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
1
,
NULL
);
ASSERT
(
pResBlock
->
info
.
rows
==
1
);
// build tagArray
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
/*STagVal tagVal = {*/
/*.cid = 0,*/
/*.type = 0,*/
/*};*/
// build STag
// set STag
blockDataDestroy
(
pSrcBlock
);
}
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExprSupp
*
pTbNameCalSup
=
&
pInfo
->
tbnameCalSup
;
SStreamState
*
pState
=
pInfo
->
pStreamScanOp
->
pTaskInfo
->
streamInfo
.
pState
;
...
...
source/libs/executor/src/tfill.c
浏览文件 @
5c2ac1a6
此差异已折叠。
点击以展开。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
5c2ac1a6
...
...
@@ -677,16 +677,6 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"===stream===printDataBlock: Block is Null or Empty"
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
taosMemoryFree
(
pBuf
);
}
typedef
int32_t
(
*
__compare_fn_t
)(
void
*
pKey
,
void
*
data
,
int32_t
index
);
int32_t
binarySearchCom
(
void
*
keyList
,
int
num
,
void
*
pKey
,
int
order
,
__compare_fn_t
comparefn
)
{
...
...
@@ -3854,7 +3844,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
numOfOutput
=
pOperator
->
exprSupp
.
numOfExprs
;
int64_t
groupId
=
pSDataBlock
->
info
.
id
.
groupId
;
uint64_t
groupId
=
pSDataBlock
->
info
.
id
.
groupId
;
int64_t
code
=
TSDB_CODE_SUCCESS
;
TSKEY
*
tsCols
=
NULL
;
SResultRow
*
pResult
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录