Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c58d2bde
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c58d2bde
编写于
8月 04, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-11274-3.0
上级
e39da950
6b06b2d9
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
120 addition
and
65 deletion
+120
-65
cmake/taosadapter_CMakeLists.txt.in
cmake/taosadapter_CMakeLists.txt.in
+1
-1
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+11
-8
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+17
-16
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+8
-9
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+27
-24
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+54
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-4
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+0
-1
未找到文件。
cmake/taosadapter_CMakeLists.txt.in
浏览文件 @
c58d2bde
...
...
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG
88d26c3
GIT_TAG
766dcc4
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
cmake/taostools_CMakeLists.txt.in
浏览文件 @
c58d2bde
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
58f58ee
GIT_TAG
8157e3b
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
c58d2bde
...
...
@@ -301,7 +301,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
pResultRowInfo
->
cur
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
// too many time window in query
if
(
taosHashGetSize
(
pSup
->
pResultRowHashTable
)
>
MAX_INTERVAL_TIME_WINDOW
)
{
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_BATCH
&&
taosHashGetSize
(
pSup
->
pResultRowHashTable
)
>
MAX_INTERVAL_TIME_WINDOW
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
...
...
@@ -3598,7 +3599,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
}
static
int32_t
initFillInfo
(
SFillOperatorInfo
*
pInfo
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SNodeListNode
*
pValNode
,
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pValNode
);
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
win
.
skey
);
...
...
@@ -3635,7 +3637,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
...
...
@@ -3833,7 +3835,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
return
TDB_CODE_SUCCESS
;
}
bool
groupbyTbname
(
SNodeList
*
pGroupList
)
{
bool
groupbyTbname
(
SNodeList
*
pGroupList
)
{
bool
bytbname
=
false
;
if
(
LIST_LENGTH
(
pGroupList
)
>
0
)
{
SNode
*
p
=
nodesListGetNode
(
pGroupList
,
0
);
...
...
@@ -3875,7 +3877,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool
assignUid
=
groupbyTbname
(
group
);
int32_t
groupNum
=
0
;
size_t
numOfTables
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
size_t
numOfTables
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
...
...
@@ -4608,7 +4610,7 @@ void releaseQueryBuf(size_t numOfTables) {
}
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
)
{
SExplainExecInfo
execInfo
=
{
0
};
SExplainExecInfo
execInfo
=
{
0
};
SExplainExecInfo
*
pExplainInfo
=
taosArrayPush
(
pExecInfoList
,
&
execInfo
);
pExplainInfo
->
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
...
...
@@ -4618,7 +4620,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
pExplainInfo
->
verboseInfo
=
NULL
;
if
(
operatorInfo
->
fpSet
.
getExplainFn
)
{
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
pExplainInfo
->
verboseInfo
,
&
pExplainInfo
->
verboseLen
);
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
pExplainInfo
->
verboseInfo
,
&
pExplainInfo
->
verboseLen
);
if
(
code
)
{
qError
(
"%s operator getExplainFn failed, code:%s"
,
GET_TASKID
(
operatorInfo
->
pTaskInfo
),
tstrerror
(
code
));
return
code
;
...
...
@@ -4629,7 +4632,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
for
(
int32_t
i
=
0
;
i
<
operatorInfo
->
numOfDownstream
;
++
i
)
{
code
=
getOperatorExplainExecInfo
(
operatorInfo
->
pDownstream
[
i
],
pExecInfoList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// taosMemoryFreeClear(*pRes);
// taosMemoryFreeClear(*pRes);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
c58d2bde
...
...
@@ -56,6 +56,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pProjPhyNode
->
pProjections
,
NULL
,
&
numOfCols
);
...
...
@@ -63,7 +65,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
...
...
@@ -73,7 +75,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo
->
mergeDataBlocks
=
false
;
}
int32_t
numOfRows
=
4096
;
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
...
...
@@ -89,12 +90,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
setFunctionResultOutput
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
MAIN_SCAN
,
numOfCols
);
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pOperator
->
exprSupp
.
pCtx
,
numOfCols
);
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PROJECT
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doProjectOperation
,
NULL
,
NULL
,
destroyProjectOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
...
@@ -106,7 +106,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
return
pOperator
;
_error:
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
...
...
@@ -156,7 +156,8 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
return
PROJECT_RETRIEVE_DONE
;
}
static
int32_t
doIngroupLimitOffset
(
SLimitInfo
*
pLimitInfo
,
uint64_t
groupId
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
pOperator
)
{
static
int32_t
doIngroupLimitOffset
(
SLimitInfo
*
pLimitInfo
,
uint64_t
groupId
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
pOperator
)
{
// set current group id
pLimitInfo
->
currentGroupId
=
groupId
;
...
...
@@ -170,8 +171,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
}
// check for the limitation in each group
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
...
...
@@ -222,7 +222,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SLimitInfo
*
pLimitInfo
=
&
pProjectInfo
->
limitInfo
;
SLimitInfo
*
pLimitInfo
=
&
pProjectInfo
->
limitInfo
;
if
(
downstream
==
NULL
)
{
return
doGenerateSourceData
(
pOperator
);
...
...
@@ -317,7 +317,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if
(
pOperator
->
cost
.
openCost
==
0
)
{
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
// printDataBlock1(p, "project");
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
...
...
@@ -330,6 +330,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SIndefRowsFuncPhysiNode
*
pPhyNode
=
(
SIndefRowsFuncPhysiNode
*
)
pNode
;
...
...
@@ -373,7 +375,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doApplyIndefinitFunction
,
NULL
,
NULL
,
destroyIndefinitOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
...
@@ -385,7 +386,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
return
pOperator
;
_error:
_error:
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -593,7 +594,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
pRes
->
info
.
rows
=
1
;
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
);
/*int32_t status = */
doIngroupLimitOffset
(
&
pProjectInfo
->
limitInfo
,
0
,
pRes
,
pOperator
);
/*int32_t status = */
doIngroupLimitOffset
(
&
pProjectInfo
->
limitInfo
,
0
,
pRes
,
pOperator
);
pOperator
->
resultInfo
.
totalRows
+=
pRes
->
info
.
rows
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
c58d2bde
...
...
@@ -30,6 +30,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
SDataBlockDescNode
*
pDescNode
=
pSortNode
->
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
...
...
@@ -45,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
1024
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pSortInfo
=
createSortInfo
(
pSortNode
->
pSortKeys
);
pInfo
->
pSortInfo
=
createSortInfo
(
pSortNode
->
pSortKeys
);
pInfo
->
pCondition
=
pSortNode
->
node
.
pConditions
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
initLimitInfo
(
pSortNode
->
node
.
pLimit
,
pSortNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
...
...
@@ -57,7 +58,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator
->
info
=
pInfo
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
...
...
@@ -222,7 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
}
// todo add the limit/offset info
if
(
pInfo
->
limitInfo
.
remainOffset
>
0
)
{
if
(
pInfo
->
limitInfo
.
remainOffset
>
0
)
{
if
(
pInfo
->
limitInfo
.
remainOffset
>=
blockDataGetNumOfRows
(
pBlock
))
{
pInfo
->
limitInfo
.
remainOffset
-=
pBlock
->
info
.
rows
;
continue
;
...
...
@@ -247,7 +247,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
}
}
return
blockDataGetNumOfRows
(
pBlock
)
>
0
?
pBlock
:
NULL
;
return
blockDataGetNumOfRows
(
pBlock
)
>
0
?
pBlock
:
NULL
;
}
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
...
@@ -474,7 +474,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -609,8 +609,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pInfo
->
groupId
=
tsortGetGroupId
(
pTupleHandle
);
pInfo
->
prefetchedTuple
=
NULL
;
}
}
else
{
}
else
{
pTupleHandle
=
tsortNextTuple
(
pHandle
);
pInfo
->
groupId
=
0
;
}
...
...
@@ -694,7 +693,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -711,7 +710,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai
}
SOperatorInfo
*
createMultiwayMergeOperatorInfo
(
SOperatorInfo
**
downStreams
,
size_t
numStreams
,
SMergePhysiNode
*
pMergePhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SMergePhysiNode
*
pMergePhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SPhysiNode
*
pPhyNode
=
(
SPhysiNode
*
)
pMergePhyNode
;
SMultiwayMergeOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMultiwayMergeOperatorInfo
));
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
c58d2bde
...
...
@@ -928,8 +928,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
inputOrder
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
inputOrder
);
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
((
!
pInfo
->
ignoreExpiredData
||
!
isCloseWindow
(
&
win
,
&
pInfo
->
twAggSup
))
&&
inSlidingWindow
(
&
pInfo
->
interval
,
&
win
,
&
pBlock
->
info
))
{
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
...
...
@@ -1091,8 +1092,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
inputOrder
,
scanFlag
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
}
...
...
@@ -1790,9 +1791,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
execModel
=
pTaskInfo
->
execModel
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
...
...
@@ -1845,7 +1847,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
...
...
@@ -1880,6 +1881,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
execModel
=
OPTR_EXEC_MODEL_STREAM
;
...
...
@@ -1906,7 +1908,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
...
...
@@ -2180,7 +2181,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
break
;
}
}
}
static
int32_t
initPrevRowsKeeper
(
STimeSliceOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
...
...
@@ -2457,7 +2457,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
size_t
keyBufSize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSessionNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pSessionNode
->
window
.
node
.
pOutputDataBlockDesc
);
...
...
@@ -2475,11 +2475,11 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
initExecTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
pTaskInfo
->
window
);
pInfo
->
tsSlotId
=
((
SColumnNode
*
)
pSessionNode
->
window
.
pTspk
)
->
slotId
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
tsSlotId
=
((
SColumnNode
*
)
pSessionNode
->
window
.
pTspk
)
->
slotId
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
winSup
.
prevTs
=
INT64_MIN
;
pInfo
->
reptScan
=
false
;
pInfo
->
pCondition
=
pSessionNode
->
window
.
node
.
pConditions
;
pInfo
->
reptScan
=
false
;
pInfo
->
pCondition
=
pSessionNode
->
window
.
node
.
pConditions
;
pOperator
->
name
=
"SessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION
;
...
...
@@ -3028,6 +3028,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
(
SInterval
){.
interval
=
pIntervalPhyNode
->
interval
,
.
sliding
=
pIntervalPhyNode
->
sliding
,
...
...
@@ -3114,7 +3115,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
...
...
@@ -3155,7 +3155,7 @@ void destroyStateWinInfo(void* ptr) {
if
(
ptr
==
NULL
)
{
return
;
}
SStateWindowInfo
*
pWin
=
(
SStateWindowInfo
*
)
ptr
;
SStateWindowInfo
*
pWin
=
(
SStateWindowInfo
*
)
ptr
;
taosMemoryFreeClear
(
pWin
->
stateKey
.
pData
);
}
...
...
@@ -3246,6 +3246,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto
_error
;
}
pOperator
->
pTaskInfo
=
pTaskInfo
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
if
(
pSessionNode
->
window
.
pExprs
!=
NULL
)
{
int32_t
numOfScalar
=
0
;
...
...
@@ -3308,7 +3310,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamSessionAgg
,
NULL
,
NULL
,
destroyStreamSessionAggOperatorInfo
,
aggEncodeResultRow
,
aggDecodeResultRow
,
NULL
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
if
(
downstream
)
{
initDownStream
(
downstream
,
&
pInfo
->
streamAggSup
,
pInfo
->
gap
,
pInfo
->
twAggSup
.
waterMark
,
pOperator
->
operatorType
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -3465,7 +3466,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
assert
(
pWinInfo
->
win
.
skey
<=
pWinInfo
->
win
.
ekey
);
// too many time window in query
int32_t
size
=
taosArrayGetSize
(
pAggSup
->
pCurWins
);
if
(
size
>
MAX_INTERVAL_TIME_WINDOW
)
{
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_BATCH
&&
size
>
MAX_INTERVAL_TIME_WINDOW
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
...
...
@@ -3647,8 +3648,8 @@ void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
taosArrayRemove
(
pWinInfos
,
index
);
}
static
void
doDeleteTimeWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int64_t
gap
,
SArray
*
result
,
FDelete
fp
)
{
static
void
doDeleteTimeWindows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int64_t
gap
,
SArray
*
result
,
FDelete
fp
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
...
...
@@ -4673,7 +4674,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
currTs
=
tsCols
[
currPos
];
currWin
.
skey
=
currTs
;
currWin
.
ekey
=
taosTimeAdd
(
currWin
.
skey
,
iaInfo
->
interval
.
interval
,
iaInfo
->
interval
.
intervalUnit
,
iaInfo
->
interval
.
precision
)
-
1
;
iaInfo
->
interval
.
precision
)
-
1
;
startPos
=
currPos
;
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
currWin
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
...
...
@@ -4933,8 +4935,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY
blockStartTs
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
iaInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
blockStartTs
,
&
iaInfo
->
interval
,
iaInfo
->
inputOrder
);
STimeWindow
win
=
getActiveTimeWindow
(
iaInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
blockStartTs
,
&
iaInfo
->
interval
,
iaInfo
->
inputOrder
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pExprSup
->
pCtx
,
...
...
@@ -4975,7 +4977,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
inputOrder
);
startPos
=
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
inputOrder
);
if
(
startPos
<
0
)
{
break
;
}
...
...
source/libs/function/src/builtins.c
浏览文件 @
c58d2bde
...
...
@@ -19,6 +19,7 @@
#include "querynodes.h"
#include "scalar.h"
#include "taoserror.h"
#include "ttime.h"
static
int32_t
buildFuncErrMsg
(
char
*
pErrBuf
,
int32_t
len
,
int32_t
errCode
,
const
char
*
pFormat
,
...)
{
va_list
vArgList
;
...
...
@@ -1442,6 +1443,58 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateInterp
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
int32_t
numOfParams
=
LIST_LENGTH
(
pFunc
->
pParameterList
);
uint8_t
dbPrec
=
pFunc
->
node
.
resType
.
precision
;
if
(
1
!=
numOfParams
&&
3
!=
numOfParams
&&
4
!=
numOfParams
)
{
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
if
(
3
<=
numOfParams
)
{
int64_t
timeVal
[
2
]
=
{
0
};
for
(
int32_t
i
=
1
;
i
<
3
;
++
i
)
{
uint8_t
nodeType
=
nodeType
(
nodesListGetNode
(
pFunc
->
pParameterList
,
i
));
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
i
))
->
resType
.
type
;
if
(
!
IS_VAR_DATA_TYPE
(
paraType
)
||
QUERY_NODE_VALUE
!=
nodeType
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
SValueNode
*
pValue
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
i
);
int32_t
ret
=
convertStringToTimestamp
(
paraType
,
pValue
->
datum
.
p
,
dbPrec
,
&
timeVal
[
i
-
1
]);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
invaildFuncParaValueErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
}
if
(
timeVal
[
0
]
>
timeVal
[
1
])
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"INTERP function invalid time range"
);
}
}
if
(
4
==
numOfParams
)
{
uint8_t
nodeType
=
nodeType
(
nodesListGetNode
(
pFunc
->
pParameterList
,
3
));
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
3
))
->
resType
.
type
;
if
(
!
IS_INTEGER_TYPE
(
paraType
)
||
QUERY_NODE_VALUE
!=
nodeType
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
int32_t
ret
=
validateTimeUnitParam
(
dbPrec
,
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
3
));
if
(
ret
==
TIME_UNIT_TOO_SMALL
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"INTERP function time interval parameter should be greater than db precision"
);
}
else
if
(
ret
==
TIME_UNIT_INVALID
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"INTERP function time interval parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
}
}
pFunc
->
node
.
resType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateFirstLast
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// forbid null as first/last input, since first(c0, null, 1) may have different number of input
int32_t
numOfParams
=
LIST_LENGTH
(
pFunc
->
pParameterList
);
...
...
@@ -2237,7 +2290,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
name
=
"interp"
,
.
type
=
FUNCTION_TYPE_INTERP
,
.
classification
=
FUNC_MGT_TIMELINE_FUNC
|
FUNC_MGT_INTERVAL_INTERPO_FUNC
|
FUNC_MGT_IMPLICIT_TS_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
translateFunc
=
translate
FirstLast
,
.
translateFunc
=
translate
Interp
,
.
getEnvFunc
=
getSelectivityFuncEnv
,
.
initFunc
=
functionSetup
,
.
processFunc
=
NULL
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
c58d2bde
...
...
@@ -28,7 +28,6 @@
#define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000
#define SAMPLE_MAX_POINTS_NUM 1000
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
...
...
@@ -4898,9 +4897,7 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
pInfo
->
numSampled
=
0
;
pInfo
->
colType
=
pCtx
->
resDataInfo
.
type
;
pInfo
->
colBytes
=
pCtx
->
resDataInfo
.
bytes
;
if
(
pInfo
->
samples
<
1
||
pInfo
->
samples
>
SAMPLE_MAX_POINTS_NUM
)
{
return
false
;
}
pInfo
->
data
=
(
char
*
)
pInfo
+
sizeof
(
SSampleInfo
);
pInfo
->
tuplePos
=
(
STuplePos
*
)((
char
*
)
pInfo
+
sizeof
(
SSampleInfo
)
+
pInfo
->
samples
*
pInfo
->
colBytes
);
...
...
source/libs/function/src/udfd.c
浏览文件 @
c58d2bde
...
...
@@ -686,7 +686,6 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf
->
len
=
0
;
}
}
fnDebug
(
"allocate buf. input buf cap - len - total : %d - %d - %d"
,
ctx
->
inputCap
,
ctx
->
inputLen
,
ctx
->
inputTotal
);
}
bool
isUdfdUvMsgComplete
(
SUdfdUvConn
*
pipe
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录