未验证 提交 84cc76ff 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12129 from taosdata/feature/udf

feature(udf): scalar memory management
......@@ -54,8 +54,13 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
......
......@@ -22,6 +22,7 @@
#include "tmsg.h"
#include "tcommon.h"
#include "function.h"
#include "tdatablock.h"
#ifdef __cplusplus
extern "C" {
......@@ -54,14 +55,14 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes; // <0 var length, others fixed length bytes
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
bool varLengthColumn;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
......@@ -72,9 +73,10 @@ typedef struct SUdfColumnData {
struct {
int32_t varOffsetsLen;
char *varOffsets;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
......@@ -131,10 +133,114 @@ typedef int32_t (*TUdfSetupFunc)();
typedef int32_t (*TUdfTeardownFunc)();
//TODO: add API to check function arguments type, number etc.
//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory.
// then UDF framework will free the memory
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
#define UDF_MEMORY_EXP_GROWTH 1.5
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
if (newCapacity== 0 || newCapacity <= data->rowsAlloc) {
return TSDB_CODE_SUCCESS;
}
int allocCapacity = MAX(data->rowsAlloc, 8);
while (allocCapacity < newCapacity) {
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
if (IS_VAR_DATA_TYPE(meta->type)) {
char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.varOffsets = (int32_t*)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
// for payload, add data in udfColDataAppend
} else {
char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->fixLenCol.data = tmp;
data->fixLenCol.dataLen = allocCapacity* meta->bytes;
}
data->rowsAlloc = allocCapacity;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
if (isVarCol) {
data->varLenCol.varOffsets[currentRow] = -1;
} else {
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
}
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else {
int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen;
if (newSize <= 1) {
newSize = 8;
}
while (newSize < data->varLenCol.payloadLen + dataLen) {
newSize = newSize * UDF_MEMORY_EXP_GROWTH;
}
char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
data->varLenCol.payload = buf;
data->varLenCol.payloadAllocLen = newSize;
}
uint32_t len = data->varLenCol.payloadLen;
data->varLenCol.varOffsets[currentRow] = len;
memcpy(data->varLenCol.payload + len, pData, dataLen);
data->varLenCol.payloadLen += dataLen;
}
}
data->numOfRows = MAX(currentRow + 1, data->numOfRows);
return 0;
}
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
......
......@@ -491,7 +491,12 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
bool isNull = false;
if (pBlock->pBlockAgg == NULL) {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, NULL);
} else {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
}
char* p = colDataGetData(pColData, j);
colDataAppend(pDstCol, j - startIndex, p, isNull);
......
......@@ -48,8 +48,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
udf1 PUBLIC os
)
udf1 PUBLIC os)
add_library(udf2 MODULE test/udf2.c)
target_include_directories(
......
......@@ -19,9 +19,6 @@
extern "C" {
#endif
//TODO replaces them with fnDebug
//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__)
#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
......@@ -107,7 +104,7 @@ void* decodeUdfRequest(const void *buf, SUdfRequest* request);
int32_t encodeUdfResponse(void **buf, const SUdfResponse *response);
void* decodeUdfResponse(const void* buf, SUdfResponse *response);
void freeUdfColumnData(SUdfColumnData *data);
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
......
......@@ -481,8 +481,8 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
return (void*)buf;
}
void freeUdfColumnData(SUdfColumnData *data) {
if (data->varLengthColumn) {
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
if (IS_VAR_DATA_TYPE(meta->type)) {
taosMemoryFree(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
taosMemoryFree(data->varLenCol.payload);
......@@ -496,7 +496,7 @@ void freeUdfColumnData(SUdfColumnData *data) {
}
void freeUdfColumn(SUdfColumn* col) {
freeUdfColumnData(&col->colData);
freeUdfColumnData(&col->colData, &col->colMeta);
}
void freeUdfDataDataBlock(SUdfDataBlock *block) {
......@@ -528,8 +528,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colMeta.scale = col->info.scale;
udfCol->colMeta.precision = col->info.precision;
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
if (udfCol->colData.varLengthColumn) {
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
......@@ -555,7 +554,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.numOfCols = 1;
block->info.rows = udfCol->colData.numOfRows;
block->info.hasVarCol = udfCol->colData.varLengthColumn;
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArraySetSize(block->pDataBlock, 1);
......
......@@ -75,8 +75,8 @@ typedef struct SUdf {
char path[PATH_MAX];
uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc;
TUdfFreeUdfColumnFunc freeUdfColumn;
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
......@@ -106,11 +106,6 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *freeSuffix = "_free";
strncpy(freeFuncName, processFuncName, strlen(processFuncName));
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
......@@ -215,7 +210,7 @@ void udfdProcessRequest(uv_work_t *req) {
udf->scalarProcFunc(&input, &output);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
udf->freeUdfColumn(&output);
freeUdfColumn(&output);
break;
}
case TSDB_UDF_CALL_AGG_INIT: {
......
......@@ -18,52 +18,20 @@ int32_t udf1_destroy() {
}
int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
SUdfColumnData *srcData = &block->udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) {
resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen;
resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen);
memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen);
resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen;
resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen);
memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen);
} else {
resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen;
resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen);
memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen);
resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen;
resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen);
memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen);
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
*(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
return 0;
}
int32_t udf1_free(SUdfColumn *col) {
SUdfColumnData *data = &col->colData;
if (data->varLengthColumn) {
free(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
free(data->varLenCol.payload);
data->varLenCol.payload = NULL;
} else {
free(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
free(data->fixLenCol.data);
data->fixLenCol.data = NULL;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
SUdfColumnData *srcData = &block->udfCols[0]->colData;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int32_t luckyNum = 88;
udfColSetRow(resultCol, i, (char*)&luckyNum, false);
}
return 0;
}
\ No newline at end of file
#!/bin/bash
set +e
#set -x
echo "Executing gen_udf.sh"
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
IN_TDINTERNAL="community"
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
cd ../../..
else
cd ../../
fi
TAOS_DIR=`pwd`
UDF1_DIR=`find $TAOS_DIR -name "libudf1.so"|grep lib|head -n1`
UDF2_DIR=`find $TAOS_DIR -name "libudf2.so"|grep lib|head -n1`
echo $UDF1_DIR
echo $UDF2_DIR
UDF_TMP=/tmp/udf
mkdir $UDF_TMP
rm $UDF_TMP/libudf1.so
rm $UDF_TMP/libudf2.so
echo "Copy udf shared library files to $UDF_TMP"
cp $UDF1_DIR $UDF_TMP
cp $UDF2_DIR $UDF_TMP
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
print ======== step1 udf
system sh/copy_udf.sh
sql create database udf vgroups 3;
sql use udf;
sql show databases;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2);
sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;
sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;
sql show functions;
sql select udf1(f) from t;
if $rows != 2 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
sql select udf2(f) from t;
if $rows != 1 then
return -1
endi
if $data00 != 2.236067977 then
return -1
endi
#sql drop function udf1;
#sql drop function udf2;
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册