udfd.c 18.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
#include "os.h"
S
slzhou 已提交
17 18
#include "fnLog.h"
#include "thash.h"
S
shenglian zhou 已提交
19

20 21 22
#include "tudf.h"
#include "tudfInt.h"

23 24 25 26
#include "tdataformat.h"
#include "tglobal.h"
#include "tmsg.h"
#include "trpc.h"
27

S
slzhou 已提交
28
typedef struct SUdfdContext {
S
slzhou 已提交
29 30
  uv_loop_t  *loop;
  uv_pipe_t   ctrlPipe;
31
  uv_signal_t intrSignal;
S
slzhou 已提交
32 33 34
  char        listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
  uv_pipe_t   listeningPipe;
  void       *clientRpc;
S
slzhou 已提交
35 36

  uv_mutex_t udfsMutex;
S
slzhou 已提交
37
  SHashObj  *udfsHash;
S
slzhou 已提交
38 39 40 41 42

  bool printVersion;
} SUdfdContext;

SUdfdContext global;
43 44

typedef struct SUdfdUvConn {
S
slzhou 已提交
45 46 47 48 49
  uv_stream_t *client;
  char        *inputBuf;
  int32_t      inputLen;
  int32_t      inputCap;
  int32_t      inputTotal;
50 51 52
} SUdfdUvConn;

typedef struct SUvUdfWork {
S
slzhou 已提交
53 54 55
  uv_stream_t *client;
  uv_buf_t     input;
  uv_buf_t     output;
56 57
} SUvUdfWork;

S
slzhou 已提交
58
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
59

S
slzhou 已提交
60
typedef struct SUdf {
S
slzhou 已提交
61 62
  int32_t    refCount;
  EUdfState  state;
S
slzhou 已提交
63
  uv_mutex_t lock;
S
slzhou 已提交
64
  uv_cond_t  condReady;
S
slzhou 已提交
65 66 67

  char   name[16];
  int8_t type;
S
slzhou 已提交
68
  char   path[PATH_MAX];
S
slzhou 已提交
69 70 71 72

  uv_lib_t              lib;
  TUdfScalarProcFunc    scalarProcFunc;
  TUdfFreeUdfColumnFunc freeUdfColumn;
73 74
} SUdf;

S
slzhou 已提交
75 76
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: add private udf structure.
77
typedef struct SUdfcFuncHandle {
S
slzhou 已提交
78
  SUdf *udf;
79
} SUdfcFuncHandle;
80

S
slzhou 已提交
81
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf);
82

S
slzhou 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) {
  strcpy(udf->name, udfName);

  udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf);

  int err = uv_dlopen(udf->path, &udf->lib);
  if (err != 0) {
    fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
    // TODO set error
  }
  // TODO: find all the functions
  char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
  strcpy(normalFuncName, udfName);
  uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
  char  freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
  char *freeSuffix = "_free";
  strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
  strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
  uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
  return 0;
S
slzhou 已提交
103
}
104

S
slzhou 已提交
105 106 107 108 109 110 111
void udfdProcessRequest(uv_work_t *req) {
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
  SUdfRequest request = {0};
  decodeUdfRequest(uvUdf->input.base, &request);

  switch (request.type) {
    case UDF_TASK_SETUP: {
S
slzhou 已提交
112 113
      // TODO: tracable id from client. connect, setup, call, teardown
      fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName);
S
slzhou 已提交
114 115
      SUdfSetupRequest *setup = &request.setup;

S
slzhou 已提交
116
      SUdf *udf = NULL;
S
slzhou 已提交
117
      uv_mutex_lock(&global.udfsMutex);
S
slzhou 已提交
118
      SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN);
S
slzhou 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
      if (*udfInHash) {
        ++(*udfInHash)->refCount;
        udf = *udfInHash;
        uv_mutex_unlock(&global.udfsMutex);
      } else {
        SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
        udfNew->refCount = 1;
        udfNew->state = UDF_STATE_INIT;

        uv_mutex_init(&udfNew->lock);
        uv_cond_init(&udfNew->condReady);
        udf = udfNew;
        taosHashPut(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN, &udfNew, sizeof(&udfNew));
        uv_mutex_unlock(&global.udfsMutex);
      }

      uv_mutex_lock(&udf->lock);
      if (udf->state == UDF_STATE_INIT) {
        udf->state = UDF_STATE_LOADING;
S
slzhou 已提交
138
        udfdLoadUdf(setup->udfName, &setup->epSet, udf);
S
slzhou 已提交
139 140 141 142 143 144 145 146 147
        udf->state = UDF_STATE_READY;
        uv_cond_broadcast(&udf->condReady);
        uv_mutex_unlock(&udf->lock);
      } else {
        while (udf->state != UDF_STATE_READY) {
          uv_cond_wait(&udf->condReady, &udf->lock);
        }
        uv_mutex_unlock(&udf->lock);
      }
148
      SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
S
slzhou 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
      handle->udf = udf;
      // TODO: allocate private structure and call init function and set it to handle
      SUdfResponse rsp;
      rsp.seqNum = request.seqNum;
      rsp.type = request.type;
      rsp.code = 0;
      rsp.setupRsp.udfHandle = (int64_t)(handle);
      int32_t len = encodeUdfResponse(NULL, &rsp);
      rsp.msgLen = len;
      void *bufBegin = taosMemoryMalloc(len);
      void *buf = bufBegin;
      encodeUdfResponse(&buf, &rsp);

      uvUdf->output = uv_buf_init(bufBegin, len);

      taosMemoryFree(uvUdf->input.base);
      break;
166 167
    }

S
slzhou 已提交
168 169
    case UDF_TASK_CALL: {
      SUdfCallRequest *call = &request.call;
S
slzhou 已提交
170 171 172
      fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType,
              call->udfHandle);
      SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
S
slzhou 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
      SUdf            *udf = handle->udf;

      SUdfDataBlock input = {0};
      convertDataBlockToUdfDataBlock(&call->block, &input);
      SUdfColumn output = {0};
      // TODO: call different functions according to call type, for now just calar
      if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
        udf->scalarProcFunc(input, &output);
      }

      SUdfResponse  response = {0};
      SUdfResponse *rsp = &response;
      if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
        rsp->seqNum = request.seqNum;
        rsp->type = request.type;
        rsp->code = 0;
        SUdfCallResponse *subRsp = &rsp->callRsp;
        subRsp->callType = call->callType;
        convertUdfColumnToDataBlock(&output, &subRsp->resultData);
      }

      int32_t len = encodeUdfResponse(NULL, rsp);
      rsp->msgLen = len;
      void *bufBegin = taosMemoryMalloc(len);
      void *buf = bufBegin;
      encodeUdfResponse(&buf, rsp);
      uvUdf->output = uv_buf_init(bufBegin, len);

      // TODO: free udf column
      udf->freeUdfColumn(&output);

      taosMemoryFree(uvUdf->input.base);
      break;
    }
    case UDF_TASK_TEARDOWN: {
      SUdfTeardownRequest *teardown = &request.teardown;
S
slzhou 已提交
209 210 211 212
      fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle =
          (SUdfcFuncHandle *)(teardown->udfHandle);
      SUdf *udf = handle->udf;
      bool  unloadUdf = false;
S
slzhou 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
      uv_mutex_lock(&global.udfsMutex);
      udf->refCount--;
      if (udf->refCount == 0) {
        unloadUdf = true;
        taosHashRemove(global.udfsHash, udf->name, TSDB_FUNC_NAME_LEN);
      }
      uv_mutex_unlock(&global.udfsMutex);
      if (unloadUdf) {
        uv_cond_destroy(&udf->condReady);
        uv_mutex_destroy(&udf->lock);
        uv_dlclose(&udf->lib);
        taosMemoryFree(udf);
      }
      // TODO: call destroy and free udf private
      taosMemoryFree(handle);

      SUdfResponse  response;
      SUdfResponse *rsp = &response;
      rsp->seqNum = request.seqNum;
      rsp->type = request.type;
      rsp->code = 0;
      int32_t len = encodeUdfResponse(NULL, rsp);
      rsp->msgLen = len;
      void *bufBegin = taosMemoryMalloc(len);
      void *buf = bufBegin;
      encodeUdfResponse(&buf, rsp);
      uvUdf->output = uv_buf_init(bufBegin, len);

      taosMemoryFree(uvUdf->input.base);
      break;
    }
    default: {
      break;
    }
  }
248 249 250
}

void udfdOnWrite(uv_write_t *req, int status) {
S
slzhou 已提交
251 252
  SUvUdfWork *work = (SUvUdfWork *)req->data;
  if (status < 0) {
S
slzhou 已提交
253
    // TODO:log error and process it.
S
slzhou 已提交
254 255 256 257 258
  }
  fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status));
  taosMemoryFree(work->output.base);
  taosMemoryFree(work);
  taosMemoryFree(req);
259 260 261
}

void udfdSendResponse(uv_work_t *work, int status) {
S
slzhou 已提交
262
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
263

S
slzhou 已提交
264 265 266
  uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
  write_req->data = udfWork;
  uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
267

S
slzhou 已提交
268
  taosMemoryFree(work);
269 270 271
}

void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
S
slzhou 已提交
272 273 274 275 276 277 278 279 280 281 282
  SUdfdUvConn *ctx = handle->data;
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (ctx->inputCap == 0) {
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
    if (ctx->inputBuf) {
      ctx->inputLen = 0;
      ctx->inputCap = msgHeadSize;
      ctx->inputTotal = -1;

      buf->base = ctx->inputBuf;
      buf->len = ctx->inputCap;
283
    } else {
S
slzhou 已提交
284 285 286
      // TODO: log error
      buf->base = NULL;
      buf->len = 0;
287
    }
S
slzhou 已提交
288 289 290 291 292 293 294 295 296 297 298 299 300 301
  } else {
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
    if (inputBuf) {
      ctx->inputBuf = inputBuf;
      buf->base = ctx->inputBuf + ctx->inputLen;
      buf->len = ctx->inputCap - ctx->inputLen;
    } else {
      // TODO: log error
      buf->base = NULL;
      buf->len = 0;
    }
  }
  fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
302 303 304
}

bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
S
slzhou 已提交
305 306 307 308 309 310 311 312
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
  }
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
    fnDebug("receive request complete. length %d", pipe->inputLen);
    return true;
  }
  return false;
313 314 315
}

void udfdHandleRequest(SUdfdUvConn *conn) {
S
slzhou 已提交
316 317 318 319 320 321 322 323 324 325
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
  udfWork->client = conn->client;
  udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
  conn->inputBuf = NULL;
  conn->inputLen = 0;
  conn->inputCap = 0;
  conn->inputTotal = -1;
  work->data = udfWork;
  uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse);
326 327 328
}

void udfdPipeCloseCb(uv_handle_t *pipe) {
S
slzhou 已提交
329 330 331 332
  SUdfdUvConn *conn = pipe->data;
  taosMemoryFree(conn->client);
  taosMemoryFree(conn->inputBuf);
  taosMemoryFree(conn);
333 334
}

S
slzhou 已提交
335
void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
336 337

void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
S
slzhou 已提交
338 339
  fnDebug("udf read %zu bytes from client", nread);
  if (nread == 0) return;
340

S
slzhou 已提交
341
  SUdfdUvConn *conn = client->data;
342

S
slzhou 已提交
343 344 345 346 347 348
  if (nread > 0) {
    conn->inputLen += nread;
    if (isUdfdUvMsgComplete(conn)) {
      udfdHandleRequest(conn);
    } else {
      // log error or continue;
349
    }
S
slzhou 已提交
350 351
    return;
  }
352

S
slzhou 已提交
353 354 355 356 357
  if (nread < 0) {
    fnDebug("Receive error %s", uv_err_name(nread));
    if (nread == UV_EOF) {
      // TODO check more when close
    } else {
358
    }
S
slzhou 已提交
359 360
    udfdUvHandleError(conn);
  }
361 362 363
}

void udfdOnNewConnection(uv_stream_t *server, int status) {
S
slzhou 已提交
364 365 366 367 368
  fnDebug("new connection");
  if (status < 0) {
    // TODO
    return;
  }
369

S
slzhou 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
  uv_pipe_init(global.loop, client, 0);
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
    ctx->client = (uv_stream_t *)client;
    ctx->inputBuf = 0;
    ctx->inputLen = 0;
    ctx->inputCap = 0;
    client->data = ctx;
    ctx->client = (uv_stream_t *)client;
    uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
  } else {
    uv_close((uv_handle_t *)client, NULL);
  }
384 385
}

386 387
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
  fnInfo("udfd signal received: %d\n", signum);
S
slzhou 已提交
388
  uv_fs_t req;
389 390 391
  uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
  uv_signal_stop(handle);
  uv_stop(global.loop);
392 393
}

S
slzhou 已提交
394
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
395

S
slzhou 已提交
396
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) {
397 398 399
  SRetrieveFuncReq retrieveReq = {0};
  retrieveReq.numOfFuncs = 1;
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
S
slzhou 已提交
400
  taosArrayPush(retrieveReq.pFuncNames, udfName);
401 402

  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
S
slzhou 已提交
403
  void   *pReq = rpcMallocCont(contLen);
404 405 406 407 408 409 410 411 412 413 414 415 416
  tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
  taosArrayDestroy(retrieveReq.pFuncNames);

  SRpcMsg rpcMsg = {0};
  rpcMsg.pCont = pReq;
  rpcMsg.contLen = contLen;
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(clientRpc, pEpSet, &rpcMsg, &rpcRsp);
  SRetrieveFuncRsp retrieveRsp = {0};
  tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);

S
slzhou 已提交
417
  SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
418

S
slzhou 已提交
419 420 421 422 423 424 425
  char path[PATH_MAX] = {0};
  taosGetTmpfilePath("/tmp", "libudf", path);
  TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
  // TODO check for failure of flush to disk
  taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
  taosCloseFile(&file);
  strncpy(udf->path, path, strlen(path));
426 427 428 429 430 431
  taosArrayDestroy(retrieveRsp.pFuncInfos);

  rpcFreeCont(rpcRsp.pCont);
  return 0;
}

S
slzhou 已提交
432
int32_t udfdOpenClientRpc() {
433 434
  char *pass = "taosdata";
  char *user = "root";
S
slzhou 已提交
435 436
  char  secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
  taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
437
  SRpcInit rpcInit = {0};
S
slzhou 已提交
438
  rpcInit.label = (char *)"UDFD";
439 440 441 442 443
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = udfdProcessRpcRsp;
  rpcInit.sessions = 1024;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = 30 * 1000;
S
slzhou 已提交
444
  rpcInit.parent = &global;
445

S
slzhou 已提交
446 447 448
  rpcInit.user = (char *)user;
  rpcInit.ckey = (char *)"key";
  rpcInit.secret = (char *)secretEncrypt;
449 450
  rpcInit.spi = 1;

S
slzhou 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
  global.clientRpc = rpcOpen(&rpcInit);

  return 0;
}

int32_t udfdCloseClientRpc() {
  rpcClose(global.clientRpc);
  return 0;
}

static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
  char *releaseName = "enterprise";
#else
  char *releaseName = "community";
#endif
  printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
  printf("gitinfo: %s\n", gitinfo);
  printf("buildInfo: %s\n", buildinfo);
}

static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
  for (int32_t i = 1; i < argc; ++i) {
    if (strcmp(argv[i], "-c") == 0) {
      if (i < argc - 1) {
        if (strlen(argv[++i]) >= PATH_MAX) {
          printf("config file path overflow");
          return -1;
        }
        tstrncpy(configDir, argv[i], PATH_MAX);
      } else {
        printf("'-c' requires a parameter, default is %s\n", configDir);
        return -1;
      }
    } else if (strcmp(argv[i], "-V") == 0) {
      global.printVersion = true;
    } else {
    }
  }
490 491 492 493

  return 0;
}

S
slzhou 已提交
494 495 496 497 498
static int32_t udfdInitLog() {
  char logName[12] = {0};
  snprintf(logName, sizeof(logName), "%slog", "udfd");
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
}
499

500 501 502 503 504 505 506 507
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
  buf->base = taosMemoryMalloc(suggested_size);
  buf->len = suggested_size;
}

void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
  if (nread < 0) {
    fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
S
slzhou 已提交
508
    uv_close((uv_handle_t *)q, NULL);
509 510 511 512 513 514 515 516 517
    uv_stop(global.loop);
    return;
  }
  fnError("udfd ctrl pipe read %zu bytes", nread);
  taosMemoryFree(buf->base);
}

static int32_t removeListeningPipe() {
  uv_fs_t req;
S
slzhou 已提交
518
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
519 520 521 522
  uv_fs_req_cleanup(&req);
  return err;
}

S
slzhou 已提交
523
static int32_t udfdUvInit() {
S
slzhou 已提交
524
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
S
slzhou 已提交
525 526 527 528
  if (loop) {
    uv_loop_init(loop);
  }
  global.loop = loop;
529 530 531

  uv_pipe_init(global.loop, &global.ctrlPipe, 1);
  uv_pipe_open(&global.ctrlPipe, 0);
S
slzhou 已提交
532
  uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
533

S
slzhou 已提交
534 535
  char    dnodeId[8] = {0};
  size_t  dnodeIdSize;
536 537 538 539
  int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
  if (err != 0) {
    dnodeId[0] = '1';
  }
S
slzhou 已提交
540 541 542 543
  char listenPipeName[32] = {0};
  snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
  strcpy(global.listenPipeName, listenPipeName);

544
  removeListeningPipe();
S
slzhou 已提交
545

546
  uv_pipe_init(global.loop, &global.listeningPipe, 0);
S
slzhou 已提交
547

548 549
  uv_signal_init(global.loop, &global.intrSignal);
  uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT);
S
slzhou 已提交
550 551 552

  int r;
  fnInfo("bind to pipe %s", global.listenPipeName);
553
  if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) {
S
slzhou 已提交
554
    fnError("Bind error %s", uv_err_name(r));
555
    removeListeningPipe();
S
slzhou 已提交
556 557
    return -1;
  }
558
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
S
slzhou 已提交
559
    fnError("Listen error %s", uv_err_name(r));
560
    removeListeningPipe();
S
slzhou 已提交
561 562
    return -2;
  }
563 564
  return 0;
}
565

S
slzhou 已提交
566 567 568
static int32_t udfdRun() {
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  uv_mutex_init(&global.udfsMutex);
569

S
slzhou 已提交
570
  // TOOD: client rpc to fetch udf function info from mnode
S
slzhou 已提交
571 572 573 574
  if (udfdOpenClientRpc() != 0) {
    fnError("open rpc connection to mnode failure");
    return -1;
  }
575

S
slzhou 已提交
576 577 578 579
  if (udfdUvInit() != 0) {
    fnError("uv init failure");
    return -2;
  }
580

S
slzhou 已提交
581 582 583 584 585 586 587 588 589 590
  fnInfo("start the udfd");
  int code = uv_run(global.loop, UV_RUN_DEFAULT);
  fnInfo("udfd stopped. result: %s", uv_err_name(code));
  int codeClose = uv_loop_close(global.loop);
  fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
  udfdCloseClientRpc();
  uv_mutex_destroy(&global.udfsMutex);
  taosHashCleanup(global.udfsHash);
  return code;
}
591

S
slzhou 已提交
592
int main(int argc, char *argv[]) {
S
slzhou 已提交
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
  if (!taosCheckSystemIsSmallEnd()) {
    printf("failed to start since on non-small-end machines\n");
    return -1;
  }

  if (udfdParseArgs(argc, argv) != 0) {
    printf("failed to start since parse args error\n");
    return -1;
  }

  if (global.printVersion) {
    udfdPrintVersion();
    return 0;
  }

  if (udfdInitLog() != 0) {
    printf("failed to start since init log error\n");
    return -1;
  }

  if (taosInitCfg(configDir, NULL, NULL, NULL, 0) != 0) {
    fnError("failed to start since read config error");
    return -1;
  }

  return udfdRun();
619
}