Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d537322e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d537322e
编写于
4月 22, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
udf scalar api change
上级
db2b9931
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
99 addition
and
65 deletion
+99
-65
source/libs/function/inc/tudf.h
source/libs/function/inc/tudf.h
+2
-2
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+35
-4
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+54
-54
source/libs/function/test/runUdf.c
source/libs/function/test/runUdf.c
+8
-5
未找到文件。
source/libs/function/inc/tudf.h
浏览文件 @
d537322e
...
...
@@ -21,6 +21,7 @@
#include <stdbool.h>
#include "tmsg.h"
#include "tcommon.h"
#include "function.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn
int32_t
callUdfAggMerge
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
);
// input: block
// output: resultData
int32_t
callUdfScalaProcess
(
UdfcFuncHandle
handle
,
SSDataBlock
*
block
,
SSDataBlock
*
resultData
);
int32_t
callUdfScalarFunc
(
UdfcFuncHandle
handle
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
);
/**
* tearn down udf
* @param handle
...
...
source/libs/function/src/tudf.c
浏览文件 @
d537322e
...
...
@@ -14,6 +14,7 @@
*/
#include "uv.h"
#include "os.h"
#include "fnLog.h"
#include "tudf.h"
#include "tudfInt.h"
#include "tarray.h"
...
...
@@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
return
0
;
}
int32_t
convertScalarParamToDataBlock
(
SScalarParam
*
input
,
int32_t
numOfCols
,
SSDataBlock
*
output
)
{
output
->
info
.
rows
=
input
->
numOfRows
;
output
->
info
.
numOfCols
=
numOfCols
;
bool
hasVarCol
=
false
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
((
input
+
i
)
->
columnData
->
info
.
type
))
{
hasVarCol
=
true
;
break
;
}
}
output
->
info
.
hasVarCol
=
hasVarCol
;
//TODO: free the array output->pDataBlock
output
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
taosArrayPush
(
output
->
pDataBlock
,
input
->
columnData
);
return
0
;
}
int32_t
convertDataBlockToScalarParm
(
SSDataBlock
*
input
,
SScalarParam
*
output
)
{
if
(
input
->
info
.
numOfCols
!=
1
)
{
fnError
(
"scalar function only support one column"
);
return
-
1
;
}
output
->
numOfRows
=
input
->
info
.
rows
;
//TODO: memory
output
->
columnData
=
taosArrayGet
(
input
->
pDataBlock
,
0
);
return
0
;
}
void
onUdfcPipeClose
(
uv_handle_t
*
handle
)
{
SClientUvConn
*
conn
=
handle
->
data
;
...
...
@@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn
return
err
;
}
// input: block
// output: resultData
int32_t
callUdfScalaProcess
(
UdfcFuncHandle
handle
,
SSDataBlock
*
block
,
SSDataBlock
*
resultData
)
{
int32_t
callUdfScalarFunc
(
UdfcFuncHandle
handle
,
SScalarParam
*
input
,
int32_t
numOfCols
,
SScalarParam
*
output
)
{
int8_t
callType
=
TSDB_UDF_CALL_SCALA_PROC
;
int32_t
err
=
callUdf
(
handle
,
callType
,
block
,
NULL
,
NULL
,
resultData
,
NULL
);
SSDataBlock
inputBlock
=
{
0
};
convertScalarParamToDataBlock
(
input
,
numOfCols
,
&
inputBlock
);
SSDataBlock
resultBlock
=
{
0
};
int32_t
err
=
callUdf
(
handle
,
callType
,
&
inputBlock
,
NULL
,
NULL
,
&
resultBlock
,
NULL
);
convertDataBlockToScalarParm
(
&
resultBlock
,
output
);
return
err
;
}
...
...
source/libs/function/src/udfd.c
浏览文件 @
d537322e
...
...
@@ -26,15 +26,15 @@
#include "trpc.h"
typedef
struct
SUdfdContext
{
uv_loop_t
*
loop
;
uv_pipe_t
ctrlPipe
;
uv_loop_t
*
loop
;
uv_pipe_t
ctrlPipe
;
uv_signal_t
intrSignal
;
char
listenPipeName
[
UDF_LISTEN_PIPE_NAME_LEN
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
char
listenPipeName
[
UDF_LISTEN_PIPE_NAME_LEN
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
uv_mutex_t
udfsMutex
;
SHashObj
*
udfsHash
;
SHashObj
*
udfsHash
;
bool
printVersion
;
}
SUdfdContext
;
...
...
@@ -55,22 +55,17 @@ typedef struct SUvUdfWork {
uv_buf_t
output
;
}
SUvUdfWork
;
typedef
enum
{
UDF_STATE_INIT
=
0
,
UDF_STATE_LOADING
,
UDF_STATE_READY
,
UDF_STATE_UNLOADING
}
EUdfState
;
typedef
enum
{
UDF_STATE_INIT
=
0
,
UDF_STATE_LOADING
,
UDF_STATE_READY
,
UDF_STATE_UNLOADING
}
EUdfState
;
typedef
struct
SUdf
{
int32_t
refCount
;
EUdfState
state
;
int32_t
refCount
;
EUdfState
state
;
uv_mutex_t
lock
;
uv_cond_t
condReady
;
uv_cond_t
condReady
;
char
name
[
16
];
int8_t
type
;
char
path
[
PATH_MAX
];
char
path
[
PATH_MAX
];
uv_lib_t
lib
;
TUdfScalarProcFunc
scalarProcFunc
;
...
...
@@ -83,24 +78,28 @@ typedef struct SUdfcFuncHandle {
SUdf
*
udf
;
}
SUdfcFuncHandle
;
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
strcpy
(
udf
->
name
,
udfName
);
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
SEpSet
*
pEpSet
,
char
*
udfName
,
SUdf
*
udf
);
int
err
=
uv_dlopen
(
udf
->
path
,
&
udf
->
lib
);
if
(
err
!=
0
)
{
fnError
(
"can not load library %s. error: %s"
,
udf
->
path
,
uv_strerror
(
err
));
// TODO set error
}
//TODO: find all the functions
char
normalFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
normalFuncName
,
udfName
);
uv_dlsym
(
&
udf
->
lib
,
normalFuncName
,
(
void
**
)(
&
udf
->
scalarProcFunc
));
char
freeFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
freeSuffix
=
"_free"
;
strncpy
(
freeFuncName
,
normalFuncName
,
strlen
(
normalFuncName
));
strncat
(
freeFuncName
,
freeSuffix
,
strlen
(
freeSuffix
));
uv_dlsym
(
&
udf
->
lib
,
freeFuncName
,
(
void
**
)(
&
udf
->
freeUdfColumn
));
return
0
;
int32_t
udfdLoadUdf
(
char
*
udfName
,
SEpSet
*
pEpSet
,
SUdf
*
udf
)
{
strcpy
(
udf
->
name
,
udfName
);
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
pEpSet
,
udf
->
name
,
udf
);
int
err
=
uv_dlopen
(
udf
->
path
,
&
udf
->
lib
);
if
(
err
!=
0
)
{
fnError
(
"can not load library %s. error: %s"
,
udf
->
path
,
uv_strerror
(
err
));
// TODO set error
}
// TODO: find all the functions
char
normalFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
normalFuncName
,
udfName
);
uv_dlsym
(
&
udf
->
lib
,
normalFuncName
,
(
void
**
)(
&
udf
->
scalarProcFunc
));
char
freeFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
freeSuffix
=
"_free"
;
strncpy
(
freeFuncName
,
normalFuncName
,
strlen
(
normalFuncName
));
strncat
(
freeFuncName
,
freeSuffix
,
strlen
(
freeSuffix
));
uv_dlsym
(
&
udf
->
lib
,
freeFuncName
,
(
void
**
)(
&
udf
->
freeUdfColumn
));
return
0
;
}
void
udfdProcessRequest
(
uv_work_t
*
req
)
{
...
...
@@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) {
switch
(
request
.
type
)
{
case
UDF_TASK_SETUP
:
{
//TODO: tracable id from client. connect, setup, call, teardown
fnInfo
(
"%"
PRId64
" setup request. udf name: %s"
,
request
.
seqNum
,
request
.
setup
.
udfName
);
//
TODO: tracable id from client. connect, setup, call, teardown
fnInfo
(
"%"
PRId64
" setup request. udf name: %s"
,
request
.
seqNum
,
request
.
setup
.
udfName
);
SUdfSetupRequest
*
setup
=
&
request
.
setup
;
SUdf
*
udf
=
NULL
;
SUdf
*
udf
=
NULL
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
.
setup
.
udfName
,
TSDB_FUNC_NAME_LEN
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
.
setup
.
udfName
,
TSDB_FUNC_NAME_LEN
);
if
(
*
udfInHash
)
{
++
(
*
udfInHash
)
->
refCount
;
udf
=
*
udfInHash
;
...
...
@@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_lock
(
&
udf
->
lock
);
if
(
udf
->
state
==
UDF_STATE_INIT
)
{
udf
->
state
=
UDF_STATE_LOADING
;
udfdLoadUdf
(
setup
->
udfName
,
udf
);
udfdLoadUdf
(
setup
->
udfName
,
&
setup
->
epSet
,
udf
);
udf
->
state
=
UDF_STATE_READY
;
uv_cond_broadcast
(
&
udf
->
condReady
);
uv_mutex_unlock
(
&
udf
->
lock
);
...
...
@@ -168,8 +167,9 @@ void udfdProcessRequest(uv_work_t *req) {
case
UDF_TASK_CALL
:
{
SUdfCallRequest
*
call
=
&
request
.
call
;
fnDebug
(
"%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
.
seqNum
,
call
->
callType
,
call
->
udfHandle
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
fnDebug
(
"%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
.
seqNum
,
call
->
callType
,
call
->
udfHandle
);
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdfDataBlock
input
=
{
0
};
...
...
@@ -206,10 +206,10 @@ void udfdProcessRequest(uv_work_t *req) {
}
case
UDF_TASK_TEARDOWN
:
{
SUdfTeardownRequest
*
teardown
=
&
request
.
teardown
;
fnInfo
(
"teardown. %"
PRId64
"handle:%"
PRIx64
,
request
.
seqNum
,
teardown
->
udfHandle
)
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
teardown
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
bool
unloadUdf
=
false
;
fnInfo
(
"teardown. %"
PRId64
"handle:%"
PRIx64
,
request
.
seqNum
,
teardown
->
udfHandle
)
SUdfcFuncHandle
*
handle
=
(
SUdfcFuncHandle
*
)(
teardown
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
bool
unloadUdf
=
false
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
udf
->
refCount
--
;
if
(
udf
->
refCount
==
0
)
{
...
...
@@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) {
void
udfdOnWrite
(
uv_write_t
*
req
,
int
status
)
{
SUvUdfWork
*
work
=
(
SUvUdfWork
*
)
req
->
data
;
if
(
status
<
0
)
{
//TODO:log error and process it.
//
TODO:log error and process it.
}
fnDebug
(
"send response. length:%zu, status: %s"
,
work
->
output
.
len
,
uv_err_name
(
status
));
taosMemoryFree
(
work
->
output
.
base
);
...
...
@@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
SEpSet
*
pEpSet
,
char
*
udfName
,
SUdf
*
udf
)
{
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
SEpSet
*
pEpSet
,
char
*
udfName
,
SUdf
*
udf
)
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
...
...
@@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu
void
udfdCtrlReadCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
if
(
nread
<
0
)
{
fnError
(
"udfd ctrl pipe read error. %s"
,
uv_err_name
(
nread
));
uv_close
((
uv_handle_t
*
)
q
,
NULL
);
uv_close
((
uv_handle_t
*
)
q
,
NULL
);
uv_stop
(
global
.
loop
);
return
;
}
...
...
@@ -515,13 +515,13 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
static
int32_t
removeListeningPipe
()
{
uv_fs_t
req
;
int
err
=
uv_fs_unlink
(
global
.
loop
,
&
req
,
global
.
listenPipeName
,
NULL
);
int
err
=
uv_fs_unlink
(
global
.
loop
,
&
req
,
global
.
listenPipeName
,
NULL
);
uv_fs_req_cleanup
(
&
req
);
return
err
;
}
static
int32_t
udfdUvInit
()
{
uv_loop_t
*
loop
=
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
uv_loop_t
*
loop
=
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
if
(
loop
)
{
uv_loop_init
(
loop
);
}
...
...
@@ -529,10 +529,10 @@ static int32_t udfdUvInit() {
uv_pipe_init
(
global
.
loop
,
&
global
.
ctrlPipe
,
1
);
uv_pipe_open
(
&
global
.
ctrlPipe
,
0
);
uv_read_start
((
uv_stream_t
*
)
&
global
.
ctrlPipe
,
udfdCtrlAllocBufCb
,
udfdCtrlReadCb
);
uv_read_start
((
uv_stream_t
*
)
&
global
.
ctrlPipe
,
udfdCtrlAllocBufCb
,
udfdCtrlReadCb
);
char
dnodeId
[
8
]
=
{
0
};
size_t
dnodeIdSize
;
char
dnodeId
[
8
]
=
{
0
};
size_t
dnodeIdSize
;
int32_t
err
=
uv_os_getenv
(
"DNODE_ID"
,
dnodeId
,
&
dnodeIdSize
);
if
(
err
!=
0
)
{
dnodeId
[
0
]
=
'1'
;
...
...
@@ -567,7 +567,7 @@ static int32_t udfdRun() {
global
.
udfsHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
uv_mutex_init
(
&
global
.
udfsMutex
);
//TOOD: client rpc to fetch udf function info from mnode
//
TOOD: client rpc to fetch udf function info from mnode
if
(
udfdOpenClientRpc
()
!=
0
)
{
fnError
(
"open rpc connection to mnode failure"
);
return
-
1
;
...
...
@@ -589,7 +589,7 @@ static int32_t udfdRun() {
return
code
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
!
taosCheckSystemIsSmallEnd
())
{
printf
(
"failed to start since on non-small-end machines
\n
"
);
return
-
1
;
...
...
source/libs/function/test/runUdf.c
浏览文件 @
d537322e
...
...
@@ -44,12 +44,15 @@ int main(int argc, char *argv[]) {
}
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
}
SSDataBlock
output
=
{
0
};
callUdfScalaProcess
(
handle
,
pBlock
,
&
output
);
SColumnInfoData
*
col
=
taosArrayGet
(
output
.
pDataBlock
,
0
);
for
(
int32_t
i
=
0
;
i
<
output
.
info
.
rows
;
++
i
)
{
SScalarParam
input
=
{
0
};
input
.
numOfRows
=
pBlock
->
info
.
rows
;
input
.
columnData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
SScalarParam
output
=
{
0
};
callUdfScalarFunc
(
handle
,
&
input
,
1
,
&
output
);
SColumnInfoData
*
col
=
output
.
columnData
;
for
(
int32_t
i
=
0
;
i
<
output
.
numOfRows
;
++
i
)
{
fprintf
(
stderr
,
"%d
\t
%d
\n
"
,
i
,
*
(
int32_t
*
)(
col
->
pData
+
i
*
sizeof
(
int32_t
)));
}
teardownUdf
(
handle
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录