Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
190fbe84
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
190fbe84
编写于
12月 12, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: udfd pipe can close before sending response
上级
cf6508c7
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
72 addition
and
43 deletion
+72
-43
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+72
-43
未找到文件。
source/libs/function/src/udfd.c
浏览文件 @
190fbe84
...
...
@@ -28,39 +28,46 @@
#include "tmsg.h"
#include "trpc.h"
#include "tmisce.h"
// clang-for
am
t on
// clang-for
ma
t on
typedef
struct
SUdfdContext
{
uv_loop_t
*
loop
;
uv_loop_t
*
loop
;
uv_pipe_t
ctrlPipe
;
uv_signal_t
intrSignal
;
char
listenPipeName
[
PATH_MAX
+
UDF_LISTEN_PIPE_NAME_LEN
+
2
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
void
*
clientRpc
;
SCorEpSet
mgmtEp
;
uv_mutex_t
udfsMutex
;
SHashObj
*
udfsHash
;
SHashObj
*
udfsHash
;
SArray
*
residentFuncs
;
SArray
*
residentFuncs
;
bool
printVersion
;
}
SUdfdContext
;
SUdfdContext
global
;
struct
SUdfdUvConn
;
struct
SUvUdfWork
;
typedef
struct
SUdfdUvConn
{
uv_stream_t
*
client
;
char
*
inputBuf
;
char
*
inputBuf
;
int32_t
inputLen
;
int32_t
inputCap
;
int32_t
inputTotal
;
struct
SUvUdfWork
*
pWorkList
;
// head of work list
}
SUdfdUvConn
;
typedef
struct
SUvUdfWork
{
uv_stream_t
*
client
;
SUdfdUvConn
*
conn
;
uv_buf_t
input
;
uv_buf_t
output
;
struct
SUvUdfWork
*
pWorkNext
;
}
SUvUdfWork
;
typedef
enum
{
UDF_STATE_INIT
=
0
,
UDF_STATE_LOADING
,
UDF_STATE_READY
,
UDF_STATE_UNLOADING
}
EUdfState
;
...
...
@@ -70,7 +77,7 @@ typedef struct SUdf {
EUdfState
state
;
uv_mutex_t
lock
;
uv_cond_t
condReady
;
bool
resident
;
bool
resident
;
char
name
[
TSDB_FUNC_NAME_LEN
+
1
];
int8_t
funcType
;
...
...
@@ -107,7 +114,7 @@ typedef enum EUdfdRpcReqRspType {
typedef
struct
SUdfdRpcSendRecvInfo
{
EUdfdRpcReqRspType
rpcType
;
int32_t
code
;
void
*
param
;
void
*
param
;
uv_sem_t
resultSem
;
}
SUdfdRpcSendRecvInfo
;
...
...
@@ -178,7 +185,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
fnInfo
(
"setup request. seq num: %"
PRId64
", udf name: %s"
,
request
->
seqNum
,
request
->
setup
.
udfName
);
SUdfSetupRequest
*
setup
=
&
request
->
setup
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SUdf
*
udf
=
NULL
;
SUdf
*
udf
=
NULL
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
));
if
(
udfInHash
)
{
...
...
@@ -193,7 +200,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
uv_cond_init
(
&
udfNew
->
condReady
);
udf
=
udfNew
;
SUdf
**
pUdf
=
&
udf
;
SUdf
**
pUdf
=
&
udf
;
taosHashPut
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
),
pUdf
,
POINTER_BYTES
);
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
...
...
@@ -207,7 +214,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
udf
->
resident
=
false
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
global
.
residentFuncs
);
++
i
)
{
char
*
funcName
=
taosArrayGet
(
global
.
residentFuncs
,
i
);
char
*
funcName
=
taosArrayGet
(
global
.
residentFuncs
,
i
);
if
(
strcmp
(
setup
->
udfName
,
funcName
)
==
0
)
{
udf
->
resident
=
true
;
break
;
...
...
@@ -248,11 +255,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void
udfdProcessCallRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
SUdfCallRequest
*
call
=
&
request
->
call
;
fnDebug
(
"call request. call type %d, handle: %"
PRIx64
", seq num %"
PRId64
,
call
->
callType
,
call
->
udfHandle
,
request
->
seqNum
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
fnDebug
(
"call request. call type %d, handle: %"
PRIx64
", seq num %"
PRId64
,
call
->
callType
,
call
->
udfHandle
,
request
->
seqNum
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdfResponse
response
=
{
0
};
SUdfResponse
*
rsp
=
&
response
;
SUdfResponse
*
rsp
=
&
response
;
SUdfCallResponse
*
subRsp
=
&
rsp
->
callRsp
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -352,7 +360,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfTeardownRequest
*
teardown
=
&
request
->
teardown
;
fnInfo
(
"teardown. seq number: %"
PRId64
", handle:%"
PRIx64
,
request
->
seqNum
,
teardown
->
udfHandle
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
teardown
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdf
*
udf
=
handle
->
udf
;
bool
unloadUdf
=
false
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -409,15 +417,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if
(
msgInfo
->
rpcType
==
UDFD_RPC_MNODE_CONNECT
)
{
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
connectRsp
);
int32_t
now
=
taosGetTimestampSec
();
int32_t
delta
=
abs
(
now
-
connectRsp
.
svrTimestamp
);
if
(
delta
>
900
)
{
msgInfo
->
code
=
TSDB_CODE_TIME_UNSYNCED
;
goto
_return
;
}
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
msgInfo
->
code
=
TSDB_CODE_APP_ERROR
;
goto
_return
;
...
...
@@ -434,7 +441,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto
_return
;
}
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
outputType
;
...
...
@@ -487,7 +494,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
taosArrayPush
(
retrieveReq
.
pFuncNames
,
udfName
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
taosArrayDestroy
(
retrieveReq
.
pFuncNames
);
...
...
@@ -522,7 +529,7 @@ int32_t udfdConnectToMnode() {
connReq
.
startTime
=
taosGetTimestampMs
();
int32_t
contLen
=
tSerializeSConnectReq
(
NULL
,
0
,
&
connReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSConnectReq
(
pReq
,
contLen
,
&
connReq
);
SUdfdRpcSendRecvInfo
*
msgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdRpcSendRecvInfo
));
...
...
@@ -589,7 +596,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy
(
finishFuncName
,
processFuncName
,
sizeof
(
finishFuncName
));
strncat
(
finishFuncName
,
finishSuffix
,
strlen
(
finishSuffix
));
uv_dlsym
(
&
udf
->
lib
,
finishFuncName
,
(
void
**
)(
&
udf
->
aggFinishFunc
));
char
mergeFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
mergeFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
mergeSuffix
=
"_merge"
;
strncpy
(
mergeFuncName
,
processFuncName
,
sizeof
(
mergeFuncName
));
strncat
(
mergeFuncName
,
mergeSuffix
,
strlen
(
mergeSuffix
));
...
...
@@ -601,9 +608,10 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
||
code
==
TSDB_CODE_SYN_NOT_LEADER
||
code
==
TSDB_CODE_SYN_RESTORING
||
code
==
TSDB_CODE_MNODE_NOT_FOUND
||
code
==
TSDB_CODE_APP_IS_STARTING
||
code
==
TSDB_CODE_APP_IS_STOPPING
)
{
if
(
msgType
==
TDMT_SCH_QUERY
||
msgType
==
TDMT_SCH_MERGE_QUERY
||
msgType
==
TDMT_SCH_FETCH
||
msgType
==
TDMT_SCH_MERGE_FETCH
)
{
if
(
msgType
==
TDMT_SCH_QUERY
||
msgType
==
TDMT_SCH_MERGE_QUERY
||
msgType
==
TDMT_SCH_FETCH
||
msgType
==
TDMT_SCH_MERGE_FETCH
)
{
return
false
;
}
}
return
true
;
}
else
{
return
false
;
...
...
@@ -663,7 +671,7 @@ int32_t udfdOpenClientRpc() {
rpcInit
.
parent
=
&
global
;
rpcInit
.
rfp
=
udfdRpcRfp
;
rpcInit
.
compressSize
=
tsCompressMsgSize
;
global
.
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
global
.
clientRpc
==
NULL
)
{
fnError
(
"failed to init dnode rpc client"
);
...
...
@@ -684,6 +692,17 @@ void udfdOnWrite(uv_write_t *req, int status) {
if
(
status
<
0
)
{
fnError
(
"udfd send response error, length: %zu code: %s"
,
work
->
output
.
len
,
uv_err_name
(
status
));
}
// remove work from the connection work list
if
(
work
->
conn
!=
NULL
)
{
SUvUdfWork
**
ppWork
;
for
(
ppWork
=
&
work
->
conn
->
pWorkList
;
*
ppWork
&&
(
*
ppWork
!=
work
);
ppWork
=
&
((
*
ppWork
)
->
pWorkNext
))
{
}
if
(
*
ppWork
==
work
)
{
*
ppWork
=
work
->
pWorkNext
;
}
else
{
fnError
(
"work not in conn any more"
);
}
}
taosMemoryFree
(
work
->
output
.
base
);
taosMemoryFree
(
work
);
taosMemoryFree
(
req
);
...
...
@@ -692,10 +711,11 @@ void udfdOnWrite(uv_write_t *req, int status) {
void
udfdSendResponse
(
uv_work_t
*
work
,
int
status
)
{
SUvUdfWork
*
udfWork
=
(
SUvUdfWork
*
)(
work
->
data
);
uv_write_t
*
write_req
=
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
write_req
->
data
=
udfWork
;
uv_write
(
write_req
,
udfWork
->
client
,
&
udfWork
->
output
,
1
,
udfdOnWrite
);
if
(
udfWork
->
conn
!=
NULL
)
{
uv_write_t
*
write_req
=
taosMemoryMalloc
(
sizeof
(
uv_write_t
));
write_req
->
data
=
udfWork
;
uv_write
(
write_req
,
udfWork
->
conn
->
client
,
&
udfWork
->
output
,
1
,
udfdOnWrite
);
}
taosMemoryFree
(
work
);
}
...
...
@@ -716,8 +736,8 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf
->
len
=
0
;
}
}
else
if
(
ctx
->
inputTotal
==
-
1
&&
ctx
->
inputLen
<
msgHeadSize
)
{
buf
->
base
=
ctx
->
inputBuf
+
ctx
->
inputLen
;
buf
->
len
=
msgHeadSize
-
ctx
->
inputLen
;
buf
->
base
=
ctx
->
inputBuf
+
ctx
->
inputLen
;
buf
->
len
=
msgHeadSize
-
ctx
->
inputLen
;
}
else
{
ctx
->
inputCap
=
ctx
->
inputTotal
>
ctx
->
inputCap
?
ctx
->
inputTotal
:
ctx
->
inputCap
;
void
*
inputBuf
=
taosMemoryRealloc
(
ctx
->
inputBuf
,
ctx
->
inputCap
);
...
...
@@ -744,14 +764,16 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
}
void
udfdHandleRequest
(
SUdfdUvConn
*
conn
)
{
char
*
inputBuf
=
taosMemoryMalloc
(
conn
->
inputLen
);
char
*
inputBuf
=
taosMemoryMalloc
(
conn
->
inputLen
);
memcpy
(
inputBuf
,
conn
->
inputBuf
,
conn
->
inputLen
);
int32_t
inputLen
=
conn
->
inputLen
;
taosMemoryFree
(
conn
->
inputBuf
);
uv_work_t
*
work
=
taosMemoryMalloc
(
sizeof
(
uv_work_t
));
uv_work_t
*
work
=
taosMemoryMalloc
(
sizeof
(
uv_work_t
));
SUvUdfWork
*
udfWork
=
taosMemoryMalloc
(
sizeof
(
SUvUdfWork
));
udfWork
->
client
=
conn
->
client
;
udfWork
->
conn
=
conn
;
udfWork
->
pWorkNext
=
conn
->
pWorkList
;
conn
->
pWorkList
=
udfWork
;
udfWork
->
input
=
uv_buf_init
(
inputBuf
,
inputLen
);
conn
->
inputBuf
=
NULL
;
conn
->
inputLen
=
0
;
...
...
@@ -763,6 +785,12 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
void
udfdPipeCloseCb
(
uv_handle_t
*
pipe
)
{
SUdfdUvConn
*
conn
=
pipe
->
data
;
SUvUdfWork
*
pWork
=
conn
->
pWorkList
;
while
(
pWork
!=
NULL
)
{
pWork
->
conn
=
NULL
;
pWork
=
pWork
->
pWorkNext
;
}
taosMemoryFree
(
conn
->
client
);
taosMemoryFree
(
conn
->
inputBuf
);
taosMemoryFree
(
conn
);
...
...
@@ -804,6 +832,7 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
uv_pipe_init
(
global
.
loop
,
client
,
0
);
if
(
uv_accept
(
server
,
(
uv_stream_t
*
)
client
)
==
0
)
{
SUdfdUvConn
*
ctx
=
taosMemoryMalloc
(
sizeof
(
SUdfdUvConn
));
ctx
->
pWorkList
=
NULL
;
ctx
->
client
=
(
uv_stream_t
*
)
client
;
ctx
->
inputBuf
=
0
;
ctx
->
inputLen
=
0
;
...
...
@@ -896,7 +925,7 @@ static int32_t udfdUvInit() {
}
global
.
loop
=
loop
;
if
(
tsStartUdfd
)
{
// udfd is started by taosd, which shall exit when taosd exit
if
(
tsStartUdfd
)
{
// udfd is started by taosd, which shall exit when taosd exit
uv_pipe_init
(
global
.
loop
,
&
global
.
ctrlPipe
,
1
);
uv_pipe_open
(
&
global
.
ctrlPipe
,
0
);
uv_read_start
((
uv_stream_t
*
)
&
global
.
ctrlPipe
,
udfdCtrlAllocBufCb
,
udfdCtrlReadCb
);
...
...
@@ -971,10 +1000,10 @@ int32_t udfdInitResidentFuncs() {
}
global
.
residentFuncs
=
taosArrayInit
(
2
,
TSDB_FUNC_NAME_LEN
);
char
*
pSave
=
tsUdfdResFuncs
;
char
*
token
;
char
*
pSave
=
tsUdfdResFuncs
;
char
*
token
;
while
((
token
=
strtok_r
(
pSave
,
","
,
&
pSave
))
!=
NULL
)
{
char
func
[
TSDB_FUNC_NAME_LEN
+
1
]
=
{
0
};
char
func
[
TSDB_FUNC_NAME_LEN
+
1
]
=
{
0
};
strncpy
(
func
,
token
,
TSDB_FUNC_NAME_LEN
);
fnInfo
(
"udfd add resident function %s"
,
func
);
taosArrayPush
(
global
.
residentFuncs
,
func
);
...
...
@@ -985,10 +1014,10 @@ int32_t udfdInitResidentFuncs() {
int32_t
udfdDeinitResidentFuncs
()
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
global
.
residentFuncs
);
++
i
)
{
char
*
funcName
=
taosArrayGet
(
global
.
residentFuncs
,
i
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
funcName
,
strlen
(
funcName
));
char
*
funcName
=
taosArrayGet
(
global
.
residentFuncs
,
i
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
funcName
,
strlen
(
funcName
));
if
(
udfInHash
)
{
SUdf
*
udf
=
*
udfInHash
;
SUdf
*
udf
=
*
udfInHash
;
if
(
udf
->
destroyFunc
)
{
(
udf
->
destroyFunc
)();
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录