Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
52264734
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
52264734
编写于
2月 06, 2023
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature: udf dispatch first by script type then by udf name
上级
6316a984
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
340 addition
and
113 deletion
+340
-113
include/libs/function/taosudf.h
include/libs/function/taosudf.h
+33
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+1
-1
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+304
-112
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/function/taosudf.h
浏览文件 @
52264734
...
...
@@ -259,6 +259,39 @@ typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interB
typedef
int32_t
(
*
TUdfAggMergeFunc
)(
SUdfInterBuf
*
inputBuf1
,
SUdfInterBuf
*
inputBuf2
,
SUdfInterBuf
*
outputBuf
);
typedef
int32_t
(
*
TUdfAggFinishFunc
)(
SUdfInterBuf
*
buf
,
SUdfInterBuf
*
resultData
);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: capacity for path and
// define macro for name and path len or use dynamic allocation/shared with SUdf.
typedef
struct
SUdfInfo
{
char
name
[
65
];
int8_t
funcType
;
int8_t
scriptType
;
int8_t
outputType
;
int32_t
outputLen
;
int32_t
bufSize
;
char
path
[
512
];
}
SUdfInfo
;
// TODO: deprecate SUdfInterBuf.numOfResult or add isInitial to SUdfInterBuf
typedef
int32_t
(
*
TScriptUdfScalarProcFunc
)(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
,
void
*
udfCtx
);
typedef
int32_t
(
*
TScriptUdfAggStartFunc
)(
SUdfInterBuf
*
buf
,
void
*
udfCtx
);
typedef
int32_t
(
*
TScriptUdfAggProcessFunc
)(
SUdfDataBlock
*
block
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
newInterBuf
,
void
*
udfCtx
);
typedef
int32_t
(
*
TScriptUdfAggMergeFunc
)(
SUdfInterBuf
*
inputBuf1
,
SUdfInterBuf
*
inputBuf2
,
SUdfInterBuf
*
outputBuf
,
void
*
udfCtx
);
typedef
int32_t
(
*
TScriptUdfAggFinishFunc
)(
SUdfInterBuf
*
buf
,
SUdfInterBuf
*
resultData
,
void
*
udfCtx
);
typedef
int32_t
(
*
TScriptUdfInitFunc
)(
SUdfInfo
*
info
,
void
**
pUdfCtx
);
typedef
int32_t
(
*
TScriptUdfDestoryFunc
)(
void
*
udfCtx
);
// the following function is for open/close script plugin.
typedef
int32_t
(
*
TScriptOpenFunc
)(
void
*
scriptCtx
);
typedef
int32_t
(
*
TScriptCloseFunc
)(
void
*
scriptCtx
);
#ifdef __cplusplus
}
#endif
...
...
include/util/taoserror.h
浏览文件 @
52264734
...
...
@@ -695,6 +695,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908)
#define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909)
#define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A)
#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B)
// sml
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)
...
...
include/util/tdef.h
浏览文件 @
52264734
...
...
@@ -205,7 +205,7 @@ typedef enum ELogicConditionType {
#define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_SCRIPT_BIN_LIB 0
#define TSDB_FUNC_SCRIPT_
LUA
1
#define TSDB_FUNC_SCRIPT_
PYTHON
1
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
...
...
source/libs/function/src/udfd.c
浏览文件 @
52264734
...
...
@@ -30,6 +30,153 @@
#include "tmisce.h"
// clang-format on
#define MAX_NUM_SCRIPT_PLUGINS 64
#define MAX_NUM_PLUGIN_FUNCS 9
typedef
struct
SUdfCPluginCtx
{
uv_lib_t
lib
;
TUdfScalarProcFunc
scalarProcFunc
;
TUdfAggStartFunc
aggStartFunc
;
TUdfAggProcessFunc
aggProcFunc
;
TUdfAggFinishFunc
aggFinishFunc
;
TUdfAggMergeFunc
aggMergeFunc
;
TUdfInitFunc
initFunc
;
TUdfDestroyFunc
destroyFunc
;
}
SUdfCPluginCtx
;
int32_t
udfdCPluginOpen
(
void
*
scriptCtx
)
{
return
0
;
}
int32_t
udfdCPluginClose
(
void
*
scriptCtx
)
{
return
0
;
}
int32_t
udfdCPluginUdfInit
(
SUdfInfo
*
udf
,
void
**
pUdfCtx
)
{
int32_t
err
=
0
;
SUdfCPluginCtx
*
udfCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfCPluginCtx
));
err
=
uv_dlopen
(
udf
->
path
,
&
udfCtx
->
lib
);
if
(
err
!=
0
)
{
fnError
(
"can not load library %s. error: %s"
,
udf
->
path
,
uv_strerror
(
err
));
return
TSDB_CODE_UDF_LOAD_UDF_FAILURE
;
}
const
char
*
udfName
=
udf
->
name
;
char
initFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
initSuffix
=
"_init"
;
strcpy
(
initFuncName
,
udfName
);
strncat
(
initFuncName
,
initSuffix
,
strlen
(
initSuffix
));
uv_dlsym
(
&
udfCtx
->
lib
,
initFuncName
,
(
void
**
)(
&
udfCtx
->
initFunc
));
char
destroyFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
destroySuffix
=
"_destroy"
;
strcpy
(
destroyFuncName
,
udfName
);
strncat
(
destroyFuncName
,
destroySuffix
,
strlen
(
destroySuffix
));
uv_dlsym
(
&
udfCtx
->
lib
,
destroyFuncName
,
(
void
**
)(
&
udfCtx
->
destroyFunc
));
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_SCALAR
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
uv_dlsym
(
&
udfCtx
->
lib
,
processFuncName
,
(
void
**
)(
&
udfCtx
->
scalarProcFunc
));
}
else
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_AGGREGATE
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
uv_dlsym
(
&
udfCtx
->
lib
,
processFuncName
,
(
void
**
)(
&
udfCtx
->
aggProcFunc
));
char
startFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
startSuffix
=
"_start"
;
strncpy
(
startFuncName
,
processFuncName
,
sizeof
(
startFuncName
));
strncat
(
startFuncName
,
startSuffix
,
strlen
(
startSuffix
));
uv_dlsym
(
&
udfCtx
->
lib
,
startFuncName
,
(
void
**
)(
&
udfCtx
->
aggStartFunc
));
char
finishFuncName
[
TSDB_FUNC_NAME_LEN
+
7
]
=
{
0
};
char
*
finishSuffix
=
"_finish"
;
strncpy
(
finishFuncName
,
processFuncName
,
sizeof
(
finishFuncName
));
strncat
(
finishFuncName
,
finishSuffix
,
strlen
(
finishSuffix
));
uv_dlsym
(
&
udfCtx
->
lib
,
finishFuncName
,
(
void
**
)(
&
udfCtx
->
aggFinishFunc
));
char
mergeFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
mergeSuffix
=
"_merge"
;
strncpy
(
mergeFuncName
,
processFuncName
,
sizeof
(
mergeFuncName
));
strncat
(
mergeFuncName
,
mergeSuffix
,
strlen
(
mergeSuffix
));
uv_dlsym
(
&
udfCtx
->
lib
,
mergeFuncName
,
(
void
**
)(
&
udfCtx
->
aggMergeFunc
));
}
if
(
udfCtx
->
initFunc
)
{
(
udfCtx
->
initFunc
)();
}
*
pUdfCtx
=
udfCtx
;
return
0
;
}
int32_t
udfdCPluginUdfDestroy
(
void
*
udfCtx
)
{
SUdfCPluginCtx
*
ctx
=
udfCtx
;
if
(
ctx
->
destroyFunc
)
{
(
ctx
->
destroyFunc
)();
}
uv_dlclose
(
&
ctx
->
lib
);
taosMemoryFree
(
ctx
);
return
0
;
}
int32_t
udfdCPluginUdfScalarProc
(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
,
void
*
udfCtx
)
{
SUdfCPluginCtx
*
ctx
=
udfCtx
;
if
(
ctx
->
scalarProcFunc
)
{
ctx
->
scalarProcFunc
(
block
,
resultCol
);
}
return
0
;
}
int32_t
udfdCPluginUdfAggStart
(
SUdfInterBuf
*
buf
,
void
*
udfCtx
)
{
SUdfCPluginCtx
*
ctx
=
udfCtx
;
if
(
ctx
->
aggStartFunc
)
{
ctx
->
aggStartFunc
(
buf
);
}
return
0
;
}
int32_t
udfdCPluginUdfAggProc
(
SUdfDataBlock
*
block
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
newInterBuf
,
void
*
udfCtx
)
{
SUdfCPluginCtx
*
ctx
=
udfCtx
;
if
(
ctx
->
aggProcFunc
)
{
ctx
->
aggProcFunc
(
block
,
interBuf
,
newInterBuf
);
}
return
0
;
}
int32_t
udfdCPluginUdfAggMerge
(
SUdfInterBuf
*
inputBuf1
,
SUdfInterBuf
*
inputBuf2
,
SUdfInterBuf
*
outputBuf
,
void
*
udfCtx
)
{
SUdfCPluginCtx
*
ctx
=
udfCtx
;
if
(
ctx
->
aggMergeFunc
)
{
ctx
->
aggMergeFunc
(
inputBuf1
,
inputBuf2
,
outputBuf
);
}
return
0
;
}
int32_t
udfdCPluginUdfAggFinish
(
SUdfInterBuf
*
buf
,
SUdfInterBuf
*
resultData
,
void
*
udfCtx
)
{
SUdfCPluginCtx
*
ctx
=
udfCtx
;
if
(
ctx
->
aggFinishFunc
)
{
ctx
->
aggFinishFunc
(
buf
,
resultData
);
}
return
0
;
}
// for c, the function pointer are filled directly and libloaded = true;
// for others, dlopen/dlsym to find function pointers
typedef
struct
SUdfScriptPlugin
{
int8_t
scriptType
;
char
libPath
[
PATH_MAX
];
bool
libLoaded
;
uv_lib_t
lib
;
TScriptUdfScalarProcFunc
udfScalarProcFunc
;
TScriptUdfAggStartFunc
udfAggStartFunc
;
TScriptUdfAggProcessFunc
udfAggProcFunc
;
TScriptUdfAggMergeFunc
udfAggMergeFunc
;
TScriptUdfAggFinishFunc
udfAggFinishFunc
;
TScriptUdfInitFunc
udfInitFunc
;
TScriptUdfDestoryFunc
udfDestroyFunc
;
TScriptOpenFunc
openFunc
;
TScriptCloseFunc
closeFunc
;
}
SUdfScriptPlugin
;
typedef
struct
SUdfdContext
{
uv_loop_t
*
loop
;
uv_pipe_t
ctrlPipe
;
...
...
@@ -37,11 +184,15 @@ typedef struct SUdfdContext {
char
listenPipeName
[
PATH_MAX
+
UDF_LISTEN_PIPE_NAME_LEN
+
2
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
SCorEpSet
mgmtEp
;
void
*
clientRpc
;
SCorEpSet
mgmtEp
;
uv_mutex_t
udfsMutex
;
SHashObj
*
udfsHash
;
uv_mutex_t
scriptPluginsMutex
;
SUdfScriptPlugin
*
scriptPlugins
[
MAX_NUM_SCRIPT_PLUGINS
];
SArray
*
residentFuncs
;
bool
printVersion
;
...
...
@@ -73,13 +224,8 @@ typedef struct SUvUdfWork {
typedef
enum
{
UDF_STATE_INIT
=
0
,
UDF_STATE_LOADING
,
UDF_STATE_READY
,
UDF_STATE_UNLOADING
}
EUdfState
;
typedef
struct
SUdf
{
int32_t
refCount
;
EUdfState
state
;
uv_mutex_t
lock
;
uv_cond_t
condReady
;
bool
resident
;
char
name
[
TSDB_FUNC_NAME_LEN
+
1
];
char
name
[
TSDB_FUNC_NAME_LEN
+
1
];
int8_t
funcType
;
int8_t
scriptType
;
int8_t
outputType
;
...
...
@@ -88,17 +234,14 @@ typedef struct SUdf {
char
path
[
PATH_MAX
];
uv_lib_t
lib
;
TUdfScalarProcFunc
scalarProcFunc
;
TUdfAggStartFunc
aggStartFunc
;
TUdfAggProcessFunc
aggProcFunc
;
TUdfAggFinishFunc
aggFinishFunc
;
TUdfAggMergeFunc
aggMergeFunc
;
int32_t
refCount
;
EUdfState
state
;
uv_mutex_t
lock
;
uv_cond_t
condReady
;
bool
resident
;
TUdfInitFunc
initFunc
;
TUdfDestroyFunc
destroyFunc
;
SUdfScriptPlugin
*
scriptPlugin
;
void
*
scriptUdfCtx
;
}
SUdf
;
// TODO: add private udf structure.
...
...
@@ -121,7 +264,6 @@ typedef struct SUdfdRpcSendRecvInfo {
static
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
);
static
int32_t
udfdConnectToMnode
();
static
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
);
static
bool
udfdRpcRfp
(
int32_t
code
,
tmsg_t
msgType
);
static
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
int32_t
udfdOpenClientRpc
();
...
...
@@ -155,6 +297,71 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static
int32_t
udfdRun
();
static
void
udfdConnectMnodeThreadFunc
(
void
*
args
);
void
udfdInitializeCPlugin
(
SUdfScriptPlugin
*
plugin
)
{
plugin
->
scriptType
=
TSDB_FUNC_SCRIPT_BIN_LIB
;
plugin
->
openFunc
=
udfdCPluginOpen
;
plugin
->
closeFunc
=
udfdCPluginClose
;
plugin
->
udfInitFunc
=
udfdCPluginUdfInit
;
plugin
->
udfDestroyFunc
=
udfdCPluginUdfDestroy
;
plugin
->
udfScalarProcFunc
=
udfdCPluginUdfScalarProc
;
plugin
->
udfAggStartFunc
=
udfdCPluginUdfAggStart
;
plugin
->
udfAggProcFunc
=
udfdCPluginUdfAggProc
;
plugin
->
udfAggMergeFunc
=
udfdCPluginUdfAggMerge
;
plugin
->
udfAggFinishFunc
=
udfdCPluginUdfAggFinish
;
return
;
}
int32_t
udfdLoadSharedLib
(
char
*
libPath
,
uv_lib_t
*
pLib
,
const
char
*
funcName
[],
void
**
func
[],
int
numOfFuncs
)
{
int
err
=
uv_dlopen
(
libPath
,
pLib
);
if
(
err
!=
0
)
{
fnError
(
"can not load library %s. error: %s"
,
libPath
,
uv_strerror
(
err
));
return
TSDB_CODE_UDF_LOAD_UDF_FAILURE
;
}
for
(
int
i
=
0
;
i
<
numOfFuncs
;
++
i
)
{
err
=
uv_dlsym
(
pLib
,
funcName
[
i
],
func
[
i
]);
if
(
err
!=
0
)
{
fnError
(
"load library function failed. lib %s function %s"
,
libPath
,
funcName
[
i
]);
}
}
return
0
;
}
void
udfdInitializePythonPlugin
(
SUdfScriptPlugin
*
plugin
)
{
plugin
->
scriptType
=
TSDB_FUNC_SCRIPT_PYTHON
;
sprintf
(
"%s"
,
plugin
->
libPath
,
"libtaosudf_py.so"
);
plugin
->
libLoaded
=
false
;
const
char
*
funcName
[
MAX_NUM_PLUGIN_FUNCS
]
=
{
"open"
,
"close"
,
"udfInit"
,
"udfDestroy"
,
"udfScalarProc"
,
"udfAggStart"
,
"udfAggFinish"
,
"udfAggProc"
,
"udfAggMerge"
};
void
**
funcs
[
MAX_NUM_PLUGIN_FUNCS
]
=
{
(
void
**
)
&
plugin
->
openFunc
,
(
void
**
)
&
plugin
->
closeFunc
,
(
void
**
)
&
plugin
->
udfInitFunc
,
(
void
**
)
&
plugin
->
udfDestroyFunc
,
(
void
**
)
&
plugin
->
udfScalarProcFunc
,
(
void
**
)
&
plugin
->
udfAggStartFunc
,
(
void
**
)
&
plugin
->
udfAggFinishFunc
,
(
void
**
)
&
plugin
->
udfAggProcFunc
,
(
void
**
)
&
plugin
->
udfAggMergeFunc
};
int32_t
err
=
udfdLoadSharedLib
(
plugin
->
libPath
,
&
plugin
->
lib
,
funcName
,
funcs
,
MAX_NUM_PLUGIN_FUNCS
);
if
(
err
!=
0
)
{
fnError
(
"can not load python plugin. lib path %s"
,
plugin
->
libPath
);
return
;
}
plugin
->
libLoaded
=
true
;
return
;
}
void
udfdInitScriptPlugins
()
{
SUdfScriptPlugin
*
plugins
=
taosMemoryCalloc
(
2
,
sizeof
(
SUdfScriptPlugin
));
// Initialize c language plugin
udfdInitializeCPlugin
(
plugins
+
0
);
// Initialize python plugin
udfdInitializePythonPlugin
(
plugins
+
1
);
return
;
}
void
udfdDeinitScriptPlugins
()
{
return
;
}
void
udfdProcessRequest
(
uv_work_t
*
req
)
{
SUvUdfWork
*
uvUdf
=
(
SUvUdfWork
*
)(
req
->
data
);
SUdfRequest
request
=
{
0
};
...
...
@@ -180,14 +387,43 @@ void udfdProcessRequest(uv_work_t *req) {
}
}
void
udfdProcessSetupRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo
(
"setup request. seq num: %"
PRId64
", udf name: %s"
,
request
->
seqNum
,
request
->
setup
.
udfName
);
SUdfSetupRequest
*
setup
=
&
request
->
setup
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SUdf
*
udf
=
NULL
;
void
convertUdf2UdfInfo
(
SUdf
*
udf
,
SUdfInfo
*
udfInfo
)
{
udfInfo
->
bufSize
=
udf
->
bufSize
;
udfInfo
->
funcType
=
udf
->
funcType
;
strncpy
(
udfInfo
->
name
,
udf
->
name
,
strlen
(
udf
->
name
));
udfInfo
->
outputLen
=
udf
->
outputLen
;
udfInfo
->
outputType
=
udf
->
outputType
;
strncpy
(
udfInfo
->
path
,
udf
->
path
,
strlen
(
udf
->
path
));
udfInfo
->
scriptType
=
udf
->
scriptType
;
}
int32_t
udfdInitUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
int32_t
err
=
0
;
err
=
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
udfName
,
udf
);
if
(
err
!=
0
)
{
fnError
(
"can not retrieve udf from mnode. udf name %s"
,
udfName
);
return
TSDB_CODE_UDF_LOAD_UDF_FAILURE
;
}
//TODO: remove script plugins mutex
uv_mutex_lock
(
&
global
.
scriptPluginsMutex
);
SUdfScriptPlugin
*
scriptPlugin
=
global
.
scriptPlugins
[
udf
->
scriptType
];
if
(
scriptPlugin
==
NULL
)
{
fnError
(
"udf name %s script type %d not supported"
,
udfName
,
udf
->
scriptType
);
uv_mutex_unlock
(
&
global
.
scriptPluginsMutex
);
return
TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED
;
}
uv_mutex_unlock
(
&
global
.
scriptPluginsMutex
);
udf
->
scriptPlugin
=
scriptPlugin
;
SUdfInfo
info
=
{
0
};
convertUdf2UdfInfo
(
udf
,
&
info
);
udf
->
scriptPlugin
->
udfInitFunc
(
&
info
,
&
udf
->
scriptUdfCtx
);
return
0
;
}
SUdf
*
udfdGetOrCreateUdf
(
const
char
*
udfName
)
{
SUdf
*
udf
=
NULL
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
));
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
udfName
,
strlen
(
udfName
));
if
(
udfInHash
)
{
++
(
*
udfInHash
)
->
refCount
;
udf
=
*
udfInHash
;
...
...
@@ -195,23 +431,35 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
else
{
SUdf
*
udfNew
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdf
));
udfNew
->
refCount
=
1
;
strncpy
(
udfNew
->
name
,
udfName
,
TSDB_FUNC_NAME_LEN
);
udfNew
->
state
=
UDF_STATE_INIT
;
uv_mutex_init
(
&
udfNew
->
lock
);
uv_cond_init
(
&
udfNew
->
condReady
);
udf
=
udfNew
;
SUdf
**
pUdf
=
&
udf
;
taosHashPut
(
global
.
udfsHash
,
request
->
setup
.
udfName
,
strlen
(
request
->
setup
.
udfName
),
pUdf
,
POINTER_BYTES
);
taosHashPut
(
global
.
udfsHash
,
udfName
,
strlen
(
udfName
),
pUdf
,
POINTER_BYTES
);
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
return
udf
;
}
void
udfdProcessSetupRequest
(
SUvUdfWork
*
uvUdf
,
SUdfRequest
*
request
)
{
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo
(
"setup request. seq num: %"
PRId64
", udf name: %s"
,
request
->
seqNum
,
request
->
setup
.
udfName
);
SUdfSetupRequest
*
setup
=
&
request
->
setup
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SUdf
*
udf
=
NULL
;
udf
=
udfdGetOrCreateUdf
(
setup
->
udfName
);
uv_mutex_lock
(
&
udf
->
lock
);
if
(
udf
->
state
==
UDF_STATE_INIT
)
{
udf
->
state
=
UDF_STATE_LOADING
;
code
=
udfdLoadUdf
(
setup
->
udfName
,
udf
);
if
(
udf
->
initFunc
)
{
udf
->
initFunc
();
}
code
=
udfdInitUdf
(
setup
->
udfName
,
udf
);
udf
->
resident
=
false
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
global
.
residentFuncs
);
++
i
)
{
char
*
funcName
=
taosArrayGet
(
global
.
residentFuncs
,
i
);
...
...
@@ -270,7 +518,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfDataBlock
input
=
{
0
};
convertDataBlockToUdfDataBlock
(
&
call
->
block
,
&
input
);
code
=
udf
->
sc
alarProcFunc
(
&
input
,
&
output
);
code
=
udf
->
sc
riptPlugin
->
udfScalarProcFunc
(
&
input
,
&
output
,
udf
->
scriptUdfCtx
);
freeUdfDataDataBlock
(
&
input
);
convertUdfColumnToDataBlock
(
&
output
,
&
response
.
callRsp
.
resultData
);
freeUdfColumn
(
&
output
);
...
...
@@ -278,7 +526,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case
TSDB_UDF_CALL_AGG_INIT
:
{
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
udf
->
aggStartFunc
(
&
outBuf
);
code
=
udf
->
scriptPlugin
->
udfAggStartFunc
(
&
outBuf
,
udf
->
scriptUdfCtx
);
subRsp
->
resultBuf
=
outBuf
;
break
;
}
...
...
@@ -286,7 +534,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfDataBlock
input
=
{
0
};
convertDataBlockToUdfDataBlock
(
&
call
->
block
,
&
input
);
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
code
=
udf
->
aggProcFunc
(
&
input
,
&
call
->
interBuf
,
&
outBuf
);
code
=
udf
->
scriptPlugin
->
udfAggProcFunc
(
&
input
,
&
call
->
interBuf
,
&
outBuf
,
udf
->
scriptUdfCtx
);
freeUdfInterBuf
(
&
call
->
interBuf
);
freeUdfDataDataBlock
(
&
input
);
subRsp
->
resultBuf
=
outBuf
;
...
...
@@ -295,7 +543,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case
TSDB_UDF_CALL_AGG_MERGE
:
{
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
code
=
udf
->
aggMergeFunc
(
&
call
->
interBuf
,
&
call
->
interBuf2
,
&
outBuf
);
code
=
udf
->
scriptPlugin
->
udfAggMergeFunc
(
&
call
->
interBuf
,
&
call
->
interBuf2
,
&
outBuf
,
udf
->
scriptUdfCtx
);
freeUdfInterBuf
(
&
call
->
interBuf
);
freeUdfInterBuf
(
&
call
->
interBuf2
);
subRsp
->
resultBuf
=
outBuf
;
...
...
@@ -304,7 +552,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case
TSDB_UDF_CALL_AGG_FIN
:
{
SUdfInterBuf
outBuf
=
{.
buf
=
taosMemoryMalloc
(
udf
->
bufSize
),
.
bufLen
=
udf
->
bufSize
,
.
numOfResult
=
0
};
code
=
udf
->
aggFinishFunc
(
&
call
->
interBuf
,
&
outBuf
);
code
=
udf
->
scriptPlugin
->
udfAggFinishFunc
(
&
call
->
interBuf
,
&
outBuf
,
udf
->
scriptUdfCtx
);
freeUdfInterBuf
(
&
call
->
interBuf
);
subRsp
->
resultBuf
=
outBuf
;
break
;
...
...
@@ -374,11 +622,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
if
(
unloadUdf
)
{
uv_cond_destroy
(
&
udf
->
condReady
);
uv_mutex_destroy
(
&
udf
->
lock
);
if
(
udf
->
destroyFunc
)
{
(
udf
->
destroyFunc
)();
}
uv_dlclose
(
&
udf
->
lib
);
taosMemoryFree
(
udf
);
udf
->
scriptPlugin
->
udfDestroyFunc
(
udf
->
scriptUdfCtx
);
}
taosMemoryFree
(
handle
);
...
...
@@ -440,7 +684,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto
_return
;
}
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
SUdf
*
udf
=
msgInfo
->
param
;
// SUdf *udf = msgInfo->param;
SUdf
*
udf
=
msgInfo
->
param
;
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
outputType
;
...
...
@@ -455,12 +700,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
char
path
[
PATH_MAX
]
=
{
0
};
#ifdef WINDOWS
snprintf
(
path
,
sizeof
(
path
),
"%s%s.dll"
,
tsTempDir
,
pFuncInfo
->
name
);
#else
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
tsTempDir
,
pFuncInfo
->
name
);
#endif
#ifdef WINDOWS
snprintf
(
path
,
sizeof
(
path
),
"%s%s"
,
tsTempDir
,
pFuncInfo
->
name
);
#else
snprintf
(
path
,
sizeof
(
path
),
"%s/%s"
,
tsTempDir
,
pFuncInfo
->
name
);
#endif
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
);
if
(
file
==
NULL
)
{
fnError
(
"udfd write udf shared library: %s failed, error: %d %s"
,
path
,
errno
,
strerror
(
errno
));
...
...
@@ -550,65 +794,10 @@ int32_t udfdConnectToMnode() {
return
code
;
}
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
strncpy
(
udf
->
name
,
udfName
,
TSDB_FUNC_NAME_LEN
);
int32_t
err
=
0
;
err
=
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
udf
->
name
,
udf
);
if
(
err
!=
0
)
{
fnError
(
"can not retrieve udf from mnode. udf name %s"
,
udfName
);
return
TSDB_CODE_UDF_LOAD_UDF_FAILURE
;
}
err
=
uv_dlopen
(
udf
->
path
,
&
udf
->
lib
);
if
(
err
!=
0
)
{
fnError
(
"can not load library %s. error: %s"
,
udf
->
path
,
uv_strerror
(
err
));
return
TSDB_CODE_UDF_LOAD_UDF_FAILURE
;
}
char
initFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
initSuffix
=
"_init"
;
strcpy
(
initFuncName
,
udfName
);
strncat
(
initFuncName
,
initSuffix
,
strlen
(
initSuffix
));
uv_dlsym
(
&
udf
->
lib
,
initFuncName
,
(
void
**
)(
&
udf
->
initFunc
));
char
destroyFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
destroySuffix
=
"_destroy"
;
strcpy
(
destroyFuncName
,
udfName
);
strncat
(
destroyFuncName
,
destroySuffix
,
strlen
(
destroySuffix
));
uv_dlsym
(
&
udf
->
lib
,
destroyFuncName
,
(
void
**
)(
&
udf
->
destroyFunc
));
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_SCALAR
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
uv_dlsym
(
&
udf
->
lib
,
processFuncName
,
(
void
**
)(
&
udf
->
scalarProcFunc
));
}
else
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_AGGREGATE
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
uv_dlsym
(
&
udf
->
lib
,
processFuncName
,
(
void
**
)(
&
udf
->
aggProcFunc
));
char
startFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
startSuffix
=
"_start"
;
strncpy
(
startFuncName
,
processFuncName
,
sizeof
(
startFuncName
));
strncat
(
startFuncName
,
startSuffix
,
strlen
(
startSuffix
));
uv_dlsym
(
&
udf
->
lib
,
startFuncName
,
(
void
**
)(
&
udf
->
aggStartFunc
));
char
finishFuncName
[
TSDB_FUNC_NAME_LEN
+
7
]
=
{
0
};
char
*
finishSuffix
=
"_finish"
;
strncpy
(
finishFuncName
,
processFuncName
,
sizeof
(
finishFuncName
));
strncat
(
finishFuncName
,
finishSuffix
,
strlen
(
finishSuffix
));
uv_dlsym
(
&
udf
->
lib
,
finishFuncName
,
(
void
**
)(
&
udf
->
aggFinishFunc
));
char
mergeFuncName
[
TSDB_FUNC_NAME_LEN
+
6
]
=
{
0
};
char
*
mergeSuffix
=
"_merge"
;
strncpy
(
mergeFuncName
,
processFuncName
,
sizeof
(
mergeFuncName
));
strncat
(
mergeFuncName
,
mergeSuffix
,
strlen
(
mergeSuffix
));
uv_dlsym
(
&
udf
->
lib
,
mergeFuncName
,
(
void
**
)(
&
udf
->
aggMergeFunc
));
}
return
0
;
}
static
bool
udfdRpcRfp
(
int32_t
code
,
tmsg_t
msgType
)
{
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
||
code
==
TSDB_CODE_SYN_NOT_LEADER
||
code
==
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
||
code
==
TSDB_CODE_SYN_RESTORING
||
code
==
TSDB_CODE_MNODE_NOT_FOUND
||
code
==
TSDB_CODE_APP_IS_STARTING
||
code
==
TSDB_CODE_APP_IS_STOPPING
)
{
code
==
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
||
code
==
TSDB_CODE_SYN_RESTORING
||
code
==
TSDB_CODE_MNODE_NOT_FOUND
||
code
==
TSDB_CODE_APP_IS_STARTING
||
code
==
TSDB_CODE_APP_IS_STOPPING
)
{
if
(
msgType
==
TDMT_SCH_QUERY
||
msgType
==
TDMT_SCH_MERGE_QUERY
||
msgType
==
TDMT_SCH_FETCH
||
msgType
==
TDMT_SCH_MERGE_FETCH
)
{
return
false
;
...
...
@@ -765,7 +954,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
}
void
udfdHandleRequest
(
SUdfdUvConn
*
conn
)
{
char
*
inputBuf
=
conn
->
inputBuf
;
char
*
inputBuf
=
conn
->
inputBuf
;
int32_t
inputLen
=
conn
->
inputLen
;
uv_work_t
*
work
=
taosMemoryMalloc
(
sizeof
(
uv_work_t
));
...
...
@@ -784,7 +973,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
void
udfdPipeCloseCb
(
uv_handle_t
*
pipe
)
{
SUdfdUvConn
*
conn
=
pipe
->
data
;
SUvUdfWork
*
pWork
=
conn
->
pWorkList
;
SUvUdfWork
*
pWork
=
conn
->
pWorkList
;
while
(
pWork
!=
NULL
)
{
pWork
->
conn
=
NULL
;
pWork
=
pWork
->
pWorkNext
;
...
...
@@ -960,6 +1149,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
}
static
int32_t
udfdRun
()
{
uv_mutex_init
(
&
global
.
scriptPluginsMutex
);
global
.
udfsHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
uv_mutex_init
(
&
global
.
udfsMutex
);
...
...
@@ -1017,11 +1208,7 @@ int32_t udfdDeinitResidentFuncs() {
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
funcName
,
strlen
(
funcName
));
if
(
udfInHash
)
{
SUdf
*
udf
=
*
udfInHash
;
if
(
udf
->
destroyFunc
)
{
(
udf
->
destroyFunc
)();
}
uv_dlclose
(
&
udf
->
lib
);
taosMemoryFree
(
udf
);
udf
->
scriptPlugin
->
udfDestroyFunc
(
udf
->
scriptUdfCtx
);
taosHashRemove
(
global
.
udfsHash
,
funcName
,
strlen
(
funcName
));
}
}
...
...
@@ -1072,6 +1259,8 @@ int main(int argc, char *argv[]) {
return
-
5
;
}
udfdInitScriptPlugins
();
udfdInitResidentFuncs
();
uv_thread_t
mnodeConnectThread
;
...
...
@@ -1083,6 +1272,9 @@ int main(int argc, char *argv[]) {
udfdCloseClientRpc
();
udfdDeinitResidentFuncs
();
udfdDeinitScriptPlugins
();
udfdCleanup
();
return
0
;
}
source/util/src/terror.c
浏览文件 @
52264734
...
...
@@ -573,6 +573,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid functio
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_NO_FUNC_HANDLE
,
"udf no function handle"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_BUFSIZE
,
"udf invalid bufsize"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_OUTPUT_TYPE
,
"udf invalid output type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED
,
"udf program language not supported"
)
//schemaless
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_PROTOCOL_TYPE
,
"Invalid line protocol type"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录