Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
db2b9931
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
db2b9931
编写于
4月 22, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
4月 22, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11697 from taosdata/3.0_udfd
feature(udf):begin integration with dnode
上级
22743e49
a01f1a4e
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
311 addition
and
189 deletion
+311
-189
include/os/osEnv.h
include/os/osEnv.h
+1
-0
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+125
-0
source/dnode/mgmt/interface/inc/dmDef.h
source/dnode/mgmt/interface/inc/dmDef.h
+14
-0
source/libs/function/inc/tudf.h
source/libs/function/inc/tudf.h
+12
-22
source/libs/function/inc/udfc.h
source/libs/function/inc/udfc.h
+5
-5
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+92
-137
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+53
-18
source/libs/function/test/runUdf.c
source/libs/function/test/runUdf.c
+5
-4
source/os/src/osEnv.c
source/os/src/osEnv.c
+1
-0
source/os/src/osProc.c
source/os/src/osProc.c
+1
-1
tests/script/test.sh
tests/script/test.sh
+2
-2
未找到文件。
include/os/osEnv.h
浏览文件 @
db2b9931
...
@@ -34,6 +34,7 @@ extern int64_t tsOpenMax;
...
@@ -34,6 +34,7 @@ extern int64_t tsOpenMax;
extern
int64_t
tsStreamMax
;
extern
int64_t
tsStreamMax
;
extern
float
tsNumOfCores
;
extern
float
tsNumOfCores
;
extern
int64_t
tsTotalMemoryKB
;
extern
int64_t
tsTotalMemoryKB
;
extern
char
*
tsProcPath
;
extern
char
configDir
[];
extern
char
configDir
[];
extern
char
tsDataDir
[];
extern
char
tsDataDir
[];
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
db2b9931
...
@@ -216,6 +216,126 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
...
@@ -216,6 +216,126 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread
(
pWrapper
->
pDnode
);
dmStopStatusThread
(
pWrapper
->
pDnode
);
}
}
static
int32_t
dmSpawnUdfd
(
SDnode
*
pDnode
);
void
dmUdfdExit
(
uv_process_t
*
process
,
int64_t
exitStatus
,
int
termSignal
)
{
dInfo
(
"udfd process exited with status %"
PRId64
", signal %d"
,
exitStatus
,
termSignal
);
uv_close
((
uv_handle_t
*
)
process
,
NULL
);
SDnode
*
pDnode
=
process
->
data
;
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
if
(
atomic_load_8
(
&
pData
->
stopping
)
!=
0
)
{
dDebug
(
"udfd process exit due to stopping"
);
}
else
{
uv_close
((
uv_handle_t
*
)
&
pData
->
ctrlPipe
,
NULL
);
dmSpawnUdfd
(
pDnode
);
}
}
static
int32_t
dmSpawnUdfd
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode start spawning udfd"
);
uv_process_options_t
options
=
{
0
};
char
path
[
PATH_MAX
]
=
{
0
};
if
(
tsProcPath
==
NULL
)
{
path
[
0
]
=
'.'
;
}
else
{
strncpy
(
path
,
tsProcPath
,
strlen
(
tsProcPath
));
taosDirName
(
path
);
}
strcat
(
path
,
"/udfd"
);
char
*
argsUdfd
[]
=
{
path
,
"-c"
,
configDir
,
NULL
};
options
.
args
=
argsUdfd
;
options
.
file
=
path
;
options
.
exit_cb
=
dmUdfdExit
;
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
uv_pipe_init
(
&
pData
->
loop
,
&
pData
->
ctrlPipe
,
1
);
uv_stdio_container_t
child_stdio
[
3
];
child_stdio
[
0
].
flags
=
UV_CREATE_PIPE
|
UV_READABLE_PIPE
;
child_stdio
[
0
].
data
.
stream
=
(
uv_stream_t
*
)
&
pData
->
ctrlPipe
;
child_stdio
[
1
].
flags
=
UV_IGNORE
;
child_stdio
[
2
].
flags
=
UV_INHERIT_FD
;
child_stdio
[
2
].
data
.
fd
=
2
;
options
.
stdio_count
=
3
;
options
.
stdio
=
child_stdio
;
char
dnodeIdEnvItem
[
32
]
=
{
0
};
char
thrdPoolSizeEnvItem
[
32
]
=
{
0
};
snprintf
(
dnodeIdEnvItem
,
32
,
"%s=%d"
,
"DNODE_ID"
,
pDnode
->
data
.
dnodeId
);
float
numCpuCores
=
4
;
taosGetCpuCores
(
&
numCpuCores
);
snprintf
(
thrdPoolSizeEnvItem
,
32
,
"%s=%d"
,
"UV_THREADPOOL_SIZE"
,
(
int
)
numCpuCores
*
2
);
char
*
envUdfd
[]
=
{
dnodeIdEnvItem
,
thrdPoolSizeEnvItem
,
NULL
};
options
.
env
=
envUdfd
;
int
err
=
uv_spawn
(
&
pData
->
loop
,
&
pData
->
process
,
&
options
);
pData
->
process
.
data
=
(
void
*
)
pDnode
;
if
(
err
!=
0
)
{
dError
(
"can not spawn udfd. path: %s, error: %s"
,
path
,
uv_strerror
(
err
));
}
return
err
;
}
static
void
dmUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
}
}
void
dmWatchUdfd
(
void
*
args
)
{
SDnode
*
pDnode
=
args
;
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
uv_loop_init
(
&
pData
->
loop
);
int32_t
err
=
dmSpawnUdfd
(
pDnode
);
atomic_store_32
(
&
pData
->
spawnErr
,
err
);
uv_barrier_wait
(
&
pData
->
barrier
);
uv_run
(
&
pData
->
loop
,
UV_RUN_DEFAULT
);
err
=
uv_loop_close
(
&
pData
->
loop
);
while
(
err
==
UV_EBUSY
)
{
uv_walk
(
&
pData
->
loop
,
dmUdfdCloseWalkCb
,
NULL
);
uv_run
(
&
pData
->
loop
,
UV_RUN_DEFAULT
);
err
=
uv_loop_close
(
&
pData
->
loop
);
}
return
;
}
int32_t
dmStartUdfd
(
SDnode
*
pDnode
)
{
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
if
(
pData
->
startCalled
)
{
dInfo
(
"dnode-mgmt start udfd already called"
);
return
0
;
}
uv_barrier_init
(
&
pData
->
barrier
,
2
);
pData
->
stopping
=
0
;
uv_thread_create
(
&
pData
->
thread
,
dmWatchUdfd
,
pDnode
);
uv_barrier_wait
(
&
pData
->
barrier
);
pData
->
startCalled
=
true
;
pData
->
needCleanUp
=
true
;
return
pData
->
spawnErr
;
}
int32_t
dmStopUdfd
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d"
,
pDnode
->
udfdData
.
needCleanUp
,
pDnode
->
udfdData
.
spawnErr
);
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
if
(
!
pData
->
needCleanUp
)
{
return
0
;
}
atomic_store_8
(
&
pData
->
stopping
,
1
);
uv_barrier_destroy
(
&
pData
->
barrier
);
if
(
pData
->
spawnErr
==
0
)
{
uv_process_kill
(
&
pData
->
process
,
SIGINT
);
}
uv_stop
(
&
pData
->
loop
);
uv_thread_join
(
&
pData
->
thread
);
atomic_store_8
(
&
pData
->
stopping
,
0
);
return
0
;
}
static
int32_t
dmInitMgmt
(
SMgmtWrapper
*
pWrapper
)
{
static
int32_t
dmInitMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"dnode-mgmt start to init"
);
dInfo
(
"dnode-mgmt start to init"
);
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
...
@@ -247,6 +367,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
...
@@ -247,6 +367,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
}
dmReportStartup
(
pDnode
,
"dnode-transport"
,
"initialized"
);
dmReportStartup
(
pDnode
,
"dnode-transport"
,
"initialized"
);
if
(
dmStartUdfd
(
pDnode
)
!=
0
)
{
dError
(
"failed to start udfd"
);
}
dInfo
(
"dnode-mgmt is initialized"
);
dInfo
(
"dnode-mgmt is initialized"
);
return
0
;
return
0
;
}
}
...
@@ -254,6 +378,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
...
@@ -254,6 +378,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static
void
dmCleanupMgmt
(
SMgmtWrapper
*
pWrapper
)
{
static
void
dmCleanupMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"dnode-mgmt start to clean up"
);
dInfo
(
"dnode-mgmt start to clean up"
);
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
dmStopUdfd
(
pDnode
);
dmStopWorker
(
pDnode
);
dmStopWorker
(
pDnode
);
taosWLockLatch
(
&
pDnode
->
data
.
latch
);
taosWLockLatch
(
&
pDnode
->
data
.
latch
);
...
...
source/dnode/mgmt/interface/inc/dmDef.h
浏览文件 @
db2b9931
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#ifndef _TD_DM_DEF_H_
#ifndef _TD_DM_DEF_H_
#define _TD_DM_DEF_H_
#define _TD_DM_DEF_H_
#include "uv.h"
#include "dmLog.h"
#include "dmLog.h"
#include "cJSON.h"
#include "cJSON.h"
...
@@ -142,6 +143,18 @@ typedef struct {
...
@@ -142,6 +143,18 @@ typedef struct {
char
desc
[
TSDB_STEP_DESC_LEN
];
char
desc
[
TSDB_STEP_DESC_LEN
];
}
SStartupInfo
;
}
SStartupInfo
;
typedef
struct
SUdfdData
{
bool
startCalled
;
bool
needCleanUp
;
uv_loop_t
loop
;
uv_thread_t
thread
;
uv_barrier_t
barrier
;
uv_process_t
process
;
int
spawnErr
;
int8_t
stopping
;
uv_pipe_t
ctrlPipe
;
}
SUdfdData
;
typedef
struct
SDnode
{
typedef
struct
SDnode
{
EDndProcType
ptype
;
EDndProcType
ptype
;
EDndNodeType
ntype
;
EDndNodeType
ntype
;
...
@@ -150,6 +163,7 @@ typedef struct SDnode {
...
@@ -150,6 +163,7 @@ typedef struct SDnode {
SStartupInfo
startup
;
SStartupInfo
startup
;
SDnodeTrans
trans
;
SDnodeTrans
trans
;
SDnodeData
data
;
SDnodeData
data
;
SUdfdData
udfdData
;
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
SMgmtWrapper
wrappers
[
NODE_END
];
SMgmtWrapper
wrappers
[
NODE_END
];
}
SDnode
;
}
SDnode
;
...
...
source/libs/function/inc/tudf.h
浏览文件 @
db2b9931
...
@@ -27,41 +27,31 @@ extern "C" {
...
@@ -27,41 +27,31 @@ extern "C" {
#endif
#endif
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock."
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf
d
.sock."
//======================================================================================
//======================================================================================
//begin API to taosd and qworker
//begin API to taosd and qworker
enum
{
enum
{
UDFC_CODE_STOPPING
=
-
1
,
UDFC_CODE_STOPPING
=
-
1
,
UDFC_CODE_RESTARTING
=
-
2
,
UDFC_CODE_PIPE_READ_ERR
=
-
3
,
UDFC_CODE_PIPE_READ_ERR
=
-
3
,
};
};
/*TODO: no api for dnode startudfd/stopudfd*/
typedef
void
*
UdfcHandle
;
/**
typedef
void
*
UdfcFuncHandle
;
* start udfd dameon service
*/
int32_t
startUdfd
(
int32_t
dnodeId
);
/**
* stop udfd dameon service
*/
int32_t
stopUdfd
(
int32_t
dnodeId
);
/**
/**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code
* @return error code
*/
*/
int32_t
createUdfdProxy
(
int32_t
dnodeId
);
int32_t
udfcOpen
(
int32_t
dnodeId
,
UdfcHandle
*
proxyHandle
);
/**
/**
* destroy udfd proxy
* destroy udfd proxy
* @return error code
* @return error code
*/
*/
int32_t
destroyUdfdProxy
(
int32_t
dnodeId
);
int32_t
udfcClose
(
UdfcHandle
proxyhandle
);
typedef
void
*
UdfHandle
;
/**
/**
* setup udf
* setup udf
...
@@ -69,7 +59,7 @@ typedef void *UdfHandle;
...
@@ -69,7 +59,7 @@ typedef void *UdfHandle;
* @param handle, out
* @param handle, out
* @return error code
* @return error code
*/
*/
int32_t
setupUdf
(
char
udfName
[],
SEpSet
*
epSet
,
Udf
Handle
*
handle
);
int32_t
setupUdf
(
UdfcHandle
proxyHandle
,
char
udfName
[],
SEpSet
*
epSet
,
UdfcFunc
Handle
*
handle
);
typedef
struct
SUdfColumnMeta
{
typedef
struct
SUdfColumnMeta
{
int16_t
type
;
int16_t
type
;
...
@@ -116,26 +106,26 @@ typedef struct SUdfInterBuf {
...
@@ -116,26 +106,26 @@ typedef struct SUdfInterBuf {
}
SUdfInterBuf
;
}
SUdfInterBuf
;
// output: interBuf
// output: interBuf
int32_t
callUdfAggInit
(
UdfHandle
handle
,
SUdfInterBuf
*
interBuf
);
int32_t
callUdfAggInit
(
Udf
cFunc
Handle
handle
,
SUdfInterBuf
*
interBuf
);
// input: block, state
// input: block, state
// output: newState
// output: newState
int32_t
callUdfAggProcess
(
UdfHandle
handle
,
SSDataBlock
*
block
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
newState
);
int32_t
callUdfAggProcess
(
Udf
cFunc
Handle
handle
,
SSDataBlock
*
block
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
newState
);
// input: interBuf
// input: interBuf
// output: resultData
// output: resultData
int32_t
callUdfAggFinalize
(
UdfHandle
handle
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
resultData
);
int32_t
callUdfAggFinalize
(
Udf
cFunc
Handle
handle
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
resultData
);
// input: interbuf1, interbuf2
// input: interbuf1, interbuf2
// output: resultBuf
// output: resultBuf
int32_t
callUdfAggMerge
(
UdfHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
);
int32_t
callUdfAggMerge
(
Udf
cFunc
Handle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
);
// input: block
// input: block
// output: resultData
// output: resultData
int32_t
callUdfScalaProcess
(
UdfHandle
handle
,
SSDataBlock
*
block
,
SSDataBlock
*
resultData
);
int32_t
callUdfScalaProcess
(
Udf
cFunc
Handle
handle
,
SSDataBlock
*
block
,
SSDataBlock
*
resultData
);
/**
/**
* tearn down udf
* tearn down udf
* @param handle
* @param handle
* @return
* @return
*/
*/
int32_t
teardownUdf
(
UdfHandle
handle
);
int32_t
teardownUdf
(
Udf
cFunc
Handle
handle
);
// end API to taosd and qworker
// end API to taosd and qworker
//=============================================================================================================================
//=============================================================================================================================
...
...
source/libs/function/inc/udfc.h
浏览文件 @
db2b9931
...
@@ -30,20 +30,20 @@ typedef struct SUdfInfo {
...
@@ -30,20 +30,20 @@ typedef struct SUdfInfo {
char
*
path
;
char
*
path
;
}
SUdfInfo
;
}
SUdfInfo
;
typedef
void
*
UdfHandle
;
typedef
void
*
Udf
cFunc
Handle
;
int32_t
createUdfdProxy
();
int32_t
createUdfdProxy
();
int32_t
destroyUdfdProxy
();
int32_t
destroyUdfdProxy
();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, Udf
cFunc
Handle *handles);
int32_t
setupUdf
(
SUdfInfo
*
udf
,
UdfHandle
*
handle
);
int32_t
setupUdf
(
SUdfInfo
*
udf
,
Udf
cFunc
Handle
*
handle
);
int32_t
callUdf
(
UdfHandle
handle
,
int8_t
step
,
char
*
state
,
int32_t
stateSize
,
SSDataBlock
input
,
char
**
newstate
,
int32_t
callUdf
(
Udf
cFunc
Handle
handle
,
int8_t
step
,
char
*
state
,
int32_t
stateSize
,
SSDataBlock
input
,
char
**
newstate
,
int32_t
*
newStateSize
,
SSDataBlock
*
output
);
int32_t
*
newStateSize
,
SSDataBlock
*
output
);
int32_t
teardownUdf
(
UdfHandle
handle
);
int32_t
teardownUdf
(
Udf
cFunc
Handle
handle
);
typedef
struct
SUdfSetupRequest
{
typedef
struct
SUdfSetupRequest
{
char
udfName
[
16
];
//
char
udfName
[
16
];
//
...
...
source/libs/function/src/tudf.c
浏览文件 @
db2b9931
...
@@ -14,19 +14,15 @@
...
@@ -14,19 +14,15 @@
*/
*/
#include "uv.h"
#include "uv.h"
#include "os.h"
#include "os.h"
#include "tlog.h"
#include "tudf.h"
#include "tudf.h"
#include "tudfInt.h"
#include "tudfInt.h"
#include "tarray.h"
#include "tarray.h"
#include "tdatablock.h"
#include "tdatablock.h"
//TODO: when startup, set thread poll size. add it to cfg
//TODO: test for udfd restart
//TODO: udfd restart when exist or aborts
//TODO: deal with uv task that has been started and then udfd core dumped
//TODO: network error processing.
//TODO: network error processing.
//TODO: add unit test
//TODO: add unit test
//TODO: include all global variable under context struct
//TODO: include all global variable under context struct
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
* The QUEUE is copied from queue.h under libuv
* The QUEUE is copied from queue.h under libuv
* */
* */
...
@@ -125,12 +121,35 @@ enum {
...
@@ -125,12 +121,35 @@ enum {
UV_TASK_DISCONNECT
=
2
UV_TASK_DISCONNECT
=
2
};
};
int64_t
gUdfTaskSeqNum
=
0
;
typedef
struct
SUdfdProxy
{
int32_t
dnodeId
;
uv_barrier_t
gUdfInitBarrier
;
uv_loop_t
gUdfdLoop
;
uv_thread_t
gUdfLoopThread
;
uv_async_t
gUdfLoopTaskAync
;
uv_async_t
gUdfLoopStopAsync
;
uv_mutex_t
gUdfTaskQueueMutex
;
int8_t
gUdfcState
;
QUEUE
gUdfTaskQueue
;
QUEUE
gUvProcTaskQueue
;
// int8_t gUdfcState = UDFC_STATE_INITAL;
// QUEUE gUdfTaskQueue = {0};
// QUEUE gUvProcTaskQueue = {0};
}
SUdfdProxy
;
typedef
struct
SUdfUvSession
{
typedef
struct
SUdfUvSession
{
SUdfdProxy
*
udfc
;
int64_t
severHandle
;
int64_t
severHandle
;
uv_pipe_t
*
udfSvcPipe
;
uv_pipe_t
*
udfSvcPipe
;
}
SUdfUvSession
;
}
SUdfUvSession
;
typedef
struct
SClientUvTaskNode
{
typedef
struct
SClientUvTaskNode
{
SUdfdProxy
*
udfc
;
int8_t
type
;
int8_t
type
;
int
errCode
;
int
errCode
;
...
@@ -169,7 +188,6 @@ typedef struct SClientUdfTask {
...
@@ -169,7 +188,6 @@ typedef struct SClientUdfTask {
}
_teardown
;
}
_teardown
;
};
};
}
SClientUdfTask
;
}
SClientUdfTask
;
typedef
struct
SClientConnBuf
{
typedef
struct
SClientConnBuf
{
...
@@ -185,34 +203,13 @@ typedef struct SClientUvConn {
...
@@ -185,34 +203,13 @@ typedef struct SClientUvConn {
SClientConnBuf
readBuf
;
SClientConnBuf
readBuf
;
}
SClientUvConn
;
}
SClientUvConn
;
uv_process_t
gUdfdProcess
;
uv_barrier_t
gUdfInitBarrier
;
uv_loop_t
gUdfdLoop
;
uv_thread_t
gUdfLoopThread
;
uv_async_t
gUdfLoopTaskAync
;
uv_async_t
gUdfLoopStopAsync
;
uv_mutex_t
gUdfTaskQueueMutex
;
int64_t
gUdfTaskSeqNum
=
0
;
enum
{
enum
{
UDFC_STATE_INITAL
=
0
,
// initial state
UDFC_STATE_INITAL
=
0
,
// initial state
UDFC_STATE_STARTNG
,
// starting after
createUdfdProxy
UDFC_STATE_STARTNG
,
// starting after
udfcOpen
UDFC_STATE_READY
,
// started and begin to receive quests
UDFC_STATE_READY
,
// started and begin to receive quests
UDFC_STATE_RESTARTING
,
// udfd abnormal exit. cleaning up and restart.
UDFC_STATE_STOPPING
,
// stopping after udfcClose
UDFC_STATE_STOPPING
,
// stopping after destroyUdfdProxy
UDFC_STATUS_FINAL
,
// stopped
UDFC_STATUS_FINAL
,
// stopped
};
};
int8_t
gUdfcState
=
UDFC_STATE_INITAL
;
//double circular linked list
QUEUE
gUdfTaskQueue
=
{
0
};
QUEUE
gUvProcTaskQueue
=
{
0
};
int32_t
encodeUdfSetupRequest
(
void
**
buf
,
const
SUdfSetupRequest
*
setup
)
{
int32_t
encodeUdfSetupRequest
(
void
**
buf
,
const
SUdfSetupRequest
*
setup
)
{
int32_t
len
=
0
;
int32_t
len
=
0
;
...
@@ -777,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
...
@@ -777,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
int32_t
createUdfcUvTask
(
SClientUdfTask
*
task
,
int8_t
uvTaskType
,
SClientUvTaskNode
**
pUvTask
)
{
int32_t
createUdfcUvTask
(
SClientUdfTask
*
task
,
int8_t
uvTaskType
,
SClientUvTaskNode
**
pUvTask
)
{
SClientUvTaskNode
*
uvTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SClientUvTaskNode
));
SClientUvTaskNode
*
uvTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SClientUvTaskNode
));
uvTask
->
type
=
uvTaskType
;
uvTask
->
type
=
uvTaskType
;
uvTask
->
udfc
=
task
->
session
->
udfc
;
if
(
uvTaskType
==
UV_TASK_CONNECT
)
{
if
(
uvTaskType
==
UV_TASK_CONNECT
)
{
}
else
if
(
uvTaskType
==
UV_TASK_REQ_RSP
)
{
}
else
if
(
uvTaskType
==
UV_TASK_REQ_RSP
)
{
uvTask
->
pipe
=
task
->
session
->
udfSvcPipe
;
uvTask
->
pipe
=
task
->
session
->
udfSvcPipe
;
SUdfRequest
request
;
SUdfRequest
request
;
request
.
type
=
task
->
type
;
request
.
type
=
task
->
type
;
request
.
seqNum
=
gUdfTaskSeqNum
++
;
request
.
seqNum
=
atomic_fetch_add_64
(
&
gUdfTaskSeqNum
,
1
)
;
if
(
task
->
type
==
UDF_TASK_SETUP
)
{
if
(
task
->
type
==
UDF_TASK_SETUP
)
{
request
.
setup
=
task
->
_setup
.
req
;
request
.
setup
=
task
->
_setup
.
req
;
...
@@ -815,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
...
@@ -815,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
int32_t
queueUvUdfTask
(
SClientUvTaskNode
*
uvTask
)
{
int32_t
queueUvUdfTask
(
SClientUvTaskNode
*
uvTask
)
{
debugPrint
(
"%s, %d"
,
"queue uv task"
,
uvTask
->
type
);
debugPrint
(
"%s, %d"
,
"queue uv task"
,
uvTask
->
type
);
SUdfdProxy
*
udfc
=
uvTask
->
udfc
;
uv_mutex_lock
(
&
gUdfTaskQueueMutex
);
uv_mutex_lock
(
&
udfc
->
gUdfTaskQueueMutex
);
QUEUE_INSERT_TAIL
(
&
gUdfTaskQueue
,
&
uvTask
->
recvTaskQueue
);
QUEUE_INSERT_TAIL
(
&
udfc
->
gUdfTaskQueue
,
&
uvTask
->
recvTaskQueue
);
uv_mutex_unlock
(
&
gUdfTaskQueueMutex
);
uv_mutex_unlock
(
&
udfc
->
gUdfTaskQueueMutex
);
uv_async_send
(
&
gUdfLoopTaskAync
);
uv_async_send
(
&
udfc
->
gUdfLoopTaskAync
);
uv_sem_wait
(
&
uvTask
->
taskSem
);
uv_sem_wait
(
&
uvTask
->
taskSem
);
uv_sem_destroy
(
&
uvTask
->
taskSem
);
uv_sem_destroy
(
&
uvTask
->
taskSem
);
...
@@ -832,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
...
@@ -832,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
switch
(
uvTask
->
type
)
{
switch
(
uvTask
->
type
)
{
case
UV_TASK_CONNECT
:
{
case
UV_TASK_CONNECT
:
{
uv_pipe_t
*
pipe
=
taosMemoryMalloc
(
sizeof
(
uv_pipe_t
));
uv_pipe_t
*
pipe
=
taosMemoryMalloc
(
sizeof
(
uv_pipe_t
));
uv_pipe_init
(
&
gUdfdLoop
,
pipe
,
0
);
uv_pipe_init
(
&
uvTask
->
udfc
->
gUdfdLoop
,
pipe
,
0
);
uvTask
->
pipe
=
pipe
;
uvTask
->
pipe
=
pipe
;
SClientUvConn
*
conn
=
taosMemoryMalloc
(
sizeof
(
SClientUvConn
));
SClientUvConn
*
conn
=
taosMemoryMalloc
(
sizeof
(
SClientUvConn
));
...
@@ -873,142 +871,98 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
...
@@ -873,142 +871,98 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
}
}
void
udfClientAsyncCb
(
uv_async_t
*
async
)
{
void
udfClientAsyncCb
(
uv_async_t
*
async
)
{
SUdfdProxy
*
udfc
=
async
->
data
;
QUEUE
wq
;
QUEUE
wq
;
uv_mutex_lock
(
&
gUdfTaskQueueMutex
);
uv_mutex_lock
(
&
udfc
->
gUdfTaskQueueMutex
);
QUEUE_MOVE
(
&
gUdfTaskQueue
,
&
wq
);
QUEUE_MOVE
(
&
udfc
->
gUdfTaskQueue
,
&
wq
);
uv_mutex_unlock
(
&
gUdfTaskQueueMutex
);
uv_mutex_unlock
(
&
udfc
->
gUdfTaskQueueMutex
);
while
(
!
QUEUE_EMPTY
(
&
wq
))
{
while
(
!
QUEUE_EMPTY
(
&
wq
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
QUEUE_REMOVE
(
h
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
recvTaskQueue
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
recvTaskQueue
);
startUvUdfTask
(
task
);
startUvUdfTask
(
task
);
QUEUE_INSERT_TAIL
(
&
gUvProcTaskQueue
,
&
task
->
procTaskQueue
);
QUEUE_INSERT_TAIL
(
&
udfc
->
gUvProcTaskQueue
,
&
task
->
procTaskQueue
);
}
}
}
}
void
cleanUpUvTasks
()
{
void
cleanUpUvTasks
(
SUdfdProxy
*
udfc
)
{
QUEUE
wq
;
QUEUE
wq
;
uv_mutex_lock
(
&
gUdfTaskQueueMutex
);
uv_mutex_lock
(
&
udfc
->
gUdfTaskQueueMutex
);
QUEUE_MOVE
(
&
gUdfTaskQueue
,
&
wq
);
QUEUE_MOVE
(
&
udfc
->
gUdfTaskQueue
,
&
wq
);
uv_mutex_unlock
(
&
gUdfTaskQueueMutex
);
uv_mutex_unlock
(
&
udfc
->
gUdfTaskQueueMutex
);
while
(
!
QUEUE_EMPTY
(
&
wq
))
{
while
(
!
QUEUE_EMPTY
(
&
wq
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
wq
);
QUEUE_REMOVE
(
h
);
QUEUE_REMOVE
(
h
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
recvTaskQueue
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
recvTaskQueue
);
if
(
gUdfcState
==
UDFC_STATE_STOPPING
)
{
if
(
udfc
->
gUdfcState
==
UDFC_STATE_STOPPING
)
{
task
->
errCode
=
UDFC_CODE_STOPPING
;
task
->
errCode
=
UDFC_CODE_STOPPING
;
}
else
if
(
gUdfcState
==
UDFC_STATE_RESTARTING
)
{
task
->
errCode
=
UDFC_CODE_RESTARTING
;
}
}
uv_sem_post
(
&
task
->
taskSem
);
uv_sem_post
(
&
task
->
taskSem
);
}
}
// TODO: deal with tasks that are waiting result.
// TODO: deal with tasks that are waiting result.
while
(
!
QUEUE_EMPTY
(
&
gUvProcTaskQueue
))
{
while
(
!
QUEUE_EMPTY
(
&
udfc
->
gUvProcTaskQueue
))
{
QUEUE
*
h
=
QUEUE_HEAD
(
&
gUvProcTaskQueue
);
QUEUE
*
h
=
QUEUE_HEAD
(
&
udfc
->
gUvProcTaskQueue
);
QUEUE_REMOVE
(
h
);
QUEUE_REMOVE
(
h
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
procTaskQueue
);
SClientUvTaskNode
*
task
=
QUEUE_DATA
(
h
,
SClientUvTaskNode
,
procTaskQueue
);
if
(
gUdfcState
==
UDFC_STATE_STOPPING
)
{
if
(
udfc
->
gUdfcState
==
UDFC_STATE_STOPPING
)
{
task
->
errCode
=
UDFC_CODE_STOPPING
;
task
->
errCode
=
UDFC_CODE_STOPPING
;
}
else
if
(
gUdfcState
==
UDFC_STATE_RESTARTING
)
{
task
->
errCode
=
UDFC_CODE_RESTARTING
;
}
}
uv_sem_post
(
&
task
->
taskSem
);
uv_sem_post
(
&
task
->
taskSem
);
}
}
}
}
void
udfStopAsyncCb
(
uv_async_t
*
async
)
{
void
udfStopAsyncCb
(
uv_async_t
*
async
)
{
cleanUpUvTasks
();
SUdfdProxy
*
udfc
=
async
->
data
;
if
(
gUdfcState
==
UDFC_STATE_STOPPING
)
{
cleanUpUvTasks
(
udfc
);
uv_stop
(
&
gUdfdLoop
);
if
(
udfc
->
gUdfcState
==
UDFC_STATE_STOPPING
)
{
uv_stop
(
&
udfc
->
gUdfdLoop
);
}
}
}
}
int32_t
udfcSpawnUdfd
();
void
onUdfdExit
(
uv_process_t
*
req
,
int64_t
exit_status
,
int
term_signal
)
{
//TODO: pipe close will be first received
debugPrint
(
"Process exited with status %"
PRId64
", signal %d"
,
exit_status
,
term_signal
);
uv_close
((
uv_handle_t
*
)
req
,
NULL
);
//TODO: restart the udfd process
if
(
gUdfcState
==
UDFC_STATE_STOPPING
)
{
if
(
term_signal
!=
SIGINT
)
{
//TODO: log error
}
}
if
(
gUdfcState
==
UDFC_STATE_READY
)
{
gUdfcState
=
UDFC_STATE_RESTARTING
;
//TODO: asynchronous without blocking. how to do it
//cleanUpUvTasks();
udfcSpawnUdfd
();
}
}
int32_t
udfcSpawnUdfd
()
{
//TODO: path
uv_process_options_t
options
=
{
0
};
static
char
path
[
256
]
=
{
0
};
size_t
cwdSize
;
uv_cwd
(
path
,
&
cwdSize
);
strcat
(
path
,
"/udfd"
);
char
*
args
[
2
]
=
{
path
,
NULL
};
options
.
args
=
args
;
options
.
file
=
path
;
options
.
exit_cb
=
onUdfdExit
;
options
.
stdio_count
=
3
;
uv_stdio_container_t
child_stdio
[
3
];
child_stdio
[
0
].
flags
=
UV_IGNORE
;
child_stdio
[
1
].
flags
=
UV_INHERIT_FD
;
child_stdio
[
1
].
data
.
fd
=
1
;
child_stdio
[
2
].
flags
=
UV_INHERIT_FD
;
child_stdio
[
2
].
data
.
fd
=
2
;
options
.
stdio
=
child_stdio
;
//TODO spawn error
int
err
=
uv_spawn
(
&
gUdfdLoop
,
&
gUdfdProcess
,
&
options
);
if
(
err
!=
0
)
{
debugPrint
(
"can not spawn udfd. path: %s, error: %s"
,
path
,
uv_strerror
(
err
));
}
return
err
;
}
void
constructUdfService
(
void
*
argsThread
)
{
void
constructUdfService
(
void
*
argsThread
)
{
uv_loop_init
(
&
gUdfdLoop
);
SUdfdProxy
*
udfc
=
(
SUdfdProxy
*
)
argsThread
;
uv_loop_init
(
&
udfc
->
gUdfdLoop
);
uv_async_init
(
&
gUdfdLoop
,
&
gUdfLoopTaskAync
,
udfClientAsyncCb
);
uv_async_init
(
&
gUdfdLoop
,
&
gUdfLoopStopAsync
,
udfStopAsyncCb
);
uv_async_init
(
&
udfc
->
gUdfdLoop
,
&
udfc
->
gUdfLoopTaskAync
,
udfClientAsyncCb
);
uv_mutex_init
(
&
gUdfTaskQueueMutex
);
udfc
->
gUdfLoopTaskAync
.
data
=
udfc
;
QUEUE_INIT
(
&
gUdfTaskQueue
);
uv_async_init
(
&
udfc
->
gUdfdLoop
,
&
udfc
->
gUdfLoopStopAsync
,
udfStopAsyncCb
);
QUEUE_INIT
(
&
gUvProcTaskQueue
);
udfc
->
gUdfLoopStopAsync
.
data
=
udfc
;
uv_barrier_wait
(
&
gUdfInitBarrier
);
uv_mutex_init
(
&
udfc
->
gUdfTaskQueueMutex
);
QUEUE_INIT
(
&
udfc
->
gUdfTaskQueue
);
QUEUE_INIT
(
&
udfc
->
gUvProcTaskQueue
);
uv_barrier_wait
(
&
udfc
->
gUdfInitBarrier
);
//TODO return value of uv_run
//TODO return value of uv_run
uv_run
(
&
gUdfdLoop
,
UV_RUN_DEFAULT
);
uv_run
(
&
udfc
->
gUdfdLoop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
gUdfdLoop
);
uv_loop_close
(
&
udfc
->
gUdfdLoop
);
}
}
int32_t
udfcOpen
(
int32_t
dnodeId
,
UdfcHandle
*
udfc
)
{
int32_t
createUdfdProxy
(
int32_t
dnodeId
)
{
SUdfdProxy
*
proxy
=
taosMemoryCalloc
(
1
,
sizeof
(
SUdfdProxy
));
gUdfcState
=
UDFC_STATE_STARTNG
;
proxy
->
dnodeId
=
dnodeId
;
uv_barrier_init
(
&
gUdfInitBarrier
,
2
);
proxy
->
gUdfcState
=
UDFC_STATE_STARTNG
;
uv_thread_create
(
&
gUdfLoopThread
,
constructUdfService
,
0
);
uv_barrier_init
(
&
proxy
->
gUdfInitBarrier
,
2
);
uv_barrier_wait
(
&
gUdfInitBarrier
);
gUdfcState
=
UDFC_STATE_READY
;
uv_thread_create
(
&
proxy
->
gUdfLoopThread
,
constructUdfService
,
proxy
);
uv_barrier_wait
(
&
proxy
->
gUdfInitBarrier
);
proxy
->
gUdfcState
=
UDFC_STATE_READY
;
*
udfc
=
proxy
;
return
0
;
return
0
;
}
}
int32_t
destroyUdfdProxy
(
int32_t
dnodeId
)
{
int32_t
udfcClose
(
UdfcHandle
udfcHandle
)
{
gUdfcState
=
UDFC_STATE_STOPPING
;
SUdfdProxy
*
udfc
=
udfcHandle
;
uv_barrier_destroy
(
&
gUdfInitBarrier
);
udfc
->
gUdfcState
=
UDFC_STATE_STOPPING
;
// if (gUdfcState == UDFC_STATE_STOPPING) {
uv_async_send
(
&
udfc
->
gUdfLoopStopAsync
);
// uv_process_kill(&gUdfdProcess, SIGINT);
uv_thread_join
(
&
udfc
->
gUdfLoopThread
);
// }
uv_mutex_destroy
(
&
udfc
->
gUdfTaskQueueMutex
);
uv_async_send
(
&
gUdfLoopStopAsync
);
uv_barrier_destroy
(
&
udfc
->
gUdfInitBarrier
);
uv_thread_join
(
&
gUdfLoopThread
);
udfc
->
gUdfcState
=
UDFC_STATUS_FINAL
;
uv_mutex_destroy
(
&
gUdfTaskQueueMutex
);
taosMemoryFree
(
udfc
);
gUdfcState
=
UDFC_STATUS_FINAL
;
return
0
;
return
0
;
}
}
...
@@ -1026,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
...
@@ -1026,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return
task
->
errCode
;
return
task
->
errCode
;
}
}
int32_t
setupUdf
(
char
udfName
[],
SEpSet
*
epSet
,
UdfHandle
*
h
andle
)
{
int32_t
setupUdf
(
UdfcHandle
udfc
,
char
udfName
[],
SEpSet
*
epSet
,
UdfcFuncHandle
*
funcH
andle
)
{
debugPrint
(
"%s"
,
"client setup udf"
);
debugPrint
(
"%s"
,
"client setup udf"
);
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
task
->
errCode
=
0
;
task
->
errCode
=
0
;
task
->
session
=
taosMemoryMalloc
(
sizeof
(
SUdfUvSession
));
task
->
session
=
taosMemoryMalloc
(
sizeof
(
SUdfUvSession
));
task
->
session
->
udfc
=
udfc
;
task
->
type
=
UDF_TASK_SETUP
;
task
->
type
=
UDF_TASK_SETUP
;
SUdfSetupRequest
*
req
=
&
task
->
_setup
.
req
;
SUdfSetupRequest
*
req
=
&
task
->
_setup
.
req
;
...
@@ -1046,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
...
@@ -1046,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
SUdfSetupResponse
*
rsp
=
&
task
->
_setup
.
rsp
;
SUdfSetupResponse
*
rsp
=
&
task
->
_setup
.
rsp
;
task
->
session
->
severHandle
=
rsp
->
udfHandle
;
task
->
session
->
severHandle
=
rsp
->
udfHandle
;
*
h
andle
=
task
->
session
;
*
funcH
andle
=
task
->
session
;
int32_t
err
=
task
->
errCode
;
int32_t
err
=
task
->
errCode
;
taosMemoryFree
(
task
);
taosMemoryFree
(
task
);
return
err
;
return
err
;
}
}
int32_t
callUdf
(
UdfHandle
handle
,
int8_t
callType
,
SSDataBlock
*
input
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
state2
,
int32_t
callUdf
(
Udf
cFunc
Handle
handle
,
int8_t
callType
,
SSDataBlock
*
input
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
state2
,
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
)
{
SSDataBlock
*
output
,
SUdfInterBuf
*
newState
)
{
debugPrint
(
"%s"
,
"client call udf"
);
debugPrint
(
"%s"
,
"client call udf"
);
...
@@ -1121,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter
...
@@ -1121,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter
}
}
//TODO: translate these calls to callUdf
//TODO: translate these calls to callUdf
int32_t
callUdfAggInit
(
UdfHandle
handle
,
SUdfInterBuf
*
interBuf
)
{
int32_t
callUdfAggInit
(
Udf
cFunc
Handle
handle
,
SUdfInterBuf
*
interBuf
)
{
int8_t
callType
=
TSDB_UDF_CALL_AGG_INIT
;
int8_t
callType
=
TSDB_UDF_CALL_AGG_INIT
;
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
NULL
,
NULL
,
NULL
,
interBuf
);
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
NULL
,
NULL
,
NULL
,
interBuf
);
...
@@ -1131,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) {
...
@@ -1131,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) {
// input: block, state
// input: block, state
// output: interbuf,
// output: interbuf,
int32_t
callUdfAggProcess
(
UdfHandle
handle
,
SSDataBlock
*
block
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
newState
)
{
int32_t
callUdfAggProcess
(
Udf
cFunc
Handle
handle
,
SSDataBlock
*
block
,
SUdfInterBuf
*
state
,
SUdfInterBuf
*
newState
)
{
int8_t
callType
=
TSDB_UDF_CALL_AGG_PROC
;
int8_t
callType
=
TSDB_UDF_CALL_AGG_PROC
;
int32_t
err
=
callUdf
(
handle
,
callType
,
block
,
state
,
NULL
,
NULL
,
newState
);
int32_t
err
=
callUdf
(
handle
,
callType
,
block
,
state
,
NULL
,
NULL
,
newState
);
return
err
;
return
err
;
...
@@ -1139,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st
...
@@ -1139,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st
// input: interbuf1, interbuf2
// input: interbuf1, interbuf2
// output: resultBuf
// output: resultBuf
int32_t
callUdfAggMerge
(
UdfHandle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
)
{
int32_t
callUdfAggMerge
(
Udf
cFunc
Handle
handle
,
SUdfInterBuf
*
interBuf1
,
SUdfInterBuf
*
interBuf2
,
SUdfInterBuf
*
resultBuf
)
{
int8_t
callType
=
TSDB_UDF_CALL_AGG_MERGE
;
int8_t
callType
=
TSDB_UDF_CALL_AGG_MERGE
;
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
interBuf1
,
interBuf2
,
NULL
,
resultBuf
);
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
interBuf1
,
interBuf2
,
NULL
,
resultBuf
);
return
err
;
return
err
;
...
@@ -1147,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf
...
@@ -1147,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf
// input: interBuf
// input: interBuf
// output: resultData
// output: resultData
int32_t
callUdfAggFinalize
(
UdfHandle
handle
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
resultData
)
{
int32_t
callUdfAggFinalize
(
Udf
cFunc
Handle
handle
,
SUdfInterBuf
*
interBuf
,
SUdfInterBuf
*
resultData
)
{
int8_t
callType
=
TSDB_UDF_CALL_AGG_PROC
;
int8_t
callType
=
TSDB_UDF_CALL_AGG_PROC
;
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
interBuf
,
NULL
,
NULL
,
resultData
);
int32_t
err
=
callUdf
(
handle
,
callType
,
NULL
,
interBuf
,
NULL
,
NULL
,
resultData
);
return
err
;
return
err
;
...
@@ -1155,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu
...
@@ -1155,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu
// input: block
// input: block
// output: resultData
// output: resultData
int32_t
callUdfScalaProcess
(
UdfHandle
handle
,
SSDataBlock
*
block
,
SSDataBlock
*
resultData
)
{
int32_t
callUdfScalaProcess
(
Udf
cFunc
Handle
handle
,
SSDataBlock
*
block
,
SSDataBlock
*
resultData
)
{
int8_t
callType
=
TSDB_UDF_CALL_SCALA_PROC
;
int8_t
callType
=
TSDB_UDF_CALL_SCALA_PROC
;
int32_t
err
=
callUdf
(
handle
,
callType
,
block
,
NULL
,
NULL
,
resultData
,
NULL
);
int32_t
err
=
callUdf
(
handle
,
callType
,
block
,
NULL
,
NULL
,
resultData
,
NULL
);
return
err
;
return
err
;
}
}
int32_t
teardownUdf
(
UdfHandle
handle
)
{
int32_t
teardownUdf
(
Udf
cFunc
Handle
handle
)
{
debugPrint
(
"%s"
,
"client teardown udf"
);
debugPrint
(
"%s"
,
"client teardown udf"
);
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
SClientUdfTask
*
task
=
taosMemoryMalloc
(
sizeof
(
SClientUdfTask
));
...
...
source/libs/function/src/udfd.c
浏览文件 @
db2b9931
...
@@ -27,7 +27,10 @@
...
@@ -27,7 +27,10 @@
typedef
struct
SUdfdContext
{
typedef
struct
SUdfdContext
{
uv_loop_t
*
loop
;
uv_loop_t
*
loop
;
uv_pipe_t
ctrlPipe
;
uv_signal_t
intrSignal
;
char
listenPipeName
[
UDF_LISTEN_PIPE_NAME_LEN
];
char
listenPipeName
[
UDF_LISTEN_PIPE_NAME_LEN
];
uv_pipe_t
listeningPipe
;
void
*
clientRpc
;
void
*
clientRpc
;
uv_mutex_t
udfsMutex
;
uv_mutex_t
udfsMutex
;
...
@@ -76,9 +79,9 @@ typedef struct SUdf {
...
@@ -76,9 +79,9 @@ typedef struct SUdf {
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: add private udf structure.
// TODO: add private udf structure.
typedef
struct
SUdfHandle
{
typedef
struct
SUdf
cFunc
Handle
{
SUdf
*
udf
;
SUdf
*
udf
;
}
SUdfHandle
;
}
SUdf
cFunc
Handle
;
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
int32_t
udfdLoadUdf
(
char
*
udfName
,
SUdf
*
udf
)
{
strcpy
(
udf
->
name
,
udfName
);
strcpy
(
udf
->
name
,
udfName
);
...
@@ -143,7 +146,7 @@ void udfdProcessRequest(uv_work_t *req) {
...
@@ -143,7 +146,7 @@ void udfdProcessRequest(uv_work_t *req) {
}
}
uv_mutex_unlock
(
&
udf
->
lock
);
uv_mutex_unlock
(
&
udf
->
lock
);
}
}
SUdf
Handle
*
handle
=
taosMemoryMalloc
(
sizeof
(
SUdf
Handle
));
SUdf
cFuncHandle
*
handle
=
taosMemoryMalloc
(
sizeof
(
SUdfcFunc
Handle
));
handle
->
udf
=
udf
;
handle
->
udf
=
udf
;
// TODO: allocate private structure and call init function and set it to handle
// TODO: allocate private structure and call init function and set it to handle
SUdfResponse
rsp
;
SUdfResponse
rsp
;
...
@@ -166,7 +169,7 @@ void udfdProcessRequest(uv_work_t *req) {
...
@@ -166,7 +169,7 @@ void udfdProcessRequest(uv_work_t *req) {
case
UDF_TASK_CALL
:
{
case
UDF_TASK_CALL
:
{
SUdfCallRequest
*
call
=
&
request
.
call
;
SUdfCallRequest
*
call
=
&
request
.
call
;
fnDebug
(
"%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
.
seqNum
,
call
->
callType
,
call
->
udfHandle
);
fnDebug
(
"%"
PRId64
"call request. call type %d, handle: %"
PRIx64
,
request
.
seqNum
,
call
->
callType
,
call
->
udfHandle
);
SUdf
Handle
*
handle
=
(
SUdf
Handle
*
)(
call
->
udfHandle
);
SUdf
cFuncHandle
*
handle
=
(
SUdfcFunc
Handle
*
)(
call
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdf
*
udf
=
handle
->
udf
;
SUdfDataBlock
input
=
{
0
};
SUdfDataBlock
input
=
{
0
};
...
@@ -204,7 +207,7 @@ void udfdProcessRequest(uv_work_t *req) {
...
@@ -204,7 +207,7 @@ void udfdProcessRequest(uv_work_t *req) {
case
UDF_TASK_TEARDOWN
:
{
case
UDF_TASK_TEARDOWN
:
{
SUdfTeardownRequest
*
teardown
=
&
request
.
teardown
;
SUdfTeardownRequest
*
teardown
=
&
request
.
teardown
;
fnInfo
(
"teardown. %"
PRId64
"handle:%"
PRIx64
,
request
.
seqNum
,
teardown
->
udfHandle
)
fnInfo
(
"teardown. %"
PRId64
"handle:%"
PRIx64
,
request
.
seqNum
,
teardown
->
udfHandle
)
SUdf
Handle
*
handle
=
(
SUdf
Handle
*
)(
teardown
->
udfHandle
);
SUdf
cFuncHandle
*
handle
=
(
SUdfcFunc
Handle
*
)(
teardown
->
udfHandle
);
SUdf
*
udf
=
handle
->
udf
;
SUdf
*
udf
=
handle
->
udf
;
bool
unloadUdf
=
false
;
bool
unloadUdf
=
false
;
uv_mutex_lock
(
&
global
.
udfsMutex
);
uv_mutex_lock
(
&
global
.
udfsMutex
);
...
@@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
...
@@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
}
}
}
}
void
removeListeningPipe
(
int
sig
)
{
void
udfdIntrSignalHandler
(
uv_signal_t
*
handle
,
int
signum
)
{
fnInfo
(
"udfd signal received: %d
\n
"
,
signum
);
uv_fs_t
req
;
uv_fs_t
req
;
uv_fs_unlink
(
global
.
loop
,
&
req
,
"udf.sock"
,
NULL
);
uv_fs_unlink
(
global
.
loop
,
&
req
,
global
.
listenPipeName
,
NULL
);
exit
(
0
);
uv_signal_stop
(
handle
);
uv_stop
(
global
.
loop
);
}
}
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
;
}
void
udfdProcessRpcRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
return
;
}
...
@@ -492,37 +497,67 @@ static int32_t udfdInitLog() {
...
@@ -492,37 +497,67 @@ static int32_t udfdInitLog() {
return
taosCreateLog
(
logName
,
1
,
configDir
,
NULL
,
NULL
,
NULL
,
0
);
return
taosCreateLog
(
logName
,
1
,
configDir
,
NULL
,
NULL
,
NULL
,
0
);
}
}
void
udfdCtrlAllocBufCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
buf
->
base
=
taosMemoryMalloc
(
suggested_size
);
buf
->
len
=
suggested_size
;
}
void
udfdCtrlReadCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
if
(
nread
<
0
)
{
fnError
(
"udfd ctrl pipe read error. %s"
,
uv_err_name
(
nread
));
uv_close
((
uv_handle_t
*
)
q
,
NULL
);
uv_stop
(
global
.
loop
);
return
;
}
fnError
(
"udfd ctrl pipe read %zu bytes"
,
nread
);
taosMemoryFree
(
buf
->
base
);
}
static
int32_t
removeListeningPipe
()
{
uv_fs_t
req
;
int
err
=
uv_fs_unlink
(
global
.
loop
,
&
req
,
global
.
listenPipeName
,
NULL
);
uv_fs_req_cleanup
(
&
req
);
return
err
;
}
static
int32_t
udfdUvInit
()
{
static
int32_t
udfdUvInit
()
{
uv_loop_t
*
loop
=
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
uv_loop_t
*
loop
=
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
if
(
loop
)
{
if
(
loop
)
{
uv_loop_init
(
loop
);
uv_loop_init
(
loop
);
}
}
global
.
loop
=
loop
;
global
.
loop
=
loop
;
uv_pipe_init
(
global
.
loop
,
&
global
.
ctrlPipe
,
1
);
uv_pipe_open
(
&
global
.
ctrlPipe
,
0
);
uv_read_start
((
uv_stream_t
*
)
&
global
.
ctrlPipe
,
udfdCtrlAllocBufCb
,
udfdCtrlReadCb
);
char
dnodeId
[
8
]
=
{
0
};
char
dnodeId
[
8
]
=
{
0
};
size_t
dnodeIdSize
;
size_t
dnodeIdSize
;
uv_os_getenv
(
"DNODE_ID"
,
dnodeId
,
&
dnodeIdSize
);
int32_t
err
=
uv_os_getenv
(
"DNODE_ID"
,
dnodeId
,
&
dnodeIdSize
);
if
(
err
!=
0
)
{
dnodeId
[
0
]
=
'1'
;
}
char
listenPipeName
[
32
]
=
{
0
};
char
listenPipeName
[
32
]
=
{
0
};
snprintf
(
listenPipeName
,
sizeof
(
listenPipeName
),
"%s%s"
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
dnodeId
);
snprintf
(
listenPipeName
,
sizeof
(
listenPipeName
),
"%s%s"
,
UDF_LISTEN_PIPE_NAME_PREFIX
,
dnodeId
);
strcpy
(
global
.
listenPipeName
,
listenPipeName
);
strcpy
(
global
.
listenPipeName
,
listenPipeName
);
uv_fs_t
req
;
removeListeningPipe
();
uv_fs_unlink
(
global
.
loop
,
&
req
,
global
.
listenPipeName
,
NULL
);
uv_pipe_t
server
;
uv_pipe_init
(
global
.
loop
,
&
global
.
listeningPipe
,
0
);
uv_pipe_init
(
global
.
loop
,
&
server
,
0
);
signal
(
SIGINT
,
removeListeningPipe
);
uv_signal_init
(
global
.
loop
,
&
global
.
intrSignal
);
uv_signal_start
(
&
global
.
intrSignal
,
udfdIntrSignalHandler
,
SIGINT
);
int
r
;
int
r
;
fnInfo
(
"bind to pipe %s"
,
global
.
listenPipeName
);
fnInfo
(
"bind to pipe %s"
,
global
.
listenPipeName
);
if
((
r
=
uv_pipe_bind
(
&
server
,
listenPipeName
)))
{
if
((
r
=
uv_pipe_bind
(
&
global
.
listeningPipe
,
listenPipeName
)))
{
fnError
(
"Bind error %s"
,
uv_err_name
(
r
));
fnError
(
"Bind error %s"
,
uv_err_name
(
r
));
removeListeningPipe
(
0
);
removeListeningPipe
();
return
-
1
;
return
-
1
;
}
}
if
((
r
=
uv_listen
((
uv_stream_t
*
)
&
server
,
128
,
udfdOnNewConnection
)))
{
if
((
r
=
uv_listen
((
uv_stream_t
*
)
&
global
.
listeningPipe
,
128
,
udfdOnNewConnection
)))
{
fnError
(
"Listen error %s"
,
uv_err_name
(
r
));
fnError
(
"Listen error %s"
,
uv_err_name
(
r
));
removeListeningPipe
(
0
);
removeListeningPipe
();
return
-
2
;
return
-
2
;
}
}
return
0
;
return
0
;
...
...
source/libs/function/test/runUdf.c
浏览文件 @
db2b9931
...
@@ -8,7 +8,8 @@
...
@@ -8,7 +8,8 @@
#include "tdatablock.h"
#include "tdatablock.h"
int
main
(
int
argc
,
char
*
argv
[])
{
int
main
(
int
argc
,
char
*
argv
[])
{
createUdfdProxy
(
1
);
UdfcHandle
udfc
;
udfcOpen
(
1
,
&
udfc
);
uv_sleep
(
1000
);
uv_sleep
(
1000
);
char
path
[
256
]
=
{
0
};
char
path
[
256
]
=
{
0
};
size_t
cwdSize
=
256
;
size_t
cwdSize
=
256
;
...
@@ -20,9 +21,9 @@ int main(int argc, char *argv[]) {
...
@@ -20,9 +21,9 @@ int main(int argc, char *argv[]) {
fprintf
(
stdout
,
"current working directory:%s
\n
"
,
path
);
fprintf
(
stdout
,
"current working directory:%s
\n
"
,
path
);
strcat
(
path
,
"/libudf1.so"
);
strcat
(
path
,
"/libudf1.so"
);
UdfHandle
handle
;
Udf
cFunc
Handle
handle
;
SEpSet
epSet
;
SEpSet
epSet
;
setupUdf
(
"udf1"
,
&
epSet
,
&
handle
);
setupUdf
(
udfc
,
"udf1"
,
&
epSet
,
&
handle
);
SSDataBlock
block
=
{
0
};
SSDataBlock
block
=
{
0
};
SSDataBlock
*
pBlock
=
&
block
;
SSDataBlock
*
pBlock
=
&
block
;
...
@@ -53,5 +54,5 @@ int main(int argc, char *argv[]) {
...
@@ -53,5 +54,5 @@ int main(int argc, char *argv[]) {
}
}
teardownUdf
(
handle
);
teardownUdf
(
handle
);
destroyUdfdProxy
(
1
);
udfcClose
(
udfc
);
}
}
source/os/src/osEnv.c
浏览文件 @
db2b9931
...
@@ -37,6 +37,7 @@ int64_t tsOpenMax = 0;
...
@@ -37,6 +37,7 @@ int64_t tsOpenMax = 0;
int64_t
tsStreamMax
=
0
;
int64_t
tsStreamMax
=
0
;
float
tsNumOfCores
=
0
;
float
tsNumOfCores
=
0
;
int64_t
tsTotalMemoryKB
=
0
;
int64_t
tsTotalMemoryKB
=
0
;
char
*
tsProcPath
=
NULL
;
void
osDefaultInit
()
{
void
osDefaultInit
()
{
taosSeedRand
(
taosSafeRand
());
taosSeedRand
(
taosSafeRand
());
...
...
source/os/src/osProc.c
浏览文件 @
db2b9931
...
@@ -17,7 +17,7 @@
...
@@ -17,7 +17,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
static
char
*
tsProcPath
=
NULL
;
char
*
tsProcPath
=
NULL
;
int32_t
taosNewProc
(
char
**
args
)
{
int32_t
taosNewProc
(
char
**
args
)
{
int32_t
pid
=
fork
();
int32_t
pid
=
fork
();
...
...
tests/script/test.sh
浏览文件 @
db2b9931
...
@@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then
...
@@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then
FLAG
=
"-v"
FLAG
=
"-v"
fi
fi
echo
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
echo
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--
child-silent-after-fork
=
yes
--
show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
valgrind
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--
child-silent-after-fork
=
yes
--
show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
--log-file
=
${
LOG_DIR
}
/valgrind-tsim.log
$PROGRAM
-c
$CFG_DIR
-f
$FILE_NAME
$FLAG
else
else
if
[[
$MULTIPROCESS
-eq
1
]]
;
then
if
[[
$MULTIPROCESS
-eq
1
]]
;
then
echo
"ExcuteCmd(multiprocess):"
$PROGRAM
-m
-c
$CFG_DIR
-f
$FILE_NAME
echo
"ExcuteCmd(multiprocess):"
$PROGRAM
-m
-c
$CFG_DIR
-f
$FILE_NAME
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录