diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 3a98102661a347f6605c6d94dd53e8e32a5a5b9a..4b36b67e43b67a25c4531f7a6e7abfc810a27765 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -54,6 +54,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) +#define colDataSetNotNull_f(bm_, r_) \ + do { \ + BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ + } while (0) + #define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) #define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 5207a73487d81c9bdb5a0e4f95c927345ee0dd26..22314e4bd667a9f4619386f01ba3633cde73581c 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -22,6 +22,7 @@ #include "tmsg.h" #include "tcommon.h" #include "function.h" +#include "tdatablock.h" #ifdef __cplusplus extern "C" { @@ -61,7 +62,7 @@ typedef struct SUdfColumnMeta { typedef struct SUdfColumnData { int32_t numOfRows; - bool varLengthColumn; + int32_t rowsAlloc; union { struct { int32_t nullBitmapLen; @@ -72,9 +73,10 @@ typedef struct SUdfColumnData { struct { int32_t varOffsetsLen; - char *varOffsets; + int32_t *varOffsets; int32_t payloadLen; char *payload; + int32_t payloadAllocLen; } varLenCol; }; } SUdfColumnData; @@ -131,9 +133,114 @@ typedef int32_t (*TUdfSetupFunc)(); typedef int32_t (*TUdfTeardownFunc)(); //TODO: add API to check function arguments type, number etc. -//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory. -// then UDF framework will free the memory -// int32_t udfColDataAppend(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) + +#define UDF_MEMORY_EXP_GROWTH 1.5 + +static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) { + SUdfColumnMeta *meta = &pColumn->colMeta; + SUdfColumnData *data = &pColumn->colData; + + if (newCapacity== 0 || newCapacity <= data->rowsAlloc) { + return TSDB_CODE_SUCCESS; + } + + int allocCapacity = MAX(data->rowsAlloc, 8); + while (allocCapacity < newCapacity) { + allocCapacity *= UDF_MEMORY_EXP_GROWTH; + } + + if (IS_VAR_DATA_TYPE(meta->type)) { + char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + data->varLenCol.varOffsets = (int32_t*)tmp; + data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity; + // for payload, add data in udfColDataAppend + } else { + char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + data->fixLenCol.nullBitmap = tmp; + data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity); + if (meta->type == TSDB_DATA_TYPE_NULL) { + return TSDB_CODE_SUCCESS; + } + + tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + data->fixLenCol.data = tmp; + data->fixLenCol.dataLen = allocCapacity* meta->bytes; + } + + data->rowsAlloc = allocCapacity; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { + SUdfColumnMeta *meta = &pColumn->colMeta; + SUdfColumnData *data = &pColumn->colData; + udfColEnsureCapacity(pColumn, currentRow+1); + bool isVarCol = IS_VAR_DATA_TYPE(meta->type); + if (isNull) { + if (isVarCol) { + data->varLenCol.varOffsets[currentRow] = -1; + } else { + colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow); + } + } else { + if (!isVarCol) { + colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow); + memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); + } else { + int32_t dataLen = varDataTLen(pData); + if (meta->type == TSDB_DATA_TYPE_JSON) { + if (*pData == TSDB_DATA_TYPE_NULL) { + dataLen = 0; + } else if (*pData == TSDB_DATA_TYPE_NCHAR) { + dataLen = varDataTLen(pData + CHAR_BYTES); + } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { + dataLen = LONG_BYTES; + } else if (*pData == TSDB_DATA_TYPE_BOOL) { + dataLen = CHAR_BYTES; + } + dataLen += CHAR_BYTES; + } + + if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) { + uint32_t newSize = data->varLenCol.payloadAllocLen; + if (newSize <= 1) { + newSize = 8; + } + + while (newSize < data->varLenCol.payloadLen + dataLen) { + newSize = newSize * UDF_MEMORY_EXP_GROWTH; + } + + char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + data->varLenCol.payload = buf; + data->varLenCol.payloadAllocLen = newSize; + } + + uint32_t len = data->varLenCol.payloadLen; + data->varLenCol.varOffsets[currentRow] = len; + + memcpy(data->varLenCol.payload + len, pData, dataLen); + data->varLenCol.payloadLen += dataLen; + } + } + data->numOfRows = MAX(currentRow + 1, data->numOfRows); + return 0; +} typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 15813b3cb05cc796594dd7be27ad10a0557359b7..c31cabda1960780331650c1db90912db3d314632 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -48,8 +48,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - udf1 PUBLIC os -) + udf1 PUBLIC os) add_library(udf2 MODULE test/udf2.c) target_include_directories( diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 8aedd59c335f2876a5fc4c6a9f958f4ea81dc464..6f82542aee6966d8b4fd7c613263a0639be190a3 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -19,9 +19,6 @@ extern "C" { #endif -//TODO replaces them with fnDebug -//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__) -#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");} enum { UDF_TASK_SETUP = 0, UDF_TASK_CALL = 1, @@ -107,7 +104,7 @@ void* decodeUdfRequest(const void *buf, SUdfRequest* request); int32_t encodeUdfResponse(void **buf, const SUdfResponse *response); void* decodeUdfResponse(const void* buf, SUdfResponse *response); -void freeUdfColumnData(SUdfColumnData *data); +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); void freeUdfColumn(SUdfColumn* col); void freeUdfDataDataBlock(SUdfDataBlock *block); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 9aac2bfe0cce2587f3c45ed9957afac40ed746dd..5b1573c88f91bc2d4dd0aa95f0053dc443dfea0e 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -481,8 +481,8 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { return (void*)buf; } -void freeUdfColumnData(SUdfColumnData *data) { - if (data->varLengthColumn) { +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { + if (IS_VAR_DATA_TYPE(meta->type)) { taosMemoryFree(data->varLenCol.varOffsets); data->varLenCol.varOffsets = NULL; taosMemoryFree(data->varLenCol.payload); @@ -496,7 +496,7 @@ void freeUdfColumnData(SUdfColumnData *data) { } void freeUdfColumn(SUdfColumn* col) { - freeUdfColumnData(&col->colData); + freeUdfColumnData(&col->colData, &col->colMeta); } void freeUdfDataDataBlock(SUdfDataBlock *block) { @@ -528,8 +528,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colMeta.scale = col->info.scale; udfCol->colMeta.precision = col->info.precision; udfCol->colData.numOfRows = udfBlock->numOfRows; - udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type); - if (udfCol->colData.varLengthColumn) { + if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); @@ -555,7 +554,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { block->info.numOfCols = 1; block->info.rows = udfCol->colData.numOfRows; - block->info.hasVarCol = udfCol->colData.varLengthColumn; + block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); taosArraySetSize(block->pDataBlock, 1); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 06fa49e1c2d85b123522efe1fc7a2cf8880a5fa6..ba9fca2969bf6d114936822113173155e93af4e4 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -75,8 +75,8 @@ typedef struct SUdf { char path[PATH_MAX]; uv_lib_t lib; + TUdfScalarProcFunc scalarProcFunc; - TUdfFreeUdfColumnFunc freeUdfColumn; TUdfAggStartFunc aggStartFunc; TUdfAggProcessFunc aggProcFunc; @@ -106,11 +106,6 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *freeSuffix = "_free"; - strncpy(freeFuncName, processFuncName, strlen(processFuncName)); - strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); - uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); @@ -215,7 +210,7 @@ void udfdProcessRequest(uv_work_t *req) { udf->scalarProcFunc(&input, &output); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); - udf->freeUdfColumn(&output); + freeUdfColumn(&output); break; } case TSDB_UDF_CALL_AGG_INIT: { diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index b2367313ae2fbada9f3cc4abb30b5d1cca8a3c88..4384d326cbbdc64f9713c03f75d625707cb22809 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -18,52 +18,20 @@ int32_t udf1_destroy() { } int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { - SUdfColumnData *resultData = &resultCol->colData; - resultData->numOfRows = block->numOfRows; - SUdfColumnData *srcData = &block->udfCols[0]->colData; - resultData->varLengthColumn = srcData->varLengthColumn; - - if (resultData->varLengthColumn) { - resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen; - resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen); - memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen); - - resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen; - resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen); - memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen); - } else { - resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen; - resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen); - memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen); - - resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen; - resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen); - memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen); - for (int32_t i = 0; i < resultData->numOfRows; ++i) { - *(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88; - } - } - SUdfColumnMeta *meta = &resultCol->colMeta; meta->bytes = 4; meta->type = TSDB_DATA_TYPE_INT; meta->scale = 0; meta->precision = 0; - return 0; -} -int32_t udf1_free(SUdfColumn *col) { - SUdfColumnData *data = &col->colData; - if (data->varLengthColumn) { - free(data->varLenCol.varOffsets); - data->varLenCol.varOffsets = NULL; - free(data->varLenCol.payload); - data->varLenCol.payload = NULL; - } else { - free(data->fixLenCol.nullBitmap); - data->fixLenCol.nullBitmap = NULL; - free(data->fixLenCol.data); - data->fixLenCol.data = NULL; + SUdfColumnData *resultData = &resultCol->colData; + resultData->numOfRows = block->numOfRows; + SUdfColumnData *srcData = &block->udfCols[0]->colData; + + for (int32_t i = 0; i < resultData->numOfRows; ++i) { + int32_t luckyNum = 88; + udfColSetRow(resultCol, i, (char*)&luckyNum, false); } + return 0; } \ No newline at end of file