/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef TDENGINE_TUDF_H #define TDENGINE_TUDF_H #include #include #include "tmsg.h" #include "tcommon.h" #include "function.h" #include "tdatablock.h" #ifdef __cplusplus extern "C" { #endif #define UDF_LISTEN_PIPE_NAME_LEN 32 #ifdef _WIN32 #define UDF_LISTEN_PIPE_NAME_PREFIX "\\\\?\\pipe\\udfd.sock" #else #define UDF_LISTEN_PIPE_NAME_PREFIX ".udfd.sock." #endif #define UDF_DNODE_ID_ENV_NAME "DNODE_ID" //====================================================================================== //begin API to taosd and qworker enum { UDFC_CODE_STOPPING = -1, UDFC_CODE_PIPE_READ_ERR = -2, UDFC_CODE_CONNECT_PIPE_ERR = -3, UDFC_CODE_LOAD_UDF_FAILURE = -4, UDFC_CODE_INVALID_STATE = -5 }; typedef void *UdfcFuncHandle; /** * setup udf * @param udf, in * @param handle, out * @return error code */ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle); typedef struct SUdfColumnMeta { int16_t type; int32_t bytes; uint8_t precision; uint8_t scale; } SUdfColumnMeta; typedef struct SUdfColumnData { int32_t numOfRows; int32_t rowsAlloc; union { struct { int32_t nullBitmapLen; char *nullBitmap; int32_t dataLen; char *data; } fixLenCol; struct { int32_t varOffsetsLen; int32_t *varOffsets; int32_t payloadLen; char *payload; int32_t payloadAllocLen; } varLenCol; }; } SUdfColumnData; typedef struct SUdfColumn { SUdfColumnMeta colMeta; SUdfColumnData colData; } SUdfColumn; typedef struct SUdfDataBlock { int32_t numOfRows; int32_t numOfCols; SUdfColumn **udfCols; } SUdfDataBlock; typedef struct SUdfInterBuf { int32_t bufLen; char* buf; int8_t numOfResult; //zero or one } SUdfInterBuf; // output: interBuf int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); // input: block, state // output: newState int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); // input: interBuf // output: resultData int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); /** * tearn down udf * @param handle * @return */ int32_t teardownUdf(UdfcFuncHandle handle); bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); // end API to taosd and qworker //============================================================================================================================= // begin API to UDF writer. // dynamic lib init and destroy typedef int32_t (*TUdfInitFunc)(); typedef int32_t (*TUdfDestroyFunc)(); //TODO: add API to check function arguments type, number etc. #define UDF_MEMORY_EXP_GROWTH 1.5 static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) { SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnData *data = &pColumn->colData; if (newCapacity== 0 || newCapacity <= data->rowsAlloc) { return TSDB_CODE_SUCCESS; } int allocCapacity = TMAX(data->rowsAlloc, 8); while (allocCapacity < newCapacity) { allocCapacity *= UDF_MEMORY_EXP_GROWTH; } if (IS_VAR_DATA_TYPE(meta->type)) { char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } data->varLenCol.varOffsets = (int32_t*)tmp; data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity; // for payload, add data in udfColDataAppend } else { char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } data->fixLenCol.nullBitmap = tmp; data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity); if (meta->type == TSDB_DATA_TYPE_NULL) { return TSDB_CODE_SUCCESS; } tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } data->fixLenCol.data = tmp; data->fixLenCol.dataLen = allocCapacity* meta->bytes; } data->rowsAlloc = allocCapacity; return TSDB_CODE_SUCCESS; } static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnData *data = &pColumn->colData; udfColEnsureCapacity(pColumn, currentRow+1); bool isVarCol = IS_VAR_DATA_TYPE(meta->type); if (isNull) { if (isVarCol) { data->varLenCol.varOffsets[currentRow] = -1; } else { colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow); } } else { if (!isVarCol) { colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow); memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); } else { int32_t dataLen = varDataTLen(pData); if (meta->type == TSDB_DATA_TYPE_JSON) { if (*pData == TSDB_DATA_TYPE_NULL) { dataLen = 0; } else if (*pData == TSDB_DATA_TYPE_NCHAR) { dataLen = varDataTLen(pData + CHAR_BYTES); } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { dataLen = LONG_BYTES; } else if (*pData == TSDB_DATA_TYPE_BOOL) { dataLen = CHAR_BYTES; } dataLen += CHAR_BYTES; } if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) { uint32_t newSize = data->varLenCol.payloadAllocLen; if (newSize <= 1) { newSize = 8; } while (newSize < data->varLenCol.payloadLen + dataLen) { newSize = newSize * UDF_MEMORY_EXP_GROWTH; } char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } data->varLenCol.payload = buf; data->varLenCol.payloadAllocLen = newSize; } uint32_t len = data->varLenCol.payloadLen; data->varLenCol.varOffsets[currentRow] = len; memcpy(data->varLenCol.payload + len, pData, dataLen); data->varLenCol.payloadLen += dataLen; } } data->numOfRows = TMAX(currentRow + 1, data->numOfRows); return 0; } typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); // end API to UDF writer //======================================================================================================================= #ifdef __cplusplus } #endif #endif // TDENGINE_TUDF_H