Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
801c2cd7
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
801c2cd7
编写于
5月 16, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into fix/hzcheng_3.0
上级
74f81375
a3f5e472
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
369 addition
and
68 deletion
+369
-68
include/common/tmsg.h
include/common/tmsg.h
+1
-1
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+6
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+39
-0
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+148
-1
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+5
-1
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+169
-63
tests/script/tsim/query/udf.sim
tests/script/tsim/query/udf.sim
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
801c2cd7
...
...
@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7
enum
{
CONN_TYPE__QUERY
=
1
,
CONN_TYPE__TMQ
,
CONN_TYPE__MAX
};
enum
{
CONN_TYPE__QUERY
=
1
,
CONN_TYPE__TMQ
,
CONN_TYPE__
UDFD
,
CONN_TYPE__
MAX
};
enum
{
HEARTBEAT_KEY_USER_AUTHINFO
=
1
,
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
801c2cd7
...
...
@@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
bool
getSampleFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
sampleFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
sampleFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sampleFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool
getTailFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
tailFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
tailFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
tailFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSelectivityFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
801c2cd7
...
...
@@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateTail
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
int32_t
numOfParams
=
LIST_LENGTH
(
pFunc
->
pParameterList
);
if
(
2
!=
numOfParams
&&
3
!=
numOfParams
)
{
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
SNode
*
pPara
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
QUERY_NODE_COLUMN
!=
nodeType
(
pPara
))
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"The input parameter of TAIL function can only be column"
);
}
for
(
int32_t
i
=
1
;
i
<
numOfParams
;
++
i
)
{
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
i
))
->
resType
.
type
;
if
(
!
IS_INTEGER_TYPE
(
paraType
))
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
}
SExprNode
*
pCol
=
(
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
uint8_t
colType
=
pCol
->
resType
.
type
;
if
(
IS_VAR_DATA_TYPE
(
colType
))
{
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
pCol
->
resType
.
bytes
,
.
type
=
colType
};
}
else
{
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
colType
].
bytes
,
.
type
=
colType
};
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateLastRow
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// todo
return
TSDB_CODE_SUCCESS
;
...
...
@@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
processFunc
=
sampleFunction
,
.
finalizeFunc
=
NULL
},
{
.
name
=
"tail"
,
.
type
=
FUNCTION_TYPE_TAIL
,
.
classification
=
FUNC_MGT_NONSTANDARD_SQL_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateTail
,
.
getEnvFunc
=
getTailFuncEnv
,
.
initFunc
=
tailFunctionSetup
,
.
processFunc
=
tailFunction
,
.
finalizeFunc
=
tailFinalize
},
{
.
name
=
"abs"
,
.
type
=
FUNCTION_TYPE_ABS
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
801c2cd7
...
...
@@ -18,12 +18,15 @@
#include "function.h"
#include "querynodes.h"
#include "taggfunction.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tpercentile.h"
#define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000
#define SAMPLE_MAX_POINTS_NUM 1000
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
typedef
struct
SSumRes
{
union
{
...
...
@@ -161,6 +164,21 @@ typedef struct SSampleInfo {
int64_t
*
timestamp
;
}
SSampleInfo
;
typedef
struct
STailItem
{
int64_t
timestamp
;
bool
isNull
;
char
data
[];
}
STailItem
;
typedef
struct
STailInfo
{
int32_t
numOfPoints
;
int32_t
numAdded
;
int32_t
offset
;
uint8_t
colType
;
int16_t
colBytes
;
STailItem
**
pItems
;
}
STailInfo
;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
...
...
@@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t
startOffset
=
pCtx
->
offset
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
if
(
colDataIsNull_
f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
colDataIsNull_
s
(
pInputCol
,
i
))
{
//colDataAppendNULL(pOutput, i);
continue
;
}
...
...
@@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
//
// return pResInfo->numOfRes;
//}
bool
getTailFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
SValueNode
*
pVal
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
int32_t
numOfPoints
=
pVal
->
datum
.
i
;
pEnv
->
calcMemSize
=
sizeof
(
STailInfo
)
+
numOfPoints
*
(
POINTER_BYTES
+
sizeof
(
STailItem
)
+
pCol
->
node
.
resType
.
bytes
);
return
true
;
}
bool
tailFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
STailInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
pInfo
->
numAdded
=
0
;
pInfo
->
numOfPoints
=
pCtx
->
param
[
1
].
param
.
i
;
if
(
pCtx
->
numOfParams
==
4
)
{
pInfo
->
offset
=
pCtx
->
param
[
2
].
param
.
i
;
}
else
{
pInfo
->
offset
=
0
;
}
pInfo
->
colType
=
pCtx
->
resDataInfo
.
type
;
pInfo
->
colBytes
=
pCtx
->
resDataInfo
.
bytes
;
if
((
pInfo
->
numOfPoints
<
1
||
pInfo
->
numOfPoints
>
TAIL_MAX_POINTS_NUM
)
||
(
pInfo
->
numOfPoints
<
0
||
pInfo
->
numOfPoints
>
TAIL_MAX_OFFSET
))
{
return
false
;
}
pInfo
->
pItems
=
(
STailItem
**
)((
char
*
)
pInfo
+
sizeof
(
STailInfo
));
char
*
pItem
=
(
char
*
)
pInfo
->
pItems
+
pInfo
->
numOfPoints
*
POINTER_BYTES
;
size_t
unitSize
=
sizeof
(
STailItem
)
+
pInfo
->
colBytes
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfPoints
;
++
i
)
{
pInfo
->
pItems
[
i
]
=
(
STailItem
*
)(
pItem
+
i
*
unitSize
);
pInfo
->
pItems
[
i
]
->
isNull
=
false
;
}
return
true
;
}
static
void
tailAssignResult
(
STailItem
*
pItem
,
char
*
data
,
int32_t
colBytes
,
TSKEY
ts
,
bool
isNull
)
{
pItem
->
timestamp
=
ts
;
if
(
isNull
)
{
pItem
->
isNull
=
true
;
}
else
{
memcpy
(
pItem
->
data
,
data
,
colBytes
);
}
}
static
int32_t
tailCompFn
(
const
void
*
p1
,
const
void
*
p2
,
const
void
*
param
)
{
STailItem
*
d1
=
*
(
STailItem
**
)
p1
;
STailItem
*
d2
=
*
(
STailItem
**
)
p2
;
return
compareInt64Val
(
&
d1
->
timestamp
,
&
d2
->
timestamp
);
}
static
void
doTailAdd
(
STailInfo
*
pInfo
,
char
*
data
,
TSKEY
ts
,
bool
isNull
)
{
STailItem
**
pList
=
pInfo
->
pItems
;
if
(
pInfo
->
numAdded
<
pInfo
->
numOfPoints
)
{
tailAssignResult
(
pList
[
pInfo
->
numAdded
],
data
,
pInfo
->
colBytes
,
ts
,
isNull
);
taosheapsort
((
void
*
)
pList
,
sizeof
(
STailItem
**
),
pInfo
->
numAdded
+
1
,
NULL
,
tailCompFn
,
0
);
pInfo
->
numAdded
++
;
}
else
if
(
pList
[
0
]
->
timestamp
<
ts
)
{
tailAssignResult
(
pList
[
0
],
data
,
pInfo
->
colBytes
,
ts
,
isNull
);
taosheapadjust
((
void
*
)
pList
,
sizeof
(
STailItem
**
),
0
,
pInfo
->
numOfPoints
-
1
,
NULL
,
tailCompFn
,
NULL
,
0
);
}
}
int32_t
tailFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STailInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
TSKEY
*
tsList
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pTsOutput
=
pCtx
->
pTsOutput
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
int32_t
startOffset
=
pCtx
->
offset
;
if
(
pInfo
->
offset
>=
pInput
->
numOfRows
)
{
return
0
;
}
else
{
pInfo
->
numOfPoints
=
MIN
(
pInfo
->
numOfPoints
,
pInput
->
numOfRows
-
pInfo
->
offset
);
}
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
-
pInfo
->
offset
;
i
+=
1
)
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
doTailAdd
(
pInfo
,
data
,
tsList
[
i
],
colDataIsNull_s
(
pInputCol
,
i
));
}
taosqsort
(
pInfo
->
pItems
,
pInfo
->
numOfPoints
,
POINTER_BYTES
,
NULL
,
tailCompFn
);
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfPoints
;
++
i
)
{
int32_t
pos
=
startOffset
+
i
;
STailItem
*
pItem
=
pInfo
->
pItems
[
i
];
if
(
pItem
->
isNull
)
{
colDataAppendNULL
(
pOutput
,
pos
);
}
else
{
colDataAppend
(
pOutput
,
pos
,
pItem
->
data
,
false
);
}
}
return
pInfo
->
numOfPoints
;
}
int32_t
tailFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
STailInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
pEntryInfo
->
complete
=
true
;
int32_t
type
=
pCtx
->
input
.
pData
[
0
]
->
info
.
type
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
// todo assign the tag value and the corresponding row data
int32_t
currentRow
=
pBlock
->
info
.
rows
;
for
(
int32_t
i
=
0
;
i
<
pEntryInfo
->
numOfRes
;
++
i
)
{
STailItem
*
pItem
=
pInfo
->
pItems
[
i
];
colDataAppend
(
pCol
,
currentRow
,
pItem
->
data
,
false
);
//setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
currentRow
+=
1
;
}
return
pEntryInfo
->
numOfRes
;
}
source/libs/function/src/tudf.c
浏览文件 @
801c2cd7
...
...
@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub {
char
udfName
[
TSDB_FUNC_NAME_LEN
];
UdfcFuncHandle
handle
;
int32_t
refCount
;
int64_t
lastRefTime
;
}
SUdfcFuncStub
;
typedef
struct
SUdfcProxy
{
...
...
@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
uv_mutex_unlock
(
&
gUdfdProxy
.
udfStubsMutex
);
*
pHandle
=
foundStub
->
handle
;
++
foundStub
->
refCount
;
foundStub
->
lastRefTime
=
taosGetTimestampUs
();
return
0
;
}
*
pHandle
=
NULL
;
...
...
@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
strcpy
(
stub
.
udfName
,
udfName
);
stub
.
handle
=
*
pHandle
;
++
stub
.
refCount
;
stub
.
lastRefTime
=
taosGetTimestampUs
();
taosArrayPush
(
gUdfdProxy
.
udfStubs
,
&
stub
);
taosArraySort
(
gUdfdProxy
.
udfStubs
,
compareUdfcFuncSub
);
}
else
{
...
...
@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() {
fnInfo
(
"tear down udf. udf name: %s, handle: %p"
,
stub
->
udfName
,
stub
->
handle
);
doTeardownUdf
(
stub
->
handle
);
}
else
{
fnInfo
(
"udf still in use. udf name: %s, ref count: %d, handle: %p"
,
stub
->
udfName
,
stub
->
refCount
,
stub
->
handle
);
fnInfo
(
"udf still in use. udf name: %s, ref count: %d, last ref time: %"
PRId64
", handle: %p"
,
stub
->
udfName
,
stub
->
refCount
,
stub
->
lastRefTime
,
stub
->
handle
);
taosArrayPush
(
udfStubs
,
stub
);
}
++
i
;
...
...
source/libs/function/src/udfd.c
浏览文件 @
801c2cd7
...
...
@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop
(
global
.
loop
);
}
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
;
}
typedef
enum
EUdfdRpcReqRspType
{
UDFD_RPC_MNODE_CONNECT
=
0
,
UDFD_RPC_RETRIVE_FUNC
,
}
EUdfdRpcReqRspType
;
typedef
struct
SUdfdRpcSendRecvInfo
{
EUdfdRpcReqRspType
rpcType
;
int32_t
code
;
void
*
param
;
uv_sem_t
resultSem
;
}
SUdfdRpcSendRecvInfo
;
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SUdfdRpcSendRecvInfo
*
msgInfo
=
(
SUdfdRpcSendRecvInfo
*
)
pMsg
->
ahandle
;
ASSERT
(
pMsg
->
ahandle
!=
NULL
);
if
(
pEpSet
)
{
if
(
!
isEpsetEqual
(
&
global
.
mgmtEp
.
epSet
,
pEpSet
))
{
updateEpSet_s
(
&
global
.
mgmtEp
,
pEpSet
);
}
}
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
fnError
(
"udfd rpc error. code: %s"
,
tstrerror
(
pMsg
->
code
));
msgInfo
->
code
=
pMsg
->
code
;
goto
_return
;
}
if
(
msgInfo
->
rpcType
==
UDFD_RPC_MNODE_CONNECT
)
{
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
connectRsp
);
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
msgInfo
->
code
=
TSDB_CODE_MND_APP_ERROR
;
goto
_return
;
}
if
(
connectRsp
.
dnodeNum
>
1
&&
!
isEpsetEqual
(
&
global
.
mgmtEp
.
epSet
,
&
connectRsp
.
epSet
))
{
updateEpSet_s
(
&
global
.
mgmtEp
,
&
connectRsp
.
epSet
);
}
msgInfo
->
code
=
0
;
}
else
if
(
msgInfo
->
rpcType
==
UDFD_RPC_RETRIVE_FUNC
)
{
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
retrieveRsp
);
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
funcType
;
udf
->
outputLen
=
pFuncInfo
->
outputLen
;
udf
->
bufSize
=
pFuncInfo
->
bufSize
;
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
"/tmp"
,
pFuncInfo
->
name
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
// TODO check for failure of flush to disk
taosWriteFile
(
file
,
pFuncInfo
->
pCode
,
pFuncInfo
->
codeSize
);
taosCloseFile
(
&
file
);
strncpy
(
udf
->
path
,
path
,
strlen
(
path
));
taosArrayDestroy
(
retrieveRsp
.
pFuncInfos
);
msgInfo
->
code
=
0
;
}
_return:
rpcFreeCont
(
pMsg
->
pCont
);
uv_sem_post
(
&
msgInfo
->
resultSem
);
return
;
}
int32_t
udfdConnectToMNode
()
{
SConnectReq
connReq
=
{
0
};
connReq
.
connType
=
CONN_TYPE__UDFD
;
tstrncpy
(
connReq
.
app
,
"udfd"
,
sizeof
(
connReq
.
app
));
tstrncpy
(
connReq
.
user
,
TSDB_DEFAULT_USER
,
sizeof
(
connReq
.
user
));
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
TSDB_DEFAULT_PASS
),
strlen
(
TSDB_DEFAULT_PASS
),
pass
);
tstrncpy
(
connReq
.
passwd
,
pass
,
sizeof
(
connReq
.
passwd
));
connReq
.
pid
=
htonl
(
taosGetPId
());
connReq
.
startTime
=
htobe64
(
taosGetTimestampMs
());
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSConnectReq
(
pReq
,
contLen
,
&
connReq
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_MNODE_CONNECT
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
msgType
=
TDMT_MND_CONNECT
;
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
ahandle
=
msgInfo
;
rpcSendRequest
(
global
.
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
NULL
);
uv_sem_wait
(
&
msgInfo
->
resultSem
);
int32_t
code
=
msgInfo
->
code
;
uv_sem_destroy
(
&
msgInfo
->
resultSem
);
taosMemoryFree
(
msgInfo
);
return
code
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
)
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
taosArrayPush
(
retrieveReq
.
pFuncNames
,
udfName
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
taosArrayDestroy
(
retrieveReq
.
pFuncNames
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_RETRIVE_FUNC
;
msgInfo
->
param
=
udf
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
msgType
=
TDMT_MND_RETRIEVE_FUNC
;
rpcMsg
.
ahandle
=
msgInfo
;
rpcSendRequest
(
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
NULL
);
uv_sem_wait
(
&
msgInfo
->
resultSem
);
uv_sem_destroy
(
&
msgInfo
->
resultSem
);
int32_t
code
=
msgInfo
->
code
;
taosMemoryFree
(
msgInfo
);
return
code
;
}
static
bool
udfdRpcRfp
(
int32_t
code
)
{
if
(
code
==
TSDB_CODE_RPC_REDIRECT
)
{
return
true
;
}
else
{
return
false
;
}
}
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
...
...
@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return
0
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
)
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
taosArrayPush
(
retrieveReq
.
pFuncNames
,
udfName
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
taosArrayDestroy
(
retrieveReq
.
pFuncNames
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
msgType
=
TDMT_MND_RETRIEVE_FUNC
;
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
&
rpcRsp
);
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
rpcRsp
.
pCont
,
rpcRsp
.
contLen
,
&
retrieveRsp
);
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
funcType
;
udf
->
outputLen
=
pFuncInfo
->
outputLen
;
udf
->
bufSize
=
pFuncInfo
->
bufSize
;
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
"/tmp"
,
udfName
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
// TODO check for failure of flush to disk
taosWriteFile
(
file
,
pFuncInfo
->
pCode
,
pFuncInfo
->
codeSize
);
taosCloseFile
(
&
file
);
strncpy
(
udf
->
path
,
path
,
strlen
(
path
));
taosArrayDestroy
(
retrieveRsp
.
pFuncInfos
);
rpcFreeCont
(
rpcRsp
.
pCont
);
return
0
;
}
int32_t
udfdOpenClientRpc
()
{
char
*
pass
=
"taosdata"
;
char
*
user
=
"root"
;
char
secretEncrypt
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)
pass
,
strlen
(
pass
),
secretEncrypt
);
SRpcInit
rpcInit
=
{
0
};
rpcInit
.
label
=
(
char
*
)
"UDFD"
;
rpcInit
.
label
=
"UDFD"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
udfdProcessRpcRsp
;
rpcInit
.
cfp
=
(
RpcCfp
)
udfdProcessRpcRsp
;
rpcInit
.
sessions
=
1024
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
30
*
1000
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
TSDB_DEFAULT_USER
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
parent
=
&
global
;
rpcInit
.
rfp
=
udfdRpcRfp
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
ckey
=
(
char
*
)
"key"
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
rpcInit
.
spi
=
1
;
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
TSDB_DEFAULT_PASS
),
strlen
(
TSDB_DEFAULT_PASS
),
pass
);
rpcInit
.
secret
=
pass
;
global
.
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
global
.
clientRpc
==
NULL
)
{
fnError
(
"failed to init dnode rpc client"
);
return
-
1
;
}
return
0
;
}
...
...
@@ -700,12 +800,6 @@ static int32_t udfdRun() {
global
.
udfsHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
uv_mutex_init
(
&
global
.
udfsMutex
);
// TOOD: client rpc to fetch udf function info from mnode
if
(
udfdOpenClientRpc
()
!=
0
)
{
fnError
(
"open rpc connection to mnode failure"
);
return
-
1
;
}
if
(
udfdUvInit
()
!=
0
)
{
fnError
(
"uv init failure"
);
return
-
2
;
...
...
@@ -717,7 +811,6 @@ static int32_t udfdRun() {
int
codeClose
=
uv_loop_close
(
global
.
loop
);
fnDebug
(
"uv loop close. result: %s"
,
uv_err_name
(
codeClose
));
removeListeningPipe
();
udfdCloseClientRpc
();
uv_mutex_destroy
(
&
global
.
udfsMutex
);
taosHashCleanup
(
global
.
udfsHash
);
return
0
;
...
...
@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) {
if
(
taosInitCfg
(
configDir
,
NULL
,
NULL
,
NULL
,
NULL
,
0
)
!=
0
)
{
fnError
(
"failed to start since read config error"
);
return
-
1
;
return
-
2
;
}
initEpSetFromCfg
(
tsFirst
,
tsSecond
,
&
global
.
mgmtEp
);
return
udfdRun
();
if
(
udfdOpenClientRpc
()
!=
0
)
{
fnError
(
"open rpc connection to mnode failure"
);
return
-
3
;
}
if
(
udfdConnectToMNode
()
!=
0
)
{
fnError
(
"failed to start since can not connect to mnode"
);
return
-
4
;
}
udfdRun
();
udfdCloseClientRpc
();
}
tests/script/tsim/query/udf.sim
浏览文件 @
801c2cd7
...
...
@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
sleep
2
000
sleep
1
000
sql connect
print ======== step1 udf
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录