udfd.c 32.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
dengyihao's avatar
dengyihao 已提交
15 16

// clang-format off
17 18
#include "uv.h"
#include "os.h"
S
slzhou 已提交
19 20
#include "fnLog.h"
#include "thash.h"
S
shenglian zhou 已提交
21

22 23 24
#include "tudf.h"
#include "tudfInt.h"

25
#include "tdatablock.h"
26 27 28 29
#include "tdataformat.h"
#include "tglobal.h"
#include "tmsg.h"
#include "trpc.h"
30
#include "tmisce.h"
31
// clang-format on
32

S
slzhou 已提交
33
typedef struct SUdfdContext {
34
  uv_loop_t  *loop;
S
slzhou 已提交
35
  uv_pipe_t   ctrlPipe;
36
  uv_signal_t intrSignal;
37
  char        listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
S
slzhou 已提交
38
  uv_pipe_t   listeningPipe;
S
slzhou 已提交
39

40
  void      *clientRpc;
41
  SCorEpSet  mgmtEp;
S
slzhou 已提交
42
  uv_mutex_t udfsMutex;
43
  SHashObj  *udfsHash;
S
slzhou 已提交
44

45
  SArray *residentFuncs;
46

S
slzhou 已提交
47 48 49 50
  bool printVersion;
} SUdfdContext;

SUdfdContext global;
51

52 53 54
struct SUdfdUvConn;
struct SUvUdfWork;

55
typedef struct SUdfdUvConn {
S
slzhou 已提交
56
  uv_stream_t *client;
57
  char        *inputBuf;
S
slzhou 已提交
58 59 60
  int32_t      inputLen;
  int32_t      inputCap;
  int32_t      inputTotal;
61 62

  struct SUvUdfWork *pWorkList;  // head of work list
63 64 65
} SUdfdUvConn;

typedef struct SUvUdfWork {
66
  SUdfdUvConn *conn;
S
slzhou 已提交
67 68
  uv_buf_t     input;
  uv_buf_t     output;
69 70

  struct SUvUdfWork *pWorkNext;
71 72
} SUvUdfWork;

S
slzhou 已提交
73
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
74

S
slzhou 已提交
75
typedef struct SUdf {
S
slzhou 已提交
76 77
  int32_t    refCount;
  EUdfState  state;
S
slzhou 已提交
78
  uv_mutex_t lock;
S
slzhou 已提交
79
  uv_cond_t  condReady;
80
  bool       resident;
S
slzhou 已提交
81

S
slzhou 已提交
82
  char    name[TSDB_FUNC_NAME_LEN + 1];
dengyihao's avatar
dengyihao 已提交
83 84 85
  int8_t  funcType;
  int8_t  scriptType;
  int8_t  outputType;
S
slzhou 已提交
86 87 88
  int32_t outputLen;
  int32_t bufSize;

dengyihao's avatar
dengyihao 已提交
89
  char path[PATH_MAX];
S
slzhou 已提交
90

dengyihao's avatar
dengyihao 已提交
91
  uv_lib_t lib;
S
shenglian zhou 已提交
92

dengyihao's avatar
dengyihao 已提交
93
  TUdfScalarProcFunc scalarProcFunc;
94

dengyihao's avatar
dengyihao 已提交
95 96 97
  TUdfAggStartFunc   aggStartFunc;
  TUdfAggProcessFunc aggProcFunc;
  TUdfAggFinishFunc  aggFinishFunc;
S
slzhou 已提交
98
  TUdfAggMergeFunc   aggMergeFunc;
99

dengyihao's avatar
dengyihao 已提交
100 101
  TUdfInitFunc    initFunc;
  TUdfDestroyFunc destroyFunc;
102 103
} SUdf;

S
slzhou 已提交
104
// TODO: add private udf structure.
105
typedef struct SUdfcFuncHandle {
S
slzhou 已提交
106
  SUdf *udf;
107
} SUdfcFuncHandle;
108

109 110 111 112 113 114 115
typedef enum EUdfdRpcReqRspType {
  UDFD_RPC_MNODE_CONNECT = 0,
  UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;

typedef struct SUdfdRpcSendRecvInfo {
  EUdfdRpcReqRspType rpcType;
dengyihao's avatar
dengyihao 已提交
116
  int32_t            code;
117
  void              *param;
dengyihao's avatar
dengyihao 已提交
118
  uv_sem_t           resultSem;
119 120
} SUdfdRpcSendRecvInfo;

dengyihao's avatar
dengyihao 已提交
121
static void    udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
S
shenglian zhou 已提交
122 123 124
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
static int32_t udfdConnectToMnode();
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
dengyihao's avatar
dengyihao 已提交
125
static bool    udfdRpcRfp(int32_t code, tmsg_t msgType);
dengyihao's avatar
dengyihao 已提交
126
static int     initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
S
shenglian zhou 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
static int32_t udfdOpenClientRpc();
static int32_t udfdCloseClientRpc();

static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessRequest(uv_work_t *req);
static void udfdOnWrite(uv_write_t *req, int status);
static void udfdSendResponse(uv_work_t *work, int status);
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
static void udfdHandleRequest(SUdfdUvConn *conn);
static void udfdPipeCloseCb(uv_handle_t *pipe);
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static void udfdOnNewConnection(uv_stream_t *server, int status);

dengyihao's avatar
dengyihao 已提交
144
static void    udfdIntrSignalHandler(uv_signal_t *handle, int signum);
S
shenglian zhou 已提交
145 146
static int32_t removeListeningPipe();

dengyihao's avatar
dengyihao 已提交
147
static void    udfdPrintVersion();
S
shenglian zhou 已提交
148 149 150
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
static int32_t udfdInitLog();

dengyihao's avatar
dengyihao 已提交
151 152
static void    udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void    udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
S
shenglian zhou 已提交
153
static int32_t udfdUvInit();
dengyihao's avatar
dengyihao 已提交
154
static void    udfdCloseWalkCb(uv_handle_t *handle, void *arg);
S
shenglian zhou 已提交
155
static int32_t udfdRun();
dengyihao's avatar
dengyihao 已提交
156
static void    udfdConnectMnodeThreadFunc(void *args);
S
shenglian zhou 已提交
157

S
shenglian zhou 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
void udfdProcessRequest(uv_work_t *req) {
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
  SUdfRequest request = {0};
  decodeUdfRequest(uvUdf->input.base, &request);

  switch (request.type) {
    case UDF_TASK_SETUP: {
      udfdProcessSetupRequest(uvUdf, &request);
      break;
    }

    case UDF_TASK_CALL: {
      udfdProcessCallRequest(uvUdf, &request);
      break;
    }
    case UDF_TASK_TEARDOWN: {
      udfdProcessTeardownRequest(uvUdf, &request);
      break;
    }
    default: {
      break;
    }
  }
}

void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
  // TODO: tracable id from client. connect, setup, call, teardown
  fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
  SUdfSetupRequest *setup = &request->setup;
  int32_t           code = TSDB_CODE_SUCCESS;
188
  SUdf             *udf = NULL;
S
shenglian zhou 已提交
189 190 191 192 193 194 195 196 197 198 199 200
  uv_mutex_lock(&global.udfsMutex);
  SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
  if (udfInHash) {
    ++(*udfInHash)->refCount;
    udf = *udfInHash;
    uv_mutex_unlock(&global.udfsMutex);
  } else {
    SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
    udfNew->refCount = 1;
    udfNew->state = UDF_STATE_INIT;
    uv_mutex_init(&udfNew->lock);
    uv_cond_init(&udfNew->condReady);
S
slzhou 已提交
201

S
shenglian zhou 已提交
202
    udf = udfNew;
203
    SUdf **pUdf = &udf;
S
slzhou 已提交
204
    taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES);
S
shenglian zhou 已提交
205 206 207 208 209 210 211 212 213 214
    uv_mutex_unlock(&global.udfsMutex);
  }

  uv_mutex_lock(&udf->lock);
  if (udf->state == UDF_STATE_INIT) {
    udf->state = UDF_STATE_LOADING;
    code = udfdLoadUdf(setup->udfName, udf);
    if (udf->initFunc) {
      udf->initFunc();
    }
S
slzhou 已提交
215
    udf->resident = false;
216
    for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
217
      char *funcName = taosArrayGet(global.residentFuncs, i);
S
slzhou 已提交
218 219 220 221 222
      if (strcmp(setup->udfName, funcName) == 0) {
        udf->resident = true;
        break;
      }
    }
S
shenglian zhou 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
    udf->state = UDF_STATE_READY;
    uv_cond_broadcast(&udf->condReady);
    uv_mutex_unlock(&udf->lock);
  } else {
    while (udf->state != UDF_STATE_READY) {
      uv_cond_wait(&udf->condReady, &udf->lock);
    }
    uv_mutex_unlock(&udf->lock);
  }
  SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
  handle->udf = udf;

  SUdfResponse rsp;
  rsp.seqNum = request->seqNum;
  rsp.type = request->type;
  rsp.code = code;
  rsp.setupRsp.udfHandle = (int64_t)(handle);
  rsp.setupRsp.outputType = udf->outputType;
  rsp.setupRsp.outputLen = udf->outputLen;
  rsp.setupRsp.bufSize = udf->bufSize;

  int32_t len = encodeUdfResponse(NULL, &rsp);
  rsp.msgLen = len;
  void *bufBegin = taosMemoryMalloc(len);
  void *buf = bufBegin;
  encodeUdfResponse(&buf, &rsp);

  uvUdf->output = uv_buf_init(bufBegin, len);

  taosMemoryFree(uvUdf->input.base);
  return;
}

void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
  SUdfCallRequest *call = &request->call;
258 259 260 261
  fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
          request->seqNum);
  SUdfcFuncHandle  *handle = (SUdfcFuncHandle *)(call->udfHandle);
  SUdf             *udf = handle->udf;
S
shenglian zhou 已提交
262
  SUdfResponse      response = {0};
263
  SUdfResponse     *rsp = &response;
S
shenglian zhou 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
  SUdfCallResponse *subRsp = &rsp->callRsp;

  int32_t code = TSDB_CODE_SUCCESS;
  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC: {
      SUdfColumn output = {0};

      SUdfDataBlock input = {0};
      convertDataBlockToUdfDataBlock(&call->block, &input);
      code = udf->scalarProcFunc(&input, &output);
      freeUdfDataDataBlock(&input);
      convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
      freeUdfColumn(&output);
      break;
    }
    case TSDB_UDF_CALL_AGG_INIT: {
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
      udf->aggStartFunc(&outBuf);
      subRsp->resultBuf = outBuf;
      break;
    }
    case TSDB_UDF_CALL_AGG_PROC: {
      SUdfDataBlock input = {0};
      convertDataBlockToUdfDataBlock(&call->block, &input);
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
      code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
      freeUdfInterBuf(&call->interBuf);
      freeUdfDataDataBlock(&input);
      subRsp->resultBuf = outBuf;

      break;
    }
S
slzhou 已提交
296 297 298 299 300 301 302 303 304
    case TSDB_UDF_CALL_AGG_MERGE: {
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
      code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf);
      freeUdfInterBuf(&call->interBuf);
      freeUdfInterBuf(&call->interBuf2);
      subRsp->resultBuf = outBuf;

      break;
    }
S
shenglian zhou 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    case TSDB_UDF_CALL_AGG_FIN: {
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
      code = udf->aggFinishFunc(&call->interBuf, &outBuf);
      freeUdfInterBuf(&call->interBuf);
      subRsp->resultBuf = outBuf;
      break;
    }
    default:
      break;
  }

  rsp->seqNum = request->seqNum;
  rsp->type = request->type;
  rsp->code = code;
  subRsp->callType = call->callType;

  int32_t len = encodeUdfResponse(NULL, rsp);
  rsp->msgLen = len;
  void *bufBegin = taosMemoryMalloc(len);
  void *buf = bufBegin;
  encodeUdfResponse(&buf, rsp);
  uvUdf->output = uv_buf_init(bufBegin, len);

  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC: {
330 331
      blockDataFreeRes(&call->block);
      blockDataFreeRes(&subRsp->resultData);
S
shenglian zhou 已提交
332 333 334 335 336 337 338
      break;
    }
    case TSDB_UDF_CALL_AGG_INIT: {
      freeUdfInterBuf(&subRsp->resultBuf);
      break;
    }
    case TSDB_UDF_CALL_AGG_PROC: {
339
      blockDataFreeRes(&call->block);
S
shenglian zhou 已提交
340 341 342
      freeUdfInterBuf(&subRsp->resultBuf);
      break;
    }
S
slzhou 已提交
343 344 345 346
    case TSDB_UDF_CALL_AGG_MERGE: {
      freeUdfInterBuf(&subRsp->resultBuf);
      break;
    }
S
shenglian zhou 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
    case TSDB_UDF_CALL_AGG_FIN: {
      freeUdfInterBuf(&subRsp->resultBuf);
      break;
    }
    default:
      break;
  }

  taosMemoryFree(uvUdf->input.base);
  return;
}

void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
  SUdfTeardownRequest *teardown = &request->teardown;
  fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
  SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
363
  SUdf            *udf = handle->udf;
S
shenglian zhou 已提交
364 365 366 367 368
  bool             unloadUdf = false;
  int32_t          code = TSDB_CODE_SUCCESS;

  uv_mutex_lock(&global.udfsMutex);
  udf->refCount--;
S
slzhou 已提交
369
  if (udf->refCount == 0 && !udf->resident) {
S
shenglian zhou 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    unloadUdf = true;
    taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
  }
  uv_mutex_unlock(&global.udfsMutex);
  if (unloadUdf) {
    uv_cond_destroy(&udf->condReady);
    uv_mutex_destroy(&udf->lock);
    if (udf->destroyFunc) {
      (udf->destroyFunc)();
    }
    uv_dlclose(&udf->lib);
    taosMemoryFree(udf);
  }
  taosMemoryFree(handle);

S
slzhou 已提交
385
  SUdfResponse  response = {0};
S
shenglian zhou 已提交
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
  SUdfResponse *rsp = &response;
  rsp->seqNum = request->seqNum;
  rsp->type = request->type;
  rsp->code = code;
  int32_t len = encodeUdfResponse(NULL, rsp);
  rsp->msgLen = len;
  void *bufBegin = taosMemoryMalloc(len);
  void *buf = bufBegin;
  encodeUdfResponse(&buf, rsp);
  uvUdf->output = uv_buf_init(bufBegin, len);

  taosMemoryFree(uvUdf->input.base);
  return;
}

401
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
402
  SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
403
  ASSERT(pMsg->info.ahandle != NULL);
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419

  if (pEpSet) {
    if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
      updateEpSet_s(&global.mgmtEp, pEpSet);
    }
  }

  if (pMsg->code != TSDB_CODE_SUCCESS) {
    fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
    msgInfo->code = pMsg->code;
    goto _return;
  }

  if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
    SConnectRsp connectRsp = {0};
    tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
420

dengyihao's avatar
dengyihao 已提交
421 422 423 424 425 426
    int32_t now = taosGetTimestampSec();
    int32_t delta = abs(now - connectRsp.svrTimestamp);
    if (delta > 900) {
      msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
      goto _return;
    }
427

428
    if (connectRsp.epSet.numOfEps == 0) {
S
Shengliang Guan 已提交
429
      msgInfo->code = TSDB_CODE_APP_ERROR;
430 431 432 433 434 435 436 437 438 439
      goto _return;
    }

    if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
      updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
    }
    msgInfo->code = 0;
  } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
    SRetrieveFuncRsp retrieveRsp = {0};
    tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
S
slzhou 已提交
440 441 442
    if (retrieveRsp.pFuncInfos == NULL) {
      goto _return;
    }
443
    SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
444
    SUdf      *udf = msgInfo->param;
445 446
    udf->funcType = pFuncInfo->funcType;
    udf->scriptType = pFuncInfo->scriptType;
447
    udf->outputType = pFuncInfo->outputType;
448 449 450
    udf->outputLen = pFuncInfo->outputLen;
    udf->bufSize = pFuncInfo->bufSize;

wafwerar's avatar
wafwerar 已提交
451 452 453 454 455 456 457
    if (!osTempSpaceAvailable()) {
      terrno = TSDB_CODE_NO_AVAIL_DISK;
      msgInfo->code = terrno;
      fnError("udfd create shared library failed since %s", terrstr(terrno));
      goto _return;
    }

458
    char path[PATH_MAX] = {0};
dengyihao's avatar
dengyihao 已提交
459
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
460
    snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name);
dengyihao's avatar
dengyihao 已提交
461
#else
wafwerar's avatar
wafwerar 已提交
462
    snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
dengyihao's avatar
dengyihao 已提交
463
#endif
dengyihao's avatar
dengyihao 已提交
464 465
    TdFilePtr file =
        taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
wafwerar's avatar
wafwerar 已提交
466 467 468
    if (file == NULL) {
      fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
      msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
wafwerar's avatar
wafwerar 已提交
469
      goto _return;
wafwerar's avatar
wafwerar 已提交
470
    }
S
shenglian zhou 已提交
471 472 473 474
    int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
    if (count != pFuncInfo->codeSize) {
      fnError("udfd write udf shared library failed");
      msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
wafwerar's avatar
wafwerar 已提交
475
      goto _return;
S
shenglian zhou 已提交
476
    }
477
    taosCloseFile(&file);
478
    strncpy(udf->path, path, PATH_MAX);
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
    tFreeSFuncInfo(pFuncInfo);
    taosArrayDestroy(retrieveRsp.pFuncInfos);
    msgInfo->code = 0;
  }

_return:
  rpcFreeCont(pMsg->pCont);
  uv_sem_post(&msgInfo->resultSem);
  return;
}

int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
  SRetrieveFuncReq retrieveReq = {0};
  retrieveReq.numOfFuncs = 1;
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
  taosArrayPush(retrieveReq.pFuncNames, udfName);

  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
497
  void   *pReq = rpcMallocCont(contLen);
498 499 500
  tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
  taosArrayDestroy(retrieveReq.pFuncNames);

dengyihao's avatar
dengyihao 已提交
501
  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
502 503 504 505 506 507 508 509
  msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
  msgInfo->param = udf;
  uv_sem_init(&msgInfo->resultSem, 0);

  SRpcMsg rpcMsg = {0};
  rpcMsg.pCont = pReq;
  rpcMsg.contLen = contLen;
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
510
  rpcMsg.info.ahandle = msgInfo;
511 512 513 514 515 516 517 518 519 520 521 522
  rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);

  uv_sem_wait(&msgInfo->resultSem);
  uv_sem_destroy(&msgInfo->resultSem);
  int32_t code = msgInfo->code;
  taosMemoryFree(msgInfo);
  return code;
}

int32_t udfdConnectToMnode() {
  SConnectReq connReq = {0};
  connReq.connType = CONN_TYPE__UDFD;
dengyihao's avatar
dengyihao 已提交
523
  tstrncpy(connReq.app, "udfd", sizeof(connReq.app));
524 525 526 527
  tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
  char pass[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
  tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
528 529
  connReq.pid = taosGetPId();
  connReq.startTime = taosGetTimestampMs();
530 531

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
532
  void   *pReq = rpcMallocCont(contLen);
533 534 535 536 537 538 539 540 541 542
  tSerializeSConnectReq(pReq, contLen, &connReq);

  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
  msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
  uv_sem_init(&msgInfo->resultSem, 0);

  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TDMT_MND_CONNECT;
  rpcMsg.pCont = pReq;
  rpcMsg.contLen = contLen;
543
  rpcMsg.info.ahandle = msgInfo;
544 545 546 547 548 549 550 551
  rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);

  uv_sem_wait(&msgInfo->resultSem);
  int32_t code = msgInfo->code;
  uv_sem_destroy(&msgInfo->resultSem);
  taosMemoryFree(msgInfo);
  return code;
}
552

553
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
S
slzhou 已提交
554
  strncpy(udf->name, udfName, TSDB_FUNC_NAME_LEN);
S
slzhou 已提交
555
  int32_t err = 0;
556

S
slzhou 已提交
557 558 559 560 561
  err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
  if (err != 0) {
    fnError("can not retrieve udf from mnode. udf name %s", udfName);
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
  }
S
slzhou 已提交
562

S
slzhou 已提交
563
  err = uv_dlopen(udf->path, &udf->lib);
S
slzhou 已提交
564 565
  if (err != 0) {
    fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
566
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
S
slzhou 已提交
567
  }
568

dengyihao's avatar
dengyihao 已提交
569
  char  initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
570 571 572
  char *initSuffix = "_init";
  strcpy(initFuncName, udfName);
  strncat(initFuncName, initSuffix, strlen(initSuffix));
dengyihao's avatar
dengyihao 已提交
573
  uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc));
574

dengyihao's avatar
dengyihao 已提交
575
  char  destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
576 577 578
  char *destroySuffix = "_destroy";
  strcpy(destroyFuncName, udfName);
  strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
dengyihao's avatar
dengyihao 已提交
579
  uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc));
580

581 582 583 584 585 586 587 588 589 590
  if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
    strcpy(processFuncName, udfName);
    uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
  } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
    strcpy(processFuncName, udfName);
    uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
    char  startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
    char *startSuffix = "_start";
591
    strncpy(startFuncName, processFuncName, sizeof(startFuncName));
592 593 594 595
    strncat(startFuncName, startSuffix, strlen(startSuffix));
    uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
    char  finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
    char *finishSuffix = "_finish";
596
    strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
597
    strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
S
slzhou 已提交
598
    uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
599
    char  mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
S
slzhou 已提交
600
    char *mergeSuffix = "_merge";
601 602 603
    strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
    strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
    uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc));
604
  }
S
slzhou 已提交
605
  return 0;
S
slzhou 已提交
606
}
dengyihao's avatar
dengyihao 已提交
607
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
608 609 610
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
      code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
      code == TSDB_CODE_APP_IS_STOPPING) {
611 612
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
        msgType == TDMT_SCH_MERGE_FETCH) {
dengyihao's avatar
dengyihao 已提交
613
      return false;
614
    }
S
shenglian zhou 已提交
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
    return true;
  } else {
    return false;
  }
}

int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
  pEpSet->version = 0;

  // init mnode ip set
  SEpSet *mgmtEpSet = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

int32_t udfdOpenClientRpc() {
  SRpcInit rpcInit = {0};
  rpcInit.label = "UDFD";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = TSDB_DEFAULT_USER;
  rpcInit.parent = &global;
  rpcInit.rfp = udfdRpcRfp;
dengyihao's avatar
dengyihao 已提交
673
  rpcInit.compressSize = tsCompressMsgSize;
674

S
shenglian zhou 已提交
675 676 677 678 679 680 681 682 683
  global.clientRpc = rpcOpen(&rpcInit);
  if (global.clientRpc == NULL) {
    fnError("failed to init dnode rpc client");
    return -1;
  }
  return 0;
}

int32_t udfdCloseClientRpc() {
684
  fnInfo("udfd begin closing rpc");
S
shenglian zhou 已提交
685
  rpcClose(global.clientRpc);
686
  fnInfo("udfd finish closing rpc");
S
shenglian zhou 已提交
687 688 689
  return 0;
}

690
void udfdOnWrite(uv_write_t *req, int status) {
S
slzhou 已提交
691 692
  SUvUdfWork *work = (SUvUdfWork *)req->data;
  if (status < 0) {
S
slzhou 已提交
693
    fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
S
slzhou 已提交
694
  }
695 696 697 698 699 700 701 702 703 704 705
  // remove work from the connection work list
  if (work->conn != NULL) {
    SUvUdfWork **ppWork;
    for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
    }
    if (*ppWork == work) {
      *ppWork = work->pWorkNext;
    } else {
      fnError("work not in conn any more");
    }
  }
S
slzhou 已提交
706 707 708
  taosMemoryFree(work->output.base);
  taosMemoryFree(work);
  taosMemoryFree(req);
709 710 711
}

void udfdSendResponse(uv_work_t *work, int status) {
S
slzhou 已提交
712
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
713

714 715 716 717 718
  if (udfWork->conn != NULL) {
    uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
    write_req->data = udfWork;
    uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
  }
S
slzhou 已提交
719
  taosMemoryFree(work);
720 721 722
}

void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
S
slzhou 已提交
723 724 725 726 727 728 729 730 731 732 733
  SUdfdUvConn *ctx = handle->data;
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (ctx->inputCap == 0) {
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
    if (ctx->inputBuf) {
      ctx->inputLen = 0;
      ctx->inputCap = msgHeadSize;
      ctx->inputTotal = -1;

      buf->base = ctx->inputBuf;
      buf->len = ctx->inputCap;
734
    } else {
dengyihao's avatar
dengyihao 已提交
735
      fnError("udfd can not allocate enough memory") buf->base = NULL;
S
slzhou 已提交
736
      buf->len = 0;
737
    }
738
  } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
739 740
    buf->base = ctx->inputBuf + ctx->inputLen;
    buf->len = msgHeadSize - ctx->inputLen;
S
slzhou 已提交
741 742 743 744 745 746 747 748
  } else {
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
    if (inputBuf) {
      ctx->inputBuf = inputBuf;
      buf->base = ctx->inputBuf + ctx->inputLen;
      buf->len = ctx->inputCap - ctx->inputLen;
    } else {
dengyihao's avatar
dengyihao 已提交
749
      fnError("udfd can not allocate enough memory") buf->base = NULL;
S
slzhou 已提交
750 751 752
      buf->len = 0;
    }
  }
753 754 755
}

bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
S
slzhou 已提交
756 757 758 759 760 761 762 763
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
  }
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
    fnDebug("receive request complete. length %d", pipe->inputLen);
    return true;
  }
  return false;
764 765 766
}

void udfdHandleRequest(SUdfdUvConn *conn) {
767
  char *inputBuf = conn->inputBuf;
S
shenglian zhou 已提交
768 769
  int32_t inputLen = conn->inputLen;

770
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
S
slzhou 已提交
771
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
772 773 774
  udfWork->conn = conn;
  udfWork->pWorkNext = conn->pWorkList;
  conn->pWorkList = udfWork;
S
shenglian zhou 已提交
775
  udfWork->input = uv_buf_init(inputBuf, inputLen);
S
slzhou 已提交
776 777 778 779 780 781
  conn->inputBuf = NULL;
  conn->inputLen = 0;
  conn->inputCap = 0;
  conn->inputTotal = -1;
  work->data = udfWork;
  uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse);
782 783 784
}

void udfdPipeCloseCb(uv_handle_t *pipe) {
S
slzhou 已提交
785
  SUdfdUvConn *conn = pipe->data;
786 787 788 789 790 791
  SUvUdfWork* pWork = conn->pWorkList;
  while (pWork != NULL) {
    pWork->conn = NULL;
    pWork = pWork->pWorkNext;
  }

S
slzhou 已提交
792 793 794
  taosMemoryFree(conn->client);
  taosMemoryFree(conn->inputBuf);
  taosMemoryFree(conn);
795 796 797
}

void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
798
  fnDebug("udfd read %zd bytes from client", nread);
S
slzhou 已提交
799
  if (nread == 0) return;
800

S
slzhou 已提交
801
  SUdfdUvConn *conn = client->data;
802

S
slzhou 已提交
803 804 805 806 807 808
  if (nread > 0) {
    conn->inputLen += nread;
    if (isUdfdUvMsgComplete(conn)) {
      udfdHandleRequest(conn);
    } else {
      // log error or continue;
809
    }
S
slzhou 已提交
810 811
    return;
  }
812

S
slzhou 已提交
813 814
  if (nread < 0) {
    if (nread == UV_EOF) {
S
slzhou 已提交
815
      fnInfo("udfd pipe read EOF");
S
slzhou 已提交
816
    } else {
817
      fnError("Receive error %s", uv_err_name(nread));
818
    }
S
slzhou 已提交
819 820
    udfdUvHandleError(conn);
  }
821 822 823
}

void udfdOnNewConnection(uv_stream_t *server, int status) {
S
slzhou 已提交
824
  if (status < 0) {
S
slzhou 已提交
825
    fnError("udfd new connection error. code: %s", uv_strerror(status));
S
slzhou 已提交
826 827
    return;
  }
828

S
slzhou 已提交
829 830 831 832
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
  uv_pipe_init(global.loop, client, 0);
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
833
    ctx->pWorkList = NULL;
S
slzhou 已提交
834 835 836 837 838 839 840 841 842 843
    ctx->client = (uv_stream_t *)client;
    ctx->inputBuf = 0;
    ctx->inputLen = 0;
    ctx->inputCap = 0;
    client->data = ctx;
    ctx->client = (uv_stream_t *)client;
    uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
  } else {
    uv_close((uv_handle_t *)client, NULL);
  }
844 845
}

846 847
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
  fnInfo("udfd signal received: %d\n", signum);
S
slzhou 已提交
848
  uv_fs_t req;
849 850 851
  uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
  uv_signal_stop(handle);
  uv_stop(global.loop);
852 853
}

S
slzhou 已提交
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
  for (int32_t i = 1; i < argc; ++i) {
    if (strcmp(argv[i], "-c") == 0) {
      if (i < argc - 1) {
        if (strlen(argv[++i]) >= PATH_MAX) {
          printf("config file path overflow");
          return -1;
        }
        tstrncpy(configDir, argv[i], PATH_MAX);
      } else {
        printf("'-c' requires a parameter, default is %s\n", configDir);
        return -1;
      }
    } else if (strcmp(argv[i], "-V") == 0) {
      global.printVersion = true;
    } else {
    }
  }
872 873 874 875

  return 0;
}

S
shenglian zhou 已提交
876 877 878 879 880 881 882 883 884 885 886
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
  char *releaseName = "enterprise";
#else
  char *releaseName = "community";
#endif
  printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
  printf("gitinfo: %s\n", gitinfo);
  printf("buildInfo: %s\n", buildinfo);
}

S
slzhou 已提交
887 888 889
static int32_t udfdInitLog() {
  char logName[12] = {0};
  snprintf(logName, sizeof(logName), "%slog", "udfd");
wafwerar's avatar
wafwerar 已提交
890
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
S
slzhou 已提交
891
}
892

893 894 895 896 897 898 899 900
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
  buf->base = taosMemoryMalloc(suggested_size);
  buf->len = suggested_size;
}

void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
  if (nread < 0) {
    fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
901
    taosMemoryFree(buf->base);
S
slzhou 已提交
902
    uv_close((uv_handle_t *)q, NULL);
903 904 905 906 907 908 909 910 911
    uv_stop(global.loop);
    return;
  }
  fnError("udfd ctrl pipe read %zu bytes", nread);
  taosMemoryFree(buf->base);
}

static int32_t removeListeningPipe() {
  uv_fs_t req;
S
slzhou 已提交
912
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
913 914 915 916
  uv_fs_req_cleanup(&req);
  return err;
}

S
slzhou 已提交
917
static int32_t udfdUvInit() {
S
slzhou 已提交
918
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
S
slzhou 已提交
919 920
  if (loop) {
    uv_loop_init(loop);
S
slzhou 已提交
921 922
  } else {
    return -1;
S
slzhou 已提交
923 924
  }
  global.loop = loop;
925

926
  if (tsStartUdfd) {  // udfd is started by taosd, which shall exit when taosd exit
927 928 929 930
    uv_pipe_init(global.loop, &global.ctrlPipe, 1);
    uv_pipe_open(&global.ctrlPipe, 0);
    uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
  }
931
  getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
S
slzhou 已提交
932

933
  removeListeningPipe();
S
slzhou 已提交
934

935
  uv_pipe_init(global.loop, &global.listeningPipe, 0);
S
slzhou 已提交
936

937 938
  uv_signal_init(global.loop, &global.intrSignal);
  uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT);
S
slzhou 已提交
939 940 941

  int r;
  fnInfo("bind to pipe %s", global.listenPipeName);
942
  if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
S
slzhou 已提交
943
    fnError("Bind error %s", uv_err_name(r));
944
    removeListeningPipe();
S
slzhou 已提交
945
    return -2;
S
slzhou 已提交
946
  }
947
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
S
slzhou 已提交
948
    fnError("Listen error %s", uv_err_name(r));
949
    removeListeningPipe();
S
slzhou 已提交
950
    return -3;
S
slzhou 已提交
951
  }
952 953
  return 0;
}
954

dengyihao's avatar
dengyihao 已提交
955
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
S
slzhou 已提交
956 957 958 959 960
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}

S
slzhou 已提交
961 962 963
static int32_t udfdRun() {
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  uv_mutex_init(&global.udfsMutex);
964

S
slzhou 已提交
965 966 967 968 969 970 971 972 973 974
  fnInfo("start udfd event loop");
  uv_run(global.loop, UV_RUN_DEFAULT);
  fnInfo("udfd event loop stopped.");

  uv_loop_close(global.loop);

  uv_walk(global.loop, udfdCloseWalkCb, NULL);
  uv_run(global.loop, UV_RUN_DEFAULT);
  uv_loop_close(global.loop);

S
slzhou 已提交
975
  return 0;
S
slzhou 已提交
976
}
977

dengyihao's avatar
dengyihao 已提交
978
void udfdConnectMnodeThreadFunc(void *args) {
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994
  int32_t retryMnodeTimes = 0;
  int32_t code = 0;
  while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) {
    uv_sleep(100 * (1 << retryMnodeTimes));
    code = udfdConnectToMnode();
    if (code == 0) {
      break;
    }
    fnError("udfd can not connect to mnode, code: %s. retry", tstrerror(code));
  }

  if (code != 0) {
    fnError("udfd can not connect to mnode");
  }
}

995
int32_t udfdInitResidentFuncs() {
S
slzhou 已提交
996 997 998 999
  if (strlen(tsUdfdResFuncs) == 0) {
    return TSDB_CODE_SUCCESS;
  }

1000
  global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
1001 1002
  char *pSave = tsUdfdResFuncs;
  char *token;
S
slzhou 已提交
1003
  while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
1004
    char func[TSDB_FUNC_NAME_LEN + 1] = {0};
S
shenglian zhou 已提交
1005
    strncpy(func, token, TSDB_FUNC_NAME_LEN);
1006
    fnInfo("udfd add resident function %s", func);
S
slzhou 已提交
1007 1008 1009
    taosArrayPush(global.residentFuncs, func);
  }

1010 1011 1012 1013
  return TSDB_CODE_SUCCESS;
}

int32_t udfdDeinitResidentFuncs() {
1014
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
1015 1016
    char  *funcName = taosArrayGet(global.residentFuncs, i);
    SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
S
slzhou 已提交
1017
    if (udfInHash) {
1018
      SUdf *udf = *udfInHash;
S
slzhou 已提交
1019 1020 1021 1022 1023
      if (udf->destroyFunc) {
        (udf->destroyFunc)();
      }
      uv_dlclose(&udf->lib);
      taosMemoryFree(udf);
S
slzhou 已提交
1024
      taosHashRemove(global.udfsHash, funcName, strlen(funcName));
S
slzhou 已提交
1025 1026
    }
  }
1027
  taosArrayDestroy(global.residentFuncs);
1028 1029 1030
  return TSDB_CODE_SUCCESS;
}

1031 1032 1033 1034 1035 1036
int32_t udfdCleanup() {
  uv_mutex_destroy(&global.udfsMutex);
  taosHashCleanup(global.udfsHash);
  return 0;
}

S
slzhou 已提交
1037
int main(int argc, char *argv[]) {
D
dapan1121 已提交
1038 1039
  if (!taosCheckSystemIsLittleEnd()) {
    printf("failed to start since on non-little-end machines\n");
S
slzhou 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
    return -1;
  }

  if (udfdParseArgs(argc, argv) != 0) {
    printf("failed to start since parse args error\n");
    return -1;
  }

  if (global.printVersion) {
    udfdPrintVersion();
    return 0;
  }

  if (udfdInitLog() != 0) {
A
Alex Duan 已提交
1054
    // ignore create log failed, because this error no matter
S
slzhou 已提交
1055 1056 1057
    printf("failed to start since init log error\n");
  }

wafwerar's avatar
wafwerar 已提交
1058
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
S
slzhou 已提交
1059
    fnError("failed to start since read config error");
1060
    return -2;
S
slzhou 已提交
1061 1062
  }

1063
  initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
1064 1065 1066 1067 1068
  if (udfdOpenClientRpc() != 0) {
    fnError("open rpc connection to mnode failure");
    return -3;
  }

S
slzhou 已提交
1069 1070 1071 1072 1073
  if (udfdUvInit() != 0) {
    fnError("uv init failure");
    return -5;
  }

1074 1075
  udfdInitResidentFuncs();

1076 1077 1078
  uv_thread_t mnodeConnectThread;
  uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL);

1079
  udfdRun();
dengyihao's avatar
dengyihao 已提交
1080

S
slzhou 已提交
1081 1082
  removeListeningPipe();
  udfdCloseClientRpc();
S
shenglian zhou 已提交
1083

1084
  udfdDeinitResidentFuncs();
1085
  udfdCleanup();
S
shenglian zhou 已提交
1086
  return 0;
1087
}