Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8b9e94a0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8b9e94a0
编写于
5月 05, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
scalar udf memory by itself
上级
c6ef0b9b
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
134 addition
and
64 deletion
+134
-64
include/common/tdatablock.h
include/common/tdatablock.h
+5
-0
include/libs/function/tudf.h
include/libs/function/tudf.h
+112
-5
source/libs/function/CMakeLists.txt
source/libs/function/CMakeLists.txt
+1
-2
source/libs/function/inc/tudfInt.h
source/libs/function/inc/tudfInt.h
+1
-4
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+5
-6
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+2
-7
source/libs/function/test/udf1.c
source/libs/function/test/udf1.c
+8
-40
未找到文件。
include/common/tdatablock.h
浏览文件 @
8b9e94a0
...
...
@@ -54,6 +54,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
...
...
include/libs/function/tudf.h
浏览文件 @
8b9e94a0
...
...
@@ -22,6 +22,7 @@
#include "tmsg.h"
#include "tcommon.h"
#include "function.h"
#include "tdatablock.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -61,7 +62,7 @@ typedef struct SUdfColumnMeta {
typedef
struct
SUdfColumnData
{
int32_t
numOfRows
;
bool
varLengthColumn
;
int32_t
rowsAlloc
;
union
{
struct
{
int32_t
nullBitmapLen
;
...
...
@@ -72,9 +73,10 @@ typedef struct SUdfColumnData {
struct
{
int32_t
varOffsetsLen
;
char
*
varOffsets
;
int32_t
*
varOffsets
;
int32_t
payloadLen
;
char
*
payload
;
int32_t
payloadAllocLen
;
}
varLenCol
;
};
}
SUdfColumnData
;
...
...
@@ -131,9 +133,114 @@ typedef int32_t (*TUdfSetupFunc)();
typedef
int32_t
(
*
TUdfTeardownFunc
)();
//TODO: add API to check function arguments type, number etc.
//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory.
// then UDF framework will free the memory
// int32_t udfColDataAppend(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull)
#define UDF_MEMORY_EXP_GROWTH 1.5
static
FORCE_INLINE
int32_t
udfColEnsureCapacity
(
SUdfColumn
*
pColumn
,
int32_t
newCapacity
)
{
SUdfColumnMeta
*
meta
=
&
pColumn
->
colMeta
;
SUdfColumnData
*
data
=
&
pColumn
->
colData
;
if
(
newCapacity
==
0
||
newCapacity
<=
data
->
rowsAlloc
)
{
return
TSDB_CODE_SUCCESS
;
}
int
allocCapacity
=
MAX
(
data
->
rowsAlloc
,
8
);
while
(
allocCapacity
<
newCapacity
)
{
allocCapacity
*=
UDF_MEMORY_EXP_GROWTH
;
}
if
(
IS_VAR_DATA_TYPE
(
meta
->
type
))
{
char
*
tmp
=
taosMemoryRealloc
(
data
->
varLenCol
.
varOffsets
,
sizeof
(
int32_t
)
*
allocCapacity
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
data
->
varLenCol
.
varOffsets
=
(
int32_t
*
)
tmp
;
data
->
varLenCol
.
varOffsetsLen
=
sizeof
(
int32_t
)
*
allocCapacity
;
// for payload, add data in udfColDataAppend
}
else
{
char
*
tmp
=
taosMemoryRealloc
(
data
->
fixLenCol
.
nullBitmap
,
BitmapLen
(
allocCapacity
));
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
data
->
fixLenCol
.
nullBitmap
=
tmp
;
data
->
fixLenCol
.
nullBitmapLen
=
BitmapLen
(
allocCapacity
);
if
(
meta
->
type
==
TSDB_DATA_TYPE_NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
tmp
=
taosMemoryRealloc
(
data
->
fixLenCol
.
data
,
allocCapacity
*
meta
->
bytes
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
data
->
fixLenCol
.
data
=
tmp
;
data
->
fixLenCol
.
dataLen
=
allocCapacity
*
meta
->
bytes
;
}
data
->
rowsAlloc
=
allocCapacity
;
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int32_t
udfColSetRow
(
SUdfColumn
*
pColumn
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
)
{
SUdfColumnMeta
*
meta
=
&
pColumn
->
colMeta
;
SUdfColumnData
*
data
=
&
pColumn
->
colData
;
udfColEnsureCapacity
(
pColumn
,
currentRow
+
1
);
bool
isVarCol
=
IS_VAR_DATA_TYPE
(
meta
->
type
);
if
(
isNull
)
{
if
(
isVarCol
)
{
data
->
varLenCol
.
varOffsets
[
currentRow
]
=
-
1
;
}
else
{
colDataSetNull_f
(
data
->
fixLenCol
.
nullBitmap
,
currentRow
);
}
}
else
{
if
(
!
isVarCol
)
{
colDataSetNotNull_f
(
data
->
fixLenCol
.
nullBitmap
,
currentRow
);
memcpy
(
data
->
fixLenCol
.
data
+
meta
->
bytes
*
currentRow
,
pData
,
meta
->
bytes
);
}
else
{
int32_t
dataLen
=
varDataTLen
(
pData
);
if
(
meta
->
type
==
TSDB_DATA_TYPE_JSON
)
{
if
(
*
pData
==
TSDB_DATA_TYPE_NULL
)
{
dataLen
=
0
;
}
else
if
(
*
pData
==
TSDB_DATA_TYPE_NCHAR
)
{
dataLen
=
varDataTLen
(
pData
+
CHAR_BYTES
);
}
else
if
(
*
pData
==
TSDB_DATA_TYPE_BIGINT
||
*
pData
==
TSDB_DATA_TYPE_DOUBLE
)
{
dataLen
=
LONG_BYTES
;
}
else
if
(
*
pData
==
TSDB_DATA_TYPE_BOOL
)
{
dataLen
=
CHAR_BYTES
;
}
dataLen
+=
CHAR_BYTES
;
}
if
(
data
->
varLenCol
.
payloadAllocLen
<
data
->
varLenCol
.
payloadLen
+
dataLen
)
{
uint32_t
newSize
=
data
->
varLenCol
.
payloadAllocLen
;
if
(
newSize
<=
1
)
{
newSize
=
8
;
}
while
(
newSize
<
data
->
varLenCol
.
payloadLen
+
dataLen
)
{
newSize
=
newSize
*
UDF_MEMORY_EXP_GROWTH
;
}
char
*
buf
=
taosMemoryRealloc
(
data
->
varLenCol
.
payload
,
newSize
);
if
(
buf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
data
->
varLenCol
.
payload
=
buf
;
data
->
varLenCol
.
payloadAllocLen
=
newSize
;
}
uint32_t
len
=
data
->
varLenCol
.
payloadLen
;
data
->
varLenCol
.
varOffsets
[
currentRow
]
=
len
;
memcpy
(
data
->
varLenCol
.
payload
+
len
,
pData
,
dataLen
);
data
->
varLenCol
.
payloadLen
+=
dataLen
;
}
}
data
->
numOfRows
=
MAX
(
currentRow
+
1
,
data
->
numOfRows
);
return
0
;
}
typedef
int32_t
(
*
TUdfFreeUdfColumnFunc
)(
SUdfColumn
*
column
);
typedef
int32_t
(
*
TUdfScalarProcFunc
)(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
);
...
...
source/libs/function/CMakeLists.txt
浏览文件 @
8b9e94a0
...
...
@@ -48,8 +48,7 @@ target_include_directories(
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
udf1 PUBLIC os
)
udf1 PUBLIC os
)
add_library
(
udf2 MODULE test/udf2.c
)
target_include_directories
(
...
...
source/libs/function/inc/tudfInt.h
浏览文件 @
8b9e94a0
...
...
@@ -19,9 +19,6 @@
extern
"C"
{
#endif
//TODO replaces them with fnDebug
//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__)
#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
enum
{
UDF_TASK_SETUP
=
0
,
UDF_TASK_CALL
=
1
,
...
...
@@ -107,7 +104,7 @@ void* decodeUdfRequest(const void *buf, SUdfRequest* request);
int32_t
encodeUdfResponse
(
void
**
buf
,
const
SUdfResponse
*
response
);
void
*
decodeUdfResponse
(
const
void
*
buf
,
SUdfResponse
*
response
);
void
freeUdfColumnData
(
SUdfColumnData
*
data
);
void
freeUdfColumnData
(
SUdfColumnData
*
data
,
SUdfColumnMeta
*
meta
);
void
freeUdfColumn
(
SUdfColumn
*
col
);
void
freeUdfDataDataBlock
(
SUdfDataBlock
*
block
);
...
...
source/libs/function/src/tudf.c
浏览文件 @
8b9e94a0
...
...
@@ -481,8 +481,8 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
return
(
void
*
)
buf
;
}
void
freeUdfColumnData
(
SUdfColumnData
*
data
)
{
if
(
data
->
varLengthColumn
)
{
void
freeUdfColumnData
(
SUdfColumnData
*
data
,
SUdfColumnMeta
*
meta
)
{
if
(
IS_VAR_DATA_TYPE
(
meta
->
type
)
)
{
taosMemoryFree
(
data
->
varLenCol
.
varOffsets
);
data
->
varLenCol
.
varOffsets
=
NULL
;
taosMemoryFree
(
data
->
varLenCol
.
payload
);
...
...
@@ -496,7 +496,7 @@ void freeUdfColumnData(SUdfColumnData *data) {
}
void
freeUdfColumn
(
SUdfColumn
*
col
)
{
freeUdfColumnData
(
&
col
->
colData
);
freeUdfColumnData
(
&
col
->
colData
,
&
col
->
colMeta
);
}
void
freeUdfDataDataBlock
(
SUdfDataBlock
*
block
)
{
...
...
@@ -528,8 +528,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol
->
colMeta
.
scale
=
col
->
info
.
scale
;
udfCol
->
colMeta
.
precision
=
col
->
info
.
precision
;
udfCol
->
colData
.
numOfRows
=
udfBlock
->
numOfRows
;
udfCol
->
colData
.
varLengthColumn
=
IS_VAR_DATA_TYPE
(
udfCol
->
colMeta
.
type
);
if
(
udfCol
->
colData
.
varLengthColumn
)
{
if
(
IS_VAR_DATA_TYPE
(
udfCol
->
colMeta
.
type
))
{
udfCol
->
colData
.
varLenCol
.
varOffsetsLen
=
sizeof
(
int32_t
)
*
udfBlock
->
numOfRows
;
udfCol
->
colData
.
varLenCol
.
varOffsets
=
taosMemoryMalloc
(
udfCol
->
colData
.
varLenCol
.
varOffsetsLen
);
memcpy
(
udfCol
->
colData
.
varLenCol
.
varOffsets
,
col
->
varmeta
.
offset
,
udfCol
->
colData
.
varLenCol
.
varOffsetsLen
);
...
...
@@ -555,7 +554,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
int32_t
convertUdfColumnToDataBlock
(
SUdfColumn
*
udfCol
,
SSDataBlock
*
block
)
{
block
->
info
.
numOfCols
=
1
;
block
->
info
.
rows
=
udfCol
->
colData
.
numOfRows
;
block
->
info
.
hasVarCol
=
udfCol
->
colData
.
varLengthColumn
;
block
->
info
.
hasVarCol
=
IS_VAR_DATA_TYPE
(
udfCol
->
colMeta
.
type
)
;
block
->
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
taosArraySetSize
(
block
->
pDataBlock
,
1
);
...
...
source/libs/function/src/udfd.c
浏览文件 @
8b9e94a0
...
...
@@ -75,8 +75,8 @@ typedef struct SUdf {
char
path
[
PATH_MAX
];
uv_lib_t
lib
;
TUdfScalarProcFunc
scalarProcFunc
;
TUdfFreeUdfColumnFunc
freeUdfColumn
;
TUdfAggStartFunc
aggStartFunc
;
TUdfAggProcessFunc
aggProcFunc
;
...
...
@@ -106,11 +106,6 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
uv_dlsym
(
&
udf
->
lib
,
processFuncName
,
(
void
**
)(
&
udf
->
scalarProcFunc
));
char
freeFuncName
[
TSDB_FUNC_NAME_LEN
+
5
]
=
{
0
};
char
*
freeSuffix
=
"_free"
;
strncpy
(
freeFuncName
,
processFuncName
,
strlen
(
processFuncName
));
strncat
(
freeFuncName
,
freeSuffix
,
strlen
(
freeSuffix
));
uv_dlsym
(
&
udf
->
lib
,
freeFuncName
,
(
void
**
)(
&
udf
->
freeUdfColumn
));
}
else
if
(
udf
->
funcType
==
TSDB_FUNC_TYPE_AGGREGATE
)
{
char
processFuncName
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
strcpy
(
processFuncName
,
udfName
);
...
...
@@ -215,7 +210,7 @@ void udfdProcessRequest(uv_work_t *req) {
udf
->
scalarProcFunc
(
&
input
,
&
output
);
convertUdfColumnToDataBlock
(
&
output
,
&
response
.
callRsp
.
resultData
);
udf
->
freeUdfColumn
(
&
output
);
freeUdfColumn
(
&
output
);
break
;
}
case
TSDB_UDF_CALL_AGG_INIT
:
{
...
...
source/libs/function/test/udf1.c
浏览文件 @
8b9e94a0
...
...
@@ -18,52 +18,20 @@ int32_t udf1_destroy() {
}
int32_t
udf1
(
SUdfDataBlock
*
block
,
SUdfColumn
*
resultCol
)
{
SUdfColumnData
*
resultData
=
&
resultCol
->
colData
;
resultData
->
numOfRows
=
block
->
numOfRows
;
SUdfColumnData
*
srcData
=
&
block
->
udfCols
[
0
]
->
colData
;
resultData
->
varLengthColumn
=
srcData
->
varLengthColumn
;
if
(
resultData
->
varLengthColumn
)
{
resultData
->
varLenCol
.
varOffsetsLen
=
srcData
->
varLenCol
.
varOffsetsLen
;
resultData
->
varLenCol
.
varOffsets
=
malloc
(
resultData
->
varLenCol
.
varOffsetsLen
);
memcpy
(
resultData
->
varLenCol
.
varOffsets
,
srcData
->
varLenCol
.
varOffsets
,
srcData
->
varLenCol
.
varOffsetsLen
);
resultData
->
varLenCol
.
payloadLen
=
srcData
->
varLenCol
.
payloadLen
;
resultData
->
varLenCol
.
payload
=
malloc
(
resultData
->
varLenCol
.
payloadLen
);
memcpy
(
resultData
->
varLenCol
.
payload
,
srcData
->
varLenCol
.
payload
,
srcData
->
varLenCol
.
payloadLen
);
}
else
{
resultData
->
fixLenCol
.
nullBitmapLen
=
srcData
->
fixLenCol
.
nullBitmapLen
;
resultData
->
fixLenCol
.
nullBitmap
=
malloc
(
resultData
->
fixLenCol
.
nullBitmapLen
);
memcpy
(
resultData
->
fixLenCol
.
nullBitmap
,
srcData
->
fixLenCol
.
nullBitmap
,
srcData
->
fixLenCol
.
nullBitmapLen
);
resultData
->
fixLenCol
.
dataLen
=
srcData
->
fixLenCol
.
dataLen
;
resultData
->
fixLenCol
.
data
=
malloc
(
resultData
->
fixLenCol
.
dataLen
);
memcpy
(
resultData
->
fixLenCol
.
data
,
srcData
->
fixLenCol
.
data
,
srcData
->
fixLenCol
.
dataLen
);
for
(
int32_t
i
=
0
;
i
<
resultData
->
numOfRows
;
++
i
)
{
*
(
resultData
->
fixLenCol
.
data
+
i
*
sizeof
(
int32_t
))
=
88
;
}
}
SUdfColumnMeta
*
meta
=
&
resultCol
->
colMeta
;
meta
->
bytes
=
4
;
meta
->
type
=
TSDB_DATA_TYPE_INT
;
meta
->
scale
=
0
;
meta
->
precision
=
0
;
return
0
;
}
int32_t
udf1_free
(
SUdfColumn
*
col
)
{
SUdfColumnData
*
data
=
&
col
->
colData
;
if
(
data
->
varLengthColumn
)
{
free
(
data
->
varLenCol
.
varOffsets
);
data
->
varLenCol
.
varOffsets
=
NULL
;
free
(
data
->
varLenCol
.
payload
);
data
->
varLenCol
.
payload
=
NULL
;
}
else
{
free
(
data
->
fixLenCol
.
nullBitmap
);
data
->
fixLenCol
.
nullBitmap
=
NULL
;
free
(
data
->
fixLenCol
.
data
);
data
->
fixLenCol
.
data
=
NULL
;
SUdfColumnData
*
resultData
=
&
resultCol
->
colData
;
resultData
->
numOfRows
=
block
->
numOfRows
;
SUdfColumnData
*
srcData
=
&
block
->
udfCols
[
0
]
->
colData
;
for
(
int32_t
i
=
0
;
i
<
resultData
->
numOfRows
;
++
i
)
{
int32_t
luckyNum
=
88
;
udfColSetRow
(
resultCol
,
i
,
(
char
*
)
&
luckyNum
,
false
);
}
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录