Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
dcd3c208
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dcd3c208
编写于
4月 30, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/cenc' into feature/cenc
上级
9d217d86
c6686a0a
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
287 addition
and
157 deletion
+287
-157
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+5
-5
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+56
-3
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+30
-31
src/client/src/tscServer.c
src/client/src/tscServer.c
+32
-17
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+11
-10
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+47
-23
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+6
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+93
-65
src/query/src/qPlan.c
src/query/src/qPlan.c
+6
-0
tests/script/general/parser/udf_dll_stable.sim
tests/script/general/parser/udf_dll_stable.sim
+0
-1
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
dcd3c208
...
...
@@ -139,11 +139,11 @@ bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool
isBlockDistQuery
(
SQueryInfo
*
pQueryInfo
);
int32_t
tscGetTopbotQueryParam
(
SQueryInfo
*
pQueryInfo
);
bool
tscNonOrderedProjectionQueryOnSTable
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscOrderedProjectionQueryOnSTable
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQueryOnSTable
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscNonOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQuery
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
);
bool
tscIsProjectionQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsTwoStageSTableQuery
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryTags
(
SQueryInfo
*
pQueryInfo
);
...
...
@@ -246,7 +246,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int
tscGetSTableVgroupInfo
(
SSqlObj
*
pSql
,
int32_t
clauseIndex
);
int
tscGetTableMeta
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
);
int
tscGetTableMetaEx
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
,
bool
createIfNotExists
);
int32_t
tscGetUdfFromNode
(
SSqlObj
*
pSql
);
int32_t
tscGetUdfFromNode
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
void
tscDoQuery
(
SSqlObj
*
pSql
);
...
...
src/client/inc/tsclient.h
浏览文件 @
dcd3c208
...
...
@@ -240,6 +240,7 @@ typedef struct SQueryInfo {
int32_t
havingFieldNum
;
bool
globalMerge
;
// need global merge
bool
arithmCalOnAgg
;
// arithmetic calculation on aggregate result.
SArray
*
pUdfInfo
;
// user defined function information SArray<SUdfInfo>
}
SQueryInfo
;
typedef
struct
{
...
...
@@ -284,7 +285,6 @@ typedef struct {
SHashObj
*
pTableBlockHashList
;
// data block for each table
SArray
*
pDataBlocks
;
// SArray<STableDataBlocks*>. Merged submit block for each vgroup
SArray
*
pUdfInfo
;
// user defined function information SArray<SUdfInfo>
}
SSqlCmd
;
typedef
struct
SResRec
{
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
dcd3c208
...
...
@@ -306,7 +306,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pMerger
->
pDesc
->
pColumnModel
->
capacity
=
1
;
// restore the limitation value at the last stage
if
(
tscOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
))
{
if
(
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
pQueryInfo
->
limit
.
limit
=
pQueryInfo
->
clauseLimit
;
pQueryInfo
->
limit
.
offset
=
pQueryInfo
->
prjOffset
;
}
...
...
@@ -461,7 +461,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
// primary timestamp column is involved in final result
if
(
pQueryInfo
->
interval
.
interval
!=
0
||
tscOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
))
{
if
(
pQueryInfo
->
interval
.
interval
!=
0
||
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
numOfGroupByCols
++
;
}
...
...
@@ -617,7 +617,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
type
=
pModel
->
pFields
[
i
].
field
.
type
;
bytes
=
pModel
->
pFields
[
i
].
field
.
bytes
;
}
else
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
-
1
*
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
int32_t
ret
=
getResultDataInfo
(
p1
.
type
,
p1
.
bytes
,
functionId
,
0
,
&
type
,
&
bytes
,
&
inter
,
0
,
false
,
pUdfInfo
);
assert
(
ret
==
TSDB_CODE_SUCCESS
);
...
...
@@ -801,6 +801,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
continue
;
}
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
}
else
{
...
...
@@ -809,6 +818,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
continue
;
}
aAggs
[
functionId
].
xFinalize
(
&
pCtx
[
j
]);
}
...
...
@@ -825,6 +843,10 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
if
(
pCtx
[
j
].
functionId
<
0
)
{
continue
;
}
aAggs
[
pCtx
[
j
].
functionId
].
init
(
&
pCtx
[
j
],
pCtx
[
j
].
resultInfo
);
}
...
...
@@ -837,6 +859,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
continue
;
}
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
}
...
...
@@ -850,6 +881,15 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
continue
;
}
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
}
...
...
@@ -1103,6 +1143,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
{
// reset output buffer
for
(
int32_t
j
=
0
;
j
<
pOperator
->
numOfOutput
;
++
j
)
{
SQLFunctionCtx
*
pCtx
=
&
pAggInfo
->
binfo
.
pCtx
[
j
];
if
(
pCtx
->
functionId
<
0
)
{
clearOutputBuf
(
&
pAggInfo
->
binfo
,
&
pAggInfo
->
bufCapacity
);
continue
;
}
aAggs
[
pCtx
->
functionId
].
init
(
pCtx
,
pCtx
->
resultInfo
);
}
}
...
...
@@ -1154,6 +1199,14 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pAggInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pAggInfo
->
binfo
.
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
continue
;
}
aAggs
[
functionId
].
xFinalize
(
&
pAggInfo
->
binfo
.
pCtx
[
j
]);
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
dcd3c208
...
...
@@ -83,7 +83,7 @@ static int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SAr
static
bool
validateIpAddress
(
const
char
*
ip
,
size_t
size
);
static
bool
hasUnsupportFunctionsForSTableQuery
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
);
static
bool
functionCompatibleCheck
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
bool
joinQuery
,
bool
twQuery
);
static
bool
functionCompatibleCheck
(
SQueryInfo
*
pQueryInfo
,
bool
joinQuery
,
bool
twQuery
);
static
int32_t
validateGroupbyNode
(
SQueryInfo
*
pQueryInfo
,
SArray
*
pList
,
SSqlCmd
*
pCmd
);
...
...
@@ -1774,13 +1774,12 @@ void genUdfList(SArray* pUdfInfo, tSqlExpr *pNode) {
}
}
static
int32_t
checkForUdf
(
SSqlObj
*
pSql
,
SArray
*
pSelection
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
pCmd
->
pUdfInfo
!=
NULL
)
{
static
int32_t
checkForUdf
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pSelection
)
{
if
(
pQueryInfo
->
pUdfInfo
!=
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
p
Cmd
->
pUdfInfo
=
taosArrayInit
(
4
,
sizeof
(
struct
SUdfInfo
));
p
QueryInfo
->
pUdfInfo
=
taosArrayInit
(
4
,
sizeof
(
struct
SUdfInfo
));
size_t
nExpr
=
taosArrayGetSize
(
pSelection
);
...
...
@@ -1789,12 +1788,12 @@ static int32_t checkForUdf(SSqlObj* pSql, SArray* pSelection) {
int32_t
type
=
pItem
->
pNode
->
type
;
if
(
type
==
SQL_NODE_EXPR
||
type
==
SQL_NODE_SQLFUNCTION
)
{
genUdfList
(
p
Cmd
->
pUdfInfo
,
pItem
->
pNode
);
genUdfList
(
p
QueryInfo
->
pUdfInfo
,
pItem
->
pNode
);
}
}
if
(
taosArrayGetSize
(
p
Cmd
->
pUdfInfo
)
>
0
)
{
return
tscGetUdfFromNode
(
pSql
);
if
(
taosArrayGetSize
(
p
QueryInfo
->
pUdfInfo
)
>
0
)
{
return
tscGetUdfFromNode
(
pSql
,
pQueryInfo
);
}
else
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1845,7 +1844,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
if
(
type
==
SQL_NODE_SQLFUNCTION
)
{
pItem
->
pNode
->
functionId
=
isValidFunction
(
pItem
->
pNode
->
operand
.
z
,
pItem
->
pNode
->
operand
.
n
);
if
(
pItem
->
pNode
->
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
isValidUdf
(
p
Cmd
->
pUdfInfo
,
pItem
->
pNode
->
operand
.
z
,
pItem
->
pNode
->
operand
.
n
);
SUdfInfo
*
pUdfInfo
=
isValidUdf
(
p
QueryInfo
->
pUdfInfo
,
pItem
->
pNode
->
operand
.
z
,
pItem
->
pNode
->
operand
.
n
);
if
(
pUdfInfo
==
NULL
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg5
);
}
...
...
@@ -1890,7 +1889,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
addPrimaryTsColIntoResult
(
pQueryInfo
);
}
if
(
!
functionCompatibleCheck
(
p
Cmd
,
p
QueryInfo
,
joinQuery
,
timeWindowQuery
))
{
if
(
!
functionCompatibleCheck
(
pQueryInfo
,
joinQuery
,
timeWindowQuery
))
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
...
...
@@ -2700,7 +2699,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
default:
{
SUdfInfo
*
pUdfInfo
=
isValidUdf
(
p
Cmd
->
pUdfInfo
,
pItem
->
pNode
->
operand
.
z
,
pItem
->
pNode
->
operand
.
n
);
SUdfInfo
*
pUdfInfo
=
isValidUdf
(
p
QueryInfo
->
pUdfInfo
,
pItem
->
pNode
->
operand
.
z
,
pItem
->
pNode
->
operand
.
n
);
if
(
pUdfInfo
==
NULL
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg9
);
}
...
...
@@ -3180,7 +3179,7 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) {
return
true
;
}
static
bool
functionCompatibleCheck
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
bool
joinQuery
,
bool
twQuery
)
{
static
bool
functionCompatibleCheck
(
SQueryInfo
*
pQueryInfo
,
bool
joinQuery
,
bool
twQuery
)
{
int32_t
startIdx
=
0
;
int32_t
aggUdf
=
0
;
int32_t
scalarUdf
=
0
;
...
...
@@ -3201,7 +3200,7 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool
int16_t
functionId
=
pExpr1
->
base
.
functionId
;
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
-
1
*
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
?
++
aggUdf
:
++
scalarUdf
;
continue
;
...
...
@@ -5257,7 +5256,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pQueryInfo
->
order
.
orderColId
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
;
// orderby ts query on super table
if
(
tscOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
))
{
if
(
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
addPrimaryTsColIntoResult
(
pQueryInfo
);
}
}
...
...
@@ -5635,7 +5634,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
SExprInfo
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
if
(
pExpr
->
base
.
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
-
1
*
pExpr
->
base
.
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
pExpr
->
base
.
functionId
-
1
);
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_SCALAR
)
{
isProjectionFunction
=
true
;
break
;
...
...
@@ -5885,13 +5884,13 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI
// todo refactor
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
if
(
!
tscQueryTags
(
pQueryInfo
))
{
// local handle the super table tag query
if
(
tscIsProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
))
{
if
(
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
if
(
pQueryInfo
->
slimit
.
limit
>
0
||
pQueryInfo
->
slimit
.
offset
>
0
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
// for projection query on super table, all queries are subqueries
if
(
tscNonOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
)
&&
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_QUERY
))
{
pQueryInfo
->
type
|=
TSDB_QUERY_TYPE_SUBQUERY
;
}
...
...
@@ -5927,7 +5926,7 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseI
pQueryInfo
->
prjOffset
=
pQueryInfo
->
limit
.
offset
;
pQueryInfo
->
vgroupLimit
=
-
1
;
if
(
tscOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
))
{
if
(
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
/*
* the offset value should be removed during retrieve data from virtual node, since the
* global order are done in client side, so the offset is applied at the client side
...
...
@@ -6280,7 +6279,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
-
1
*
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
)
{
++
numOfAggregation
;
}
...
...
@@ -6526,7 +6525,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on super table does not compatible with "group by" syntax
if
(
tscIsProjectionQuery
(
p
Cmd
,
p
QueryInfo
))
{
if
(
tscIsProjectionQuery
(
pQueryInfo
))
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
...
...
@@ -6708,7 +6707,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) {
char
*
name
=
NULL
;
if
(
pExpr
->
base
.
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Sql
->
cmd
.
pUdfInfo
,
-
1
*
pExpr
->
base
.
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
pExpr
->
base
.
functionId
-
1
);
name
=
pUdfInfo
->
name
;
}
else
{
name
=
aAggs
[
pExpr
->
base
.
functionId
].
name
;
...
...
@@ -7034,7 +7033,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return
code
;
}
code
=
checkForUdf
(
pSql
,
pSqlNode
->
pSelNodeList
);
code
=
checkForUdf
(
pSql
,
p
QueryInfo
,
p
SqlNode
->
pSelNodeList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -7059,7 +7058,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
if
(
!
tscIsProjectionQuery
(
p
Cmd
,
p
QueryInfo
)
&&
pQueryInfo
->
interval
.
interval
==
0
)
{
if
(
!
tscIsProjectionQuery
(
pQueryInfo
)
&&
pQueryInfo
->
interval
.
interval
==
0
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg7
);
}
...
...
@@ -7399,7 +7398,7 @@ int32_t validateHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* p
}
//REDO function check
if
(
!
functionCompatibleCheck
(
p
Cmd
,
p
QueryInfo
,
joinQuery
,
timeWindowQuery
))
{
if
(
!
functionCompatibleCheck
(
pQueryInfo
,
joinQuery
,
timeWindowQuery
))
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
...
...
@@ -7589,18 +7588,18 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_TABLE_QUERY
);
}
code
=
checkForUdf
(
pSql
,
pQueryInfo
,
pSqlNode
->
pSelNodeList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// parse the group by clause in the first place
if
(
validateGroupbyNode
(
pQueryInfo
,
pSqlNode
->
pGroupby
,
pCmd
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
code
=
checkForUdf
(
pSql
,
pSqlNode
->
pSelNodeList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// set where info
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
// set where info
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
if
(
pSqlNode
->
pWhere
!=
NULL
)
{
if
(
validateWhereNode
(
pQueryInfo
,
&
pSqlNode
->
pWhere
,
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
dcd3c208
...
...
@@ -521,7 +521,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
type
=
pQueryInfo
->
type
;
// while numOfTables equals to 0, it must be Heartbeat
assert
((
pQueryInfo
->
numOfTables
==
0
&&
pQueryInfo
->
command
==
TSDB_SQL_HB
)
||
pQueryInfo
->
numOfTables
>
0
);
assert
((
pQueryInfo
->
numOfTables
==
0
&&
(
pQueryInfo
->
command
==
TSDB_SQL_HB
||
pSql
->
cmd
.
command
==
TSDB_SQL_RETRIEVE_FUNC
)
)
||
pQueryInfo
->
numOfTables
>
0
);
}
tscDebug
(
"0x%"
PRIx64
" SQL cmd:%s will be processed, name:%s, type:%d"
,
pSql
->
self
,
sqlCmd
[
pCmd
->
command
],
name
,
type
);
...
...
@@ -1026,10 +1026,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
// support only one udf
if
(
p
Cmd
->
pUdfInfo
!=
NULL
&&
taosArrayGetSize
(
pCmd
->
pUdfInfo
)
>
0
)
{
if
(
p
QueryInfo
->
pUdfInfo
!=
NULL
&&
taosArrayGetSize
(
pQueryInfo
->
pUdfInfo
)
>
0
)
{
pQueryMsg
->
udfContentOffset
=
htonl
((
int32_t
)
(
pMsg
-
pCmd
->
payload
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
p
Cmd
->
pUdfInfo
);
++
i
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
p
QueryInfo
->
pUdfInfo
);
++
i
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
i
);
*
(
int8_t
*
)
pMsg
=
pUdfInfo
->
resType
;
pMsg
+=
sizeof
(
pUdfInfo
->
resType
);
...
...
@@ -1843,14 +1843,15 @@ int tscBuildRetrieveFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
char
*
pMsg
=
pCmd
->
payload
;
int32_t
numOfFuncs
=
(
int32_t
)
taosArrayGetSize
(
pCmd
->
pUdfInfo
);
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
int32_t
numOfFuncs
=
(
int32_t
)
taosArrayGetSize
(
pQueryInfo
->
pUdfInfo
);
SRetrieveFuncMsg
*
pRetrieveFuncMsg
=
(
SRetrieveFuncMsg
*
)
pMsg
;
pRetrieveFuncMsg
->
num
=
htonl
(
numOfFuncs
);
pMsg
+=
sizeof
(
SRetrieveFuncMsg
);
for
(
int32_t
i
=
0
;
i
<
numOfFuncs
;
++
i
)
{
SUdfInfo
*
pUdf
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
i
);
SUdfInfo
*
pUdf
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
i
);
STR_TO_NET_VARSTR
(
pMsg
,
pUdf
->
name
);
pMsg
+=
varDataNetTLen
(
pMsg
);
}
...
...
@@ -2125,15 +2126,17 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int
tscProcessRetrieveFuncRsp
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SUdfFuncMsg
*
pFuncMsg
=
(
SUdfFuncMsg
*
)
pSql
->
res
.
pRsp
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
pFuncMsg
->
num
=
htonl
(
pFuncMsg
->
num
);
assert
(
pFuncMsg
->
num
==
taosArrayGetSize
(
p
Cmd
->
pUdfInfo
));
assert
(
pFuncMsg
->
num
==
taosArrayGetSize
(
p
QueryInfo
->
pUdfInfo
));
char
*
pMsg
=
pFuncMsg
->
content
;
for
(
int32_t
i
=
0
;
i
<
pFuncMsg
->
num
;
++
i
)
{
SFunctionInfoMsg
*
pFunc
=
(
SFunctionInfoMsg
*
)
pMsg
;
for
(
int32_t
j
=
0
;
j
<
pFuncMsg
->
num
;
++
j
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
j
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
j
);
if
(
strcmp
(
pUdfInfo
->
name
,
pFunc
->
name
)
!=
0
)
{
continue
;
}
...
...
@@ -2161,11 +2164,13 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
return
pSql
->
res
.
code
;
}
SQueryInfo
*
parQueryInfo
=
tscGetQueryInfo
(
&
parent
->
cmd
,
parent
->
cmd
.
clauseIndex
);
assert
(
parent
->
signature
==
parent
&&
(
int64_t
)
pSql
->
param
==
parent
->
self
);
taosArrayDestroy
(
par
ent
->
cmd
.
pUdfInfo
);
taosArrayDestroy
(
par
QueryInfo
->
pUdfInfo
);
par
ent
->
cmd
.
pUdfInfo
=
pCmd
->
pUdfInfo
;
// assigned to parent sql obj.
p
Cmd
->
pUdfInfo
=
NULL
;
par
QueryInfo
->
pUdfInfo
=
pQueryInfo
->
pUdfInfo
;
// assigned to parent sql obj.
p
QueryInfo
->
pUdfInfo
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2479,7 +2484,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr
(
pRes
,
pQueryInfo
);
}
else
if
((
UTIL_TABLE_IS_CHILD_TABLE
(
pTableMetaInfo
)
||
UTIL_TABLE_IS_NORMAL_TABLE
(
pTableMetaInfo
))
&&
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_SUBQUERY
))
{
tscSetResRawPtr
(
pRes
,
pQueryInfo
);
}
else
if
(
tscNonOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
)
&&
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_QUERY
)
&&
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
))
{
}
else
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_QUERY
)
&&
!
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
))
{
tscSetResRawPtr
(
pRes
,
pQueryInfo
);
}
...
...
@@ -2607,7 +2612,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
return
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
}
int32_t
tscGetUdfFromNode
(
SSqlObj
*
pSql
)
{
int32_t
tscGetUdfFromNode
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
)
{
SSqlObj
*
pNew
=
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
NULL
==
pNew
)
{
tscError
(
"%p malloc failed for new sqlobj to get user-defined functions"
,
pSql
);
...
...
@@ -2618,14 +2623,24 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql) {
pNew
->
signature
=
pNew
;
pNew
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_FUNC
;
pNew
->
cmd
.
pUdfInfo
=
taosArrayInit
(
4
,
sizeof
(
SUdfInfo
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pSql
->
cmd
.
pUdfInfo
);
++
i
)
{
if
(
tscAddQueryInfo
(
&
pNew
->
cmd
)
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"%p malloc failed for new queryinfo"
,
pSql
);
tscFreeSqlObj
(
pNew
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfo
(
&
pNew
->
cmd
,
0
);
pNewQueryInfo
->
pUdfInfo
=
taosArrayInit
(
4
,
sizeof
(
SUdfInfo
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pQueryInfo
->
pUdfInfo
);
++
i
)
{
SUdfInfo
info
=
{
0
};
SUdfInfo
*
p1
=
taosArrayGet
(
p
Sql
->
cmd
.
pUdfInfo
,
i
);
SUdfInfo
*
p1
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
i
);
info
=
*
p1
;
info
.
name
=
strdup
(
p1
->
name
);
taosArrayPush
(
pNew
->
cmd
.
pUdfInfo
,
&
info
);
taosArrayPush
(
pNew
QueryInfo
->
pUdfInfo
,
&
info
);
}
pNew
->
cmd
.
active
=
pNewQueryInfo
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
+
pSql
->
cmd
.
payloadLen
))
{
tscError
(
"%p malloc failed for payload to get table meta"
,
pSql
);
...
...
src/client/src/tscSubquery.c
浏览文件 @
dcd3c208
...
...
@@ -615,7 +615,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if
((
pExpr
->
base
.
colInfo
.
colId
!=
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
||
(
funcId
!=
TSDB_FUNC_TS
&&
funcId
!=
TSDB_FUNC_TS_DUMMY
&&
funcId
!=
TSDB_FUNC_PRJ
))
{
int16_t
functionId
=
tscIsProjectionQuery
(
&
pNew
->
cmd
,
pQueryInfo
)
?
TSDB_FUNC_PRJ
:
TSDB_FUNC_TS
;
int16_t
functionId
=
tscIsProjectionQuery
(
pQueryInfo
)
?
TSDB_FUNC_PRJ
:
TSDB_FUNC_TS
;
tscAddFuncInSelectClause
(
pQueryInfo
,
0
,
functionId
,
&
index
,
s
,
TSDB_COL_NORMAL
);
tscPrintSelNodeList
(
pNew
,
0
);
...
...
@@ -636,7 +636,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
if
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
))
{
assert
(
pTableMetaInfo
->
pVgroupTables
!=
NULL
);
if
(
tscNonOrderedProjectionQueryOnSTable
(
&
pNew
->
cmd
,
pQueryInfo
,
0
))
{
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
SArray
*
p
=
buildVgroupTableByResult
(
pQueryInfo
,
pTableMetaInfo
->
pVgroupTables
);
tscFreeVgroupTableInfo
(
pTableMetaInfo
->
pVgroupTables
);
pTableMetaInfo
->
pVgroupTables
=
p
;
...
...
@@ -1429,7 +1429,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
}
SSubqueryState
*
pState
=
&
pParentSql
->
subState
;
if
(
tscNonOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
)
&&
numOfRows
==
0
)
{
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
numOfRows
==
0
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
...
...
@@ -1561,7 +1561,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
}
SQueryInfo
*
p
=
tscGetQueryInfo
(
&
pSub
->
cmd
,
0
);
orderedPrjQuery
=
tscNonOrderedProjectionQueryOnSTable
(
&
pSub
->
cmd
,
p
,
0
);
orderedPrjQuery
=
tscNonOrderedProjectionQueryOnSTable
(
p
,
0
);
if
(
orderedPrjQuery
)
{
break
;
}
...
...
@@ -1586,7 +1586,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSub
->
cmd
,
0
);
if
(
tscNonOrderedProjectionQueryOnSTable
(
&
pSub
->
cmd
,
pQueryInfo
,
0
)
&&
pSub
->
res
.
row
>=
pSub
->
res
.
numOfRows
&&
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
pSub
->
res
.
row
>=
pSub
->
res
.
numOfRows
&&
pSub
->
res
.
completed
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
...
...
@@ -1788,7 +1788,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
}
// In case of consequence query from other vnode, do not wait for other query response here.
if
(
!
(
pTableMetaInfo
->
vgroupIndex
>
0
&&
tscNonOrderedProjectionQueryOnSTable
(
&
pSql
->
cmd
,
pQueryInfo
,
0
)))
{
if
(
!
(
pTableMetaInfo
->
vgroupIndex
>
0
&&
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)))
{
if
(
!
subAndCheckDone
(
pSql
,
pParentSql
,
pSupporter
->
subqueryIndex
))
{
return
;
}
...
...
@@ -1800,7 +1800,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if
(
pTableMetaInfo
->
vgroupIndex
>
0
&&
tscNonOrderedProjectionQueryOnSTable
(
&
pSql
->
cmd
,
pQueryInfo
,
0
))
{
if
(
pTableMetaInfo
->
vgroupIndex
>
0
&&
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
pSql
->
fp
=
joinRetrieveFinalResCallback
;
// continue retrieve data
pSql
->
cmd
.
command
=
TSDB_SQL_FETCH
;
...
...
@@ -2773,6 +2773,8 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// set the command flag must be after the semaphore been correctly set.
if
(
pParentSql
->
cmd
.
command
!=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
)
{
pParentSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_LOCALMERGE
;
SQueryInfo
*
pQueryInfo2
=
tscGetQueryInfo
(
&
pParentSql
->
cmd
,
pParentSql
->
cmd
.
clauseIndex
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
...
...
@@ -2781,7 +2783,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
int32_t
functionId
=
pCtx
->
functionId
;
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
ParentSql
->
cmd
.
pUdfInfo
,
-
1
*
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo2
->
pUdfInfo
,
-
1
*
functionId
-
1
);
code
=
initUdfInfo
(
pUdfInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pParentSql
->
res
.
code
=
code
;
...
...
@@ -2862,8 +2864,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug
(
"0x%"
PRIx64
" sub:%p retrieve numOfRows:%d totalNumOfRows:%"
PRIu64
" from ep:%s, orderOfSub:%d"
,
pParentSql
->
self
,
pSql
,
pRes
->
numOfRows
,
pState
->
numOfRetrievedRows
,
pSql
->
epSet
.
fqdn
[
pSql
->
epSet
.
inUse
],
idx
);
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
num
>
tsMaxNumOfOrderedResults
&&
tscIsProjectionQueryOnSTable
(
pCmd
,
pQueryInfo
,
0
))
{
if
(
num
>
tsMaxNumOfOrderedResults
&&
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
tscError
(
"%p sub:%p num of OrderedRes is too many, max allowed:%"
PRId32
" , current:%"
PRId64
,
pParentSql
,
pSql
,
tsMaxNumOfOrderedResults
,
num
);
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_TSC_SORTED_RES_TOO_MANY
);
...
...
src/client/src/tscUtil.c
浏览文件 @
dcd3c208
...
...
@@ -127,7 +127,7 @@ bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tab
}
// for ordered projection query, iterate all qualified vnodes sequentially
if
(
tscNonOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
tableIndex
))
{
if
(
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
tableIndex
))
{
return
false
;
}
...
...
@@ -138,7 +138,7 @@ bool tscIsTwoStageSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t tab
return
false
;
}
bool
tscIsProjectionQueryOnSTable
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
bool
tscIsProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
tableIndex
);
/*
...
...
@@ -156,7 +156,7 @@ bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t
int32_t
functionId
=
tscSqlExprGet
(
pQueryInfo
,
i
)
->
base
.
functionId
;
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
-
1
*
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
)
{
return
false
;
}
...
...
@@ -179,8 +179,8 @@ bool tscIsProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, int32_t
}
// not order by timestamp projection query on super table
bool
tscNonOrderedProjectionQueryOnSTable
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
if
(
!
tscIsProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
tableIndex
))
{
bool
tscNonOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
if
(
!
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
tableIndex
))
{
return
false
;
}
...
...
@@ -188,8 +188,8 @@ bool tscNonOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo,
return
pQueryInfo
->
order
.
orderColId
<
0
;
}
bool
tscOrderedProjectionQueryOnSTable
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
if
(
!
tscIsProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
tableIndex
))
{
bool
tscOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
if
(
!
tscIsProjectionQueryOnSTable
(
pQueryInfo
,
tableIndex
))
{
return
false
;
}
...
...
@@ -197,14 +197,14 @@ bool tscOrderedProjectionQueryOnSTable(SSqlCmd *pCmd, SQueryInfo* pQueryInfo, in
return
pQueryInfo
->
order
.
orderColId
>=
0
;
}
bool
tscIsProjectionQuery
(
S
SqlCmd
*
pCmd
,
S
QueryInfo
*
pQueryInfo
)
{
bool
tscIsProjectionQuery
(
SQueryInfo
*
pQueryInfo
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
int32_t
functionId
=
tscSqlExprGet
(
pQueryInfo
,
i
)
->
base
.
functionId
;
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
Cmd
->
pUdfInfo
,
-
1
*
functionId
-
1
);
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
p
QueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
)
{
return
false
;
}
...
...
@@ -249,7 +249,7 @@ bool tscIsSecondStageQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
}
if
(
tscIsProjectionQuery
(
p
Cmd
,
p
QueryInfo
))
{
if
(
tscIsProjectionQuery
(
pQueryInfo
))
{
return
false
;
}
...
...
@@ -422,6 +422,15 @@ bool isSimpleAggregate(SQueryInfo* pQueryInfo) {
}
int32_t
functionId
=
pExpr
->
base
.
functionId
;
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pQueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
)
{
return
true
;
}
continue
;
}
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
...
...
@@ -799,6 +808,12 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
tfree
(
pUp
);
}
if
(
pCmd
->
subCmd
)
{
pQueryInfo
->
pUdfInfo
=
taosArrayDestroy
(
pQueryInfo
->
pUdfInfo
);
}
else
{
pQueryInfo
->
pUdfInfo
=
tscDestroyUdfArrayList
(
pQueryInfo
->
pUdfInfo
);
}
freeQueryInfoImpl
(
pQueryInfo
);
clearAllTableMetaInfo
(
pQueryInfo
,
removeMeta
);
...
...
@@ -842,11 +857,6 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd
->
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pCmd
->
pTableBlockHashList
,
removeMeta
);
pCmd
->
pDataBlocks
=
tscDestroyBlockArrayList
(
pCmd
->
pDataBlocks
);
if
(
pCmd
->
subCmd
)
{
pCmd
->
pUdfInfo
=
taosArrayDestroy
(
pCmd
->
pUdfInfo
);
}
else
{
pCmd
->
pUdfInfo
=
tscDestroyUdfArrayList
(
pCmd
->
pUdfInfo
);
}
tscFreeQueryInfo
(
pCmd
,
removeMeta
);
}
...
...
@@ -2683,12 +2693,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
goto
_error
;
}
if
(
pCmd
->
pUdfInfo
)
{
pnCmd
->
pUdfInfo
=
taosArrayDup
(
pCmd
->
pUdfInfo
);
}
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfo
(
pnCmd
,
0
);
if
(
pQueryInfo
->
pUdfInfo
)
{
pNewQueryInfo
->
pUdfInfo
=
taosArrayDup
(
pQueryInfo
->
pUdfInfo
);
}
pNewQueryInfo
->
command
=
pQueryInfo
->
command
;
pnCmd
->
active
=
pNewQueryInfo
;
...
...
@@ -3096,7 +3107,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTableMetaInfo
->
pVgroupTables
);
}
return
tscNonOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
)
&&
return
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
(
!
tscHasReachLimitation
(
pQueryInfo
,
pRes
))
&&
(
pTableMetaInfo
->
vgroupIndex
<
numOfVgroups
-
1
);
}
...
...
@@ -3114,7 +3125,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
assert
(
pRes
->
numOfRows
==
0
&&
tscNonOrderedProjectionQueryOnSTable
(
p
Cmd
,
p
QueryInfo
,
0
)
&&
!
tscHasReachLimitation
(
pQueryInfo
,
pRes
));
assert
(
pRes
->
numOfRows
==
0
&&
tscNonOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
!
tscHasReachLimitation
(
pQueryInfo
,
pRes
));
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
int32_t
totalVgroups
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
...
...
@@ -3489,9 +3500,15 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
functionId
=
TSDB_FUNC_STDDEV
;
}
SUdfInfo
*
pUdfInfo
=
NULL
;
if
(
functionId
<
0
)
{
pUdfInfo
=
taosArrayGet
(
pQueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
}
int32_t
inter
=
0
;
getResultDataInfo
(
pExpr
->
base
.
colType
,
pExpr
->
base
.
colBytes
,
functionId
,
0
,
&
pse
->
resType
,
&
pse
->
resBytes
,
&
inter
,
0
,
false
,
NULL
);
&
pse
->
resBytes
,
&
inter
,
0
,
false
,
pUdfInfo
);
pse
->
colType
=
pse
->
resType
;
pse
->
colBytes
=
pse
->
resBytes
;
...
...
@@ -3558,8 +3575,14 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
functionId
=
TSDB_FUNC_STDDEV
;
}
SUdfInfo
*
pUdfInfo
=
NULL
;
if
(
functionId
<
0
)
{
pUdfInfo
=
taosArrayGet
(
pQueryInfo
->
pUdfInfo
,
-
1
*
functionId
-
1
);
}
getResultDataInfo
(
pExpr
->
base
.
colType
,
pExpr
->
base
.
colBytes
,
functionId
,
0
,
&
pse
->
resType
,
&
pse
->
resBytes
,
&
inter
,
0
,
false
,
NULL
);
0
,
false
,
pUdfInfo
);
}
}
...
...
@@ -3633,7 +3656,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr
->
fillType
=
pQueryInfo
->
fillType
;
pQueryAttr
->
groupbyColumn
=
tscGroupbyColumn
(
pQueryInfo
);
pQueryAttr
->
havingNum
=
pQueryInfo
->
havingFieldNum
;
pQueryAttr
->
pUdfInfo
=
pQueryInfo
->
pUdfInfo
;
if
(
pQueryInfo
->
order
.
order
==
TSDB_ORDER_ASC
)
{
// TODO refactor
pQueryAttr
->
window
=
pQueryInfo
->
window
;
}
else
{
...
...
src/query/inc/qExecutor.h
浏览文件 @
dcd3c208
...
...
@@ -236,6 +236,7 @@ typedef struct SQueryAttr {
SMemRef
memRef
;
STableGroupInfo
tableGroupInfo
;
// table <tid, last_key> list SArray<STableKeyInfo>
int32_t
vgId
;
SArray
*
pUdfInfo
;
// no need to free
}
SQueryAttr
;
typedef
SSDataBlock
*
(
*
__operator_fn_t
)(
void
*
param
,
bool
*
newgroup
);
...
...
@@ -495,6 +496,7 @@ typedef struct SMultiwayMergeInfo {
bool
hasPrev
;
bool
groupMix
;
SArray
*
udfInfo
;
}
SMultiwayMergeInfo
;
SOperatorInfo
*
createDataBlocksOptScanInfo
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
,
int32_t
reverseTime
);
...
...
@@ -515,7 +517,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createMultiwaySortOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
numOfRows
,
void
*
merger
,
bool
groupMix
);
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
);
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
);
SOperatorInfo
*
createSLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
merger
);
SOperatorInfo
*
createFilterOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
...
...
@@ -530,6 +532,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo
int32_t
getNumOfResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
finalizeQueryResult
(
SOperatorInfo
*
pOperator
,
SQLFunctionCtx
*
pCtx
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
updateOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
,
int32_t
numOfInputRows
);
void
clearOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
);
void
freeParam
(
SQueryParam
*
param
);
int32_t
convertQueryMsg
(
SQueryTableMsg
*
pQueryMsg
,
SQueryParam
*
param
);
...
...
@@ -571,4 +574,6 @@ void freeQueryAttr(SQueryAttr *pQuery);
int32_t
getMaximumIdleDurationSec
();
void
doInvokeUdf
(
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
#endif // TDENGINE_QEXECUTOR_H
src/query/src/qExecutor.c
浏览文件 @
dcd3c208
...
...
@@ -701,39 +701,72 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
return
num
;
}
static
void
doInvokeUdf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
)
{
void
doInvokeUdf
(
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
)
{
int32_t
output
=
0
;
SUdfInfo
*
pUdfInfo
=
pRuntimeEnv
->
pUdfInfo
;
if
(
pUdfInfo
&&
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_NORMAL
])
{
qDebug
(
"invoke udf function:%s,%p"
,
pUdfInfo
->
name
,
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_NORMAL
]);
if
(
pUdfInfo
->
isScript
)
{
(
*
(
scriptNormalFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_NORMAL
])(
pUdfInfo
->
pScriptCtx
,
(
char
*
)
pCtx
->
pInput
+
idx
*
pCtx
->
inputType
,
pCtx
->
inputType
,
pCtx
->
inputBytes
,
pCtx
->
size
,
pCtx
->
ptsList
,
pCtx
->
pOutput
,
(
char
*
)
pCtx
->
ptsOutputBuf
,
&
output
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
else
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
if
(
pUdfInfo
==
NULL
||
pUdfInfo
->
funcs
[
type
]
==
NULL
)
{
qError
(
"empty udf function, type:%d"
,
type
);
return
;
}
void
*
interBuf
=
(
void
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
qDebug
(
"invoke udf function:%s,%p"
,
pUdfInfo
->
name
,
pUdfInfo
->
funcs
[
type
]
);
(
*
(
udfNormalFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_NORMAL
])((
char
*
)
pCtx
->
pInput
+
idx
*
pCtx
->
inputType
,
pCtx
->
inputType
,
pCtx
->
inputBytes
,
pCtx
->
size
,
pCtx
->
ptsList
,
pCtx
->
pOutput
,
interBuf
,
(
char
*
)
pCtx
->
ptsOutputBuf
,
&
output
,
pCtx
->
outputType
,
pCtx
->
outputBytes
,
&
pUdfInfo
->
init
);
}
switch
(
type
)
{
case
TSDB_UDF_FUNC_NORMAL
:
if
(
pUdfInfo
->
isScript
)
{
(
*
(
scriptNormalFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_NORMAL
])(
pUdfInfo
->
pScriptCtx
,
(
char
*
)
pCtx
->
pInput
+
idx
*
pCtx
->
inputType
,
pCtx
->
inputType
,
pCtx
->
inputBytes
,
pCtx
->
size
,
pCtx
->
ptsList
,
pCtx
->
pOutput
,
(
char
*
)
pCtx
->
ptsOutputBuf
,
&
output
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
else
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
)
{
void
*
interBuf
=
(
void
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
(
*
(
udfNormalFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_NORMAL
])((
char
*
)
pCtx
->
pInput
+
idx
*
pCtx
->
inputType
,
pCtx
->
inputType
,
pCtx
->
inputBytes
,
pCtx
->
size
,
pCtx
->
ptsList
,
pCtx
->
pOutput
,
interBuf
,
(
char
*
)
pCtx
->
ptsOutputBuf
,
&
output
,
pCtx
->
outputType
,
pCtx
->
outputBytes
,
&
pUdfInfo
->
init
);
}
if
(
pUdfInfo
->
funcType
==
TSDB_UDF_TYPE_AGGREGATE
)
{
pCtx
->
resultInfo
->
numOfRes
=
output
;
}
else
{
pCtx
->
resultInfo
->
numOfRes
+=
output
;
}
if
(
pCtx
->
resultInfo
->
numOfRes
>
0
)
{
pCtx
->
resultInfo
->
hasResult
=
DATA_SET_FLAG
;
}
break
;
case
TSDB_UDF_FUNC_MERGE
:
(
*
(
udfMergeFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_MERGE
])(
pCtx
->
pInput
,
pCtx
->
size
,
pCtx
->
pOutput
,
&
output
,
&
pUdfInfo
->
init
);
// set the output value exist
pCtx
->
resultInfo
->
numOfRes
=
output
;
}
else
{
pCtx
->
resultInfo
->
numOfRes
+=
output
;
}
if
(
output
>
0
)
{
pCtx
->
resultInfo
->
hasResult
=
DATA_SET_FLAG
;
}
if
(
pCtx
->
resultInfo
->
numOfRes
>
0
)
{
pCtx
->
resultInfo
->
hasResult
=
DATA_SET_FLAG
;
}
break
;
return
;
case
TSDB_UDF_FUNC_FINALIZE
:
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
void
*
interBuf
=
(
void
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pUdfInfo
->
isScript
)
{
(
*
(
scriptFinalizeFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])(
pUdfInfo
->
pScriptCtx
,
pCtx
->
pOutput
,
&
output
);
}
else
{
(
*
(
udfFinalizeFunc
)
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])(
pCtx
->
pOutput
,
interBuf
,
&
output
,
&
pUdfInfo
->
init
);
}
// set the output value exist
pCtx
->
resultInfo
->
numOfRes
=
output
;
if
(
output
>
0
)
{
pCtx
->
resultInfo
->
hasResult
=
DATA_SET_FLAG
;
}
break
;
}
}
qError
(
"empty udf function"
);
return
;
}
...
...
@@ -768,7 +801,8 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
// aAggs[functionId].xFunction(&pCtx[k]);
if
(
functionId
<
0
)
{
// load the script and exec, pRuntimeEnv->pUdfInfo
doInvokeUdf
(
pRuntimeEnv
,
&
pCtx
[
k
],
0
);
SUdfInfo
*
pUdfInfo
=
pRuntimeEnv
->
pUdfInfo
;
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
k
],
0
,
TSDB_UDF_FUNC_NORMAL
);
}
else
{
aAggs
[
functionId
].
xFunction
(
&
pCtx
[
k
]);
}
...
...
@@ -982,6 +1016,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
pCtx
[
i
].
pInput
=
p
->
pData
;
assert
(
p
->
info
.
colId
==
pColIndex
->
colId
&&
pCtx
[
i
].
inputType
==
p
->
info
.
type
);
if
(
pCtx
[
i
].
functionId
<
0
)
{
SColumnInfoData
*
tsInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
pCtx
[
i
].
ptsList
=
(
int64_t
*
)
tsInfo
->
pData
;
continue
;
}
uint32_t
status
=
aAggs
[
pCtx
[
i
].
functionId
].
status
;
if
((
status
&
(
TSDB_FUNCSTATE_SELECTIVITY
|
TSDB_FUNCSTATE_NEED_TS
))
!=
0
)
{
SColumnInfoData
*
tsInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
...
...
@@ -1011,7 +1052,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
pCtx
[
k
].
startTs
=
startTs
;
// this can be set during create the struct
// aAggs[functionId].xFunction(&pCtx[k]);
if
(
functionId
<
0
)
{
doInvokeUdf
(
pRuntimeEnv
,
&
pCtx
[
k
],
0
);
SUdfInfo
*
pUdfInfo
=
pRuntimeEnv
->
pUdfInfo
;
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
k
],
0
,
TSDB_UDF_FUNC_NORMAL
);
}
else
{
aAggs
[
functionId
].
xFunction
(
&
pCtx
[
k
]);
}
...
...
@@ -1033,8 +1075,9 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC
pCtx
[
k
].
startTs
=
pQueryAttr
->
window
.
skey
;
if
(
pCtx
[
k
].
functionId
<
0
)
{
// load the script and exec
doInvokeUdf
(
pRuntimeEnv
,
&
pCtx
[
k
],
0
);
// load the script and exec
SUdfInfo
*
pUdfInfo
=
pRuntimeEnv
->
pUdfInfo
;
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
k
],
0
,
TSDB_UDF_FUNC_NORMAL
);
}
else
{
aAggs
[
pCtx
[
k
].
functionId
].
xFunction
(
&
pCtx
[
k
]);
}
...
...
@@ -1359,7 +1402,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
pInfo
->
binfo
.
pCtx
[
k
].
size
=
1
;
int32_t
functionId
=
pInfo
->
binfo
.
pCtx
[
k
].
functionId
;
if
(
functionId
<
0
)
{
doInvokeUdf
(
pRuntimeEnv
,
&
pInfo
->
binfo
.
pCtx
[
k
],
j
);
SUdfInfo
*
pUdfInfo
=
pRuntimeEnv
->
pUdfInfo
;
doInvokeUdf
(
pUdfInfo
,
&
pInfo
->
binfo
.
pCtx
[
k
],
j
,
TSDB_UDF_FUNC_NORMAL
);
}
else
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pInfo
->
binfo
.
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunctionF
(
&
pInfo
->
binfo
.
pCtx
[
k
],
j
);
}
...
...
@@ -1848,7 +1892,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case
OP_GlobalAggregate
:
{
pRuntimeEnv
->
proot
=
createGlobalAggregateOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQueryAttr
->
pExpr3
,
pQueryAttr
->
numOfExpr3
,
merger
);
pQueryAttr
->
numOfExpr3
,
merger
,
pQueryAttr
->
pUdfInfo
);
break
;
}
...
...
@@ -3154,6 +3198,21 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
}
}
void
clearOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
)
{
SSDataBlock
*
pDataBlock
=
pBInfo
->
pRes
;
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
int32_t
functionId
=
pBInfo
->
pCtx
[
i
].
functionId
;
if
(
functionId
<
0
)
{
memset
(
pBInfo
->
pCtx
[
i
].
pOutput
,
0
,
pColInfo
->
info
.
bytes
*
(
*
bufCapacity
));
}
}
}
void
initCtxOutputBuffer
(
SQLFunctionCtx
*
pCtx
,
int32_t
size
)
{
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
...
...
@@ -3221,23 +3280,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
if
(
pCtx
[
j
].
functionId
<
0
)
{
int32_t
output
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
void
*
interBuf
=
(
void
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pRuntimeEnv
->
pUdfInfo
&&
pRuntimeEnv
->
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])
{
if
(
pRuntimeEnv
->
pUdfInfo
->
isScript
)
{
(
*
(
scriptFinalizeFunc
)
pRuntimeEnv
->
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])(
pRuntimeEnv
->
pUdfInfo
->
pScriptCtx
,
pCtx
[
j
].
pOutput
,
&
output
);
}
else
{
(
*
(
udfFinalizeFunc
)
pRuntimeEnv
->
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])(
pCtx
[
j
].
pOutput
,
interBuf
,
&
output
,
&
pRuntimeEnv
->
pUdfInfo
->
init
);
}
}
// set the output value exist
pCtx
[
j
].
resultInfo
->
numOfRes
=
output
;
if
(
output
>
0
)
{
pCtx
[
j
].
resultInfo
->
hasResult
=
DATA_SET_FLAG
;
}
doInvokeUdf
(
pRuntimeEnv
->
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
}
else
{
aAggs
[
pCtx
[
j
].
functionId
].
xFinalize
(
&
pCtx
[
j
]);
}
...
...
@@ -3254,23 +3297,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
}
else
{
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
if
(
pCtx
[
j
].
functionId
<
0
)
{
int32_t
output
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
void
*
interBuf
=
(
void
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pRuntimeEnv
->
pUdfInfo
&&
pRuntimeEnv
->
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])
{
if
(
pRuntimeEnv
->
pUdfInfo
->
isScript
)
{
(
*
(
scriptFinalizeFunc
)
pRuntimeEnv
->
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])(
pRuntimeEnv
->
pUdfInfo
->
pScriptCtx
,
pCtx
[
j
].
pOutput
,
&
output
);
}
else
{
(
*
(
udfFinalizeFunc
)
pRuntimeEnv
->
pUdfInfo
->
funcs
[
TSDB_UDF_FUNC_FINALIZE
])(
pCtx
[
j
].
pOutput
,
interBuf
,
&
output
,
&
pRuntimeEnv
->
pUdfInfo
->
init
);
}
}
// set the output value exist
pCtx
[
j
].
resultInfo
->
numOfRes
=
output
;
if
(
output
>
0
)
{
pCtx
[
j
].
resultInfo
->
hasResult
=
DATA_SET_FLAG
;
}
doInvokeUdf
(
pRuntimeEnv
->
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
}
else
{
aAggs
[
pCtx
[
j
].
functionId
].
xFinalize
(
&
pCtx
[
j
]);
}
...
...
@@ -4642,7 +4669,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
)
{
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
)
{
SMultiwayMergeInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SMultiwayMergeInfo
));
pInfo
->
resultRowFactor
=
...
...
@@ -4653,6 +4680,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pInfo
->
pMerge
=
param
;
pInfo
->
bufCapacity
=
4096
;
pInfo
->
udfInfo
=
pUdfInfo
;
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pInfo
->
bufCapacity
*
pInfo
->
resultRowFactor
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
...
...
@@ -4919,7 +4947,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
}
// Return result of the previous group in the firstly.
if
(
newgroup
&&
pRes
->
info
.
rows
>
0
)
{
if
(
*
newgroup
&&
pRes
->
info
.
rows
>
0
)
{
pArithInfo
->
existDataBlock
=
pBlock
;
clearNumOfRes
(
pInfo
->
pCtx
,
pOperator
->
numOfOutput
);
return
pInfo
->
pRes
;
...
...
src/query/src/qPlan.c
浏览文件 @
dcd3c208
...
...
@@ -138,6 +138,12 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
else
{
// diff/add/multiply/subtract/division
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
//arithmetic on scalar function
if
(
pQueryAttr
->
pExpr2
!=
NULL
)
{
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
}
}
if
(
pQueryAttr
->
limit
.
limit
>
0
||
pQueryAttr
->
limit
.
offset
>
0
)
{
...
...
tests/script/general/parser/udf_dll_stable.sim
浏览文件 @
dcd3c208
...
...
@@ -678,7 +678,6 @@ if $data61 != 8 then
return -1
endi
sql select add_one(f1)+1 from stb1;
if $rows != 17 then
return -1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录