Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d46c3935
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d46c3935
编写于
5月 17, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: mnode not ready for connect from udfd
上级
ee2f3e7e
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
145 addition
and
137 deletion
+145
-137
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+145
-137
未找到文件。
source/libs/function/src/udfd.c
浏览文件 @
d46c3935
...
...
@@ -86,17 +86,148 @@ typedef struct SUdf {
TUdfDestroyFunc
destroyFunc
;
}
SUdf
;
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: add private udf structure.
typedef
struct
SUdfcFuncHandle
{
SUdf
*
udf
;
}
SUdfcFuncHandle
;
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
);
typedef
enum
EUdfdRpcReqRspType
{
UDFD_RPC_MNODE_CONNECT
=
0
,
UDFD_RPC_RETRIVE_FUNC
,
}
EUdfdRpcReqRspType
;
typedef
struct
SUdfdRpcSendRecvInfo
{
EUdfdRpcReqRspType
rpcType
;
int32_t
code
;
void
*
param
;
uv_sem_t
resultSem
;
}
SUdfdRpcSendRecvInfo
;
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SUdfdRpcSendRecvInfo
*
msgInfo
=
(
SUdfdRpcSendRecvInfo
*
)
pMsg
->
ahandle
;
ASSERT
(
pMsg
->
ahandle
!=
NULL
);
if
(
pEpSet
)
{
if
(
!
isEpsetEqual
(
&
global
.
mgmtEp
.
epSet
,
pEpSet
))
{
updateEpSet_s
(
&
global
.
mgmtEp
,
pEpSet
);
}
}
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
fnError
(
"udfd rpc error. code: %s"
,
tstrerror
(
pMsg
->
code
));
msgInfo
->
code
=
pMsg
->
code
;
goto
_return
;
}
if
(
msgInfo
->
rpcType
==
UDFD_RPC_MNODE_CONNECT
)
{
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
connectRsp
);
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
msgInfo
->
code
=
TSDB_CODE_MND_APP_ERROR
;
goto
_return
;
}
if
(
connectRsp
.
dnodeNum
>
1
&&
!
isEpsetEqual
(
&
global
.
mgmtEp
.
epSet
,
&
connectRsp
.
epSet
))
{
updateEpSet_s
(
&
global
.
mgmtEp
,
&
connectRsp
.
epSet
);
}
msgInfo
->
code
=
0
;
}
else
if
(
msgInfo
->
rpcType
==
UDFD_RPC_RETRIVE_FUNC
)
{
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
retrieveRsp
);
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
funcType
;
udf
->
outputLen
=
pFuncInfo
->
outputLen
;
udf
->
bufSize
=
pFuncInfo
->
bufSize
;
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
"/tmp"
,
pFuncInfo
->
name
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
// TODO check for failure of flush to disk
taosWriteFile
(
file
,
pFuncInfo
->
pCode
,
pFuncInfo
->
codeSize
);
taosCloseFile
(
&
file
);
strncpy
(
udf
->
path
,
path
,
strlen
(
path
));
tFreeSFuncInfo
(
pFuncInfo
);
taosArrayDestroy
(
retrieveRsp
.
pFuncInfos
);
msgInfo
->
code
=
0
;
}
_return:
rpcFreeCont
(
pMsg
->
pCont
);
uv_sem_post
(
&
msgInfo
->
resultSem
);
return
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
)
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
taosArrayPush
(
retrieveReq
.
pFuncNames
,
udfName
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
taosArrayDestroy
(
retrieveReq
.
pFuncNames
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_RETRIVE_FUNC
;
msgInfo
->
param
=
udf
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
msgType
=
TDMT_MND_RETRIEVE_FUNC
;
rpcMsg
.
ahandle
=
msgInfo
;
rpcSendRequest
(
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
NULL
);
uv_sem_wait
(
&
msgInfo
->
resultSem
);
uv_sem_destroy
(
&
msgInfo
->
resultSem
);
int32_t
code
=
msgInfo
->
code
;
taosMemoryFree
(
msgInfo
);
return
code
;
}
int32_t
udfdConnectToMnode
()
{
SConnectReq
connReq
=
{
0
};
connReq
.
connType
=
CONN_TYPE__UDFD
;
tstrncpy
(
connReq
.
app
,
"udfd"
,
sizeof
(
connReq
.
app
));
tstrncpy
(
connReq
.
user
,
TSDB_DEFAULT_USER
,
sizeof
(
connReq
.
user
));
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
TSDB_DEFAULT_PASS
),
strlen
(
TSDB_DEFAULT_PASS
),
pass
);
tstrncpy
(
connReq
.
passwd
,
pass
,
sizeof
(
connReq
.
passwd
));
connReq
.
pid
=
htonl
(
taosGetPId
());
connReq
.
startTime
=
htobe64
(
taosGetTimestampMs
());
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSConnectReq
(
pReq
,
contLen
,
&
connReq
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_MNODE_CONNECT
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
msgType
=
TDMT_MND_CONNECT
;
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
ahandle
=
msgInfo
;
rpcSendRequest
(
global
.
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
NULL
);
uv_sem_wait
(
&
msgInfo
->
resultSem
);
int32_t
code
=
msgInfo
->
code
;
uv_sem_destroy
(
&
msgInfo
->
resultSem
);
taosMemoryFree
(
msgInfo
);
return
code
;
}
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
strcpy
(
udf
->
name
,
udfName
);
int32_t
err
=
0
;
err
=
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
udf
->
name
,
udf
);
if
(
err
!=
0
)
{
fnError
(
"can not retrieve udf from mnode. udf name %s"
,
udfName
);
...
...
@@ -515,140 +646,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop
(
global
.
loop
);
}
typedef
enum
EUdfdRpcReqRspType
{
UDFD_RPC_MNODE_CONNECT
=
0
,
UDFD_RPC_RETRIVE_FUNC
,
}
EUdfdRpcReqRspType
;
typedef
struct
SUdfdRpcSendRecvInfo
{
EUdfdRpcReqRspType
rpcType
;
int32_t
code
;
void
*
param
;
uv_sem_t
resultSem
;
}
SUdfdRpcSendRecvInfo
;
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SUdfdRpcSendRecvInfo
*
msgInfo
=
(
SUdfdRpcSendRecvInfo
*
)
pMsg
->
ahandle
;
ASSERT
(
pMsg
->
ahandle
!=
NULL
);
if
(
pEpSet
)
{
if
(
!
isEpsetEqual
(
&
global
.
mgmtEp
.
epSet
,
pEpSet
))
{
updateEpSet_s
(
&
global
.
mgmtEp
,
pEpSet
);
}
}
if
(
pMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
fnError
(
"udfd rpc error. code: %s"
,
tstrerror
(
pMsg
->
code
));
msgInfo
->
code
=
pMsg
->
code
;
goto
_return
;
}
if
(
msgInfo
->
rpcType
==
UDFD_RPC_MNODE_CONNECT
)
{
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
connectRsp
);
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
msgInfo
->
code
=
TSDB_CODE_MND_APP_ERROR
;
goto
_return
;
}
if
(
connectRsp
.
dnodeNum
>
1
&&
!
isEpsetEqual
(
&
global
.
mgmtEp
.
epSet
,
&
connectRsp
.
epSet
))
{
updateEpSet_s
(
&
global
.
mgmtEp
,
&
connectRsp
.
epSet
);
}
msgInfo
->
code
=
0
;
}
else
if
(
msgInfo
->
rpcType
==
UDFD_RPC_RETRIVE_FUNC
)
{
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
retrieveRsp
);
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
funcType
;
udf
->
outputLen
=
pFuncInfo
->
outputLen
;
udf
->
bufSize
=
pFuncInfo
->
bufSize
;
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
"/tmp"
,
pFuncInfo
->
name
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
// TODO check for failure of flush to disk
taosWriteFile
(
file
,
pFuncInfo
->
pCode
,
pFuncInfo
->
codeSize
);
taosCloseFile
(
&
file
);
strncpy
(
udf
->
path
,
path
,
strlen
(
path
));
tFreeSFuncInfo
(
pFuncInfo
);
taosArrayDestroy
(
retrieveRsp
.
pFuncInfos
);
msgInfo
->
code
=
0
;
}
_return:
rpcFreeCont
(
pMsg
->
pCont
);
uv_sem_post
(
&
msgInfo
->
resultSem
);
return
;
}
int32_t
udfdConnectToMNode
()
{
SConnectReq
connReq
=
{
0
};
connReq
.
connType
=
CONN_TYPE__UDFD
;
tstrncpy
(
connReq
.
app
,
"udfd"
,
sizeof
(
connReq
.
app
));
tstrncpy
(
connReq
.
user
,
TSDB_DEFAULT_USER
,
sizeof
(
connReq
.
user
));
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
TSDB_DEFAULT_PASS
),
strlen
(
TSDB_DEFAULT_PASS
),
pass
);
tstrncpy
(
connReq
.
passwd
,
pass
,
sizeof
(
connReq
.
passwd
));
connReq
.
pid
=
htonl
(
taosGetPId
());
connReq
.
startTime
=
htobe64
(
taosGetTimestampMs
());
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSConnectReq
(
pReq
,
contLen
,
&
connReq
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_MNODE_CONNECT
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
msgType
=
TDMT_MND_CONNECT
;
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
ahandle
=
msgInfo
;
rpcSendRequest
(
global
.
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
NULL
);
uv_sem_wait
(
&
msgInfo
->
resultSem
);
int32_t
code
=
msgInfo
->
code
;
uv_sem_destroy
(
&
msgInfo
->
resultSem
);
taosMemoryFree
(
msgInfo
);
return
code
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
)
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
taosArrayPush
(
retrieveReq
.
pFuncNames
,
udfName
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
taosArrayDestroy
(
retrieveReq
.
pFuncNames
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
msgInfo
->
rpcType
=
UDFD_RPC_RETRIVE_FUNC
;
msgInfo
->
param
=
udf
;
uv_sem_init
(
&
msgInfo
->
resultSem
,
0
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
contLen
;
rpcMsg
.
msgType
=
TDMT_MND_RETRIEVE_FUNC
;
rpcMsg
.
ahandle
=
msgInfo
;
rpcSendRequest
(
clientRpc
,
&
global
.
mgmtEp
.
epSet
,
&
rpcMsg
,
NULL
);
uv_sem_wait
(
&
msgInfo
->
resultSem
);
uv_sem_destroy
(
&
msgInfo
->
resultSem
);
int32_t
code
=
msgInfo
->
code
;
taosMemoryFree
(
msgInfo
);
return
code
;
}
static
bool
udfdRpcRfp
(
int32_t
code
)
{
if
(
code
==
TSDB_CODE_RPC_REDIRECT
)
{
return
true
;
...
...
@@ -884,7 +881,18 @@ int main(int argc, char *argv[]) {
return
-
3
;
}
if
(
udfdConnectToMNode
()
!=
0
)
{
int32_t
retryMnodeTimes
=
0
;
int32_t
code
=
0
;
while
(
retryMnodeTimes
++
<
TSDB_MAX_REPLICA
)
{
uv_sleep
(
500
*
(
1
<<
retryMnodeTimes
));
code
=
udfdConnectToMnode
();
if
(
code
==
0
)
{
break
;
}
fnError
(
"can not connect to mnode, code: %s. retry"
,
tstrerror
(
code
));
}
if
(
code
!=
0
)
{
fnError
(
"failed to start since can not connect to mnode"
);
return
-
4
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录