tudf.c 53.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
#include "os.h"
S
slzhou 已提交
17
#include "fnLog.h"
18
#include "tudf.h"
19
#include "tudfInt.h"
S
shenglian zhou 已提交
20
#include "tarray.h"
S
slzhou 已提交
21
#include "tglobal.h"
S
slzhou 已提交
22
#include "tdatablock.h"
S
shenglian zhou 已提交
23 24
#include "querynodes.h"
#include "builtinsimpl.h"
S
slzhou 已提交
25
#include "functionMgt.h"
26

27
//TODO: add unit test
28 29 30 31 32 33 34
typedef struct SUdfdData {
  bool          startCalled;
  bool          needCleanUp;
  uv_loop_t     loop;
  uv_thread_t   thread;
  uv_barrier_t  barrier;
  uv_process_t  process;
35 36 37
#ifdef WINDOWS
  HANDLE        jobHandle;
#endif
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  int           spawnErr;
  uv_pipe_t     ctrlPipe;
  uv_async_t    stopAsync;
  int32_t        stopCalled;

  int32_t         dnodeId;
} SUdfdData;

SUdfdData udfdGlobal = {0};

static int32_t udfSpawnUdfd(SUdfdData *pData);

void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
  SUdfdData *pData = process->data;
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
  } else {
    fnInfo("udfd process restart");
    udfSpawnUdfd(pData);
  }
}

static int32_t udfSpawnUdfd(SUdfdData* pData) {
  fnInfo("dnode start spawning udfd");
  uv_process_options_t options = {0};

  char path[PATH_MAX] = {0};
  if (tsProcPath == NULL) {
    path[0] = '.';
  } else {
    strncpy(path, tsProcPath, strlen(tsProcPath));
    taosDirName(path);
  }
#ifdef WINDOWS
  strcat(path, "udfd.exe");
#else
  strcat(path, "/udfd");
#endif
  char* argsUdfd[] = {path, "-c", configDir, NULL};
  options.args = argsUdfd;
  options.file = path;

  options.exit_cb = udfUdfdExit;

  uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);

  uv_stdio_container_t child_stdio[3];
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
  child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
  child_stdio[1].flags = UV_IGNORE;
  child_stdio[2].flags = UV_INHERIT_FD;
  child_stdio[2].data.fd = 2;
  options.stdio_count = 3;
  options.stdio = child_stdio;

  options.flags = UV_PROCESS_DETACHED;

  char dnodeIdEnvItem[32] = {0};
  char thrdPoolSizeEnvItem[32] = {0};
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
  float numCpuCores = 4;
  taosGetCpuCores(&numCpuCores);
  snprintf(thrdPoolSizeEnvItem,32,  "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
  char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
  options.env = envUdfd;

  int err = uv_spawn(&pData->loop, &pData->process, &options);
  pData->process.data = (void*)pData;

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
#ifdef WINDOWS
  // End udfd.exe by Job.
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
  pData->jobHandle = CreateJobObject(NULL, NULL);
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
  if (!add_job_ok) {
    fnError("Assign udfd to job failed.");
  } else {
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
    memset(&limit_info, 0x0, sizeof(limit_info));
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
    bool set_auto_kill_ok = SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
    if (!set_auto_kill_ok) {
      fnError("Set job auto kill udfd failed.");
    }
  }
#endif

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
  if (err != 0) {
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
  }
  return err;
}

static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}

static void udfUdfdStopAsyncCb(uv_async_t *async) {
  SUdfdData *pData = async->data;
  uv_stop(&pData->loop);
}

static void udfWatchUdfd(void *args) {
  SUdfdData *pData = args;
  uv_loop_init(&pData->loop);
  uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
  pData->stopAsync.data = pData;
  int32_t err = udfSpawnUdfd(pData);
  atomic_store_32(&pData->spawnErr, err);
  uv_barrier_wait(&pData->barrier);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);

  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);
  return;
}

int32_t udfStartUdfd(int32_t startDnodeId) {
S
slzhou 已提交
161 162
  if (!tsStartUdfd) {
    fnInfo("start udfd is disabled.")
S
slzhou 已提交
163
    return 0;
S
slzhou 已提交
164
  }
165 166
  SUdfdData *pData = &udfdGlobal;
  if (pData->startCalled) {
S
Shengliang Guan 已提交
167
    fnInfo("dnode start udfd already called");
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    return 0;
  }
  pData->startCalled = true;
  char dnodeId[8] = {0};
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
  uv_os_setenv("DNODE_ID", dnodeId);
  pData->dnodeId = startDnodeId;

  uv_barrier_init(&pData->barrier, 2);
  uv_thread_create(&pData->thread, udfWatchUdfd, pData);
  uv_barrier_wait(&pData->barrier);
  int32_t err = atomic_load_32(&pData->spawnErr);
  if (err != 0) {
    uv_barrier_destroy(&pData->barrier);
    uv_async_send(&pData->stopAsync);
    uv_thread_join(&pData->thread);
    pData->needCleanUp = false;
S
Shengliang Guan 已提交
185
    fnInfo("dnode udfd cleaned up after spawn err");
186 187 188 189 190 191 192 193
  } else {
    pData->needCleanUp = true;
  }
  return err;
}

int32_t udfStopUdfd() {
  SUdfdData *pData = &udfdGlobal;
S
Shengliang Guan 已提交
194
  fnInfo("dnode to stop udfd. need cleanup: %d, spawn err: %d",
195 196 197 198 199 200 201 202 203
        pData->needCleanUp, pData->spawnErr);
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
    return 0;
  }
  atomic_store_32(&pData->stopCalled, 1);
  pData->needCleanUp = false;
  uv_barrier_destroy(&pData->barrier);
  uv_async_send(&pData->stopAsync);
  uv_thread_join(&pData->thread);
204 205 206
#ifdef WINDOWS
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
#endif
S
Shengliang Guan 已提交
207
  fnInfo("dnode udfd cleaned up");
208 209 210 211
  return 0;
}

//==============================================================================================
S
shenglian zhou 已提交
212 213 214 215
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 * The QUEUE is copied from queue.h under libuv
 * */

S
shenglian zhou 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - offsetof(type, field)))

/* Important note: mutating the list while QUEUE_FOREACH is
 * iterating over its elements results in undefined behavior.
 */
#define QUEUE_FOREACH(q, h)                                                   \
  for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

#define QUEUE_EMPTY(q)                                                        \
  ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))

#define QUEUE_HEAD(q)                                                         \
  (QUEUE_NEXT(q))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_ADD(h, n)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
  }                                                                           \
  while (0)

#define QUEUE_SPLIT(h, q, n)                                                  \
  do {                                                                        \
    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(n) = (n);                                                 \
    QUEUE_NEXT(n) = (q);                                                      \
    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
    QUEUE_PREV(q) = (n);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_MOVE(h, n)                                                      \
  do {                                                                        \
    if (QUEUE_EMPTY(h))                                                       \
      QUEUE_INIT(n);                                                          \
    else {                                                                    \
      QUEUE* q = QUEUE_HEAD(h);                                               \
      QUEUE_SPLIT(h, q, n);                                                   \
    }                                                                         \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_HEAD(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
    QUEUE_PREV(q) = (h);                                                      \
    QUEUE_NEXT_PREV(q) = (q);                                                 \
    QUEUE_NEXT(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_TAIL(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = (h);                                                      \
    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(q) = (q);                                                 \
    QUEUE_PREV(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_REMOVE(q)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
  }                                                                           \
  while (0)


304 305 306 307 308 309
enum {
  UV_TASK_CONNECT = 0,
  UV_TASK_REQ_RSP = 1,
  UV_TASK_DISCONNECT = 2
};

310
int64_t gUdfTaskSeqNum = 0;
311 312 313
typedef struct SUdfcFuncStub {
  char udfName[TSDB_FUNC_NAME_LEN];
  UdfcFuncHandle handle;
S
slzhou 已提交
314
  int32_t refCount;
315
  int64_t lastRefTime;
316 317
} SUdfcFuncStub;

318
typedef struct SUdfcProxy {
319
  char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
320
  uv_barrier_t initBarrier;
321

322 323 324
  uv_loop_t   uvLoop;
  uv_thread_t loopThread;
  uv_async_t  loopTaskAync;
325

326
  uv_async_t loopStopAsync;
327

328 329 330 331
  uv_mutex_t taskQueueMutex;
  int8_t     udfcState;
  QUEUE      taskQueue;
  QUEUE      uvProcTaskQueue;
332

333 334 335
  uv_mutex_t udfStubsMutex;
  SArray*    udfStubs; // SUdfcFuncStub

336
  int8_t initialized;
337
} SUdfcProxy;
338

339
SUdfcProxy gUdfdProxy = {0};
340

S
slzhou 已提交
341
typedef struct SUdfcUvSession {
342
  SUdfcProxy *udfc;
343
  int64_t severHandle;
S
slzhou 已提交
344
  uv_pipe_t *udfUvPipe;
S
shenglian zhou 已提交
345 346 347 348

  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
S
slzhou 已提交
349 350 351

  char udfName[TSDB_FUNC_NAME_LEN];
} SUdfcUvSession;
352 353

typedef struct SClientUvTaskNode {
354
  SUdfcProxy *udfc;
355 356 357 358 359 360 361 362 363 364 365
  int8_t type;
  int errCode;

  uv_pipe_t *pipe;

  int64_t seqNum;
  uv_buf_t reqBuf;

  uv_sem_t taskSem;
  uv_buf_t rspBuf;

S
shenglian zhou 已提交
366 367 368
  QUEUE recvTaskQueue;
  QUEUE procTaskQueue;
  QUEUE connTaskQueue;
369 370 371 372 373
} SClientUvTaskNode;

typedef struct SClientUdfTask {
  int8_t type;

S
slzhou 已提交
374
  SUdfcUvSession *session;
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403

  int32_t errCode;

  union {
    struct {
      SUdfSetupRequest req;
      SUdfSetupResponse rsp;
    } _setup;
    struct {
      SUdfCallRequest req;
      SUdfCallResponse rsp;
    } _call;
    struct {
      SUdfTeardownRequest req;
      SUdfTeardownResponse rsp;
    } _teardown;
  };

} SClientUdfTask;

typedef struct SClientConnBuf {
  char *buf;
  int32_t len;
  int32_t cap;
  int32_t total;
} SClientConnBuf;

typedef struct SClientUvConn {
  uv_pipe_t *pipe;
S
shenglian zhou 已提交
404
  QUEUE taskQueue;
405
  SClientConnBuf readBuf;
S
slzhou 已提交
406
  SUdfcUvSession *session;
407 408
} SClientUvConn;

409 410
enum {
  UDFC_STATE_INITAL = 0, // initial state
411
  UDFC_STATE_STARTNG, // starting after udfcOpen
412
  UDFC_STATE_READY, // started and begin to receive quests
413
  UDFC_STATE_STOPPING, // stopping after udfcClose
414
};
415

416 417
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
  char    dnodeId[8] = {0};
wafwerar's avatar
wafwerar 已提交
418
  size_t  dnodeIdSize = sizeof(dnodeId);
419 420
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
  if (err != 0) {
421
    fnError("get dnode id from env. error: %s.", uv_err_name(err));
422 423
    dnodeId[0] = '1';
  }
424
#ifdef _WIN32
425
  snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
426 427 428 429
#else
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#endif
  fnInfo("get dnode id from env. dnode id: %s. pipe path: %s", dnodeId, pipeName);
430 431 432
  return 0;
}

433 434 435
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
  int32_t len = 0;
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
S
shenglian zhou 已提交
436 437
  return len;
}
438

439 440 441
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
  return (void*)buf;
S
shenglian zhou 已提交
442
}
443

444 445
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
  int32_t len = 0;
446
  len += taosEncodeFixedI8(buf, state->numOfResult);
447 448
  len += taosEncodeFixedI32(buf, state->bufLen);
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
S
shenglian zhou 已提交
449 450
  return len;
}
451

452
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
453
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
454 455 456
  buf = taosDecodeFixedI32(buf, &state->bufLen);
  buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
  return (void*)buf;
S
shenglian zhou 已提交
457 458
}

459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, call->udfHandle);
  len += taosEncodeFixedI8(buf, call->callType);
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
    len += taosEncodeFixedI8(buf, call->initFirst);
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
    len += encodeUdfInterBuf(buf, &call->interBuf);
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
    len += encodeUdfInterBuf(buf, &call->interBuf2);
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
475
  }
476
  return len;
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
  buf = taosDecodeFixedI8(buf, &call->callType);
  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
      buf = taosDecodeFixedI8(buf, &call->initFirst);
      break;
    case TSDB_UDF_CALL_AGG_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
      break;
    case TSDB_UDF_CALL_AGG_FIN:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
500
  }
501
  return (void*)buf;
S
shenglian zhou 已提交
502 503
}

504 505 506 507
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
  return len;
S
shenglian zhou 已提交
508 509
}

510 511 512
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) {
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
  return (void*)buf;
S
shenglian zhou 已提交
513 514
}

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(request->msgLen);
  } else {
    *(int32_t*)(*buf) = request->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
  }
  len += taosEncodeFixedI64(buf, request->seqNum);
  len += taosEncodeFixedI8(buf, request->type);
  if (request->type == UDF_TASK_SETUP) {
    len += encodeUdfSetupRequest(buf, &request->setup);
  } else if (request->type == UDF_TASK_CALL) {
    len += encodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    len += encodeUdfTeardownRequest(buf, &request->teardown);
  }
  return len;
S
shenglian zhou 已提交
533 534
}

535 536
void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
  request->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
537
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
S
shenglian zhou 已提交
538

539 540
  buf = taosDecodeFixedI64(buf, &request->seqNum);
  buf = taosDecodeFixedI8(buf, &request->type);
S
shenglian zhou 已提交
541 542

  if (request->type == UDF_TASK_SETUP) {
543
    buf = decodeUdfSetupRequest(buf, &request->setup);
S
shenglian zhou 已提交
544
  } else if (request->type == UDF_TASK_CALL) {
545 546 547
    buf = decodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
S
shenglian zhou 已提交
548
  }
549
  return (void*)buf;
S
shenglian zhou 已提交
550
}
551

552 553 554
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
S
shenglian zhou 已提交
555 556 557
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
  len += taosEncodeFixedI32(buf, setupRsp->outputLen);
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
558 559
  return len;
}
560

561 562
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
S
shenglian zhou 已提交
563 564 565
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
  buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
566
  return (void*)buf;
S
shenglian zhou 已提交
567
}
568

569 570 571 572 573 574
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI8(buf, callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      len += tEncodeDataBlock(buf, &callRsp->resultData);
575
      break;
576
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
577
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
578 579
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
580
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
581 582
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
583
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
584 585
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
586
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
587 588
      break;
  }
589
  return len;
S
shenglian zhou 已提交
590 591
}

592 593 594 595 596 597 598
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
599
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
600 601
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
602
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
603 604
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
605
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
606 607
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
608
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
609
      break;
S
shenglian zhou 已提交
610
  }
611
  return (void*)buf;
S
shenglian zhou 已提交
612 613
}

614 615
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) {
  return 0;
S
shenglian zhou 已提交
616 617
}

618 619
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) {
  return (void*)buf;
S
shenglian zhou 已提交
620 621
}

622 623 624 625 626 627 628
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(rsp->msgLen);
  } else {
    *(int32_t*)(*buf) = rsp->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
S
shenglian zhou 已提交
629 630
  }

S
slzhou 已提交
631 632 633 634 635 636 637
  if (buf == NULL) {
    len += sizeof(rsp->seqNum);
  } else {
    *(int64_t*)(*buf) = rsp->seqNum;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
  }

638 639 640
  len += taosEncodeFixedI64(buf, rsp->seqNum);
  len += taosEncodeFixedI8(buf, rsp->type);
  len += taosEncodeFixedI32(buf, rsp->code);
S
shenglian zhou 已提交
641

642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
  }
  return len;
S
shenglian zhou 已提交
657 658
}

659 660
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
  rsp->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
661
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
S
slzhou 已提交
662 663
  rsp->seqNum = *(int64_t*)(buf);
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
664 665 666
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
  buf = taosDecodeFixedI8(buf, &rsp->type);
  buf = taosDecodeFixedI32(buf, &rsp->code);
S
shenglian zhou 已提交
667

668 669 670 671 672 673 674 675 676 677 678 679 680
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
681
  }
682
  return (void*)buf;
683
}
684

S
shenglian zhou 已提交
685 686
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
  if (IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
687 688 689 690
    taosMemoryFree(data->varLenCol.varOffsets);
    data->varLenCol.varOffsets = NULL;
    taosMemoryFree(data->varLenCol.payload);
    data->varLenCol.payload = NULL;
S
shenglian zhou 已提交
691
  } else {
S
slzhou 已提交
692 693 694 695
    taosMemoryFree(data->fixLenCol.nullBitmap);
    data->fixLenCol.nullBitmap = NULL;
    taosMemoryFree(data->fixLenCol.data);
    data->fixLenCol.data = NULL;
S
shenglian zhou 已提交
696 697 698 699
  }
}

void freeUdfColumn(SUdfColumn* col) {
S
shenglian zhou 已提交
700
  freeUdfColumnData(&col->colData, &col->colMeta);
S
shenglian zhou 已提交
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
}

void freeUdfDataDataBlock(SUdfDataBlock *block) {
  for (int32_t i = 0; i < block->numOfCols; ++i) {
    freeUdfColumn(block->udfCols[i]);
    taosMemoryFree(block->udfCols[i]);
    block->udfCols[i] = NULL;
  }
  taosMemoryFree(block->udfCols);
  block->udfCols = NULL;
}

void freeUdfInterBuf(SUdfInterBuf *buf) {
  taosMemoryFree(buf->buf);
  buf->buf = NULL;
}

S
slzhou 已提交
718 719 720 721

int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
  udfBlock->numOfRows = block->info.rows;
  udfBlock->numOfCols = block->info.numOfCols;
S
slzhou 已提交
722
  udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*));
S
slzhou 已提交
723
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
S
slzhou 已提交
724
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
S
slzhou 已提交
725 726 727 728 729 730 731
    SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
    SUdfColumn *udfCol = udfBlock->udfCols[i];
    udfCol->colMeta.type = col->info.type;
    udfCol->colMeta.bytes = col->info.bytes;
    udfCol->colMeta.scale = col->info.scale;
    udfCol->colMeta.precision = col->info.precision;
    udfCol->colData.numOfRows = udfBlock->numOfRows;
S
slzhou@taodata.com 已提交
732
    udfCol->hasNull = col->hasNull;
S
shenglian zhou 已提交
733
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
S
slzhou 已提交
734 735 736 737 738 739
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
      memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
S
slzhou 已提交
740
    } else {
S
slzhou 已提交
741 742 743 744 745 746 747 748 749 750
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
      char* bitmap = udfCol->colData.fixLenCol.nullBitmap;
      memcpy(bitmap, col->nullbitmap, bitmapLen);
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
      char* data = udfCol->colData.fixLenCol.data;
      memcpy(data, col->pData, dataLen);
S
slzhou 已提交
751 752 753 754 755 756 757 758
    }
  }
  return 0;
}

int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
  block->info.numOfCols = 1;
  block->info.rows = udfCol->colData.numOfRows;
S
shenglian zhou 已提交
759
  block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
S
slzhou 已提交
760 761 762 763 764 765 766 767 768

  block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
  taosArraySetSize(block->pDataBlock, 1);
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
  SUdfColumnMeta *meta = &udfCol->colMeta;
  col->info.precision = meta->precision;
  col->info.bytes = meta->bytes;
  col->info.scale = meta->scale;
  col->info.type = meta->type;
S
slzhou@taodata.com 已提交
769
  col->hasNull = udfCol->hasNull;
S
slzhou 已提交
770 771 772
  SUdfColumnData *data = &udfCol->colData;

  if (!IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
773 774 775 776
    col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
    memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
    col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
    memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
S
slzhou 已提交
777
  } else {
S
slzhou 已提交
778 779 780 781
    col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
    memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
    col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
    memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
S
slzhou 已提交
782 783 784 785
  }
  return 0;
}

S
slzhou 已提交
786 787 788 789 790 791 792 793 794 795 796 797 798 799
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
  output->info.rows = input->numOfRows;
  output->info.numOfCols = numOfCols;
  bool hasVarCol = false;
  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) {
      hasVarCol = true;
      break;
    }
  }
  output->info.hasVarCol = hasVarCol;

  //TODO: free the array output->pDataBlock
  output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
800 801 802
  for (int32_t i = 0; i < numOfCols; ++i) {
    taosArrayPush(output->pDataBlock, (input + i)->columnData);
  }
S
slzhou 已提交
803 804 805 806 807 808 809 810 811 812 813 814 815
  return 0;
}

int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
  if (input->info.numOfCols != 1) {
    fnError("scalar function only support one column");
    return -1;
  }
  output->numOfRows = input->info.rows;
  //TODO: memory
  output->columnData = taosArrayGet(input->pDataBlock, 0);
  return 0;
}
S
slzhou 已提交
816

817 818
void onUdfcPipeClose(uv_handle_t *handle) {
  SClientUvConn *conn = handle->data;
S
shenglian zhou 已提交
819 820 821
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
822
    task->errCode = 0;
S
shenglian zhou 已提交
823
    QUEUE_REMOVE(&task->procTaskQueue);
S
slzhou 已提交
824
    uv_sem_post(&task->taskSem);
825
  }
S
slzhou 已提交
826
  conn->session->udfUvPipe = NULL;
wafwerar's avatar
wafwerar 已提交
827 828 829
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
  taosMemoryFree((uv_pipe_t *) handle);
830 831
}

S
slzhou 已提交
832 833
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
834 835
  if (uvTask->type == UV_TASK_REQ_RSP) {
    if (uvTask->rspBuf.base != NULL) {
S
shenglian zhou 已提交
836
      SUdfResponse rsp;
837 838
      void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
      assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
S
shenglian zhou 已提交
839
      task->errCode = rsp.code;
840 841 842

      switch (task->type) {
        case UDF_TASK_SETUP: {
S
shenglian zhou 已提交
843
          //TODO: copy or not
S
shenglian zhou 已提交
844
          task->_setup.rsp = rsp.setupRsp;
845 846 847
          break;
        }
        case UDF_TASK_CALL: {
S
shenglian zhou 已提交
848
          task->_call.rsp = rsp.callRsp;
S
shenglian zhou 已提交
849
          //TODO: copy or not
850 851 852
          break;
        }
        case UDF_TASK_TEARDOWN: {
S
shenglian zhou 已提交
853
          task->_teardown.rsp = rsp.teardownRsp;
854 855 856 857 858 859 860 861 862
          //TODO: copy or not?
          break;
        }
        default: {
          break;
        }
      }

      // TODO: the call buffer is setup and freed by udf invocation
wafwerar's avatar
wafwerar 已提交
863
      taosMemoryFree(uvTask->rspBuf.base);
864 865
    } else {
      task->errCode = uvTask->errCode;
866
    }
867 868 869 870
  } else if (uvTask->type == UV_TASK_CONNECT) {
    task->errCode = uvTask->errCode;
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
    task->errCode = uvTask->errCode;
871
  }
872 873 874 875 876 877 878 879 880
  return 0;
}

void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
  SClientUvConn *conn = handle->data;
  SClientConnBuf *connBuf = &conn->readBuf;

  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (connBuf->cap == 0) {
wafwerar's avatar
wafwerar 已提交
881
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
882 883 884 885 886 887 888 889
    if (connBuf->buf) {
      connBuf->len = 0;
      connBuf->cap = msgHeadSize;
      connBuf->total = -1;

      buf->base = connBuf->buf;
      buf->len = connBuf->cap;
    } else {
890
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
891 892 893 894 895
      buf->base = NULL;
      buf->len = 0;
    }
  } else {
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
wafwerar's avatar
wafwerar 已提交
896
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
897 898 899 900 901
    if (resultBuf) {
      connBuf->buf = resultBuf;
      buf->base = connBuf->buf + connBuf->len;
      buf->len = connBuf->cap - connBuf->len;
    } else {
902
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
903 904 905 906 907
      buf->base = NULL;
      buf->len = 0;
    }
  }

908
  fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
909 910 911

}

912 913 914 915 916
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
    connBuf->total = *(int32_t *) (connBuf->buf);
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
917
    fnTrace("udfc complete message is received, now handle it");
918
    return true;
919
  }
920 921 922 923 924
  return false;
}

void udfcUvHandleRsp(SClientUvConn *conn) {
  SClientConnBuf *connBuf = &conn->readBuf;
925
  int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
926

S
shenglian zhou 已提交
927
  if (QUEUE_EMPTY(&conn->taskQueue)) {
928
    fnError("udfc no task waiting for response on connection");
929 930 931 932
    return;
  }
  bool found = false;
  SClientUvTaskNode *taskFound = NULL;
S
shenglian zhou 已提交
933 934 935 936
  QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);

  while (h != &conn->taskQueue) {
937 938 939 940 941
    if (task->seqNum == seqNum) {
      if (found == false) {
        found = true;
        taskFound = task;
      } else {
942
        fnError("udfc more than one task waiting for the same response");
943 944
        continue;
      }
945
    }
S
shenglian zhou 已提交
946 947
    h = QUEUE_NEXT(h);
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
948 949
  }

950 951
  if (taskFound) {
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
S
shenglian zhou 已提交
952 953
    QUEUE_REMOVE(&taskFound->connTaskQueue);
    QUEUE_REMOVE(&taskFound->procTaskQueue);
S
slzhou 已提交
954
    uv_sem_post(&taskFound->taskSem);
955
  } else {
956
    fnError("no task is waiting for the response.");
957 958 959 960 961 962
  }
  connBuf->buf = NULL;
  connBuf->total = -1;
  connBuf->len = 0;
  connBuf->cap = 0;
}
963

964
void udfcUvHandleError(SClientUvConn *conn) {
S
shenglian zhou 已提交
965 966 967
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
968
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
S
slzhou 已提交
969
    QUEUE_REMOVE(&task->connTaskQueue);
S
shenglian zhou 已提交
970
    QUEUE_REMOVE(&task->procTaskQueue);
S
slzhou 已提交
971
    uv_sem_post(&task->taskSem);
S
shenglian zhou 已提交
972 973
  }

S
slzhou 已提交
974
  uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
975 976
}

977
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
978
  fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
979 980 981 982 983 984 985 986 987 988 989 990
  if (nread == 0) return;

  SClientUvConn *conn = client->data;
  SClientConnBuf *connBuf = &conn->readBuf;
  if (nread > 0) {
    connBuf->len += nread;
    if (isUdfcUvMsgComplete(connBuf)) {
      udfcUvHandleRsp(conn);
    }

  }
  if (nread < 0) {
S
slzhou 已提交
991
    fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
992
    if (nread == UV_EOF) {
S
slzhou 已提交
993
      fnError("\tudfc client pipe %p closed", client);
994 995
    }
    udfcUvHandleError(conn);
996 997 998 999
  }

}

1000
void onUdfcPipetWrite(uv_write_t *write, int status) {
1001
  SClientUvTaskNode *uvTask = write->data;
1002
  uv_pipe_t *pipe = uvTask->pipe;
1003 1004
  fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
  SClientUvConn *conn = pipe->data;
1005
  if (status == 0) {
S
shenglian zhou 已提交
1006
    QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
1007
  } else {
1008
    fnError("udfc client %p write error.", pipe);
1009
    udfcUvHandleError(conn);
1010
  }
wafwerar's avatar
wafwerar 已提交
1011 1012
  taosMemoryFree(write);
  taosMemoryFree(uvTask->reqBuf.base);
1013
}
H
Haojun Liao 已提交
1014

1015
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
1016 1017
  SClientUvTaskNode *uvTask = connect->data;
  if (status != 0) {
1018
    fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status));
H
Haojun Liao 已提交
1019
  }
1020 1021 1022
  uvTask->errCode = status;

  uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
wafwerar's avatar
wafwerar 已提交
1023
  taosMemoryFree(connect);
S
shenglian zhou 已提交
1024
  QUEUE_REMOVE(&uvTask->procTaskQueue);
1025
  uv_sem_post(&uvTask->taskSem);
1026
}
H
Haojun Liao 已提交
1027

S
slzhou 已提交
1028
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
wafwerar's avatar
wafwerar 已提交
1029
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
1030
  uvTask->type = uvTaskType;
1031
  uvTask->udfc = task->session->udfc;
1032 1033 1034

  if (uvTaskType == UV_TASK_CONNECT) {
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
S
slzhou 已提交
1035
    uvTask->pipe = task->session->udfUvPipe;
1036 1037
    SUdfRequest request;
    request.type = task->type;
1038
    request.seqNum =   atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
1039 1040

    if (task->type == UDF_TASK_SETUP) {
S
shenglian zhou 已提交
1041
      request.setup = task->_setup.req;
1042 1043
      request.type = UDF_TASK_SETUP;
    } else if (task->type == UDF_TASK_CALL) {
S
shenglian zhou 已提交
1044
      request.call = task->_call.req;
1045 1046
      request.type = UDF_TASK_CALL;
    } else if (task->type == UDF_TASK_TEARDOWN) {
S
shenglian zhou 已提交
1047
      request.teardown = task->_teardown.req;
1048 1049 1050 1051
      request.type = UDF_TASK_TEARDOWN;
    } else {
      //TODO log and return error
    }
1052 1053
    int32_t bufLen = encodeUdfRequest(NULL, &request);
    request.msgLen = bufLen;
S
slzhou 已提交
1054 1055
    void *bufBegin = taosMemoryMalloc(bufLen);
    void *buf = bufBegin;
1056
    encodeUdfRequest(&buf, &request);
S
slzhou 已提交
1057
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
1058 1059
    uvTask->seqNum = request.seqNum;
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
S
slzhou 已提交
1060
    uvTask->pipe = task->session->udfUvPipe;
1061 1062
  }
  uv_sem_init(&uvTask->taskSem, 0);
H
Haojun Liao 已提交
1063

1064 1065 1066
  *pUvTask = uvTask;
  return 0;
}
H
Haojun Liao 已提交
1067

S
slzhou 已提交
1068
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
1069
  fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
1070 1071 1072 1073 1074
  SUdfcProxy *udfc = uvTask->udfc;
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
  uv_mutex_unlock(&udfc->taskQueueMutex);
  uv_async_send(&udfc->loopTaskAync);
H
Haojun Liao 已提交
1075

1076
  uv_sem_wait(&uvTask->taskSem);
S
slzhou 已提交
1077
  fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
1078
  uv_sem_destroy(&uvTask->taskSem);
H
Haojun Liao 已提交
1079

1080 1081
  return 0;
}
H
Haojun Liao 已提交
1082

S
slzhou 已提交
1083
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
1084
  fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
1085 1086
  int32_t code = 0;

1087 1088
  switch (uvTask->type) {
    case UV_TASK_CONNECT: {
wafwerar's avatar
wafwerar 已提交
1089
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
1090
      uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
1091
      uvTask->pipe = pipe;
H
Haojun Liao 已提交
1092

S
slzhou 已提交
1093
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
1094 1095 1096 1097 1098
      conn->pipe = pipe;
      conn->readBuf.len = 0;
      conn->readBuf.cap = 0;
      conn->readBuf.buf = 0;
      conn->readBuf.total = -1;
S
shenglian zhou 已提交
1099
      QUEUE_INIT(&conn->taskQueue);
H
Haojun Liao 已提交
1100

1101 1102
      pipe->data = conn;

wafwerar's avatar
wafwerar 已提交
1103
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
1104
      connReq->data = uvTask;
1105
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
1106
      code = 0;
H
Haojun Liao 已提交
1107
      break;
1108 1109 1110
    }
    case UV_TASK_REQ_RSP: {
      uv_pipe_t *pipe = uvTask->pipe;
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
        write->data = uvTask;
        int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
        if (err != 0) {
          fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
        }
        code = err;
1121
      }
1122 1123 1124
      break;
    }
    case UV_TASK_DISCONNECT: {
1125 1126 1127 1128 1129 1130 1131 1132 1133
      uv_pipe_t *pipe = uvTask->pipe;
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        SClientUvConn *conn = pipe->data;
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
        uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
        code = 0;
      }
1134 1135 1136
      break;
    }
    default: {
1137
      fnError("udfc event loop unknown task type.")
1138 1139 1140
      break;
    }
  }
H
Haojun Liao 已提交
1141

1142
  return code;
1143
}
H
Haojun Liao 已提交
1144

1145
void udfcAsyncTaskCb(uv_async_t *async) {
1146
  SUdfcProxy *udfc = async->data;
S
shenglian zhou 已提交
1147
  QUEUE wq;
1148

1149 1150 1151
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1152

S
shenglian zhou 已提交
1153 1154 1155 1156
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1157 1158 1159
    int32_t code = udfcStartUvTask(task);
    if (code == 0) {
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
1160 1161 1162
    } else {
      task->errCode = code;
      uv_sem_post(&task->taskSem);
1163
    }
1164 1165 1166 1167
  }

}

1168
void cleanUpUvTasks(SUdfcProxy *udfc) {
S
slzhou 已提交
1169
  fnDebug("clean up uv tasks")
S
shenglian zhou 已提交
1170
  QUEUE wq;
1171

1172 1173 1174
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1175

S
shenglian zhou 已提交
1176 1177 1178 1179
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1180
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1181
      task->errCode = TSDB_CODE_UDF_STOPPING;
1182 1183 1184 1185
    }
    uv_sem_post(&task->taskSem);
  }

1186 1187
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
    QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
S
shenglian zhou 已提交
1188 1189
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
1190
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1191
      task->errCode = TSDB_CODE_UDF_STOPPING;
S
shenglian zhou 已提交
1192 1193 1194 1195
    }
    uv_sem_post(&task->taskSem);
  }
}
1196

S
shenglian zhou 已提交
1197
void udfStopAsyncCb(uv_async_t *async) {
1198
  SUdfcProxy *udfc = async->data;
1199
  cleanUpUvTasks(udfc);
1200 1201
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
    uv_stop(&udfc->uvLoop);
S
shenglian zhou 已提交
1202
  }
1203
}
S
shenglian zhou 已提交
1204

S
shenglian zhou 已提交
1205
void constructUdfService(void *argsThread) {
1206 1207 1208
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
  uv_loop_init(&udfc->uvLoop);

1209
  uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
1210 1211 1212 1213 1214 1215 1216
  udfc->loopTaskAync.data = udfc;
  uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
  udfc->loopStopAsync.data = udfc;
  uv_mutex_init(&udfc->taskQueueMutex);
  QUEUE_INIT(&udfc->taskQueue);
  QUEUE_INIT(&udfc->uvProcTaskQueue);
  uv_barrier_wait(&udfc->initBarrier);
1217
  //TODO return value of uv_run
1218 1219
  uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
  uv_loop_close(&udfc->uvLoop);
1220 1221
}

1222 1223 1224 1225 1226
int32_t udfcOpen() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
  if (old == 1) {
    return 0;
  }
1227
  SUdfcProxy *proxy = &gUdfdProxy;
1228
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
1229 1230 1231 1232 1233 1234
  proxy->udfcState = UDFC_STATE_STARTNG;
  uv_barrier_init(&proxy->initBarrier, 2);
  uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
  proxy->udfcState = UDFC_STATE_READY;
  uv_barrier_wait(&proxy->initBarrier);
1235 1236
  uv_mutex_init(&proxy->udfStubsMutex);
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1237
  fnInfo("udfc initialized")
1238 1239 1240
  return 0;
}

1241 1242 1243 1244 1245 1246
int32_t udfcClose() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
  if (old == 0) {
    return 0;
  }

1247 1248 1249 1250 1251 1252
  SUdfcProxy *udfc = &gUdfdProxy;
  udfc->udfcState = UDFC_STATE_STOPPING;
  uv_async_send(&udfc->loopStopAsync);
  uv_thread_join(&udfc->loopThread);
  uv_mutex_destroy(&udfc->taskQueueMutex);
  uv_barrier_destroy(&udfc->initBarrier);
1253 1254
  taosArrayDestroy(udfc->udfStubs);
  uv_mutex_destroy(&udfc->udfStubsMutex);
1255
  udfc->udfcState = UDFC_STATE_INITAL;
1256
  fnInfo("udfc cleaned up");
1257 1258 1259
  return 0;
}

S
slzhou 已提交
1260
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
1261 1262
  SClientUvTaskNode *uvTask = NULL;

S
slzhou 已提交
1263 1264 1265
  udfcCreateUvTask(task, uvTaskType, &uvTask);
  udfcQueueUvTask(uvTask);
  udfcGetUdfTaskResultFromUvTask(task, uvTask);
1266
  if (uvTaskType == UV_TASK_CONNECT) {
S
slzhou 已提交
1267 1268 1269
    task->session->udfUvPipe = uvTask->pipe;
    SClientUvConn *conn = uvTask->pipe->data;
    conn->session = task->session;
S
slzhou 已提交
1270 1271
  }
  taosMemoryFree(uvTask);
1272 1273 1274 1275
  uvTask = NULL;
  return task->errCode;
}

1276
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
1277
  if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
1278
    return TSDB_CODE_UDF_INVALID_STATE;
1279
  }
S
slzhou 已提交
1280
  SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
1281
  task->errCode = 0;
S
slzhou 已提交
1282
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
1283
  task->session->udfc = &gUdfdProxy;
1284 1285 1286
  task->type = UDF_TASK_SETUP;

  SUdfSetupRequest *req = &task->_setup.req;
S
shenglian zhou 已提交
1287
  memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
1288

S
slzhou 已提交
1289
  int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
1290
  if (errCode != 0) {
1291
    fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
1292
    return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
H
Haojun Liao 已提交
1293
  }
1294

S
slzhou 已提交
1295
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1296 1297 1298

  SUdfSetupResponse *rsp = &task->_setup.rsp;
  task->session->severHandle = rsp->udfHandle;
S
shenglian zhou 已提交
1299 1300 1301
  task->session->outputType = rsp->outputType;
  task->session->outputLen = rsp->outputLen;
  task->session->bufSize = rsp->bufSize;
S
slzhou 已提交
1302
  strcpy(task->session->udfName, udfName);
1303
  if (task->errCode != 0) {
1304
    fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
1305
  } else {
S
slzhou 已提交
1306
    fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
1307 1308
    *funcHandle = task->session;
  }
1309
  int32_t err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1310
  taosMemoryFree(task);
1311
  return err;
H
Haojun Liao 已提交
1312 1313
}

1314
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1315
                SSDataBlock* output, SUdfInterBuf *newState) {
1316
  fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
S
slzhou 已提交
1317
  SUdfcUvSession *session = (SUdfcUvSession *) handle;
S
slzhou 已提交
1318 1319
  if (session->udfUvPipe == NULL) {
    fnError("No pipe to udfd");
1320
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1321 1322
  }
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1323
  task->errCode = 0;
S
slzhou 已提交
1324
  task->session = (SUdfcUvSession *) handle;
1325 1326 1327
  task->type = UDF_TASK_CALL;

  SUdfCallRequest *req = &task->_call.req;
S
slzhou 已提交
1328
  req->udfHandle = task->session->severHandle;
S
slzhou 已提交
1329
  req->callType = callType;
S
slzhou 已提交
1330

S
shenglian zhou 已提交
1331
  switch (callType) {
1332 1333 1334 1335
    case TSDB_UDF_CALL_AGG_INIT: {
      req->initFirst = 1;
      break;
    }
S
shenglian zhou 已提交
1336 1337 1338 1339 1340
    case TSDB_UDF_CALL_AGG_PROC: {
      req->block = *input;
      req->interBuf = *state;
      break;
    }
1341 1342 1343 1344 1345 1346
    case TSDB_UDF_CALL_AGG_MERGE: {
      req->interBuf = *state;
      req->interBuf2 = *state2;
      break;
    }
    case TSDB_UDF_CALL_AGG_FIN: {
S
shenglian zhou 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355
      req->interBuf = *state;
      break;
    }
    case TSDB_UDF_CALL_SCALA_PROC: {
      req->block = *input;
      break;
    }
  }

S
slzhou 已提交
1356
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1357

1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382
  if (task->errCode != 0) {
    fnError("call udf failure. err: %d", task->errCode);
  } else {
    SUdfCallResponse *rsp = &task->_call.rsp;
    switch (callType) {
      case TSDB_UDF_CALL_AGG_INIT: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_PROC: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_MERGE: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_FIN: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_SCALA_PROC: {
        *output = rsp->resultData;
        break;
      }
S
shenglian zhou 已提交
1383
    }
S
slzhou 已提交
1384 1385
  };
  int err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1386
  taosMemoryFree(task);
S
slzhou 已提交
1387
  return err;
1388 1389
}

1390
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
S
slzhou 已提交
1391 1392 1393 1394 1395 1396 1397 1398 1399
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;

  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);

  return err;
}

// input: block, state
// output: interbuf,
1400
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
S
slzhou 已提交
1401 1402 1403 1404 1405 1406 1407
  int8_t callType = TSDB_UDF_CALL_AGG_PROC;
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
  return err;
}

// input: interbuf1, interbuf2
// output: resultBuf
1408
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
S
slzhou 已提交
1409 1410 1411 1412 1413 1414 1415
  int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
  return err;
}

// input: interBuf
// output: resultData
1416
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
S
slzhou 已提交
1417
  int8_t callType = TSDB_UDF_CALL_AGG_FIN;
S
slzhou 已提交
1418 1419 1420 1421
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
  return err;
}

1422
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
S
slzhou 已提交
1423
  int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
S
slzhou 已提交
1424 1425 1426 1427
  SSDataBlock inputBlock = {0};
  convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
  SSDataBlock resultBlock = {0};
  int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
S
slzhou 已提交
1428 1429 1430
  if (err == 0) {
    convertDataBlockToScalarParm(&resultBlock, output);
  }
S
slzhou 已提交
1431 1432 1433
  return err;
}

1434 1435 1436 1437 1438 1439
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
  return strcmp(stub1->udfName, stub2->udfName);
}

S
slzhou 已提交
1440
int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
1441 1442 1443 1444 1445 1446 1447 1448
  int32_t code = 0;
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  SUdfcFuncStub key = {0};
  strcpy(key.udfName, udfName);
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
  if (foundStub != NULL) {
    uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
    *pHandle = foundStub->handle;
S
slzhou 已提交
1449
    ++foundStub->refCount;
1450
    foundStub->lastRefTime = taosGetTimestampUs();
1451 1452 1453 1454 1455 1456 1457 1458
    return 0;
  }
  *pHandle = NULL;
  code = doSetupUdf(udfName, pHandle);
  if (code == TSDB_CODE_SUCCESS) {
    SUdfcFuncStub stub = {0};
    strcpy(stub.udfName, udfName);
    stub.handle = *pHandle;
S
slzhou 已提交
1459
    ++stub.refCount;
1460
    stub.lastRefTime = taosGetTimestampUs();
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
    taosArrayPush(gUdfdProxy.udfStubs, &stub);
    taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
  } else {
    *pHandle = NULL;
  }

  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
  return code;
}

S
slzhou 已提交
1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481
void releaseUdfFuncHandle(char* udfName) {
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  SUdfcFuncStub key = {0};
  strcpy(key.udfName, udfName);
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
  ASSERT(foundStub);
  --foundStub->refCount;
  ASSERT(foundStub->refCount>=0);
  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}

1482 1483
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
  UdfcFuncHandle handle = NULL;
S
slzhou 已提交
1484
  int32_t code = accquireUdfFuncHandle(udfName, &handle);
1485 1486 1487 1488
  if (code != 0) {
    return code;
  }
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
S
slzhou 已提交
1489
  releaseUdfFuncHandle(udfName);
1490 1491 1492 1493
  return code;
}

int32_t doTeardownUdf(UdfcFuncHandle handle) {
S
slzhou 已提交
1494
  SUdfcUvSession *session = (SUdfcUvSession *) handle;
S
slzhou 已提交
1495

S
slzhou 已提交
1496
  if (session->udfUvPipe == NULL) {
S
slzhou 已提交
1497
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
1498
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1499 1500 1501
  }

  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1502
  task->errCode = 0;
S
slzhou 已提交
1503
  task->session = session;
1504 1505 1506 1507 1508
  task->type = UDF_TASK_TEARDOWN;

  SUdfTeardownRequest *req = &task->_teardown.req;
  req->udfHandle = task->session->severHandle;

S
slzhou 已提交
1509
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1510 1511 1512 1513

  SUdfTeardownResponse *rsp = &task->_teardown.rsp;
  int32_t err = task->errCode;

S
slzhou 已提交
1514
  udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
1515

wafwerar's avatar
wafwerar 已提交
1516 1517
  taosMemoryFree(task->session);
  taosMemoryFree(task);
1518

S
slzhou 已提交
1519 1520
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);

1521 1522
  return err;
}
S
shenglian zhou 已提交
1523

S
shenglian zhou 已提交
1524 1525
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
S
slzhou 已提交
1526
  SUdfcUvSession *session;
S
shenglian zhou 已提交
1527 1528 1529 1530 1531 1532
  int8_t finalResNum;
  int8_t interResNum;
  char* finalResBuf;
  char* interResBuf;
} SUdfAggRes;

S
shenglian zhou 已提交
1533
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
S
slzhou 已提交
1534
  if (fmIsScalarFunc(pFunc->funcId)) {
S
shenglian zhou 已提交
1535 1536
    return false;
  }
S
slzhou 已提交
1537
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
S
shenglian zhou 已提交
1538 1539 1540 1541 1542 1543 1544 1545
  return true;
}

bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
  if (functionSetup(pCtx, pResultCellInfo) != true) {
    return false;
  }
  UdfcFuncHandle handle;
1546
  int32_t udfCode = 0;
S
slzhou 已提交
1547
  if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
1548
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
S
shenglian zhou 已提交
1549 1550
    return false;
  }
S
slzhou 已提交
1551
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
S
shenglian zhou 已提交
1552 1553
  SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
  int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
S
shenglian zhou 已提交
1554
  memset(udfRes, 0, envSize);
S
shenglian zhou 已提交
1555

S
slzhou 已提交
1556 1557 1558
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;

S
slzhou 已提交
1559
  udfRes->session = (SUdfcUvSession *)handle;
S
shenglian zhou 已提交
1560
  SUdfInterBuf buf = {0};
1561 1562
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
S
shenglian zhou 已提交
1563 1564
    return false;
  }
S
shenglian zhou 已提交
1565
  udfRes->interResNum = buf.numOfResult;
S
slzhou 已提交
1566
  memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
S
shenglian zhou 已提交
1567 1568 1569 1570 1571 1572 1573
  return true;
}

int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t numOfCols = pInput->numOfInputCols;

S
slzhou 已提交
1574
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
S
slzhou 已提交
1575
  SUdfcUvSession *session = udfRes->session;
1576
  if (session == NULL) {
1577
    return TSDB_CODE_UDF_NO_FUNC_HANDLE;
1578
  }
S
slzhou 已提交
1579 1580
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1581 1582 1583 1584 1585

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;


1586 1587
  SSDataBlock tempBlock = {0};
  tempBlock.info.numOfCols = numOfCols;
1588
  tempBlock.info.rows = pInput->totalRows;
1589
  tempBlock.info.uid = pInput->uid;
S
shenglian zhou 已提交
1590
  bool hasVarCol = false;
1591
  tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
S
shenglian zhou 已提交
1592 1593 1594 1595 1596 1597

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData *col = pInput->pData[i];
    if (IS_VAR_DATA_TYPE(col->info.type)) {
      hasVarCol = true;
    }
1598
    taosArrayPush(tempBlock.pDataBlock, col);
S
shenglian zhou 已提交
1599
  }
1600
  tempBlock.info.hasVarCol = hasVarCol;
S
shenglian zhou 已提交
1601

1602 1603
  SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);

S
slzhou 已提交
1604
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1605
                        .bufLen = session->bufSize,
S
slzhou 已提交
1606
                        .numOfResult = udfRes->interResNum};
S
shenglian zhou 已提交
1607
  SUdfInterBuf newState = {0};
1608

1609
  int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
1610 1611 1612 1613 1614 1615 1616
  if (udfCode != 0) {
    fnError("udfAggProcess error. code: %d", udfCode);
    newState.numOfResult = 0;
  } else {
    udfRes->interResNum = newState.numOfResult;
    memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
  }
S
slzhou 已提交
1617 1618 1619
  if (newState.numOfResult == 1 || state.numOfResult == 1) {
    GET_RES_INFO(pCtx)->numOfRes = 1;
  }
S
shenglian zhou 已提交
1620

1621 1622
  blockDataDestroy(inputBlock);
  taosArrayDestroy(tempBlock.pDataBlock);
S
shenglian zhou 已提交
1623 1624

  taosMemoryFree(newState.buf);
1625
  return udfCode;
S
shenglian zhou 已提交
1626 1627 1628
}

int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
S
slzhou 已提交
1629
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
S
slzhou 已提交
1630
  SUdfcUvSession *session = udfRes->session;
1631
  if (session == NULL) {
1632
    return TSDB_CODE_UDF_NO_FUNC_HANDLE;
1633
  }
S
slzhou 已提交
1634 1635
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1636 1637


S
slzhou 已提交
1638
  SUdfInterBuf resultBuf = {0};
S
slzhou 已提交
1639
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1640
                        .bufLen = session->bufSize,
S
slzhou 已提交
1641
                        .numOfResult = udfRes->interResNum};
1642
  int32_t udfCallCode= 0;
1643 1644 1645
  udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
  if (udfCallCode != 0) {
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
1646 1647 1648 1649 1650 1651
    GET_RES_INFO(pCtx)->numOfRes = 0;
  } else {
    memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
    udfRes->finalResNum = resultBuf.numOfResult;
    GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
  }
S
shenglian zhou 已提交
1652

1653
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
S
slzhou 已提交
1654
  releaseUdfFuncHandle(pCtx->udfName);
1655
  return udfCallCode == 0 ? numOfResults : udfCallCode;
S
slzhou 已提交
1656 1657
}

1658
int32_t cleanUpUdfs() {
S
slzhou 已提交
1659 1660 1661 1662 1663 1664
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  int32_t i = 0;
  SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
  while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
    SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
    if (stub->refCount == 0) {
S
slzhou 已提交
1665
      fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
S
slzhou 已提交
1666 1667
      doTeardownUdf(stub->handle);
    } else {
1668 1669
      fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
             stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
S
slzhou 已提交
1670 1671 1672 1673 1674 1675 1676 1677
      taosArrayPush(udfStubs, stub);
    }
    ++i;
  }
  taosArrayDestroy(gUdfdProxy.udfStubs);
  gUdfdProxy.udfStubs = udfStubs;
  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
  return 0;
S
shenglian zhou 已提交
1678
}