Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ca7263be
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ca7263be
编写于
5月 10, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
76a9ef18
74b12592
变更
17
展开全部
隐藏空白更改
内联
并排
Showing
17 changed file
with
869 addition
and
485 deletion
+869
-485
include/common/tname.h
include/common/tname.h
+13
-0
include/libs/function/function.h
include/libs/function/function.h
+11
-0
source/client/src/clientSml.c
source/client/src/clientSml.c
+44
-61
source/common/src/tname.c
source/common/src/tname.c
+41
-0
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+4
-141
source/dnode/mgmt/interface/inc/dmDef.h
source/dnode/mgmt/interface/inc/dmDef.h
+2
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+29
-19
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+6
-1
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+4
-1
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+5
-2
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+16
-15
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+504
-213
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+156
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+2
-0
tests/script/tsim/insert/basic0.sim
tests/script/tsim/insert/basic0.sim
+2
-1
tests/script/tsim/query/udf.sim
tests/script/tsim/query/udf.sim
+29
-29
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
include/common/tname.h
浏览文件 @
ca7263be
...
...
@@ -17,6 +17,7 @@
#define _TD_COMMON_NAME_H_
#include "tdef.h"
#include "tarray.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
bool
tNameDBNameEqual
(
SName
*
left
,
SName
*
right
);
typedef
struct
{
// input
SArray
*
tags
;
// element is SSmlKV
const
char
*
sTableName
;
// super table name
uint8_t
sTableNameLen
;
// the length of super table name
// output
char
*
childTableName
;
// must have size of TSDB_TABLE_NAME_LEN;
uint64_t
uid
;
// child table uid, may be useful
}
RandTableName
;
void
buildChildTableName
(
RandTableName
*
rName
);
#ifdef __cplusplus
}
...
...
include/libs/function/function.h
浏览文件 @
ca7263be
...
...
@@ -336,6 +336,17 @@ int32_t udfcOpen();
*/
int32_t
udfcClose
();
/**
* start udfd that serves udf function invocation under dnode startDnodeId
* @param startDnodeId
* @return
*/
int32_t
udfStartUdfd
(
int32_t
startDnodeId
);
/**
* stop udfd
* @return
*/
int32_t
udfStopUdfd
();
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientSml.c
浏览文件 @
ca7263be
...
...
@@ -15,6 +15,7 @@
#include "tcommon.h"
#include "catalog.h"
#include "clientInt.h"
#include "tname.h"
//=================================================================================================
#define SPACE ' '
...
...
@@ -97,6 +98,21 @@ typedef struct {
char
*
buf
;
}
SSmlMsgBuf
;
typedef
struct
{
int32_t
code
;
int32_t
lineNum
;
int32_t
numOfSTables
;
int32_t
numOfCTables
;
int32_t
numOfCreateSTables
;
int64_t
parseTime
;
int64_t
schemaTime
;
int64_t
insertBindTime
;
int64_t
insertRpcTime
;
int64_t
endTime
;
}
SSmlCostInfo
;
typedef
struct
{
uint64_t
id
;
...
...
@@ -114,6 +130,7 @@ typedef struct {
SRequestObj
*
pRequest
;
SQuery
*
pQuery
;
SSmlCostInfo
cost
;
int32_t
affectedRows
;
SSmlMsgBuf
msgBuf
;
SHashObj
*
dumplicateKey
;
// for dumplicate key
...
...
@@ -147,45 +164,6 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
return
TSDB_CODE_SML_INVALID_DATA
;
}
static
int
smlCompareKv
(
const
void
*
p1
,
const
void
*
p2
)
{
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
p1
;
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
p2
;
int32_t
kvLen1
=
kv1
->
keyLen
;
int32_t
kvLen2
=
kv2
->
keyLen
;
int32_t
res
=
strncasecmp
(
kv1
->
key
,
kv2
->
key
,
TMIN
(
kvLen1
,
kvLen2
));
if
(
res
!=
0
)
{
return
res
;
}
else
{
return
kvLen1
-
kvLen2
;
}
}
static
void
smlBuildChildTableName
(
SSmlTableInfo
*
tags
)
{
int32_t
size
=
taosArrayGetSize
(
tags
->
tags
);
ASSERT
(
size
>
0
);
taosArraySort
(
tags
->
tags
,
smlCompareKv
);
SStringBuilder
sb
=
{
0
};
taosStringBuilderAppendStringLen
(
&
sb
,
tags
->
sTableName
,
tags
->
sTableNameLen
);
for
(
int
j
=
0
;
j
<
size
;
++
j
)
{
SSmlKv
*
tagKv
=
taosArrayGetP
(
tags
->
tags
,
j
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
key
,
tagKv
->
keyLen
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
value
,
tagKv
->
valueLen
);
}
size_t
len
=
0
;
char
*
keyJoined
=
taosStringBuilderGetResult
(
&
sb
,
&
len
);
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
keyJoined
,
(
uint32_t
)
len
);
tMD5Final
(
&
context
);
uint64_t
digest1
=
*
(
uint64_t
*
)(
context
.
digest
);
//uint64_t digest2 = *(uint64_t*)(context.digest + 8);
//snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf
(
tags
->
childTableName
,
TSDB_TABLE_NAME_LEN
,
"t_%016"
PRIx64
,
digest1
);
taosStringBuilderDestroy
(
&
sb
);
tags
->
uid
=
digest1
;
}
static
int32_t
smlGenerateSchemaAction
(
SSchema
*
pointColField
,
SHashObj
*
dbAttrHash
,
SArray
*
dbAttrArray
,
bool
isTag
,
char
sTableName
[],
SSchemaAction
*
action
,
bool
*
actionNeeded
,
SSmlHandle
*
info
)
{
// char fieldName[TSDB_COL_NAME_LEN] = {0};
...
...
@@ -444,6 +422,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
schemaAction
.
createSTable
.
sTableName
);
return
code
;
}
info
->
cost
.
numOfCreateSTables
++
;
}
else
if
(
code
==
TSDB_CODE_SUCCESS
)
{
}
else
{
uError
(
"SML:0x%"
PRIx64
" load table meta error: %s"
,
info
->
id
,
tstrerror
(
code
));
...
...
@@ -926,20 +905,6 @@ static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return
false
;
}
static
bool
checkDuplicateKey
(
char
*
key
,
SHashObj
*
pHash
,
SSmlHandle
*
info
)
{
char
*
val
=
NULL
;
val
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
));
if
(
val
)
{
uError
(
"SML:0x%"
PRIx64
" Duplicate key detected:%s"
,
info
->
id
,
key
);
return
true
;
}
uint8_t
dummy_val
=
0
;
taosHashPut
(
pHash
,
key
,
strlen
(
key
),
&
dummy_val
,
sizeof
(
uint8_t
));
return
false
;
}
static
int32_t
smlParseString
(
const
char
*
sql
,
SSmlLineInfo
*
elements
,
SSmlMsgBuf
*
msg
){
if
(
!
sql
)
return
TSDB_CODE_SML_INVALID_DATA
;
while
(
*
sql
!=
'\0'
)
{
// jump the space at the begining
...
...
@@ -1546,8 +1511,10 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
tinfo
->
sTableName
=
elements
.
measure
;
tinfo
->
sTableNameLen
=
elements
.
measureLen
;
smlBuildChildTableName
(
tinfo
);
uDebug
(
"SML:0x%"
PRIx64
" child table name: %s"
,
info
->
id
,
tinfo
->
childTableName
);
RandTableName
rName
=
{.
tags
=
tinfo
->
tags
,
.
sTableName
=
tinfo
->
sTableName
,
.
sTableNameLen
=
tinfo
->
sTableNameLen
,
.
childTableName
=
tinfo
->
childTableName
};
buildChildTableName
(
&
rName
);
tinfo
->
uid
=
rName
.
uid
;
SSmlSTableMeta
**
tableMeta
=
taosHashGet
(
info
->
superTables
,
elements
.
measure
,
elements
.
measureLen
);
if
(
tableMeta
){
// update meta
...
...
@@ -1604,7 +1571,7 @@ static void smlDestroyInfo(SSmlHandle* info){
static
SSmlHandle
*
smlBuildSmlInfo
(
TAOS
*
taos
,
SRequestObj
*
request
,
SMLProtocolType
protocol
,
int8_t
precision
,
bool
dataFormat
){
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSmlHandle
*
info
=
taosMemory
Malloc
(
sizeof
(
SSmlHandle
));
SSmlHandle
*
info
=
taosMemory
Calloc
(
1
,
sizeof
(
SSmlHandle
));
if
(
NULL
==
info
)
{
return
NULL
;
}
...
...
@@ -1699,12 +1666,23 @@ static int32_t smlInsertData(SSmlHandle* info) {
}
smlBuildOutput
(
info
->
exec
,
info
->
pVgHash
);
info
->
cost
.
insertRpcTime
=
taosGetTimestampUs
();
launchQueryImpl
(
info
->
pRequest
,
info
->
pQuery
,
TSDB_CODE_SUCCESS
,
true
);
info
->
affectedRows
=
taos_affected_rows
(
info
->
pRequest
);
return
info
->
pRequest
->
code
;
}
static
void
smlPrintStatisticInfo
(
SSmlHandle
*
info
){
uError
(
"SML:0x%"
PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
parse cost:%"
PRId64
",schema cost:%"
PRId64
",bind cost:%"
PRId64
",rpc cost:%"
PRId64
",total cost:%"
PRId64
""
,
info
->
id
,
info
->
cost
.
code
,
info
->
cost
.
lineNum
,
info
->
cost
.
numOfSTables
,
info
->
cost
.
numOfCTables
,
info
->
cost
.
numOfCreateSTables
,
info
->
cost
.
schemaTime
-
info
->
cost
.
parseTime
,
info
->
cost
.
insertBindTime
-
info
->
cost
.
schemaTime
,
info
->
cost
.
insertRpcTime
-
info
->
cost
.
insertBindTime
,
info
->
cost
.
endTime
-
info
->
cost
.
insertRpcTime
,
info
->
cost
.
endTime
-
info
->
cost
.
parseTime
);
}
static
int
smlInsertLines
(
SSmlHandle
*
info
,
char
*
lines
[],
int
numLines
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1714,6 +1692,7 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
goto
cleanup
;
}
info
->
cost
.
parseTime
=
taosGetTimestampUs
();
for
(
int32_t
i
=
0
;
i
<
numLines
;
++
i
)
{
code
=
smlParseLine
(
info
,
lines
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1721,24 +1700,29 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
goto
cleanup
;
}
}
uDebug
(
"SML:0x%"
PRIx64
" smlInsertLines parse success. tables %d"
,
info
->
id
,
taosHashGetSize
(
info
->
childTables
));
uDebug
(
"SML:0x%"
PRIx64
" smlInsertLines parse success. super tables %d"
,
info
->
id
,
taosHashGetSize
(
info
->
superTables
));
info
->
cost
.
lineNum
=
numLines
;
info
->
cost
.
numOfSTables
=
taosHashGetSize
(
info
->
superTables
);
info
->
cost
.
numOfCTables
=
taosHashGetSize
(
info
->
childTables
);
info
->
cost
.
schemaTime
=
taosGetTimestampUs
();
code
=
smlModifyDBSchemas
(
info
);
if
(
code
!=
0
)
{
uError
(
"SML:0x%"
PRIx64
" smlModifyDBSchemas error : %s"
,
info
->
id
,
tstrerror
(
code
));
goto
cleanup
;
}
info
->
cost
.
insertBindTime
=
taosGetTimestampUs
();
code
=
smlInsertData
(
info
);
if
(
code
!=
0
)
{
uError
(
"SML:0x%"
PRIx64
" smlInsertData error : %s"
,
info
->
id
,
tstrerror
(
code
));
goto
cleanup
;
}
uDebug
(
"SML:0x%"
PRIx64
" smlInsertLines finish inserting %d lines."
,
info
->
id
,
numLines
);
info
->
cost
.
endTime
=
taosGetTimestampUs
();
cleanup:
info
->
cost
.
code
=
code
;
smlPrintStatisticInfo
(
info
);
return
code
;
}
...
...
@@ -1790,7 +1774,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
}
smlDestroyInfo
(
info
);
end:
return
(
TAOS_RES
*
)
request
;
}
source/common/src/tname.c
浏览文件 @
ca7263be
...
...
@@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE
#include "tname.h"
#include "tcommon.h"
#include "tstrbuild.h"
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
...
...
@@ -294,4 +296,43 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return
0
;
}
static
int
compareKv
(
const
void
*
p1
,
const
void
*
p2
)
{
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
p1
;
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
p2
;
int32_t
kvLen1
=
kv1
->
keyLen
;
int32_t
kvLen2
=
kv2
->
keyLen
;
int32_t
res
=
strncasecmp
(
kv1
->
key
,
kv2
->
key
,
TMIN
(
kvLen1
,
kvLen2
));
if
(
res
!=
0
)
{
return
res
;
}
else
{
return
kvLen1
-
kvLen2
;
}
}
/*
* use stable name and tags to grearate child table name
*/
void
buildChildTableName
(
RandTableName
*
rName
)
{
int32_t
size
=
taosArrayGetSize
(
rName
->
tags
);
ASSERT
(
size
>
0
);
taosArraySort
(
rName
->
tags
,
compareKv
);
SStringBuilder
sb
=
{
0
};
taosStringBuilderAppendStringLen
(
&
sb
,
rName
->
sTableName
,
rName
->
sTableNameLen
);
for
(
int
j
=
0
;
j
<
size
;
++
j
)
{
SSmlKv
*
tagKv
=
taosArrayGetP
(
rName
->
tags
,
j
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
key
,
tagKv
->
keyLen
);
taosStringBuilderAppendStringLen
(
&
sb
,
tagKv
->
value
,
tagKv
->
valueLen
);
}
size_t
len
=
0
;
char
*
keyJoined
=
taosStringBuilderGetResult
(
&
sb
,
&
len
);
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
keyJoined
,
(
uint32_t
)
len
);
tMD5Final
(
&
context
);
uint64_t
digest1
=
*
(
uint64_t
*
)(
context
.
digest
);
uint64_t
digest2
=
*
(
uint64_t
*
)(
context
.
digest
+
8
);
snprintf
(
rName
->
childTableName
,
TSDB_TABLE_NAME_LEN
,
"t_%016"
PRIx64
"%016"
PRIx64
,
digest1
,
digest2
);
taosStringBuilderDestroy
(
&
sb
);
rName
->
uid
=
digest1
;
}
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
ca7263be
...
...
@@ -217,145 +217,6 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread
(
pWrapper
->
pDnode
);
}
static
int32_t
dmSpawnUdfd
(
SDnode
*
pDnode
);
void
dmUdfdExit
(
uv_process_t
*
process
,
int64_t
exitStatus
,
int
termSignal
)
{
dInfo
(
"udfd process exited with status %"
PRId64
", signal %d"
,
exitStatus
,
termSignal
);
SDnode
*
pDnode
=
process
->
data
;
if
(
exitStatus
==
0
&&
termSignal
==
0
||
atomic_load_32
(
&
pDnode
->
udfdData
.
stopCalled
))
{
dInfo
(
"udfd process exit due to SIGINT or dnode-mgmt called stop"
);
}
else
{
dInfo
(
"udfd process restart"
);
dmSpawnUdfd
(
pDnode
);
}
}
static
int32_t
dmSpawnUdfd
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode start spawning udfd"
);
uv_process_options_t
options
=
{
0
};
char
path
[
PATH_MAX
]
=
{
0
};
if
(
tsProcPath
==
NULL
)
{
path
[
0
]
=
'.'
;
}
else
{
strncpy
(
path
,
tsProcPath
,
strlen
(
tsProcPath
));
taosDirName
(
path
);
}
#ifdef WINDOWS
strcat
(
path
,
"udfd.exe"
);
#else
strcat
(
path
,
"/udfd"
);
#endif
char
*
argsUdfd
[]
=
{
path
,
"-c"
,
configDir
,
NULL
};
options
.
args
=
argsUdfd
;
options
.
file
=
path
;
options
.
exit_cb
=
dmUdfdExit
;
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
uv_pipe_init
(
&
pData
->
loop
,
&
pData
->
ctrlPipe
,
1
);
uv_stdio_container_t
child_stdio
[
3
];
child_stdio
[
0
].
flags
=
UV_CREATE_PIPE
|
UV_READABLE_PIPE
;
child_stdio
[
0
].
data
.
stream
=
(
uv_stream_t
*
)
&
pData
->
ctrlPipe
;
child_stdio
[
1
].
flags
=
UV_IGNORE
;
child_stdio
[
2
].
flags
=
UV_INHERIT_FD
;
child_stdio
[
2
].
data
.
fd
=
2
;
options
.
stdio_count
=
3
;
options
.
stdio
=
child_stdio
;
options
.
flags
=
UV_PROCESS_DETACHED
;
char
dnodeIdEnvItem
[
32
]
=
{
0
};
char
thrdPoolSizeEnvItem
[
32
]
=
{
0
};
snprintf
(
dnodeIdEnvItem
,
32
,
"%s=%d"
,
"DNODE_ID"
,
pDnode
->
data
.
dnodeId
);
float
numCpuCores
=
4
;
taosGetCpuCores
(
&
numCpuCores
);
snprintf
(
thrdPoolSizeEnvItem
,
32
,
"%s=%d"
,
"UV_THREADPOOL_SIZE"
,
(
int
)
numCpuCores
*
2
);
char
*
envUdfd
[]
=
{
dnodeIdEnvItem
,
thrdPoolSizeEnvItem
,
NULL
};
options
.
env
=
envUdfd
;
int
err
=
uv_spawn
(
&
pData
->
loop
,
&
pData
->
process
,
&
options
);
pData
->
process
.
data
=
(
void
*
)
pDnode
;
if
(
err
!=
0
)
{
dError
(
"can not spawn udfd. path: %s, error: %s"
,
path
,
uv_strerror
(
err
));
}
return
err
;
}
static
void
dmUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
}
}
static
void
dmUdfdStopAsyncCb
(
uv_async_t
*
async
)
{
SDnode
*
pDnode
=
async
->
data
;
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
uv_stop
(
&
pData
->
loop
);
}
static
void
dmWatchUdfd
(
void
*
args
)
{
SDnode
*
pDnode
=
args
;
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
uv_loop_init
(
&
pData
->
loop
);
uv_async_init
(
&
pData
->
loop
,
&
pData
->
stopAsync
,
dmUdfdStopAsyncCb
);
pData
->
stopAsync
.
data
=
pDnode
;
int32_t
err
=
dmSpawnUdfd
(
pDnode
);
atomic_store_32
(
&
pData
->
spawnErr
,
err
);
uv_barrier_wait
(
&
pData
->
barrier
);
uv_run
(
&
pData
->
loop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
pData
->
loop
);
uv_walk
(
&
pData
->
loop
,
dmUdfdCloseWalkCb
,
NULL
);
uv_run
(
&
pData
->
loop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
pData
->
loop
);
return
;
}
static
int32_t
dmStartUdfd
(
SDnode
*
pDnode
)
{
char
dnodeId
[
8
]
=
{
0
};
snprintf
(
dnodeId
,
sizeof
(
dnodeId
),
"%d"
,
pDnode
->
data
.
dnodeId
);
uv_os_setenv
(
"DNODE_ID"
,
dnodeId
);
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
if
(
pData
->
startCalled
)
{
dInfo
(
"dnode-mgmt start udfd already called"
);
return
0
;
}
pData
->
startCalled
=
true
;
uv_barrier_init
(
&
pData
->
barrier
,
2
);
uv_thread_create
(
&
pData
->
thread
,
dmWatchUdfd
,
pDnode
);
uv_barrier_wait
(
&
pData
->
barrier
);
int32_t
err
=
atomic_load_32
(
&
pData
->
spawnErr
);
if
(
err
!=
0
)
{
uv_barrier_destroy
(
&
pData
->
barrier
);
uv_async_send
(
&
pData
->
stopAsync
);
uv_thread_join
(
&
pData
->
thread
);
pData
->
needCleanUp
=
false
;
dInfo
(
"dnode-mgmt udfd cleaned up after spawn err"
);
}
else
{
pData
->
needCleanUp
=
true
;
}
return
err
;
}
static
int32_t
dmStopUdfd
(
SDnode
*
pDnode
)
{
dInfo
(
"dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d"
,
pDnode
->
udfdData
.
needCleanUp
,
pDnode
->
udfdData
.
spawnErr
);
SUdfdData
*
pData
=
&
pDnode
->
udfdData
;
if
(
!
pData
->
needCleanUp
||
atomic_load_32
(
&
pData
->
stopCalled
))
{
return
0
;
}
atomic_store_32
(
&
pData
->
stopCalled
,
1
);
pData
->
needCleanUp
=
false
;
uv_barrier_destroy
(
&
pData
->
barrier
);
uv_async_send
(
&
pData
->
stopAsync
);
uv_thread_join
(
&
pData
->
thread
);
dInfo
(
"dnode-mgmt udfd cleaned up"
);
return
0
;
}
static
int32_t
dmInitMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"dnode-mgmt start to init"
);
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
...
...
@@ -387,7 +248,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
dmReportStartup
(
pDnode
,
"dnode-transport"
,
"initialized"
);
if
(
dmStartUdfd
(
pDnode
)
!=
0
)
{
if
(
udfStartUdfd
(
pDnode
->
data
.
dnodeId
)
!=
0
)
{
dError
(
"failed to start udfd"
);
}
...
...
@@ -398,7 +259,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static
void
dmCleanupMgmt
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"dnode-mgmt start to clean up"
);
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
dmStopUdfd
(
pDnode
);
udfStopUdfd
();
dmStopWorker
(
pDnode
);
taosWLockLatch
(
&
pDnode
->
data
.
latch
);
...
...
source/dnode/mgmt/interface/inc/dmDef.h
浏览文件 @
ca7263be
...
...
@@ -156,6 +156,8 @@ typedef struct SUdfdData {
uv_pipe_t
ctrlPipe
;
uv_async_t
stopAsync
;
int32_t
stopCalled
;
int32_t
dnodeId
;
}
SUdfdData
;
typedef
struct
SDnode
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ca7263be
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libs/function/function.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -803,7 +804,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
for
(
int32_t
k
=
0
;
k
<
pOperator
->
numOfExprs
;
++
k
)
{
if
(
functionNeedToExecute
(
&
pCtx
[
k
]))
{
pCtx
[
k
].
startTs
=
startTs
;
// this can be set during create the struct
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
if
(
pCtx
[
k
].
fpSet
.
process
!=
NULL
)
pCtx
[
k
].
fpSet
.
process
(
&
pCtx
[
k
]);
}
}
}
...
...
@@ -1074,35 +1076,36 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
// set the output buffer for the selectivity + tag query
static
int32_t
setCtxTagColumnInfo
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
int32_t
num
=
0
;
int16_t
tagLen
=
0
;
SqlFunctionCtx
*
p
=
NULL
;
SqlFunctionCtx
**
p
Tag
Ctx
=
taosMemoryCalloc
(
numOfOutput
,
POINTER_BYTES
);
if
(
p
Tag
Ctx
==
NULL
)
{
SqlFunctionCtx
**
p
Val
Ctx
=
taosMemoryCalloc
(
numOfOutput
,
POINTER_BYTES
);
if
(
p
Val
Ctx
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pCtx
[
i
].
functionId
;
if
(
functionId
==
FUNCTION_TAG_DUMMY
||
functionId
==
FUNCTION_TS_DUMMY
)
{
tagLen
+=
pCtx
[
i
].
resDataInfo
.
bytes
;
pTagCtx
[
num
++
]
=
&
pCtx
[
i
];
}
else
if
(
1
/*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/
)
{
p
=
&
pCtx
[
i
];
}
else
if
(
functionId
==
FUNCTION_TS
||
functionId
==
FUNCTION_TAG
)
{
// tag function may be the group by tag column
// ts may be the required primary timestamp column
continue
;
if
(
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
pValCtx
[
num
++
]
=
&
pCtx
[
i
];
}
else
{
// the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
p
=
&
pCtx
[
i
];
}
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
// tagLen += pCtx[i].resDataInfo.bytes;
// pTagCtx[num++] = &pCtx[i];
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
// // tag function may be the group by tag column
// // ts may be the required primary timestamp column
// continue;
// } else {
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
// }
}
if
(
p
!=
NULL
)
{
p
->
subsidiaries
.
pCtx
=
p
Tag
Ctx
;
p
->
subsidiaries
.
pCtx
=
p
Val
Ctx
;
p
->
subsidiaries
.
num
=
num
;
}
else
{
taosMemoryFreeClear
(
p
Tag
Ctx
);
taosMemoryFreeClear
(
p
Val
Ctx
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2219,6 +2222,8 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
pCtx
[
j
].
resultInfo
=
getResultCell
(
pRow
,
j
,
rowCellOffset
);
if
(
pCtx
[
j
].
fpSet
.
process
)
{
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
}
else
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
...
...
@@ -3974,6 +3979,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
blockDataCleanup
(
pRes
);
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -4037,9 +4043,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
p
TaskInfo
->
code
=
p
rojectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
int32_t
status
=
handleLimitOffset
(
pOperator
,
pBlock
);
if
(
status
==
PROJECT_RETRIEVE_CONTINUE
)
{
continue
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
ca7263be
...
...
@@ -262,6 +262,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
...
...
@@ -289,7 +291,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
pScalarExprInfo
!=
NULL
)
{
projectApplyFunctions
(
pInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
pScalarFuncCtx
,
pInfo
->
numOfScalarExpr
,
NULL
);
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pInfo
->
pScalarFuncCtx
,
pInfo
->
numOfScalarExpr
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
ca7263be
...
...
@@ -114,7 +114,10 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo
*
pOperator
=
param
;
SSortOperatorInfo
*
pSort
=
pOperator
->
info
;
if
(
pOperator
->
pExpr
!=
NULL
)
{
projectApplyFunctions
(
pOperator
->
pExpr
,
pBlock
,
pBlock
,
pSort
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
NULL
);
int32_t
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pBlock
,
pBlock
,
pSort
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
code
);
}
}
}
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
ca7263be
...
...
@@ -37,11 +37,11 @@ bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t
sumFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
sumInvertFunction
(
SqlFunctionCtx
*
pCtx
);
bool
minFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
maxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
minmaxFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
bool
getMinmaxFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
minmaxFunctionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getAvgFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
...
...
@@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
bool
getTopBotFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
);
int32_t
topFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
bottomFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
topBotFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSpreadFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
...
...
@@ -82,6 +83,8 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn
int32_t
histogramFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
histogramFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
);
bool
getSelectivityFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/function/src/builtins.c
浏览文件 @
ca7263be
...
...
@@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
static
int32_t
translateBottom
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// todo
SDataType
*
pType
=
&
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
pType
->
bytes
,
.
type
=
pType
->
type
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -509,9 +510,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
translateFunc
=
translateInOutNum
,
.
dataRequiredFunc
=
statisDataRequired
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
minFunctionSetup
,
.
initFunc
=
min
max
FunctionSetup
,
.
processFunc
=
minFunction
,
.
finalizeFunc
=
f
unctionFinalize
.
finalizeFunc
=
minmaxF
unctionFinalize
},
{
.
name
=
"max"
,
...
...
@@ -520,9 +521,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
translateFunc
=
translateInOutNum
,
.
dataRequiredFunc
=
statisDataRequired
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
maxFunctionSetup
,
.
initFunc
=
m
inm
axFunctionSetup
,
.
processFunc
=
maxFunction
,
.
finalizeFunc
=
f
unctionFinalize
.
finalizeFunc
=
minmaxF
unctionFinalize
},
{
.
name
=
"stddev"
,
...
...
@@ -562,14 +563,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
translateFunc
=
translateApercentile
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
maxFunctionSetup
,
.
initFunc
=
m
inm
axFunctionSetup
,
.
processFunc
=
maxFunction
,
.
finalizeFunc
=
functionFinalize
},
{
.
name
=
"top"
,
.
type
=
FUNCTION_TYPE_TOP
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
,
.
translateFunc
=
translateTop
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"bottom"
,
.
type
=
FUNCTION_TYPE_BOTTOM
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
,
.
translateFunc
=
translateBottom
,
.
getEnvFunc
=
get
Minmax
FuncEnv
,
.
initFunc
=
maxF
unctionSetup
,
.
processFunc
=
max
Function
,
.
finalizeFunc
=
function
Finalize
.
getEnvFunc
=
get
TopBot
FuncEnv
,
.
initFunc
=
f
unctionSetup
,
.
processFunc
=
bottom
Function
,
.
finalizeFunc
=
topBot
Finalize
},
{
.
name
=
"spread"
,
...
...
@@ -603,7 +604,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_MULTI_RES_FUNC
,
.
translateFunc
=
translateLastRow
,
.
getEnvFunc
=
getMinmaxFuncEnv
,
.
initFunc
=
maxFunctionSetup
,
.
initFunc
=
m
inm
axFunctionSetup
,
.
processFunc
=
maxFunction
,
.
finalizeFunc
=
functionFinalize
},
...
...
@@ -1032,8 +1033,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
type
=
FUNCTION_TYPE_SELECT_VALUE
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
,
.
translateFunc
=
translateSelectValue
,
.
getEnvFunc
=
NULL
,
.
initFunc
=
NULL
,
.
getEnvFunc
=
getSelectivityFuncEnv
,
// todo remove this function later.
.
initFunc
=
functionSetup
,
.
sprocessFunc
=
NULL
,
.
finalizeFunc
=
NULL
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
ca7263be
此差异已折叠。
点击以展开。
source/libs/function/src/tudf.c
浏览文件 @
ca7263be
...
...
@@ -23,10 +23,165 @@
#include "builtinsimpl.h"
#include "functionMgt.h"
//TODO: network error processing.
//TODO: add unit test
//TODO: include all global variable under context struct
typedef
struct
SUdfdData
{
bool
startCalled
;
bool
needCleanUp
;
uv_loop_t
loop
;
uv_thread_t
thread
;
uv_barrier_t
barrier
;
uv_process_t
process
;
int
spawnErr
;
uv_pipe_t
ctrlPipe
;
uv_async_t
stopAsync
;
int32_t
stopCalled
;
int32_t
dnodeId
;
}
SUdfdData
;
SUdfdData
udfdGlobal
=
{
0
};
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
);
void
udfUdfdExit
(
uv_process_t
*
process
,
int64_t
exitStatus
,
int
termSignal
)
{
fnInfo
(
"udfd process exited with status %"
PRId64
", signal %d"
,
exitStatus
,
termSignal
);
SUdfdData
*
pData
=
process
->
data
;
if
(
exitStatus
==
0
&&
termSignal
==
0
||
atomic_load_32
(
&
pData
->
stopCalled
))
{
fnInfo
(
"udfd process exit due to SIGINT or dnode-mgmt called stop"
);
}
else
{
fnInfo
(
"udfd process restart"
);
udfSpawnUdfd
(
pData
);
}
}
static
int32_t
udfSpawnUdfd
(
SUdfdData
*
pData
)
{
fnInfo
(
"dnode start spawning udfd"
);
uv_process_options_t
options
=
{
0
};
char
path
[
PATH_MAX
]
=
{
0
};
if
(
tsProcPath
==
NULL
)
{
path
[
0
]
=
'.'
;
}
else
{
strncpy
(
path
,
tsProcPath
,
strlen
(
tsProcPath
));
taosDirName
(
path
);
}
#ifdef WINDOWS
strcat
(
path
,
"udfd.exe"
);
#else
strcat
(
path
,
"/udfd"
);
#endif
char
*
argsUdfd
[]
=
{
path
,
"-c"
,
configDir
,
NULL
};
options
.
args
=
argsUdfd
;
options
.
file
=
path
;
options
.
exit_cb
=
udfUdfdExit
;
uv_pipe_init
(
&
pData
->
loop
,
&
pData
->
ctrlPipe
,
1
);
uv_stdio_container_t
child_stdio
[
3
];
child_stdio
[
0
].
flags
=
UV_CREATE_PIPE
|
UV_READABLE_PIPE
;
child_stdio
[
0
].
data
.
stream
=
(
uv_stream_t
*
)
&
pData
->
ctrlPipe
;
child_stdio
[
1
].
flags
=
UV_IGNORE
;
child_stdio
[
2
].
flags
=
UV_INHERIT_FD
;
child_stdio
[
2
].
data
.
fd
=
2
;
options
.
stdio_count
=
3
;
options
.
stdio
=
child_stdio
;
options
.
flags
=
UV_PROCESS_DETACHED
;
char
dnodeIdEnvItem
[
32
]
=
{
0
};
char
thrdPoolSizeEnvItem
[
32
]
=
{
0
};
snprintf
(
dnodeIdEnvItem
,
32
,
"%s=%d"
,
"DNODE_ID"
,
pData
->
dnodeId
);
float
numCpuCores
=
4
;
taosGetCpuCores
(
&
numCpuCores
);
snprintf
(
thrdPoolSizeEnvItem
,
32
,
"%s=%d"
,
"UV_THREADPOOL_SIZE"
,
(
int
)
numCpuCores
*
2
);
char
*
envUdfd
[]
=
{
dnodeIdEnvItem
,
thrdPoolSizeEnvItem
,
NULL
};
options
.
env
=
envUdfd
;
int
err
=
uv_spawn
(
&
pData
->
loop
,
&
pData
->
process
,
&
options
);
pData
->
process
.
data
=
(
void
*
)
pData
;
if
(
err
!=
0
)
{
fnError
(
"can not spawn udfd. path: %s, error: %s"
,
path
,
uv_strerror
(
err
));
}
return
err
;
}
static
void
udfUdfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
}
}
static
void
udfUdfdStopAsyncCb
(
uv_async_t
*
async
)
{
SUdfdData
*
pData
=
async
->
data
;
uv_stop
(
&
pData
->
loop
);
}
static
void
udfWatchUdfd
(
void
*
args
)
{
SUdfdData
*
pData
=
args
;
uv_loop_init
(
&
pData
->
loop
);
uv_async_init
(
&
pData
->
loop
,
&
pData
->
stopAsync
,
udfUdfdStopAsyncCb
);
pData
->
stopAsync
.
data
=
pData
;
int32_t
err
=
udfSpawnUdfd
(
pData
);
atomic_store_32
(
&
pData
->
spawnErr
,
err
);
uv_barrier_wait
(
&
pData
->
barrier
);
uv_run
(
&
pData
->
loop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
pData
->
loop
);
uv_walk
(
&
pData
->
loop
,
udfUdfdCloseWalkCb
,
NULL
);
uv_run
(
&
pData
->
loop
,
UV_RUN_DEFAULT
);
uv_loop_close
(
&
pData
->
loop
);
return
;
}
int32_t
udfStartUdfd
(
int32_t
startDnodeId
)
{
SUdfdData
*
pData
=
&
udfdGlobal
;
if
(
pData
->
startCalled
)
{
fnInfo
(
"dnode-mgmt start udfd already called"
);
return
0
;
}
pData
->
startCalled
=
true
;
char
dnodeId
[
8
]
=
{
0
};
snprintf
(
dnodeId
,
sizeof
(
dnodeId
),
"%d"
,
startDnodeId
);
uv_os_setenv
(
"DNODE_ID"
,
dnodeId
);
pData
->
dnodeId
=
startDnodeId
;
uv_barrier_init
(
&
pData
->
barrier
,
2
);
uv_thread_create
(
&
pData
->
thread
,
udfWatchUdfd
,
pData
);
uv_barrier_wait
(
&
pData
->
barrier
);
int32_t
err
=
atomic_load_32
(
&
pData
->
spawnErr
);
if
(
err
!=
0
)
{
uv_barrier_destroy
(
&
pData
->
barrier
);
uv_async_send
(
&
pData
->
stopAsync
);
uv_thread_join
(
&
pData
->
thread
);
pData
->
needCleanUp
=
false
;
fnInfo
(
"dnode-mgmt udfd cleaned up after spawn err"
);
}
else
{
pData
->
needCleanUp
=
true
;
}
return
err
;
}
int32_t
udfStopUdfd
()
{
SUdfdData
*
pData
=
&
udfdGlobal
;
fnInfo
(
"dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d"
,
pData
->
needCleanUp
,
pData
->
spawnErr
);
if
(
!
pData
->
needCleanUp
||
atomic_load_32
(
&
pData
->
stopCalled
))
{
return
0
;
}
atomic_store_32
(
&
pData
->
stopCalled
,
1
);
pData
->
needCleanUp
=
false
;
uv_barrier_destroy
(
&
pData
->
barrier
);
uv_async_send
(
&
pData
->
stopAsync
);
uv_thread_join
(
&
pData
->
thread
);
fnInfo
(
"dnode-mgmt udfd cleaned up"
);
return
0
;
}
//==============================================================================================
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
* The QUEUE is copied from queue.h under libuv
* */
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
ca7263be
...
...
@@ -522,6 +522,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
ret
=
raftStoreClose
(
pSyncNode
->
pRaftStore
);
assert
(
ret
==
0
);
syncRespMgrDestroy
(
pSyncNode
->
pSyncRespMgr
);
voteGrantedDestroy
(
pSyncNode
->
pVotesGranted
);
votesRespondDestory
(
pSyncNode
->
pVotesRespond
);
syncIndexMgrDestroy
(
pSyncNode
->
pNextIndex
);
...
...
@@ -1138,6 +1139,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
syncNodeReplicate
(
ths
);
}
syncEntryDestory
(
pEntry
);
return
ret
;
}
...
...
tests/script/tsim/insert/basic0.sim
浏览文件 @
ca7263be
...
...
@@ -140,7 +140,8 @@ endi
if $data00 != -13 then
return -1
endi
if $data01 != -2.30000 then
if $data01 != -2.30000 then
print expect -2.30000, actual: $data01
return -1
endi
if $data02 != -3.300000000 then
...
...
tests/script/tsim/query/udf.sim
浏览文件 @
ca7263be
...
...
@@ -64,35 +64,35 @@ if $data00 != 1.414213562 then
return -1
endi
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
sql select udf1(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30
if $rows != 4 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
sql select udf2(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
endi
if $data00 != 2.645751311 then
return -1
endi
#
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
#
sql select udf1(f1, f2) from t2;
#
print $rows , $data00 , $data10 , $data20 , $data30
#
if $rows != 4 then
#
return -1
#
endi
#
if $data00 != 88 then
#
return -1
#
endi
#
if $data10 != 88 then
#
return -1
#
endi
#
#
if $data20 != NULL then
#
return -1
#
endi
#
#
if $data30 != NULL then
#
return -1
#
endi
#
#
sql select udf2(f1, f2) from t2;
#
print $rows, $data00
#
if $rows != 1 then
#
return -1
#
endi
#
if $data00 != 2.645751311 then
#
return -1
#
endi
sql drop function udf1;
sql show functions;
if $rows != 1 then
...
...
taos-tools
@
0ae9f872
比较
2f3dfddd
...
0ae9f872
Subproject commit
2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
Subproject commit
0ae9f872c26d5da8cb61aa9eb00b5c7aeba10ec4
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录