Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cc914db3
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
cc914db3
编写于
4月 25, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
4月 25, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11767 from taosdata/3.0_udfd
feat(udf):scalar api change
上级
8ace6c9b
837a754b
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
342 addition
and
182 deletion
+342
-182
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-11
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+6
-3
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+4
-4
source/dnode/mnode/impl/test/func/func.cpp
source/dnode/mnode/impl/test/func/func.cpp
+77
-11
source/libs/function/inc/tudf.h
source/libs/function/inc/tudf.h
+8
-5
source/libs/function/inc/tudfInt.h
source/libs/function/inc/tudfInt.h
+1
-1
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+93
-68
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+75
-28
source/libs/function/test/runUdf.c
source/libs/function/test/runUdf.c
+70
-47
tests/system-test/2-query/cast.py
tests/system-test/2-query/cast.py
+3
-3
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-1
未找到文件。
source/common/src/tmsg.c
浏览文件 @
cc914db3
...
...
@@ -1568,13 +1568,8 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
if
(
tEncodeI32
(
&
encoder
,
pReq
->
codeLen
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
signature
)
<
0
)
return
-
1
;
int32_t
codeSize
=
0
;
if
(
pReq
->
pCode
!=
NULL
)
{
codeSize
=
strlen
(
pReq
->
pCode
)
+
1
;
}
if
(
tEncodeI32
(
&
encoder
,
codeSize
)
<
0
)
return
-
1
;
if
(
pReq
->
pCode
!=
NULL
)
{
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
pCode
)
<
0
)
return
-
1
;
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
pCode
,
pReq
->
codeLen
)
<
0
)
return
-
1
;
}
int32_t
commentSize
=
0
;
...
...
@@ -1608,10 +1603,8 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
codeLen
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
signature
)
<
0
)
return
-
1
;
int32_t
codeSize
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
codeSize
)
<
0
)
return
-
1
;
if
(
codeSize
>
0
)
{
pReq
->
pCode
=
taosMemoryCalloc
(
1
,
codeSize
);
if
(
pReq
->
codeLen
>
0
)
{
pReq
->
pCode
=
taosMemoryCalloc
(
1
,
pReq
->
codeLen
);
if
(
pReq
->
pCode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -1734,7 +1727,7 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp *
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
codeSize
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
commentSize
)
<
0
)
return
-
1
;
if
(
pInfo
->
codeSize
)
{
if
(
tEncode
CStr
(
&
encoder
,
pInfo
->
pCod
e
)
<
0
)
return
-
1
;
if
(
tEncode
Binary
(
&
encoder
,
pInfo
->
pCode
,
pInfo
->
codeSiz
e
)
<
0
)
return
-
1
;
}
if
(
pInfo
->
commentSize
)
{
if
(
tEncodeCStr
(
&
encoder
,
pInfo
->
pComment
)
<
0
)
return
-
1
;
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
cc914db3
...
...
@@ -311,6 +311,9 @@ static void dmWatchUdfd(void *args) {
}
static
int32_t
dmStartUdfd
(
SDnode
*
pDnode
)
{
char
dnodeId
[
8
]
=
{
0
};
snprintf
(
dnodeId
,
sizeof
(
dnodeId
),
"%d"
,
pDnode
->
data
.
dnodeId
);
uv_os_setenv
(
"DNODE_ID"
,
dnodeId
);
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
if
(
pData
->
startCalled
)
{
dInfo
(
"dnode-mgmt start udfd already called"
);
...
...
@@ -371,9 +374,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
dmReportStartup
(
pDnode
,
"dnode-transport"
,
"initialized"
);
//
if (dmStartUdfd(pDnode) != 0) {
//
dError("failed to start udfd");
//
}
if
(
dmStartUdfd
(
pDnode
)
!=
0
)
{
dError
(
"failed to start udfd"
);
}
dInfo
(
"dnode-mgmt is initialized"
);
return
0
;
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
cc914db3
...
...
@@ -309,10 +309,10 @@ static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) {
goto
_OVER
;
}
if
(
createReq
.
pCode
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_FUNC_CODE
;
goto
_OVER
;
}
if
(
createReq
.
codeLen
<=
1
)
{
terrno
=
TSDB_CODE_MND_INVALID_FUNC_CODE
;
goto
_OVER
;
}
if
(
createReq
.
bufSize
<=
0
||
createReq
.
bufSize
>
TSDB_FUNC_BUF_SIZE
)
{
terrno
=
TSDB_CODE_MND_INVALID_FUNC_BUFSIZE
;
...
...
source/dnode/mnode/impl/test/func/func.cpp
浏览文件 @
cc914db3
...
...
@@ -22,17 +22,16 @@ class MndTestFunc : public ::testing::Test {
void
SetUp
()
override
{}
void
TearDown
()
override
{}
void
SetCode
(
SCreateFuncReq
*
pReq
,
const
char
*
pCode
);
void
SetCode
(
SCreateFuncReq
*
pReq
,
const
char
*
pCode
,
int32_t
size
);
void
SetComment
(
SCreateFuncReq
*
pReq
,
const
char
*
pComment
);
};
Testbase
MndTestFunc
::
test
;
void
MndTestFunc
::
SetCode
(
SCreateFuncReq
*
pReq
,
const
char
*
pCode
)
{
int32_t
len
=
strlen
(
pCode
);
pReq
->
pCode
=
(
char
*
)
taosMemoryCalloc
(
1
,
len
+
1
);
strcpy
(
pReq
->
pCode
,
pCode
);
pReq
->
codeLen
=
len
;
void
MndTestFunc
::
SetCode
(
SCreateFuncReq
*
pReq
,
const
char
*
pCode
,
int32_t
size
)
{
pReq
->
pCode
=
(
char
*
)
taosMemoryMalloc
(
size
);
memcpy
(
pReq
->
pCode
,
pCode
,
size
);
pReq
->
codeLen
=
size
;
}
void
MndTestFunc
::
SetComment
(
SCreateFuncReq
*
pReq
,
const
char
*
pComment
)
{
...
...
@@ -79,7 +78,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
{
SCreateFuncReq
createReq
=
{
0
};
strcpy
(
createReq
.
name
,
"f1"
);
SetCode
(
&
createReq
,
""
);
SetCode
(
&
createReq
,
""
,
1
);
SetComment
(
&
createReq
,
"comment1"
);
int32_t
contLen
=
tSerializeSCreateFuncReq
(
NULL
,
0
,
&
createReq
);
...
...
@@ -95,7 +94,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
{
SCreateFuncReq
createReq
=
{
0
};
strcpy
(
createReq
.
name
,
"f1"
);
SetCode
(
&
createReq
,
"code1"
);
SetCode
(
&
createReq
,
"code1"
,
6
);
SetComment
(
&
createReq
,
"comment1"
);
int32_t
contLen
=
tSerializeSCreateFuncReq
(
NULL
,
0
,
&
createReq
);
...
...
@@ -111,7 +110,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
{
SCreateFuncReq
createReq
=
{
0
};
strcpy
(
createReq
.
name
,
"f1"
);
SetCode
(
&
createReq
,
"code1"
);
SetCode
(
&
createReq
,
"code1"
,
6
);
SetComment
(
&
createReq
,
"comment1"
);
createReq
.
bufSize
=
TSDB_FUNC_BUF_SIZE
+
1
;
...
...
@@ -128,7 +127,7 @@ TEST_F(MndTestFunc, 02_Create_Func) {
for
(
int32_t
i
=
0
;
i
<
3
;
++
i
)
{
SCreateFuncReq
createReq
=
{
0
};
strcpy
(
createReq
.
name
,
"f1"
);
SetCode
(
&
createReq
,
"code1"
);
SetCode
(
&
createReq
,
"code1"
,
6
);
SetComment
(
&
createReq
,
"comment1"
);
createReq
.
bufSize
=
TSDB_FUNC_BUF_SIZE
+
1
;
createReq
.
igExists
=
0
;
...
...
@@ -253,7 +252,7 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) {
createReq
.
outputLen
=
24
;
createReq
.
bufSize
=
6
;
createReq
.
signature
=
18
;
SetCode
(
&
createReq
,
"code2"
);
SetCode
(
&
createReq
,
"code2"
,
6
);
SetComment
(
&
createReq
,
"comment2"
);
int32_t
contLen
=
tSerializeSCreateFuncReq
(
NULL
,
0
,
&
createReq
);
...
...
@@ -439,3 +438,70 @@ TEST_F(MndTestFunc, 04_Drop_Func) {
test
.
SendShowReq
(
TSDB_MGMT_TABLE_FUNC
,
"user_functions"
,
""
);
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
}
TEST_F
(
MndTestFunc
,
05
_Actual_code
)
{
{
SCreateFuncReq
createReq
=
{
0
};
strcpy
(
createReq
.
name
,
"udf1"
);
char
code
[
300
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
sizeof
(
code
);
++
i
)
{
code
[
i
]
=
(
i
)
%
20
;
}
SetCode
(
&
createReq
,
code
,
300
);
SetComment
(
&
createReq
,
"comment1"
);
createReq
.
bufSize
=
8
;
createReq
.
igExists
=
0
;
createReq
.
funcType
=
1
;
createReq
.
scriptType
=
2
;
createReq
.
outputType
=
TSDB_DATA_TYPE_SMALLINT
;
createReq
.
outputLen
=
12
;
createReq
.
bufSize
=
4
;
createReq
.
signature
=
5
;
int32_t
contLen
=
tSerializeSCreateFuncReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSCreateFuncReq
(
pReq
,
contLen
,
&
createReq
);
tFreeSCreateFuncReq
(
&
createReq
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_MND_CREATE_FUNC
,
pReq
,
contLen
);
ASSERT_NE
(
pRsp
,
nullptr
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
}
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
taosArrayPush
(
retrieveReq
.
pFuncNames
,
"udf1"
);
int32_t
contLen
=
tSerializeSRetrieveFuncReq
(
NULL
,
0
,
&
retrieveReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSRetrieveFuncReq
(
pReq
,
contLen
,
&
retrieveReq
);
tFreeSRetrieveFuncReq
(
&
retrieveReq
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_MND_RETRIEVE_FUNC
,
pReq
,
contLen
);
ASSERT_NE
(
pRsp
,
nullptr
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
pRsp
->
pCont
,
pRsp
->
contLen
,
&
retrieveRsp
);
EXPECT_EQ
(
retrieveRsp
.
numOfFuncs
,
1
);
EXPECT_EQ
(
retrieveRsp
.
numOfFuncs
,
(
int32_t
)
taosArrayGetSize
(
retrieveRsp
.
pFuncInfos
));
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
EXPECT_STREQ
(
pFuncInfo
->
name
,
"udf1"
);
EXPECT_EQ
(
pFuncInfo
->
funcType
,
1
);
EXPECT_EQ
(
pFuncInfo
->
scriptType
,
2
);
EXPECT_EQ
(
pFuncInfo
->
outputType
,
TSDB_DATA_TYPE_SMALLINT
);
EXPECT_EQ
(
pFuncInfo
->
outputLen
,
12
);
EXPECT_EQ
(
pFuncInfo
->
bufSize
,
4
);
EXPECT_EQ
(
pFuncInfo
->
signature
,
5
);
EXPECT_STREQ
(
"comment1"
,
pFuncInfo
->
pComment
);
for
(
int32_t
i
=
0
;
i
<
300
;
++
i
)
{
EXPECT_EQ
(
pFuncInfo
->
pCode
[
i
],
(
i
)
%
20
);
}
tFreeSRetrieveFuncRsp
(
&
retrieveRsp
);
}
}
\ No newline at end of file
source/libs/function/inc/tudf.h
浏览文件 @
cc914db3
...
...
@@ -29,29 +29,32 @@ extern "C" {
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//======================================================================================
//begin API to taosd and qworker
enum
{
UDFC_CODE_STOPPING
=
-
1
,
UDFC_CODE_PIPE_READ_ERR
=
-
3
,
UDFC_CODE_PIPE_READ_ERR
=
-
2
,
UDFC_CODE_CONNECT_PIPE_ERR
=
-
3
,
UDFC_CODE_LOAD_UDF_FAILURE
=
-
4
,
UDFC_CODE_INVALID_STATE
=
-
5
};
typedef
void
*
UdfcHandle
;
typedef
void
*
UdfcFuncHandle
;
/**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code
*/
int32_t
udfcOpen
(
int32_t
dnodeId
,
UdfcHandle
*
proxyHandle
);
int32_t
udfcOpen
();
/**
* destroy udfd proxy
* @return error code
*/
int32_t
udfcClose
(
UdfcHandle
proxyhandle
);
int32_t
udfcClose
();
/**
...
...
@@ -60,7 +63,7 @@ int32_t udfcClose(UdfcHandle proxyhandle);
* @param handle, out
* @return error code
*/
int32_t
setupUdf
(
UdfcHandle
proxyHandle
,
char
udfName
[],
SEpSet
*
epSet
,
UdfcFuncHandle
*
handle
);
int32_t
setupUdf
(
char
udfName
[]
,
UdfcFuncHandle
*
handle
);
typedef
struct
SUdfColumnMeta
{
int16_t
type
;
...
...
source/libs/function/inc/tudfInt.h
浏览文件 @
cc914db3
...
...
@@ -39,7 +39,6 @@ enum {
typedef
struct
SUdfSetupRequest
{
char
udfName
[
TSDB_FUNC_NAME_LEN
];
SEpSet
epSet
;
}
SUdfSetupRequest
;
typedef
struct
SUdfSetupResponse
{
...
...
@@ -112,6 +111,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block);
int32_t
convertDataBlockToUdfDataBlock
(
SSDataBlock
*
block
,
SUdfDataBlock
*
udfBlock
);
int32_t
convertUdfColumnToDataBlock
(
SUdfColumn
*
udfCol
,
SSDataBlock
*
block
);
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/function/src/tudf.c
浏览文件 @
cc914db3
...
...
@@ -124,7 +124,7 @@ enum {
int64_t
gUdfTaskSeqNum
=
0
;
typedef
struct
SUdfdProxy
{
int32_t
dnodeId
;
char
udfdPipeName
[
UDF_LISTEN_PIPE_NAME_LEN
]
;
uv_barrier_t
gUdfInitBarrier
;
uv_loop_t
gUdfdLoop
;
...
...
@@ -137,11 +137,11 @@ typedef struct SUdfdProxy {
int8_t
gUdfcState
;
QUEUE
gUdfTaskQueue
;
QUEUE
gUvProcTaskQueue
;
// int8_t gUdfcState = UDFC_STATE_INITAL;
// QUEUE gUdfTaskQueue = {0};
// QUEUE gUvProcTaskQueue = {0};
int8_t
initialized
;
}
SUdfdProxy
;
SUdfdProxy
gUdfdProxy
=
{
0
};
typedef
struct
SUdfUvSession
{
SUdfdProxy
*
udfc
;
...
...
@@ -209,19 +209,27 @@ enum {
UDFC_STATE_STARTNG
,
// starting after udfcOpen
UDFC_STATE_READY
,
// started and begin to receive quests
UDFC_STATE_STOPPING
,
// stopping after udfcClose
UDFC_STATUS_FINAL
,
// stopped
};
int32_t
getUdfdPipeName
(
char
*
pipeName
,
int32_t
size
)
{
char
dnodeId
[
8
]
=
{
0
};
size_t
dnodeIdSize
;
int32_t
err
=
uv_os_getenv
(
UDF_DNODE_ID_ENV_NAME
,
dnodeId
,
&
dnodeIdSize
);
if
(
err
!=
0
)
{
dnodeId
[
0
]
=
'1'
;
}
snprintf
(
pipeName
,
size
,
"%s%s"
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
dnodeId
);
return
0
;
}
int32_t
encodeUdfSetupRequest
(
void
**
buf
,
const
SUdfSetupRequest
*
setup
)
{
int32_t
len
=
0
;
len
+=
taosEncodeBinary
(
buf
,
setup
->
udfName
,
TSDB_FUNC_NAME_LEN
);
len
+=
taosEncodeSEpSet
(
buf
,
&
setup
->
epSet
);
return
len
;
}
void
*
decodeUdfSetupRequest
(
const
void
*
buf
,
SUdfSetupRequest
*
request
)
{
buf
=
taosDecodeBinaryTo
(
buf
,
request
->
udfName
,
TSDB_FUNC_NAME_LEN
);
buf
=
taosDecodeSEpSet
((
void
*
)
buf
,
&
request
->
epSet
);
return
(
void
*
)
buf
;
}
...
...
@@ -604,7 +612,7 @@ void onUdfcPipeClose(uv_handle_t *handle) {
}
int32_t
udfcGetUvTaskResponseResult
(
SClientUdfTask
*
task
,
SClientUvTaskNode
*
uvTask
)
{
debugPrint
(
"%s"
,
"get uv task result"
);
fnDebug
(
"udfc get uv task result. task: %p"
,
task
);
if
(
uvTask
->
type
==
UV_TASK_REQ_RSP
)
{
if
(
uvTask
->
rspBuf
.
base
!=
NULL
)
{
SUdfResponse
rsp
;
...
...
@@ -647,7 +655,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
}
void
udfcAllocateBuffer
(
uv_handle_t
*
handle
,
size_t
suggestedSize
,
uv_buf_t
*
buf
)
{
debugPrint
(
"%s"
,
"client allocate buffer to receive from pipe"
);
SClientUvConn
*
conn
=
handle
->
data
;
SClientConnBuf
*
connBuf
=
&
conn
->
readBuf
;
...
...
@@ -662,7 +669,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf
->
base
=
connBuf
->
buf
;
buf
->
len
=
connBuf
->
cap
;
}
else
{
//TODO: log error
fnError
(
"udfc allocate buffer failure. size: %d"
,
msgHeadSize
);
buf
->
base
=
NULL
;
buf
->
len
=
0
;
}
...
...
@@ -674,13 +681,13 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf
->
base
=
connBuf
->
buf
+
connBuf
->
len
;
buf
->
len
=
connBuf
->
cap
-
connBuf
->
len
;
}
else
{
//TODO: log error free connBuf->buf
fnError
(
"udfc re-allocate buffer failure. size: %d"
,
connBuf
->
cap
);
buf
->
base
=
NULL
;
buf
->
len
=
0
;
}
}
debugPrint
(
"
\t
conn buf cap - len - total : %d - %d - %d"
,
connBuf
->
cap
,
connBuf
->
len
,
connBuf
->
total
);
fnTrace
(
"
conn buf cap - len - total : %d - %d - %d"
,
connBuf
->
cap
,
connBuf
->
len
,
connBuf
->
total
);
}
...
...
@@ -689,6 +696,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
connBuf
->
total
=
*
(
int32_t
*
)
(
connBuf
->
buf
);
}
if
(
connBuf
->
len
==
connBuf
->
cap
&&
connBuf
->
total
==
connBuf
->
cap
)
{
fnTrace
(
"udfc complete message is received, now handle it"
);
return
true
;
}
return
false
;
...
...
@@ -696,10 +704,10 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
void
udfcUvHandleRsp
(
SClientUvConn
*
conn
)
{
SClientConnBuf
*
connBuf
=
&
conn
->
readBuf
;
int64_t
seqNum
=
*
(
int64_t
*
)
(
connBuf
->
buf
+
sizeof
(
int32_t
));
// msglen
int32_t
then seqnum
int64_t
seqNum
=
*
(
int64_t
*
)
(
connBuf
->
buf
+
sizeof
(
int32_t
));
// msglen then seqnum
if
(
QUEUE_EMPTY
(
&
conn
->
taskQueue
))
{
//LOG error
fnError
(
"udfc no task waiting for response on connection"
);
return
;
}
bool
found
=
false
;
...
...
@@ -713,7 +721,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
found
=
true
;
taskFound
=
task
;
}
else
{
//LOG error
;
fnError
(
"udfc more than one task waiting for the same response"
)
;
continue
;
}
}
...
...
@@ -727,7 +735,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
uv_sem_post
(
&
taskFound
->
taskSem
);
QUEUE_REMOVE
(
&
taskFound
->
procTaskQueue
);
}
else
{
//TODO: LOG error
fnError
(
"no task is waiting for the response."
);
}
connBuf
->
buf
=
NULL
;
connBuf
->
total
=
-
1
;
...
...
@@ -751,7 +759,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
}
void
onUdfcRead
(
uv_stream_t
*
client
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
debugPrint
(
"%s, nread: %zd"
,
"client read from pipe"
,
nread
);
fnTrace
(
"udfc client %p, client read from pipe. nread: %zd"
,
client
,
nread
);
if
(
nread
==
0
)
return
;
SClientUvConn
*
conn
=
client
->
data
;
...
...
@@ -764,9 +772,9 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
if
(
nread
<
0
)
{
debugPrint
(
"
\t
client read error: %s"
,
uv_strerror
(
nread
));
fnError
(
"udfc client pipe %p read error: %s"
,
client
,
uv_strerror
(
nread
));
if
(
nread
==
UV_EOF
)
{
//TODO:
fnError
(
"udfc client pipe %p closed"
,
client
);
}
udfcUvHandleError
(
conn
);
}
...
...
@@ -774,16 +782,15 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
void
onUdfClientWrite
(
uv_write_t
*
write
,
int
status
)
{
debugPrint
(
"%s"
,
"after writing to pipe"
);
SClientUvTaskNode
*
uvTask
=
write
->
data
;
uv_pipe_t
*
pipe
=
uvTask
->
pipe
;
if
(
status
==
0
)
{
uv_pipe_t
*
pipe
=
uvTask
->
pipe
;
SClientUvConn
*
conn
=
pipe
->
data
;
QUEUE_INSERT_TAIL
(
&
conn
->
taskQueue
,
&
uvTask
->
connTaskQueue
);
}
else
{
//TODO Log error
;
fnError
(
"udfc client %p write error."
,
pipe
)
;
}
debugPrint
(
"
\t
length:%zu"
,
uvTask
->
reqBuf
.
len
);
fnTrace
(
"udfc client %p write length:%zu"
,
pipe
,
uvTask
->
reqBuf
.
len
);
taosMemoryFree
(
write
);
taosMemoryFree
(
uvTask
->
reqBuf
.
base
);
}
...
...
@@ -841,7 +848,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
}
int32_t
queueUvUdfTask
(
SClientUvTaskNode
*
uvTask
)
{
debugPrint
(
"%s, %d"
,
"queue uv task"
,
uvTask
->
type
);
fnTrace
(
"queue uv task to event loop, task: %d, %p"
,
uvTask
->
type
,
uvTask
);
SUdfdProxy
*
udfc
=
uvTask
->
udfc
;
uv_mutex_lock
(
&
udfc
->
gUdfTaskQueueMutex
);
QUEUE_INSERT_TAIL
(
&
udfc
->
gUdfTaskQueue
,
&
uvTask
->
recvTaskQueue
);
...
...
@@ -855,7 +862,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
}
int32_t
startUvUdfTask
(
SClientUvTaskNode
*
uvTask
)
{
debugPrint
(
"%s, type %d"
,
"start uv task "
,
uvTask
->
type
);
fnTrace
(
"event loop start uv task. task: %d, %p"
,
uvTask
->
type
,
uvTask
);
switch
(
uvTask
->
type
)
{
case
UV_TASK_CONNECT
:
{
uv_pipe_t
*
pipe
=
taosMemoryMalloc
(
sizeof
(
uv_pipe_t
));
...
...
@@ -874,8 +881,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
uv_connect_t
*
connReq
=
taosMemoryMalloc
(
sizeof
(
uv_connect_t
));
connReq
->
data
=
uvTask
;
uv_pipe_connect
(
connReq
,
pipe
,
"udf.sock"
,
onUdfClientConnect
);
uv_pipe_connect
(
connReq
,
pipe
,
uvTask
->
udfc
->
udfdPipeName
,
onUdfClientConnect
);
break
;
}
case
UV_TASK_REQ_RSP
:
{
...
...
@@ -971,27 +977,37 @@ void constructUdfService(void *argsThread) {
uv_loop_close
(
&
udfc
->
gUdfdLoop
);
}
int32_t
udfcOpen
(
int32_t
dnodeId
,
UdfcHandle
*
udfc
)
{
SUdfdProxy
*
proxy
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdProxy
));
proxy
->
dnodeId
=
dnodeId
;
int32_t
udfcOpen
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
gUdfdProxy
.
initialized
,
0
,
1
);
if
(
old
==
1
)
{
return
0
;
}
SUdfdProxy
*
proxy
=
&
gUdfdProxy
;
getUdfdPipeName
(
proxy
->
udfdPipeName
,
UDF_LISTEN_PIPE_NAME_LEN
);
proxy
->
gUdfcState
=
UDFC_STATE_STARTNG
;
uv_barrier_init
(
&
proxy
->
gUdfInitBarrier
,
2
);
uv_thread_create
(
&
proxy
->
gUdfLoopThread
,
constructUdfService
,
proxy
);
uv_barrier_wait
(
&
proxy
->
gUdfInitBarrier
);
atomic_store_8
(
&
proxy
->
gUdfcState
,
UDFC_STATE_READY
);
proxy
->
gUdfcState
=
UDFC_STATE_READY
;
*
udfc
=
proxy
;
uv_barrier_wait
(
&
proxy
->
gUdfInitBarrier
);
fnInfo
(
"udfc initialized"
)
return
0
;
}
int32_t
udfcClose
(
UdfcHandle
udfcHandle
)
{
SUdfdProxy
*
udfc
=
udfcHandle
;
int32_t
udfcClose
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
gUdfdProxy
.
initialized
,
1
,
0
);
if
(
old
==
0
)
{
return
0
;
}
SUdfdProxy
*
udfc
=
&
gUdfdProxy
;
udfc
->
gUdfcState
=
UDFC_STATE_STOPPING
;
uv_async_send
(
&
udfc
->
gUdfLoopStopAsync
);
uv_thread_join
(
&
udfc
->
gUdfLoopThread
);
uv_mutex_destroy
(
&
udfc
->
gUdfTaskQueueMutex
);
uv_barrier_destroy
(
&
udfc
->
gUdfInitBarrier
);
udfc
->
gUdfcState
=
UDFC_STAT
US_FIN
AL
;
taosMemoryFree
(
udfc
);
udfc
->
gUdfcState
=
UDFC_STAT
E_INIT
AL
;
fnInfo
(
"udfc cleaned up"
);
return
0
;
}
...
...
@@ -1009,12 +1025,15 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return
task
->
errCode
;
}
int32_t
setupUdf
(
UdfcHandle
udfc
,
char
udfName
[],
SEpSet
*
epSet
,
UdfcFuncHandle
*
funcHandle
)
{
debugPrint
(
"%s"
,
"client setup udf"
);
int32_t
setupUdf
(
char
udfName
[],
UdfcFuncHandle
*
funcHandle
)
{
fnInfo
(
"udfc setup udf. udfName: %s"
,
udfName
);
if
(
gUdfdProxy
.
gUdfcState
!=
UDFC_STATE_READY
)
{
return
UDFC_CODE_INVALID_STATE
;
}
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
task
->
errCode
=
0
;
task
->
session
=
taosMemoryMalloc
(
sizeof
(
SUdfUvSession
));
task
->
session
->
udfc
=
udfc
;
task
->
session
->
udfc
=
&
gUdfdProxy
;
task
->
type
=
UDF_TASK_SETUP
;
SUdfSetupRequest
*
req
=
&
task
->
_setup
.
req
;
...
...
@@ -1022,15 +1041,20 @@ int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle
int32_t
errCode
=
udfcRunUvTask
(
task
,
UV_TASK_CONNECT
);
if
(
errCode
!=
0
)
{
//TODO: log error
return
-
1
;
fnError
(
"failed to connect to pipe. udfName: %s, pipe: %s"
,
udfName
,
(
&
gUdfdProxy
)
->
udfdPipeName
);
return
UDFC_CODE_CONNECT_PIPE_ERR
;
}
udfcRunUvTask
(
task
,
UV_TASK_REQ_RSP
);
SUdfSetupResponse
*
rsp
=
&
task
->
_setup
.
rsp
;
task
->
session
->
severHandle
=
rsp
->
udfHandle
;
*
funcHandle
=
task
->
session
;
if
(
task
->
errCode
!=
0
)
{
fnError
(
"failed to setup udf. err: %d"
,
task
->
errCode
)
}
else
{
fnInfo
(
"sucessfully setup udf func handle. handle: %p"
,
task
->
session
);
*
funcHandle
=
task
->
session
;
}
int32_t
err
=
task
->
errCode
;
taosMemoryFree
(
task
);
return
err
;
...
...
@@ -1038,7 +1062,7 @@ int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle
int32_t
callUdf
(
UdfcFuncHandle
handle
,
int8_t
callType
,
SSDataBlock
*
input
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
state2
,
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
)
{
debugPrint
(
"%s"
,
"client call udf"
);
fnTrace
(
"udfc call udf. callType: %d, funcHandle: %p"
,
callType
,
handle
);
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
task
->
errCode
=
0
;
...
...
@@ -1076,35 +1100,37 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
udfcRunUvTask
(
task
,
UV_TASK_REQ_RSP
);
SUdfCallResponse
*
rsp
=
&
task
->
_call
.
rsp
;
switch
(
callType
)
{
case
TSDB_UDF_CALL_AGG_INIT
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_AGG_PROC
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_AGG_MERGE
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_AGG_FIN
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_SCALA_PROC
:
{
*
output
=
rsp
->
resultData
;
break
;
if
(
task
->
errCode
!=
0
)
{
fnError
(
"call udf failure. err: %d"
,
task
->
errCode
);
}
else
{
SUdfCallResponse
*
rsp
=
&
task
->
_call
.
rsp
;
switch
(
callType
)
{
case
TSDB_UDF_CALL_AGG_INIT
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_AGG_PROC
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_AGG_MERGE
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_AGG_FIN
:
{
*
newState
=
rsp
->
resultBuf
;
break
;
}
case
TSDB_UDF_CALL_SCALA_PROC
:
{
*
output
=
rsp
->
resultData
;
break
;
}
}
}
taosMemoryFree
(
task
);
return
task
->
errCode
;
}
//TODO: translate these calls to callUdf
int32_t
callUdfAggInit
(
UdfcFuncHandle
handle
,
SUdfInterBuf
*
interBuf
)
{
int8_t
callType
=
TSDB_UDF_CALL_AGG_INIT
;
...
...
@@ -1148,7 +1174,7 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
}
int32_t
teardownUdf
(
UdfcFuncHandle
handle
)
{
debugPrint
(
"%s"
,
"client teardown udf"
);
fnInfo
(
"tear down udf. udf func handle: %p"
,
handle
);
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
task
->
errCode
=
0
;
...
...
@@ -1160,7 +1186,6 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
udfcRunUvTask
(
task
,
UV_TASK_REQ_RSP
);
SUdfTeardownResponse
*
rsp
=
&
task
->
_teardown
.
rsp
;
int32_t
err
=
task
->
errCode
;
...
...
source/libs/function/src/udfd.c
浏览文件 @
cc914db3
...
...
@@ -20,6 +20,7 @@
#include "tudf.h"
#include "tudfInt.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tmsg.h"
...
...
@@ -31,8 +32,9 @@ typedef struct SUdfdContext {
uv_signal_t
intrSignal
;
char
listenPipeName
[
UDF_LISTEN_PIPE_NAME_LEN
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
void
*
clientRpc
;
SCorEpSet
mgmtEp
;
uv_mutex_t
udfsMutex
;
SHashObj
*
udfsHash
;
...
...
@@ -63,8 +65,13 @@ typedef struct SUdf {
uv_mutex_t
lock
;
uv_cond_t
condReady
;
char
name
[
16
];
int8_t
type
;
char
name
[
TSDB_FUNC_NAME_LEN
];
int8_t
funcType
;
int8_t
scriptType
;
int8_t
outputType
;
int32_t
outputLen
;
int32_t
bufSize
;
char
path
[
PATH_MAX
];
uv_lib_t
lib
;
...
...
@@ -78,17 +85,17 @@ typedef struct SUdfcFuncHandle {
SUdf
*
udf
;
}
SUdfcFuncHandle
;
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
SEpSet
*
pEpSet
,
char
*
udfName
,
SUdf
*
udf
);
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
);
int32_t
udfdLoadUdf
(
char
*
udfName
,
S
EpSet
*
pEpSet
,
S
Udf
*
udf
)
{
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
strcpy
(
udf
->
name
,
udfName
);
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
pEpSet
,
udf
->
name
,
udf
);
udfdFillUdfInfoFromMNode
(
global
.
clientRpc
,
udf
->
name
,
udf
);
//strcpy(udf->path, "/home/slzhou/TDengine/debug/build/lib/libudf1.so");
int
err
=
uv_dlopen
(
udf
->
path
,
&
udf
->
lib
);
if
(
err
!=
0
)
{
fnError
(
"can not load library %s. error: %s"
,
udf
->
path
,
uv_strerror
(
err
));
// TODO set error
return
UDFC_CODE_LOAD_UDF_FAILURE
;
}
// TODO: find all the functions
char
normalFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
...
...
@@ -115,8 +122,8 @@ void udfdProcessRequest(uv_work_t *req) {
SUdf
*
udf
=
NULL
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
.
setup
.
udfName
,
TSDB_FUNC_NAME_LEN
);
if
(
*
udfInHash
)
{
SUdf
**
udfInHash
=
taosHashGet
(
global
.
udfsHash
,
request
.
setup
.
udfName
,
strlen
(
request
.
setup
.
udfName
)
);
if
(
udfInHash
)
{
++
(
*
udfInHash
)
->
refCount
;
udf
=
*
udfInHash
;
uv_mutex_unlock
(
&
global
.
udfsMutex
);
...
...
@@ -128,14 +135,14 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_init
(
&
udfNew
->
lock
);
uv_cond_init
(
&
udfNew
->
condReady
);
udf
=
udfNew
;
taosHashPut
(
global
.
udfsHash
,
request
.
setup
.
udfName
,
TSDB_FUNC_NAME_LEN
,
&
udfNew
,
sizeof
(
&
udfNew
));
taosHashPut
(
global
.
udfsHash
,
request
.
setup
.
udfName
,
strlen
(
request
.
setup
.
udfName
)
,
&
udfNew
,
sizeof
(
&
udfNew
));
uv_mutex_unlock
(
&
global
.
udfsMutex
);
}
uv_mutex_lock
(
&
udf
->
lock
);
if
(
udf
->
state
==
UDF_STATE_INIT
)
{
udf
->
state
=
UDF_STATE_LOADING
;
udfdLoadUdf
(
setup
->
udfName
,
&
setup
->
epSet
,
udf
);
udfdLoadUdf
(
setup
->
udfName
,
udf
);
udf
->
state
=
UDF_STATE_READY
;
uv_cond_broadcast
(
&
udf
->
condReady
);
uv_mutex_unlock
(
&
udf
->
lock
);
...
...
@@ -214,7 +221,7 @@ void udfdProcessRequest(uv_work_t *req) {
udf
->
refCount
--
;
if
(
udf
->
refCount
==
0
)
{
unloadUdf
=
true
;
taosHashRemove
(
global
.
udfsHash
,
udf
->
name
,
TSDB_FUNC_NAME_LEN
);
taosHashRemove
(
global
.
udfsHash
,
udf
->
name
,
strlen
(
udf
->
name
)
);
}
uv_mutex_unlock
(
&
global
.
udfsMutex
);
if
(
unloadUdf
)
{
...
...
@@ -393,7 +400,48 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
SEpSet
*
pEpSet
,
char
*
udfName
,
SUdf
*
udf
)
{
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
// init mnode ip set
SEpSet
*
mgmtEpSet
=
&
(
pEpSet
->
epSet
);
mgmtEpSet
->
numOfEps
=
0
;
mgmtEpSet
->
inUse
=
0
;
if
(
firstEp
&&
firstEp
[
0
]
!=
0
)
{
if
(
strlen
(
firstEp
)
>=
TSDB_EP_LEN
)
{
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
-
1
;
}
int32_t
code
=
taosGetFqdnPortFromEp
(
firstEp
,
&
mgmtEpSet
->
eps
[
0
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
terrno
;
}
mgmtEpSet
->
numOfEps
++
;
}
if
(
secondEp
&&
secondEp
[
0
]
!=
0
)
{
if
(
strlen
(
secondEp
)
>=
TSDB_EP_LEN
)
{
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
-
1
;
}
taosGetFqdnPortFromEp
(
secondEp
,
&
mgmtEpSet
->
eps
[
mgmtEpSet
->
numOfEps
]);
mgmtEpSet
->
numOfEps
++
;
}
if
(
mgmtEpSet
->
numOfEps
==
0
)
{
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
-
1
;
}
return
0
;
}
int32_t
udfdFillUdfInfoFromMNode
(
void
*
clientRpc
,
char
*
udfName
,
SUdf
*
udf
)
{
SRetrieveFuncReq
retrieveReq
=
{
0
};
retrieveReq
.
numOfFuncs
=
1
;
retrieveReq
.
pFuncNames
=
taosArrayInit
(
1
,
TSDB_FUNC_NAME_LEN
);
...
...
@@ -410,15 +458,21 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName,
rpcMsg
.
msgType
=
TDMT_MND_RETRIEVE_FUNC
;
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
clientRpc
,
pE
pSet
,
&
rpcMsg
,
&
rpcRsp
);
rpcSendRecv
(
clientRpc
,
&
global
.
mgmtEp
.
e
pSet
,
&
rpcMsg
,
&
rpcRsp
);
SRetrieveFuncRsp
retrieveRsp
=
{
0
};
tDeserializeSRetrieveFuncRsp
(
rpcRsp
.
pCont
,
rpcRsp
.
contLen
,
&
retrieveRsp
);
SFuncInfo
*
pFuncInfo
=
(
SFuncInfo
*
)
taosArrayGet
(
retrieveRsp
.
pFuncInfos
,
0
);
udf
->
funcType
=
pFuncInfo
->
funcType
;
udf
->
scriptType
=
pFuncInfo
->
scriptType
;
udf
->
outputType
=
pFuncInfo
->
funcType
;
udf
->
outputLen
=
pFuncInfo
->
outputLen
;
udf
->
bufSize
=
pFuncInfo
->
bufSize
;
char
path
[
PATH_MAX
]
=
{
0
};
taosGetTmpfilePath
(
"/tmp"
,
"libudf"
,
path
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
);
snprintf
(
path
,
sizeof
(
path
),
"%s/lib%s.so"
,
"/tmp"
,
udfName
);
TdFilePtr
file
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_READ
|
TD_FILE_TRUNC
|
TD_FILE_AUTO_DEL
);
// TODO check for failure of flush to disk
taosWriteFile
(
file
,
pFuncInfo
->
pCode
,
pFuncInfo
->
codeSize
);
taosCloseFile
(
&
file
);
...
...
@@ -531,15 +585,7 @@ static int32_t udfdUvInit() {
uv_pipe_open
(
&
global
.
ctrlPipe
,
0
);
uv_read_start
((
uv_stream_t
*
)
&
global
.
ctrlPipe
,
udfdCtrlAllocBufCb
,
udfdCtrlReadCb
);
char
dnodeId
[
8
]
=
{
0
};
size_t
dnodeIdSize
;
int32_t
err
=
uv_os_getenv
(
"DNODE_ID"
,
dnodeId
,
&
dnodeIdSize
);
if
(
err
!=
0
)
{
dnodeId
[
0
]
=
'1'
;
}
char
listenPipeName
[
32
]
=
{
0
};
snprintf
(
listenPipeName
,
sizeof
(
listenPipeName
),
"%s%s"
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
dnodeId
);
strcpy
(
global
.
listenPipeName
,
listenPipeName
);
getUdfdPipeName
(
global
.
listenPipeName
,
UDF_LISTEN_PIPE_NAME_LEN
);
removeListeningPipe
();
...
...
@@ -550,7 +596,7 @@ static int32_t udfdUvInit() {
int
r
;
fnInfo
(
"bind to pipe %s"
,
global
.
listenPipeName
);
if
((
r
=
uv_pipe_bind
(
&
global
.
listeningPipe
,
listenPipeName
)))
{
if
((
r
=
uv_pipe_bind
(
&
global
.
listeningPipe
,
global
.
listenPipeName
)))
{
fnError
(
"Bind error %s"
,
uv_err_name
(
r
));
removeListeningPipe
();
return
-
1
;
...
...
@@ -580,7 +626,7 @@ static int32_t udfdRun() {
fnInfo
(
"start the udfd"
);
int
code
=
uv_run
(
global
.
loop
,
UV_RUN_DEFAULT
);
fnInfo
(
"udfd stopped. result: %s
"
,
uv_err_name
(
code
)
);
fnInfo
(
"udfd stopped. result: %s
, code: %d"
,
uv_err_name
(
code
),
code
);
int
codeClose
=
uv_loop_close
(
global
.
loop
);
fnDebug
(
"uv loop close. result: %s"
,
uv_err_name
(
codeClose
));
udfdCloseClientRpc
();
...
...
@@ -615,5 +661,6 @@ int main(int argc, char *argv[]) {
return
-
1
;
}
initEpSetFromCfg
(
tsFirst
,
tsSecond
,
&
global
.
mgmtEp
);
return
udfdRun
();
}
source/libs/function/test/runUdf.c
浏览文件 @
cc914db3
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "uv.h"
#include "fnLog.h"
#include "os.h"
#include "tudf.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tudf.h"
int
main
(
int
argc
,
char
*
argv
[])
{
UdfcHandle
udfc
;
udfcOpen
(
1
,
&
udfc
);
uv_sleep
(
1000
);
char
path
[
256
]
=
{
0
};
size_t
cwdSize
=
256
;
int
err
=
uv_cwd
(
path
,
&
cwdSize
);
if
(
err
!=
0
)
{
fprintf
(
stderr
,
"err cwd: %s
\n
"
,
uv_strerror
(
err
));
return
err
;
}
fprintf
(
stdout
,
"current working directory:%s
\n
"
,
path
);
strcat
(
path
,
"/libudf1.so"
);
UdfcFuncHandle
handle
;
SEpSet
epSet
;
setupUdf
(
udfc
,
"udf1"
,
&
epSet
,
&
handle
);
SSDataBlock
block
=
{
0
};
SSDataBlock
*
pBlock
=
&
block
;
pBlock
->
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
numOfCols
=
1
;
pBlock
->
info
.
rows
=
4
;
char
data
[
16
]
=
{
0
};
char
bitmap
[
4
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
colInfo
.
info
.
type
=
TSDB_DATA_TYPE_INT
;
colInfo
.
info
.
bytes
=
sizeof
(
int32_t
);
colInfo
.
info
.
colId
=
1
;
colInfo
.
pData
=
data
;
colInfo
.
nullbitmap
=
bitmap
;
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
colDataAppendInt32
(
&
colInfo
,
j
,
&
j
);
static
int32_t
parseArgs
(
int32_t
argc
,
char
*
argv
[])
{
for
(
int32_t
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
if
(
strlen
(
argv
[
++
i
])
>=
PATH_MAX
)
{
printf
(
"config file path overflow"
);
return
-
1
;
}
tstrncpy
(
configDir
,
argv
[
i
],
PATH_MAX
);
}
else
{
printf
(
"'-c' requires a parameter, default is %s
\n
"
,
configDir
);
return
-
1
;
}
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
}
}
SScalarParam
input
=
{
0
};
input
.
numOfRows
=
pBlock
->
info
.
rows
;
input
.
columnData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
SScalarParam
output
=
{
0
};
callUdfScalarFunc
(
handle
,
&
input
,
1
,
&
output
);
return
0
;
}
SColumnInfoData
*
col
=
output
.
columnData
;
for
(
int32_t
i
=
0
;
i
<
output
.
numOfRows
;
++
i
)
{
fprintf
(
stderr
,
"%d
\t
%d
\n
"
,
i
,
*
(
int32_t
*
)(
col
->
pData
+
i
*
sizeof
(
int32_t
)));
static
int32_t
initLog
()
{
char
logName
[
12
]
=
{
0
};
snprintf
(
logName
,
sizeof
(
logName
),
"%slog"
,
"udfc"
);
return
taosCreateLog
(
logName
,
1
,
configDir
,
NULL
,
NULL
,
NULL
,
0
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
parseArgs
(
argc
,
argv
);
initLog
();
if
(
taosInitCfg
(
configDir
,
NULL
,
NULL
,
NULL
,
0
)
!=
0
)
{
fnError
(
"failed to start since read config error"
);
return
-
1
;
}
udfcOpen
();
uv_sleep
(
1000
);
UdfcFuncHandle
handle
;
setupUdf
(
"udf1"
,
&
handle
);
SSDataBlock
block
=
{
0
};
SSDataBlock
*
pBlock
=
&
block
;
pBlock
->
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
numOfCols
=
1
;
pBlock
->
info
.
rows
=
4
;
char
data
[
16
]
=
{
0
};
char
bitmap
[
4
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
colInfo
.
info
.
type
=
TSDB_DATA_TYPE_INT
;
colInfo
.
info
.
bytes
=
sizeof
(
int32_t
);
colInfo
.
info
.
colId
=
1
;
colInfo
.
pData
=
data
;
colInfo
.
nullbitmap
=
bitmap
;
for
(
int32_t
j
=
0
;
j
<
pBlock
->
info
.
rows
;
++
j
)
{
colDataAppendInt32
(
&
colInfo
,
j
,
&
j
);
}
teardownUdf
(
handle
);
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
}
SScalarParam
input
=
{
0
};
input
.
numOfRows
=
pBlock
->
info
.
rows
;
input
.
columnData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
SScalarParam
output
=
{
0
};
callUdfScalarFunc
(
handle
,
&
input
,
1
,
&
output
);
udfcClose
(
udfc
);
SColumnInfoData
*
col
=
output
.
columnData
;
for
(
int32_t
i
=
0
;
i
<
output
.
numOfRows
;
++
i
)
{
fprintf
(
stderr
,
"%d
\t
%d
\n
"
,
i
,
*
(
int32_t
*
)(
col
->
pData
+
i
*
sizeof
(
int32_t
)));
}
teardownUdf
(
handle
);
udfcClose
();
}
tests/system-test/2-query/cast.py
浏览文件 @
cc914db3
...
...
@@ -77,9 +77,9 @@ class TDTestCase:
tdLog
.
printNoPrefix
(
"==========step5: cast int to binary, expect changes to str(int) "
)
tdSql
.
query
(
"select cast(c1 as binary(32)) as b from ct4"
)
for
i
in
range
(
len
(
data_ct4_c1
)):
tdSql
.
checkData
(
i
,
0
,
str
(
data_ct4_c1
[
i
])
)
#
tdSql.query("select cast(c1 as binary(32)) as b from ct4")
#
for i in range(len(data_ct4_c1)):
#
tdSql.checkData( i, 0, str(data_ct4_c1[i]) )
tdSql
.
query
(
"select cast(c1 as binary(32)) as b from t1"
)
for
i
in
range
(
len
(
data_t1_c1
)):
tdSql
.
checkData
(
i
,
0
,
str
(
data_t1_c1
[
i
])
)
...
...
tests/system-test/fulltest.sh
浏览文件 @
cc914db3
...
...
@@ -4,4 +4,4 @@ set -e
#python3 ./test.py -f 2-query/between.py
#python3 ./test.py -f 2-query/distinct.py
python3 ./test.py
-f
2-query/varchar.py
python3 ./test.py
-f
2-query/cast.py
#
python3 ./test.py -f 2-query/cast.py
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录