Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5a1f2a9c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5a1f2a9c
编写于
10月 20, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
10月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17504 from taosdata/szhou/fixbugs
fix: fix scan coverity issue
上级
91302965
0ed1d883
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
27 addition
and
17 deletion
+27
-17
source/libs/function/inc/tudfInt.h
source/libs/function/inc/tudfInt.h
+1
-1
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+6
-5
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+15
-9
source/libs/function/test/runUdf.c
source/libs/function/test/runUdf.c
+5
-2
未找到文件。
source/libs/function/inc/tudfInt.h
浏览文件 @
5a1f2a9c
...
...
@@ -35,7 +35,7 @@ enum {
};
typedef
struct
SUdfSetupRequest
{
char
udfName
[
TSDB_FUNC_NAME_LEN
];
char
udfName
[
TSDB_FUNC_NAME_LEN
+
1
];
}
SUdfSetupRequest
;
typedef
struct
SUdfSetupResponse
{
...
...
source/libs/function/src/tudf.c
浏览文件 @
5a1f2a9c
...
...
@@ -315,7 +315,7 @@ enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
int64_t
gUdfTaskSeqNum
=
0
;
typedef
struct
SUdfcFuncStub
{
char
udfName
[
TSDB_FUNC_NAME_LEN
];
char
udfName
[
TSDB_FUNC_NAME_LEN
+
1
];
UdfcFuncHandle
handle
;
int32_t
refCount
;
int64_t
lastRefTime
;
...
...
@@ -353,7 +353,7 @@ typedef struct SUdfcUvSession {
int32_t
outputLen
;
int32_t
bufSize
;
char
udfName
[
TSDB_FUNC_NAME_LEN
];
char
udfName
[
TSDB_FUNC_NAME_LEN
+
1
];
}
SUdfcUvSession
;
typedef
struct
SClientUvTaskNode
{
...
...
@@ -898,7 +898,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
int32_t
code
=
0
;
uv_mutex_lock
(
&
gUdfdProxy
.
udfStubsMutex
);
SUdfcFuncStub
key
=
{
0
};
str
cpy
(
key
.
udfName
,
udfName
);
str
ncpy
(
key
.
udfName
,
udfName
,
TSDB_FUNC_NAME_LEN
);
int32_t
stubIndex
=
taosArraySearchIdx
(
gUdfdProxy
.
udfStubs
,
&
key
,
compareUdfcFuncSub
,
TD_EQ
);
if
(
stubIndex
!=
-
1
)
{
SUdfcFuncStub
*
foundStub
=
taosArrayGet
(
gUdfdProxy
.
udfStubs
,
stubIndex
);
...
...
@@ -936,7 +936,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
void
releaseUdfFuncHandle
(
char
*
udfName
)
{
uv_mutex_lock
(
&
gUdfdProxy
.
udfStubsMutex
);
SUdfcFuncStub
key
=
{
0
};
str
cpy
(
key
.
udfName
,
udfName
);
str
ncpy
(
key
.
udfName
,
udfName
,
TSDB_FUNC_NAME_LEN
);
SUdfcFuncStub
*
foundStub
=
taosArraySearch
(
gUdfdProxy
.
udfStubs
,
&
key
,
compareUdfcFuncSub
,
TD_EQ
);
if
(
!
foundStub
)
{
uv_mutex_unlock
(
&
gUdfdProxy
.
udfStubsMutex
);
...
...
@@ -1446,6 +1446,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
QUEUE_INSERT_TAIL
(
connTaskQueue
,
&
uvTask
->
connTaskQueue
);
int
err
=
uv_write
(
write
,
(
uv_stream_t
*
)
pipe
,
&
uvTask
->
reqBuf
,
1
,
onUdfcPipeWrite
);
if
(
err
!=
0
)
{
taosMemoryFree
(
write
);
fnError
(
"udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s"
,
uvTask
,
uv_strerror
(
err
));
}
code
=
err
;
...
...
@@ -1637,7 +1638,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
task
->
session
->
outputType
=
rsp
->
outputType
;
task
->
session
->
outputLen
=
rsp
->
outputLen
;
task
->
session
->
bufSize
=
rsp
->
bufSize
;
str
cpy
(
task
->
session
->
udfName
,
udfName
);
str
ncpy
(
task
->
session
->
udfName
,
udfName
,
TSDB_FUNC_NAME_LEN
);
if
(
task
->
errCode
!=
0
)
{
fnError
(
"failed to setup udf. udfname: %s, err: %d"
,
udfName
,
task
->
errCode
)
}
else
{
...
...
source/libs/function/src/udfd.c
浏览文件 @
5a1f2a9c
...
...
@@ -71,7 +71,7 @@ typedef struct SUdf {
uv_cond_t
condReady
;
bool
resident
;
char
name
[
TSDB_FUNC_NAME_LEN
];
char
name
[
TSDB_FUNC_NAME_LEN
+
1
];
int8_t
funcType
;
int8_t
scriptType
;
int8_t
outputType
;
...
...
@@ -188,11 +188,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdf
*
udfNew
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdf
));
udfNew
->
refCount
=
1
;
udfNew
->
state
=
UDF_STATE_INIT
;
uv_mutex_init
(
&
udfNew
->
lock
);
uv_cond_init
(
&
udfNew
->
condReady
);
udf
=
udfNew
;
taosHashPut
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
),
&
udfNew
,
sizeof
(
&
udfNew
));
SUdf
**
pUdf
=
&
udf
;
taosHashPut
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
),
pUdf
,
POINTER_BYTES
);
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
...
...
@@ -246,7 +247,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void
udfdProcessCallRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
SUdfCallRequest
*
call
=
&
request
->
call
;
fnDebug
(
"
%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
->
seqNum
,
call
->
callType
,
call
->
udfHandle
);
fnDebug
(
"
call request. call type %d, handle: %"
PRIx64
", seq num %"
PRId64
,
call
->
callType
,
call
->
udfHandle
,
request
->
seqNum
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdfResponse
response
=
{
0
};
...
...
@@ -372,7 +373,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
taosMemoryFree
(
handle
);
SUdfResponse
response
;
SUdfResponse
response
=
{
0
}
;
SUdfResponse
*
rsp
=
&
response
;
rsp
->
seqNum
=
request
->
seqNum
;
rsp
->
type
=
request
->
type
;
...
...
@@ -428,7 +429,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
else
if
(
msgInfo
->
rpcType
==
UDFD_RPC_RETRIVE_FUNC
)
{
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
retrieveRsp
);
if
(
retrieveRsp
.
pFuncInfos
==
NULL
)
{
goto
_return
;
}
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
...
...
@@ -540,7 +543,7 @@ int32_t udfdConnectToMnode() {
}
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
str
cpy
(
udf
->
name
,
udfName
);
str
ncpy
(
udf
->
name
,
udfName
,
TSDB_FUNC_NAME_LEN
);
int32_t
err
=
0
;
err
=
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
udf
->
name
,
udf
);
...
...
@@ -880,6 +883,8 @@ static int32_t udfdUvInit() {
uv_loop_t
*
loop
=
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
if
(
loop
)
{
uv_loop_init
(
loop
);
}
else
{
return
-
1
;
}
global
.
loop
=
loop
;
...
...
@@ -901,12 +906,12 @@ static int32_t udfdUvInit() {
if
((
r
=
uv_pipe_bind
(
&
global
.
listeningPipe
,
global
.
listenPipeName
)))
{
fnError
(
"Bind error %s"
,
uv_err_name
(
r
));
removeListeningPipe
();
return
-
1
;
return
-
2
;
}
if
((
r
=
uv_listen
((
uv_stream_t
*
)
&
global
.
listeningPipe
,
128
,
udfdOnNewConnection
)))
{
fnError
(
"Listen error %s"
,
uv_err_name
(
r
));
removeListeningPipe
();
return
-
2
;
return
-
3
;
}
return
0
;
}
...
...
@@ -962,6 +967,7 @@ int32_t udfdInitResidentFuncs() {
while
((
token
=
strtok_r
(
pSave
,
","
,
&
pSave
))
!=
NULL
)
{
char
func
[
TSDB_FUNC_NAME_LEN
+
1
]
=
{
0
};
strncpy
(
func
,
token
,
TSDB_FUNC_NAME_LEN
);
fnInfo
(
"udfd add resident function %s"
,
func
);
taosArrayPush
(
global
.
residentFuncs
,
func
);
}
...
...
source/libs/function/test/runUdf.c
浏览文件 @
5a1f2a9c
...
...
@@ -110,8 +110,11 @@ int aggregateFuncTest() {
taosArrayDestroy
(
pBlock
->
pDataBlock
);
doCallUdfAggFinalize
(
handle
,
&
newBuf
,
&
resultBuf
);
fprintf
(
stderr
,
"agg result: %f
\n
"
,
*
(
double
*
)
resultBuf
.
buf
);
if
(
resultBuf
.
buf
!=
NULL
)
{
fprintf
(
stderr
,
"agg result: %f
\n
"
,
*
(
double
*
)
resultBuf
.
buf
);
}
else
{
fprintf
(
stderr
,
"result buffer is null"
);
}
freeUdfInterBuf
(
&
buf
);
freeUdfInterBuf
(
&
newBuf
);
freeUdfInterBuf
(
&
resultBuf
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录