/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ // clang-format off #include "uv.h" #include "os.h" #include "fnLog.h" #include "thash.h" #include "tudf.h" #include "tudfInt.h" #include "version.h" #include "tdatablock.h" #include "tdataformat.h" #include "tglobal.h" #include "tmsg.h" #include "trpc.h" #include "tmisce.h" // clang-format on #define UDFD_MAX_SCRIPT_PLUGINS 64 #define UDFD_MAX_SCRIPT_TYPE 1 #define UDFD_MAX_PLUGIN_FUNCS 9 typedef struct SUdfCPluginCtx { uv_lib_t lib; TUdfScalarProcFunc scalarProcFunc; TUdfAggStartFunc aggStartFunc; TUdfAggProcessFunc aggProcFunc; TUdfAggFinishFunc aggFinishFunc; TUdfAggMergeFunc aggMergeFunc; TUdfInitFunc initFunc; TUdfDestroyFunc destroyFunc; } SUdfCPluginCtx; int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *initSuffix = "_init"; snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix); uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); char destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0}; char *destroySuffix = "_destroy"; snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix); uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); return udfName; } void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strncpy(processFuncName, udfName, sizeof(processFuncName)); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *startSuffix = "_start"; snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix); uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); char finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0}; char *finishSuffix = "_finish"; snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix); uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *mergeSuffix = "_merge"; snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix); uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); } int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t err = 0; SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); err = uv_dlopen(udf->path, &udfCtx->lib); if (err != 0) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); taosMemoryFree(udfCtx); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } const char *udfName = udf->name; udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strncpy(processFuncName, udfName, sizeof(processFuncName)); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); } else if (udf->funcType == UDF_FUNC_TYPE_AGG) { udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); } int32_t code = 0; if (udfCtx->initFunc) { code = (udfCtx->initFunc)(); if (code != 0) { uv_dlclose(&udfCtx->lib); taosMemoryFree(udfCtx); return code; } } *pUdfCtx = udfCtx; return 0; } int32_t udfdCPluginUdfDestroy(void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; int32_t code = 0; if (ctx->destroyFunc) { code = (ctx->destroyFunc)(); } uv_dlclose(&ctx->lib); taosMemoryFree(ctx); return code; } int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->scalarProcFunc) { return ctx->scalarProcFunc(block, resultCol); } else { fnError("udfd c plugin scalar proc not implemented"); return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } } int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggStartFunc) { return ctx->aggStartFunc(buf); } else { fnError("udfd c plugin aggregation start not implemented"); return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } return 0; } int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggProcFunc) { return ctx->aggProcFunc(block, interBuf, newInterBuf); } else { fnError("udfd c plugin aggregation process not implemented"); return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } } int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggMergeFunc) { return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); } else { fnError("udfd c plugin aggregation merge not implemented"); return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } } int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggFinishFunc) { return ctx->aggFinishFunc(buf, resultData); } else { fnError("udfd c plugin aggregation finish not implemented"); return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } return 0; } // for c, the function pointer are filled directly and libloaded = true; // for others, dlopen/dlsym to find function pointers typedef struct SUdfScriptPlugin { int8_t scriptType; char libPath[PATH_MAX]; bool libLoaded; uv_lib_t lib; TScriptUdfScalarProcFunc udfScalarProcFunc; TScriptUdfAggStartFunc udfAggStartFunc; TScriptUdfAggProcessFunc udfAggProcFunc; TScriptUdfAggMergeFunc udfAggMergeFunc; TScriptUdfAggFinishFunc udfAggFinishFunc; TScriptUdfInitFunc udfInitFunc; TScriptUdfDestoryFunc udfDestroyFunc; TScriptOpenFunc openFunc; TScriptCloseFunc closeFunc; } SUdfScriptPlugin; typedef struct SUdfdContext { uv_loop_t *loop; uv_pipe_t ctrlPipe; uv_signal_t intrSignal; char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_pipe_t listeningPipe; void *clientRpc; SCorEpSet mgmtEp; uv_mutex_t udfsMutex; SHashObj *udfsHash; uv_mutex_t scriptPluginsMutex; SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS]; SArray *residentFuncs; char udfDataDir[PATH_MAX]; bool printVersion; } SUdfdContext; SUdfdContext global; struct SUdfdUvConn; struct SUvUdfWork; typedef struct SUdfdUvConn { uv_stream_t *client; char *inputBuf; int32_t inputLen; int32_t inputCap; int32_t inputTotal; struct SUvUdfWork *pWorkList; // head of work list } SUdfdUvConn; typedef struct SUvUdfWork { SUdfdUvConn *conn; uv_buf_t input; uv_buf_t output; struct SUvUdfWork *pWorkNext; } SUvUdfWork; typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState; typedef struct SUdf { char name[TSDB_FUNC_NAME_LEN + 1]; int32_t version; int64_t createdTime; int8_t funcType; int8_t scriptType; int8_t outputType; int32_t outputLen; int32_t bufSize; char path[PATH_MAX]; int32_t refCount; EUdfState state; uv_mutex_t lock; uv_cond_t condReady; bool resident; SUdfScriptPlugin *scriptPlugin; void *scriptUdfCtx; int64_t lastFetchTime; // last fetch time in milliseconds bool expired; } SUdf; typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; typedef enum EUdfdRpcReqRspType { UDFD_RPC_MNODE_CONNECT = 0, UDFD_RPC_RETRIVE_FUNC, } EUdfdRpcReqRspType; typedef struct SUdfdRpcSendRecvInfo { EUdfdRpcReqRspType rpcType; int32_t code; void *param; uv_sem_t resultSem; } SUdfdRpcSendRecvInfo; static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdConnectToMnode(); static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); static int32_t udfdCloseClientRpc(); static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessRequest(uv_work_t *req); static void udfdOnWrite(uv_write_t *req, int status); static void udfdSendResponse(uv_work_t *work, int status); static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe); static void udfdHandleRequest(SUdfdUvConn *conn); static void udfdPipeCloseCb(uv_handle_t *pipe); static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); } static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); static void udfdOnNewConnection(uv_stream_t *server, int status); static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); static int32_t removeListeningPipe(); static void udfdPrintVersion(); static int32_t udfdParseArgs(int32_t argc, char *argv[]); static int32_t udfdInitLog(); static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static int32_t udfdUvInit(); static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); SUdf *udfdNewUdf(const char *udfName); void udfdGetFuncBodyPath(const SUdf *udf, char *path); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; plugin->udfInitFunc = udfdCPluginUdfInit; plugin->udfDestroyFunc = udfdCPluginUdfDestroy; plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc; plugin->udfAggStartFunc = udfdCPluginUdfAggStart; plugin->udfAggProcFunc = udfdCPluginUdfAggProc; plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; plugin->openFunc(items, 1); return; } int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { int err = uv_dlopen(libPath, pLib); if (err != 0) { fnError("can not load library %s. error: %s", libPath, uv_strerror(err)); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } for (int i = 0; i < numOfFuncs; ++i) { err = uv_dlsym(pLib, funcName[i], func[i]); if (err != 0) { fnError("load library function failed. lib %s function %s", libPath, funcName[i]); } } return 0; } int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; // todo: windows support sprintf(plugin->libPath, "%s", "libtaospyudf.so"); plugin->libLoaded = false; const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", "pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"}; void **funcs[UDFD_MAX_PLUGIN_FUNCS] = { (void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc, (void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc, (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc}; int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS); if (err != 0) { fnError("can not load python plugin. lib path %s", plugin->libPath); return err; } if (plugin->openFunc) { int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath char *pythonPath = taosMemoryMalloc(lenPythonPath); #ifdef WINDOWS snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath); #else snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath); #endif SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; err = plugin->openFunc(items, 2); taosMemoryFree(pythonPath); } if (err != 0) { fnError("udf script python plugin open func failed. error: %d", err); uv_dlclose(&plugin->lib); return err; } plugin->libLoaded = true; return 0; } void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { if (plugin->closeFunc) { plugin->closeFunc(); } plugin->openFunc = NULL; plugin->closeFunc = NULL; plugin->udfInitFunc = NULL; plugin->udfDestroyFunc = NULL; plugin->udfScalarProcFunc = NULL; plugin->udfAggStartFunc = NULL; plugin->udfAggProcFunc = NULL; plugin->udfAggMergeFunc = NULL; plugin->udfAggFinishFunc = NULL; return; } void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { if (plugin->closeFunc) { plugin->closeFunc(); } uv_dlclose(&plugin->lib); if (plugin->libLoaded) { plugin->libLoaded = false; } plugin->openFunc = NULL; plugin->closeFunc = NULL; plugin->udfInitFunc = NULL; plugin->udfDestroyFunc = NULL; plugin->udfScalarProcFunc = NULL; plugin->udfAggStartFunc = NULL; plugin->udfAggProcFunc = NULL; plugin->udfAggMergeFunc = NULL; plugin->udfAggFinishFunc = NULL; } int32_t udfdInitScriptPlugin(int8_t scriptType) { SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); switch (scriptType) { case TSDB_FUNC_SCRIPT_BIN_LIB: udfdInitializeCPlugin(plugin); break; case TSDB_FUNC_SCRIPT_PYTHON: { int32_t err = udfdInitializePythonPlugin(plugin); if (err != 0) { taosMemoryFree(plugin); return err; } break; } default: fnError("udf script type %d not supported", scriptType); taosMemoryFree(plugin); return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } global.scriptPlugins[scriptType] = plugin; return TSDB_CODE_SUCCESS; } void udfdDeinitScriptPlugins() { SUdfScriptPlugin *plugin = NULL; plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON]; if (plugin != NULL) { udfdDeinitPythonPlugin(plugin); taosMemoryFree(plugin); } plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB]; if (plugin != NULL) { udfdDeinitCPlugin(plugin); taosMemoryFree(plugin); } return; } void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; decodeUdfRequest(uvUdf->input.base, &request); switch (request.type) { case UDF_TASK_SETUP: { udfdProcessSetupRequest(uvUdf, &request); break; } case UDF_TASK_CALL: { udfdProcessCallRequest(uvUdf, &request); break; } case UDF_TASK_TEARDOWN: { udfdProcessTeardownRequest(uvUdf, &request); break; } default: { break; } } } void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->bufSize = udf->bufSize; if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { udfInfo->funcType = UDF_FUNC_TYPE_AGG; } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { udfInfo->funcType = UDF_FUNC_TYPE_SCALAR; } udfInfo->name = udf->name; udfInfo->version = udf->version; udfInfo->createdTime = udf->createdTime; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; udfInfo->path = udf->path; udfInfo->scriptType = udf->scriptType; } int32_t udfdInitUdf(char *udfName, SUdf *udf) { int32_t err = 0; err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); if (err != 0) { fnError("can not retrieve udf from mnode. udf name %s", udfName); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) { fnError("udf name %s script type %d not supported", udfName, udf->scriptType); return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } uv_mutex_lock(&global.scriptPluginsMutex); SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; if (scriptPlugin == NULL) { err = udfdInitScriptPlugin(udf->scriptType); if (err != 0) { uv_mutex_unlock(&global.scriptPluginsMutex); return err; } } uv_mutex_unlock(&global.scriptPluginsMutex); udf->scriptPlugin = global.scriptPlugins[udf->scriptType]; SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); if (err != 0) { fnError("udf name %s init failed. error %d", udfName, err); return err; } fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx); return 0; } SUdf *udfdNewUdf(const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; udfNew->lastFetchTime = taosGetTimestampMs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); udfNew->state = UDF_STATE_INIT; uv_mutex_init(&udfNew->lock); uv_cond_init(&udfNew->condReady); udfNew->resident = false; udfNew->expired = false; for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { char *funcName = taosArrayGet(global.residentFuncs, i); if (strcmp(udfName, funcName) == 0) { udfNew->resident = true; break; } } return udfNew; } SUdf *udfdGetOrCreateUdf(const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); int64_t currTime = taosGetTimestampMs(); bool expired = false; if (pUdfHash) { expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; SUdf *udf = *pUdfHash; uv_mutex_unlock(&global.udfsMutex); fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version, udf->createdTime); return udf; } else { (*pUdfHash)->expired = true; fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64, (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime); taosHashRemove(global.udfsHash, udfName, strlen(udfName)); } } SUdf *udf = udfdNewUdf(udfName); SUdf **pUdf = &udf; taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); return udf; } void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { // TODO: tracable id from client. connect, setup, call, teardown fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); SUdfSetupRequest *setup = &request->setup; int32_t code = TSDB_CODE_SUCCESS; SUdf *udf = NULL; udf = udfdGetOrCreateUdf(setup->udfName); uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; code = udfdInitUdf(setup->udfName, udf); if (code == 0) { udf->state = UDF_STATE_READY; } else { udf->state = UDF_STATE_INIT; } uv_cond_broadcast(&udf->condReady); uv_mutex_unlock(&udf->lock); } else { while (udf->state == UDF_STATE_LOADING) { uv_cond_wait(&udf->condReady, &udf->lock); } uv_mutex_unlock(&udf->lock); } SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); handle->udf = udf; SUdfResponse rsp; rsp.seqNum = request->seqNum; rsp.type = request->type; rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0; rsp.setupRsp.udfHandle = (int64_t)(handle); rsp.setupRsp.outputType = udf->outputType; rsp.setupRsp.bytes = udf->outputLen; rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); rsp.msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; encodeUdfResponse(&buf, &rsp); uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); return; } void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfCallRequest *call = &request->call; fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle, request->seqNum); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdf *udf = handle->udf; SUdfResponse response = {0}; SUdfResponse *rsp = &response; SUdfCallResponse *subRsp = &rsp->callRsp; int32_t code = TSDB_CODE_SUCCESS; switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { SUdfColumn output = {0}; output.colMeta.bytes = udf->outputLen; output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; udfColEnsureCapacity(&output, call->block.info.rows); SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); freeUdfDataDataBlock(&input); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); freeUdfColumn(&output); break; } case TSDB_UDF_CALL_AGG_INIT: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); subRsp->resultBuf = outBuf; break; } case TSDB_UDF_CALL_AGG_PROC: { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); freeUdfDataDataBlock(&input); subRsp->resultBuf = outBuf; break; } case TSDB_UDF_CALL_AGG_MERGE: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); freeUdfInterBuf(&call->interBuf2); subRsp->resultBuf = outBuf; break; } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); subRsp->resultBuf = outBuf; break; } default: break; } rsp->seqNum = request->seqNum; rsp->type = request->type; rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0; subRsp->callType = call->callType; int32_t len = encodeUdfResponse(NULL, rsp); rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(bufBegin, len); switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { blockDataFreeRes(&call->block); blockDataFreeRes(&subRsp->resultData); break; } case TSDB_UDF_CALL_AGG_INIT: { freeUdfInterBuf(&subRsp->resultBuf); break; } case TSDB_UDF_CALL_AGG_PROC: { blockDataFreeRes(&call->block); freeUdfInterBuf(&subRsp->resultBuf); break; } case TSDB_UDF_CALL_AGG_MERGE: { freeUdfInterBuf(&subRsp->resultBuf); break; } case TSDB_UDF_CALL_AGG_FIN: { freeUdfInterBuf(&subRsp->resultBuf); break; } default: break; } taosMemoryFree(uvUdf->input.base); return; } void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfTeardownRequest *teardown = &request->teardown; fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); SUdf *udf = handle->udf; bool unloadUdf = false; int32_t code = TSDB_CODE_SUCCESS; uv_mutex_lock(&global.udfsMutex); udf->refCount--; if (udf->refCount == 0 && (!udf->resident || udf->expired)) { unloadUdf = true; taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); } uv_mutex_unlock(&global.udfsMutex); if (unloadUdf) { fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx)); uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); taosMemoryFree(udf); } taosMemoryFree(handle); SUdfResponse response = {0}; SUdfResponse *rsp = &response; rsp->seqNum = request->seqNum; rsp->type = request->type; rsp->code = code; int32_t len = encodeUdfResponse(NULL, rsp); rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); return; } void udfdGetFuncBodyPath(const SUdf *udf, char *path) { if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime); #else snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { #ifdef WINDOWS snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); #else snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } else { #ifdef WINDOWS snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); #else snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } } int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_NO_DISKSPACE; fnError("udfd create shared library failed since %s", terrstr(terrno)); return terrno; } char path[PATH_MAX] = {0}; udfdGetFuncBodyPath(udf, path); bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); if (fileExist) { strncpy(udf->path, path, PATH_MAX); fnInfo("udfd func body file. reuse existing file %s", path); return TSDB_CODE_SUCCESS; } TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); return TSDB_CODE_FILE_CORRUPTED; } int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); if (count != pFuncInfo->codeSize) { fnError("udfd write udf shared library failed"); return TSDB_CODE_FILE_CORRUPTED; } taosCloseFile(&file); strncpy(udf->path, path, PATH_MAX); return TSDB_CODE_SUCCESS; } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; if (pEpSet) { if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { updateEpSet_s(&global.mgmtEp, pEpSet); } } if (pMsg->code != TSDB_CODE_SUCCESS) { fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); msgInfo->code = pMsg->code; goto _return; } if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { SConnectRsp connectRsp = {0}; tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); int32_t now = taosGetTimestampSec(); int32_t delta = abs(now - connectRsp.svrTimestamp); if (delta > 900) { msgInfo->code = TSDB_CODE_TIME_UNSYNCED; goto _return; } if (connectRsp.epSet.numOfEps == 0) { msgInfo->code = TSDB_CODE_APP_ERROR; goto _return; } if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); } msgInfo->code = 0; } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { SRetrieveFuncRsp retrieveRsp = {0}; tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0); udf->version = pFuncExtraInfo->funcVersion; udf->createdTime = pFuncExtraInfo->funcCreatedTime; msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); if (msgInfo->code != 0) { udf->lastFetchTime = 0; } tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); taosArrayDestroy(retrieveRsp.pFuncExtraInfos); } _return: rpcFreeCont(pMsg->pCont); uv_sem_post(&msgInfo->resultSem); return; } int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); taosArrayPush(retrieveReq.pFuncNames, udfName); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); void *pReq = rpcMallocCont(contLen); tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); taosArrayDestroy(retrieveReq.pFuncNames); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; msgInfo->param = udf; uv_sem_init(&msgInfo->resultSem, 0); SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; rpcMsg.contLen = contLen; rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; rpcMsg.info.ahandle = msgInfo; rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); uv_sem_wait(&msgInfo->resultSem); uv_sem_destroy(&msgInfo->resultSem); int32_t code = msgInfo->code; taosMemoryFree(msgInfo); return code; } static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; } return true; } else { return false; } } int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) { pEpSet->version = 0; // init mnode ip set SEpSet *mgmtEpSet = &(pEpSet->epSet); mgmtEpSet->numOfEps = 0; mgmtEpSet->inUse = 0; if (firstEp && firstEp[0] != 0) { if (strlen(firstEp) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return terrno; } mgmtEpSet->numOfEps++; } if (secondEp && secondEp[0] != 0) { if (strlen(secondEp) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); mgmtEpSet->numOfEps++; } if (mgmtEpSet->numOfEps == 0) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } return 0; } int32_t udfdOpenClientRpc() { SRpcInit rpcInit = {0}; rpcInit.label = "UDFD"; rpcInit.numOfThreads = 1; rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = TSDB_DEFAULT_USER; rpcInit.parent = &global; rpcInit.rfp = udfdRpcRfp; rpcInit.compressSize = tsCompressMsgSize; int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); return -1; } return 0; } int32_t udfdCloseClientRpc() { fnInfo("udfd begin closing rpc"); rpcClose(global.clientRpc); fnInfo("udfd finish closing rpc"); return 0; } void udfdOnWrite(uv_write_t *req, int status) { SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) { fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); } // remove work from the connection work list if (work->conn != NULL) { SUvUdfWork **ppWork; for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) { } if (*ppWork == work) { *ppWork = work->pWorkNext; } else { fnError("work not in conn any more"); } } taosMemoryFree(work->output.base); taosMemoryFree(work); taosMemoryFree(req); } void udfdSendResponse(uv_work_t *work, int status) { SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); if (udfWork->conn != NULL) { uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); write_req->data = udfWork; uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); } taosMemoryFree(work); } void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { SUdfdUvConn *ctx = handle->data; int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); if (ctx->inputCap == 0) { ctx->inputBuf = taosMemoryMalloc(msgHeadSize); if (ctx->inputBuf) { ctx->inputLen = 0; ctx->inputCap = msgHeadSize; ctx->inputTotal = -1; buf->base = ctx->inputBuf; buf->len = ctx->inputCap; } else { fnError("udfd can not allocate enough memory") buf->base = NULL; buf->len = 0; } } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) { buf->base = ctx->inputBuf + ctx->inputLen; buf->len = msgHeadSize - ctx->inputLen; } else { ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); if (inputBuf) { ctx->inputBuf = inputBuf; buf->base = ctx->inputBuf + ctx->inputLen; buf->len = ctx->inputCap - ctx->inputLen; } else { fnError("udfd can not allocate enough memory") buf->base = NULL; buf->len = 0; } } } bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) { pipe->inputTotal = *(int32_t *)(pipe->inputBuf); } if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) { fnDebug("receive request complete. length %d", pipe->inputLen); return true; } return false; } void udfdHandleRequest(SUdfdUvConn *conn) { char *inputBuf = conn->inputBuf; int32_t inputLen = conn->inputLen; uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); udfWork->conn = conn; udfWork->pWorkNext = conn->pWorkList; conn->pWorkList = udfWork; udfWork->input = uv_buf_init(inputBuf, inputLen); conn->inputBuf = NULL; conn->inputLen = 0; conn->inputCap = 0; conn->inputTotal = -1; work->data = udfWork; uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse); } void udfdPipeCloseCb(uv_handle_t *pipe) { SUdfdUvConn *conn = pipe->data; SUvUdfWork *pWork = conn->pWorkList; while (pWork != NULL) { pWork->conn = NULL; pWork = pWork->pWorkNext; } taosMemoryFree(conn->client); taosMemoryFree(conn->inputBuf); taosMemoryFree(conn); } void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { fnDebug("udfd read %zd bytes from client", nread); if (nread == 0) return; SUdfdUvConn *conn = client->data; if (nread > 0) { conn->inputLen += nread; if (isUdfdUvMsgComplete(conn)) { udfdHandleRequest(conn); } else { // log error or continue; } return; } if (nread < 0) { if (nread == UV_EOF) { fnInfo("udfd pipe read EOF"); } else { fnError("Receive error %s", uv_err_name(nread)); } udfdUvHandleError(conn); } } void udfdOnNewConnection(uv_stream_t *server, int status) { if (status < 0) { fnError("udfd new connection error. code: %s", uv_strerror(status)); return; } uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_init(global.loop, client, 0); if (uv_accept(server, (uv_stream_t *)client) == 0) { SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); ctx->pWorkList = NULL; ctx->client = (uv_stream_t *)client; ctx->inputBuf = 0; ctx->inputLen = 0; ctx->inputCap = 0; client->data = ctx; ctx->client = (uv_stream_t *)client; uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); } else { uv_close((uv_handle_t *)client, NULL); } } void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); uv_signal_stop(handle); uv_stop(global.loop); } static int32_t udfdParseArgs(int32_t argc, char *argv[]) { for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { if (strlen(argv[++i]) >= PATH_MAX) { printf("config file path overflow"); return -1; } tstrncpy(configDir, argv[i], PATH_MAX); } else { printf("'-c' requires a parameter, default is %s\n", configDir); return -1; } } else if (strcmp(argv[i], "-V") == 0) { global.printVersion = true; } else { } } return 0; } static void udfdPrintVersion() { #ifdef TD_ENTERPRISE char *releaseName = "enterprise"; #else char *releaseName = "community"; #endif printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version); printf("gitinfo: %s\n", gitinfo); printf("buildInfo: %s\n", buildinfo); } static int32_t udfdInitLog() { char logName[12] = {0}; snprintf(logName, sizeof(logName), "%slog", "udfd"); return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0); } void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->base = taosMemoryMalloc(suggested_size); buf->len = suggested_size; } void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { if (nread < 0) { fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); taosMemoryFree(buf->base); uv_close((uv_handle_t *)q, NULL); uv_stop(global.loop); return; } fnError("udfd ctrl pipe read %zu bytes", nread); taosMemoryFree(buf->base); } static int32_t removeListeningPipe() { uv_fs_t req; int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); uv_fs_req_cleanup(&req); return err; } static int32_t udfdUvInit() { uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop) { uv_loop_init(loop); } else { return -1; } global.loop = loop; if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_open(&global.ctrlPipe, 0); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); } getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName)); removeListeningPipe(); uv_pipe_init(global.loop, &global.listeningPipe, 0); uv_signal_init(global.loop, &global.intrSignal); uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); int r; fnInfo("bind to pipe %s", global.listenPipeName); if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) { fnError("Bind error %s", uv_err_name(r)); removeListeningPipe(); return -2; } if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) { fnError("Listen error %s", uv_err_name(r)); removeListeningPipe(); return -3; } return 0; } static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) { if (!uv_is_closing(handle)) { uv_close(handle, NULL); } } static int32_t udfdRun() { uv_mutex_init(&global.scriptPluginsMutex); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); fnInfo("start udfd event loop"); uv_run(global.loop, UV_RUN_DEFAULT); fnInfo("udfd event loop stopped."); uv_loop_close(global.loop); uv_walk(global.loop, udfdCloseWalkCb, NULL); uv_run(global.loop, UV_RUN_DEFAULT); uv_loop_close(global.loop); return 0; } int32_t udfdInitResidentFuncs() { if (strlen(tsUdfdResFuncs) == 0) { return TSDB_CODE_SUCCESS; } global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN); char *pSave = tsUdfdResFuncs; char *token; while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { char func[TSDB_FUNC_NAME_LEN + 1] = {0}; strncpy(func, token, TSDB_FUNC_NAME_LEN); fnInfo("udfd add resident function %s", func); taosArrayPush(global.residentFuncs, func); } return TSDB_CODE_SUCCESS; } int32_t udfdDeinitResidentFuncs() { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { char *funcName = taosArrayGet(global.residentFuncs, i); SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { SUdf *udf = *udfInHash; int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); taosHashRemove(global.udfsHash, funcName, strlen(funcName)); taosMemoryFree(udf); } } taosArrayDestroy(global.residentFuncs); return TSDB_CODE_SUCCESS; } int32_t udfdCleanup() { uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); return 0; } int32_t udfdCreateUdfSourceDir() { snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir); int32_t code = taosMkDir(global.udfDataDir); if (code != TSDB_CODE_SUCCESS) { snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir); code = taosMkDir(global.udfDataDir); } fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code)); return code; } int32_t udfdDestroyUdfSourceDir() { fnInfo("destory udf source directory %s", global.udfDataDir); taosRemoveDir(global.udfDataDir); return 0; } int main(int argc, char *argv[]) { if (!taosCheckSystemIsLittleEnd()) { printf("failed to start since on non-little-end machines\n"); return -1; } if (udfdParseArgs(argc, argv) != 0) { printf("failed to start since parse args error\n"); return -1; } if (global.printVersion) { udfdPrintVersion(); return 0; } if (udfdInitLog() != 0) { // ignore create log failed, because this error no matter printf("failed to start since init log error\n"); } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); return -2; } initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); if (udfdOpenClientRpc() != 0) { fnError("open rpc connection to mnode failed"); return -3; } if (udfdCreateUdfSourceDir() != 0) { fnError("create udf source directory failed"); return -4; } if (udfdUvInit() != 0) { fnError("uv init failure"); return -5; } udfdInitResidentFuncs(); udfdRun(); removeListeningPipe(); udfdDestroyUdfSourceDir(); udfdCloseClientRpc(); udfdDeinitResidentFuncs(); udfdDeinitScriptPlugins(); udfdCleanup(); return 0; }