tudf.c 56.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
#include "os.h"
S
slzhou 已提交
17
#include "fnLog.h"
18
#include "tudf.h"
19
#include "tudfInt.h"
S
shenglian zhou 已提交
20
#include "tarray.h"
S
slzhou 已提交
21
#include "tglobal.h"
S
slzhou 已提交
22
#include "tdatablock.h"
S
shenglian zhou 已提交
23 24
#include "querynodes.h"
#include "builtinsimpl.h"
S
slzhou 已提交
25
#include "functionMgt.h"
26

27
//TODO: add unit test
28 29 30 31 32 33 34
typedef struct SUdfdData {
  bool          startCalled;
  bool          needCleanUp;
  uv_loop_t     loop;
  uv_thread_t   thread;
  uv_barrier_t  barrier;
  uv_process_t  process;
35 36 37
#ifdef WINDOWS
  HANDLE        jobHandle;
#endif
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  int           spawnErr;
  uv_pipe_t     ctrlPipe;
  uv_async_t    stopAsync;
  int32_t        stopCalled;

  int32_t         dnodeId;
} SUdfdData;

SUdfdData udfdGlobal = {0};

static int32_t udfSpawnUdfd(SUdfdData *pData);

void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
  SUdfdData *pData = process->data;
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
  } else {
    fnInfo("udfd process restart");
    udfSpawnUdfd(pData);
  }
}

static int32_t udfSpawnUdfd(SUdfdData* pData) {
  fnInfo("dnode start spawning udfd");
  uv_process_options_t options = {0};

  char path[PATH_MAX] = {0};
  if (tsProcPath == NULL) {
    path[0] = '.';
  } else {
    strncpy(path, tsProcPath, strlen(tsProcPath));
    taosDirName(path);
  }
#ifdef WINDOWS
  strcat(path, "udfd.exe");
#else
  strcat(path, "/udfd");
#endif
  char* argsUdfd[] = {path, "-c", configDir, NULL};
  options.args = argsUdfd;
  options.file = path;

  options.exit_cb = udfUdfdExit;

  uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);

  uv_stdio_container_t child_stdio[3];
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
  child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
  child_stdio[1].flags = UV_IGNORE;
  child_stdio[2].flags = UV_INHERIT_FD;
  child_stdio[2].data.fd = 2;
  options.stdio_count = 3;
  options.stdio = child_stdio;

  options.flags = UV_PROCESS_DETACHED;

  char dnodeIdEnvItem[32] = {0};
  char thrdPoolSizeEnvItem[32] = {0};
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
  float numCpuCores = 4;
  taosGetCpuCores(&numCpuCores);
  snprintf(thrdPoolSizeEnvItem,32,  "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
  char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
  options.env = envUdfd;

  int err = uv_spawn(&pData->loop, &pData->process, &options);
  pData->process.data = (void*)pData;

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
#ifdef WINDOWS
  // End udfd.exe by Job.
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
  pData->jobHandle = CreateJobObject(NULL, NULL);
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
  if (!add_job_ok) {
    fnError("Assign udfd to job failed.");
  } else {
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
    memset(&limit_info, 0x0, sizeof(limit_info));
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
    bool set_auto_kill_ok = SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
    if (!set_auto_kill_ok) {
      fnError("Set job auto kill udfd failed.");
    }
  }
#endif

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
  if (err != 0) {
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
  }
  return err;
}

static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}

static void udfUdfdStopAsyncCb(uv_async_t *async) {
  SUdfdData *pData = async->data;
  uv_stop(&pData->loop);
}

static void udfWatchUdfd(void *args) {
  SUdfdData *pData = args;
  uv_loop_init(&pData->loop);
  uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
  pData->stopAsync.data = pData;
  int32_t err = udfSpawnUdfd(pData);
  atomic_store_32(&pData->spawnErr, err);
  uv_barrier_wait(&pData->barrier);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);

  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);
  return;
}

int32_t udfStartUdfd(int32_t startDnodeId) {
S
slzhou 已提交
161 162
  if (!tsStartUdfd) {
    fnInfo("start udfd is disabled.")
S
slzhou 已提交
163
    return 0;
S
slzhou 已提交
164
  }
165 166
  SUdfdData *pData = &udfdGlobal;
  if (pData->startCalled) {
S
Shengliang Guan 已提交
167
    fnInfo("dnode start udfd already called");
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    return 0;
  }
  pData->startCalled = true;
  char dnodeId[8] = {0};
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
  uv_os_setenv("DNODE_ID", dnodeId);
  pData->dnodeId = startDnodeId;

  uv_barrier_init(&pData->barrier, 2);
  uv_thread_create(&pData->thread, udfWatchUdfd, pData);
  uv_barrier_wait(&pData->barrier);
  int32_t err = atomic_load_32(&pData->spawnErr);
  if (err != 0) {
    uv_barrier_destroy(&pData->barrier);
    uv_async_send(&pData->stopAsync);
    uv_thread_join(&pData->thread);
    pData->needCleanUp = false;
S
Shengliang Guan 已提交
185
    fnInfo("dnode udfd cleaned up after spawn err");
186 187 188 189 190 191 192 193
  } else {
    pData->needCleanUp = true;
  }
  return err;
}

int32_t udfStopUdfd() {
  SUdfdData *pData = &udfdGlobal;
S
Shengliang Guan 已提交
194
  fnInfo("dnode to stop udfd. need cleanup: %d, spawn err: %d",
195 196 197 198 199 200 201 202 203
        pData->needCleanUp, pData->spawnErr);
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
    return 0;
  }
  atomic_store_32(&pData->stopCalled, 1);
  pData->needCleanUp = false;
  uv_barrier_destroy(&pData->barrier);
  uv_async_send(&pData->stopAsync);
  uv_thread_join(&pData->thread);
204 205 206
#ifdef WINDOWS
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
#endif
S
Shengliang Guan 已提交
207
  fnInfo("dnode udfd cleaned up");
208 209 210 211
  return 0;
}

//==============================================================================================
S
shenglian zhou 已提交
212 213 214 215
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 * The QUEUE is copied from queue.h under libuv
 * */

S
shenglian zhou 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - offsetof(type, field)))

/* Important note: mutating the list while QUEUE_FOREACH is
 * iterating over its elements results in undefined behavior.
 */
#define QUEUE_FOREACH(q, h)                                                   \
  for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

#define QUEUE_EMPTY(q)                                                        \
  ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))

#define QUEUE_HEAD(q)                                                         \
  (QUEUE_NEXT(q))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_ADD(h, n)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
  }                                                                           \
  while (0)

#define QUEUE_SPLIT(h, q, n)                                                  \
  do {                                                                        \
    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(n) = (n);                                                 \
    QUEUE_NEXT(n) = (q);                                                      \
    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
    QUEUE_PREV(q) = (n);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_MOVE(h, n)                                                      \
  do {                                                                        \
    if (QUEUE_EMPTY(h))                                                       \
      QUEUE_INIT(n);                                                          \
    else {                                                                    \
      QUEUE* q = QUEUE_HEAD(h);                                               \
      QUEUE_SPLIT(h, q, n);                                                   \
    }                                                                         \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_HEAD(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
    QUEUE_PREV(q) = (h);                                                      \
    QUEUE_NEXT_PREV(q) = (q);                                                 \
    QUEUE_NEXT(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_TAIL(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = (h);                                                      \
    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(q) = (q);                                                 \
    QUEUE_PREV(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_REMOVE(q)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
  }                                                                           \
  while (0)


304 305 306 307 308 309
enum {
  UV_TASK_CONNECT = 0,
  UV_TASK_REQ_RSP = 1,
  UV_TASK_DISCONNECT = 2
};

310
int64_t gUdfTaskSeqNum = 0;
311 312 313
typedef struct SUdfcFuncStub {
  char udfName[TSDB_FUNC_NAME_LEN];
  UdfcFuncHandle handle;
S
slzhou 已提交
314
  int32_t refCount;
315
  int64_t lastRefTime;
316 317
} SUdfcFuncStub;

318
typedef struct SUdfcProxy {
319
  char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
320
  uv_barrier_t initBarrier;
321

322 323 324
  uv_loop_t   uvLoop;
  uv_thread_t loopThread;
  uv_async_t  loopTaskAync;
325

326
  uv_async_t loopStopAsync;
327

328 329 330 331
  uv_mutex_t taskQueueMutex;
  int8_t     udfcState;
  QUEUE      taskQueue;
  QUEUE      uvProcTaskQueue;
332

333 334 335
  uv_mutex_t udfStubsMutex;
  SArray*    udfStubs; // SUdfcFuncStub

336
  int8_t initialized;
337
} SUdfcProxy;
338

339
SUdfcProxy gUdfdProxy = {0};
340

S
slzhou 已提交
341
typedef struct SUdfcUvSession {
342
  SUdfcProxy *udfc;
343
  int64_t severHandle;
S
slzhou 已提交
344
  uv_pipe_t *udfUvPipe;
S
shenglian zhou 已提交
345 346 347 348

  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
S
slzhou 已提交
349 350 351

  char udfName[TSDB_FUNC_NAME_LEN];
} SUdfcUvSession;
352 353

typedef struct SClientUvTaskNode {
354
  SUdfcProxy *udfc;
355 356 357 358 359 360 361 362 363 364 365
  int8_t type;
  int errCode;

  uv_pipe_t *pipe;

  int64_t seqNum;
  uv_buf_t reqBuf;

  uv_sem_t taskSem;
  uv_buf_t rspBuf;

S
shenglian zhou 已提交
366 367 368
  QUEUE recvTaskQueue;
  QUEUE procTaskQueue;
  QUEUE connTaskQueue;
369 370 371 372 373
} SClientUvTaskNode;

typedef struct SClientUdfTask {
  int8_t type;

S
slzhou 已提交
374
  SUdfcUvSession *session;
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403

  int32_t errCode;

  union {
    struct {
      SUdfSetupRequest req;
      SUdfSetupResponse rsp;
    } _setup;
    struct {
      SUdfCallRequest req;
      SUdfCallResponse rsp;
    } _call;
    struct {
      SUdfTeardownRequest req;
      SUdfTeardownResponse rsp;
    } _teardown;
  };

} SClientUdfTask;

typedef struct SClientConnBuf {
  char *buf;
  int32_t len;
  int32_t cap;
  int32_t total;
} SClientConnBuf;

typedef struct SClientUvConn {
  uv_pipe_t *pipe;
S
shenglian zhou 已提交
404
  QUEUE taskQueue;
405
  SClientConnBuf readBuf;
S
slzhou 已提交
406
  SUdfcUvSession *session;
407 408
} SClientUvConn;

409 410
enum {
  UDFC_STATE_INITAL = 0, // initial state
411
  UDFC_STATE_STARTNG, // starting after udfcOpen
412
  UDFC_STATE_READY, // started and begin to receive quests
413
  UDFC_STATE_STOPPING, // stopping after udfcClose
414
};
415

416 417
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
  char    dnodeId[8] = {0};
wafwerar's avatar
wafwerar 已提交
418
  size_t  dnodeIdSize = sizeof(dnodeId);
419 420
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
  if (err != 0) {
421
    fnError("get dnode id from env. error: %s.", uv_err_name(err));
422 423
    dnodeId[0] = '1';
  }
424
#ifdef _WIN32
425
  snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
426 427 428 429
#else
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#endif
  fnInfo("get dnode id from env. dnode id: %s. pipe path: %s", dnodeId, pipeName);
430 431 432
  return 0;
}

433 434 435
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
  int32_t len = 0;
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
S
shenglian zhou 已提交
436 437
  return len;
}
438

439 440 441
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
  return (void*)buf;
S
shenglian zhou 已提交
442
}
443

444 445
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
  int32_t len = 0;
446
  len += taosEncodeFixedI8(buf, state->numOfResult);
447 448
  len += taosEncodeFixedI32(buf, state->bufLen);
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
S
shenglian zhou 已提交
449 450
  return len;
}
451

452
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
453
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
454 455 456
  buf = taosDecodeFixedI32(buf, &state->bufLen);
  buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
  return (void*)buf;
S
shenglian zhou 已提交
457 458
}

459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, call->udfHandle);
  len += taosEncodeFixedI8(buf, call->callType);
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
    len += taosEncodeFixedI8(buf, call->initFirst);
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
    len += encodeUdfInterBuf(buf, &call->interBuf);
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
    len += encodeUdfInterBuf(buf, &call->interBuf2);
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
475
  }
476
  return len;
477 478
}

479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
  buf = taosDecodeFixedI8(buf, &call->callType);
  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
      buf = taosDecodeFixedI8(buf, &call->initFirst);
      break;
    case TSDB_UDF_CALL_AGG_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
      break;
    case TSDB_UDF_CALL_AGG_FIN:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
500
  }
501
  return (void*)buf;
S
shenglian zhou 已提交
502 503
}

504 505 506 507
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
  return len;
S
shenglian zhou 已提交
508 509
}

510 511 512
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) {
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
  return (void*)buf;
S
shenglian zhou 已提交
513 514
}

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(request->msgLen);
  } else {
    *(int32_t*)(*buf) = request->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
  }
  len += taosEncodeFixedI64(buf, request->seqNum);
  len += taosEncodeFixedI8(buf, request->type);
  if (request->type == UDF_TASK_SETUP) {
    len += encodeUdfSetupRequest(buf, &request->setup);
  } else if (request->type == UDF_TASK_CALL) {
    len += encodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    len += encodeUdfTeardownRequest(buf, &request->teardown);
  }
  return len;
S
shenglian zhou 已提交
533 534
}

535 536
void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
  request->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
537
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
S
shenglian zhou 已提交
538

539 540
  buf = taosDecodeFixedI64(buf, &request->seqNum);
  buf = taosDecodeFixedI8(buf, &request->type);
S
shenglian zhou 已提交
541 542

  if (request->type == UDF_TASK_SETUP) {
543
    buf = decodeUdfSetupRequest(buf, &request->setup);
S
shenglian zhou 已提交
544
  } else if (request->type == UDF_TASK_CALL) {
545 546 547
    buf = decodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
S
shenglian zhou 已提交
548
  }
549
  return (void*)buf;
S
shenglian zhou 已提交
550
}
551

552 553 554
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
S
shenglian zhou 已提交
555 556 557
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
  len += taosEncodeFixedI32(buf, setupRsp->outputLen);
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
558 559
  return len;
}
560

561 562
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
S
shenglian zhou 已提交
563 564 565
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
  buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
566
  return (void*)buf;
S
shenglian zhou 已提交
567
}
568

569 570 571 572 573 574
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI8(buf, callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      len += tEncodeDataBlock(buf, &callRsp->resultData);
575
      break;
576
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
577
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
578 579
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
580
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
581 582
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
583
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
584 585
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
586
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
587 588
      break;
  }
589
  return len;
S
shenglian zhou 已提交
590 591
}

592 593 594 595 596 597 598
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
599
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
600 601
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
602
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
603 604
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
605
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
606 607
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
608
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
609
      break;
S
shenglian zhou 已提交
610
  }
611
  return (void*)buf;
S
shenglian zhou 已提交
612 613
}

614 615
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) {
  return 0;
S
shenglian zhou 已提交
616 617
}

618 619
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) {
  return (void*)buf;
S
shenglian zhou 已提交
620 621
}

622 623 624 625 626 627 628
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(rsp->msgLen);
  } else {
    *(int32_t*)(*buf) = rsp->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
S
shenglian zhou 已提交
629 630
  }

S
slzhou 已提交
631 632 633 634 635 636 637
  if (buf == NULL) {
    len += sizeof(rsp->seqNum);
  } else {
    *(int64_t*)(*buf) = rsp->seqNum;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
  }

638 639 640
  len += taosEncodeFixedI64(buf, rsp->seqNum);
  len += taosEncodeFixedI8(buf, rsp->type);
  len += taosEncodeFixedI32(buf, rsp->code);
S
shenglian zhou 已提交
641

642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
  }
  return len;
S
shenglian zhou 已提交
657 658
}

659 660
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
  rsp->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
661
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
S
slzhou 已提交
662 663
  rsp->seqNum = *(int64_t*)(buf);
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
664 665 666
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
  buf = taosDecodeFixedI8(buf, &rsp->type);
  buf = taosDecodeFixedI32(buf, &rsp->code);
S
shenglian zhou 已提交
667

668 669 670 671 672 673 674 675 676 677 678 679 680
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
681
  }
682
  return (void*)buf;
683
}
684

S
shenglian zhou 已提交
685 686
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
  if (IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
687 688 689 690
    taosMemoryFree(data->varLenCol.varOffsets);
    data->varLenCol.varOffsets = NULL;
    taosMemoryFree(data->varLenCol.payload);
    data->varLenCol.payload = NULL;
S
shenglian zhou 已提交
691
  } else {
S
slzhou 已提交
692 693 694 695
    taosMemoryFree(data->fixLenCol.nullBitmap);
    data->fixLenCol.nullBitmap = NULL;
    taosMemoryFree(data->fixLenCol.data);
    data->fixLenCol.data = NULL;
S
shenglian zhou 已提交
696 697 698 699
  }
}

void freeUdfColumn(SUdfColumn* col) {
S
shenglian zhou 已提交
700
  freeUdfColumnData(&col->colData, &col->colMeta);
S
shenglian zhou 已提交
701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
}

void freeUdfDataDataBlock(SUdfDataBlock *block) {
  for (int32_t i = 0; i < block->numOfCols; ++i) {
    freeUdfColumn(block->udfCols[i]);
    taosMemoryFree(block->udfCols[i]);
    block->udfCols[i] = NULL;
  }
  taosMemoryFree(block->udfCols);
  block->udfCols = NULL;
}

void freeUdfInterBuf(SUdfInterBuf *buf) {
  taosMemoryFree(buf->buf);
  buf->buf = NULL;
}

S
slzhou 已提交
718 719 720 721

int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
  udfBlock->numOfRows = block->info.rows;
  udfBlock->numOfCols = block->info.numOfCols;
S
slzhou 已提交
722
  udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*));
S
slzhou 已提交
723
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
S
slzhou 已提交
724
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
S
slzhou 已提交
725 726 727 728 729 730 731
    SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
    SUdfColumn *udfCol = udfBlock->udfCols[i];
    udfCol->colMeta.type = col->info.type;
    udfCol->colMeta.bytes = col->info.bytes;
    udfCol->colMeta.scale = col->info.scale;
    udfCol->colMeta.precision = col->info.precision;
    udfCol->colData.numOfRows = udfBlock->numOfRows;
S
slzhou@taodata.com 已提交
732
    udfCol->hasNull = col->hasNull;
S
shenglian zhou 已提交
733
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
S
slzhou 已提交
734 735 736 737 738 739
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
      memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
S
slzhou 已提交
740
    } else {
S
slzhou 已提交
741 742 743 744 745 746 747 748 749 750
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
      char* bitmap = udfCol->colData.fixLenCol.nullBitmap;
      memcpy(bitmap, col->nullbitmap, bitmapLen);
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
      char* data = udfCol->colData.fixLenCol.data;
      memcpy(data, col->pData, dataLen);
S
slzhou 已提交
751 752 753 754 755 756 757 758
    }
  }
  return 0;
}

int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
  block->info.numOfCols = 1;
  block->info.rows = udfCol->colData.numOfRows;
S
shenglian zhou 已提交
759
  block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
S
slzhou 已提交
760 761 762 763 764 765 766 767 768

  block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
  taosArraySetSize(block->pDataBlock, 1);
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
  SUdfColumnMeta *meta = &udfCol->colMeta;
  col->info.precision = meta->precision;
  col->info.bytes = meta->bytes;
  col->info.scale = meta->scale;
  col->info.type = meta->type;
S
slzhou@taodata.com 已提交
769
  col->hasNull = udfCol->hasNull;
S
slzhou 已提交
770 771 772
  SUdfColumnData *data = &udfCol->colData;

  if (!IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
773 774 775 776
    col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
    memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
    col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
    memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
S
slzhou 已提交
777
  } else {
S
slzhou 已提交
778 779 780 781
    col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
    memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
    col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
    memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
S
slzhou 已提交
782 783 784 785
  }
  return 0;
}

S
slzhou 已提交
786 787 788 789 790 791 792 793 794 795 796 797 798
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
  output->info.rows = input->numOfRows;
  output->info.numOfCols = numOfCols;
  bool hasVarCol = false;
  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) {
      hasVarCol = true;
      break;
    }
  }
  output->info.hasVarCol = hasVarCol;

  output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
799 800 801
  for (int32_t i = 0; i < numOfCols; ++i) {
    taosArrayPush(output->pDataBlock, (input + i)->columnData);
  }
S
slzhou 已提交
802 803 804 805 806 807 808 809 810
  return 0;
}

int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
  if (input->info.numOfCols != 1) {
    fnError("scalar function only support one column");
    return -1;
  }
  output->numOfRows = input->info.rows;
S
slzhou 已提交
811 812 813 814 815 816

  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
  memcpy(output->columnData,
         taosArrayGet(input->pDataBlock, 0),
         sizeof(SColumnInfoData));

S
slzhou 已提交
817 818
  return 0;
}
S
slzhou 已提交
819

820 821
void onUdfcPipeClose(uv_handle_t *handle) {
  SClientUvConn *conn = handle->data;
S
shenglian zhou 已提交
822 823 824
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
825
    task->errCode = 0;
S
shenglian zhou 已提交
826
    QUEUE_REMOVE(&task->procTaskQueue);
S
slzhou 已提交
827
    uv_sem_post(&task->taskSem);
828
  }
S
slzhou 已提交
829
  conn->session->udfUvPipe = NULL;
wafwerar's avatar
wafwerar 已提交
830 831 832
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
  taosMemoryFree((uv_pipe_t *) handle);
833 834
}

S
slzhou 已提交
835 836
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
837 838
  if (uvTask->type == UV_TASK_REQ_RSP) {
    if (uvTask->rspBuf.base != NULL) {
S
slzhou 已提交
839
      SUdfResponse rsp = {0};
840 841
      void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
      assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
S
shenglian zhou 已提交
842
      task->errCode = rsp.code;
843 844 845

      switch (task->type) {
        case UDF_TASK_SETUP: {
S
shenglian zhou 已提交
846
          //TODO: copy or not
S
shenglian zhou 已提交
847
          task->_setup.rsp = rsp.setupRsp;
848 849 850
          break;
        }
        case UDF_TASK_CALL: {
S
shenglian zhou 已提交
851
          task->_call.rsp = rsp.callRsp;
S
shenglian zhou 已提交
852
          //TODO: copy or not
853 854 855
          break;
        }
        case UDF_TASK_TEARDOWN: {
S
shenglian zhou 已提交
856
          task->_teardown.rsp = rsp.teardownRsp;
857 858 859 860 861 862 863 864 865
          //TODO: copy or not?
          break;
        }
        default: {
          break;
        }
      }

      // TODO: the call buffer is setup and freed by udf invocation
wafwerar's avatar
wafwerar 已提交
866
      taosMemoryFree(uvTask->rspBuf.base);
867 868
    } else {
      task->errCode = uvTask->errCode;
869
    }
870 871 872 873
  } else if (uvTask->type == UV_TASK_CONNECT) {
    task->errCode = uvTask->errCode;
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
    task->errCode = uvTask->errCode;
874
  }
875 876 877 878 879 880 881 882 883
  return 0;
}

void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
  SClientUvConn *conn = handle->data;
  SClientConnBuf *connBuf = &conn->readBuf;

  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (connBuf->cap == 0) {
wafwerar's avatar
wafwerar 已提交
884
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
885 886 887 888 889 890 891 892
    if (connBuf->buf) {
      connBuf->len = 0;
      connBuf->cap = msgHeadSize;
      connBuf->total = -1;

      buf->base = connBuf->buf;
      buf->len = connBuf->cap;
    } else {
893
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
894 895 896 897 898
      buf->base = NULL;
      buf->len = 0;
    }
  } else {
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
wafwerar's avatar
wafwerar 已提交
899
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
900 901 902 903 904
    if (resultBuf) {
      connBuf->buf = resultBuf;
      buf->base = connBuf->buf + connBuf->len;
      buf->len = connBuf->cap - connBuf->len;
    } else {
905
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
906 907 908 909 910
      buf->base = NULL;
      buf->len = 0;
    }
  }

911
  fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
912 913 914

}

915 916 917 918 919
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
    connBuf->total = *(int32_t *) (connBuf->buf);
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
920
    fnTrace("udfc complete message is received, now handle it");
921
    return true;
922
  }
923 924 925 926 927
  return false;
}

void udfcUvHandleRsp(SClientUvConn *conn) {
  SClientConnBuf *connBuf = &conn->readBuf;
928
  int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
929

S
shenglian zhou 已提交
930
  if (QUEUE_EMPTY(&conn->taskQueue)) {
931
    fnError("udfc no task waiting for response on connection");
932 933 934 935
    return;
  }
  bool found = false;
  SClientUvTaskNode *taskFound = NULL;
S
shenglian zhou 已提交
936 937 938 939
  QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);

  while (h != &conn->taskQueue) {
940 941 942 943 944
    if (task->seqNum == seqNum) {
      if (found == false) {
        found = true;
        taskFound = task;
      } else {
945
        fnError("udfc more than one task waiting for the same response");
946 947
        continue;
      }
948
    }
S
shenglian zhou 已提交
949 950
    h = QUEUE_NEXT(h);
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
951 952
  }

953 954
  if (taskFound) {
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
S
shenglian zhou 已提交
955 956
    QUEUE_REMOVE(&taskFound->connTaskQueue);
    QUEUE_REMOVE(&taskFound->procTaskQueue);
S
slzhou 已提交
957
    uv_sem_post(&taskFound->taskSem);
958
  } else {
959
    fnError("no task is waiting for the response.");
960 961 962 963 964 965
  }
  connBuf->buf = NULL;
  connBuf->total = -1;
  connBuf->len = 0;
  connBuf->cap = 0;
}
966

967
void udfcUvHandleError(SClientUvConn *conn) {
S
shenglian zhou 已提交
968 969 970
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
971
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
S
slzhou 已提交
972
    QUEUE_REMOVE(&task->connTaskQueue);
S
shenglian zhou 已提交
973
    QUEUE_REMOVE(&task->procTaskQueue);
S
slzhou 已提交
974
    uv_sem_post(&task->taskSem);
S
shenglian zhou 已提交
975 976
  }

S
slzhou 已提交
977
  uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
978 979
}

980
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
981
  fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
982 983 984 985 986 987 988 989 990 991 992 993
  if (nread == 0) return;

  SClientUvConn *conn = client->data;
  SClientConnBuf *connBuf = &conn->readBuf;
  if (nread > 0) {
    connBuf->len += nread;
    if (isUdfcUvMsgComplete(connBuf)) {
      udfcUvHandleRsp(conn);
    }

  }
  if (nread < 0) {
S
slzhou 已提交
994
    fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
995
    if (nread == UV_EOF) {
S
slzhou 已提交
996
      fnError("\tudfc client pipe %p closed", client);
997 998
    }
    udfcUvHandleError(conn);
999 1000 1001 1002
  }

}

1003
void onUdfcPipetWrite(uv_write_t *write, int status) {
1004
  SClientUvTaskNode *uvTask = write->data;
1005
  uv_pipe_t *pipe = uvTask->pipe;
1006 1007
  fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
  SClientUvConn *conn = pipe->data;
1008
  if (status == 0) {
S
shenglian zhou 已提交
1009
    QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
1010
  } else {
1011
    fnError("udfc client %p write error.", pipe);
1012
    udfcUvHandleError(conn);
1013
  }
wafwerar's avatar
wafwerar 已提交
1014 1015
  taosMemoryFree(write);
  taosMemoryFree(uvTask->reqBuf.base);
1016
}
H
Haojun Liao 已提交
1017

1018
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
1019 1020
  SClientUvTaskNode *uvTask = connect->data;
  if (status != 0) {
1021
    fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status));
H
Haojun Liao 已提交
1022
  }
1023 1024 1025
  uvTask->errCode = status;

  uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
wafwerar's avatar
wafwerar 已提交
1026
  taosMemoryFree(connect);
S
shenglian zhou 已提交
1027
  QUEUE_REMOVE(&uvTask->procTaskQueue);
1028
  uv_sem_post(&uvTask->taskSem);
1029
}
H
Haojun Liao 已提交
1030

S
slzhou 已提交
1031
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
wafwerar's avatar
wafwerar 已提交
1032
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
1033
  uvTask->type = uvTaskType;
1034
  uvTask->udfc = task->session->udfc;
1035 1036 1037

  if (uvTaskType == UV_TASK_CONNECT) {
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
S
slzhou 已提交
1038
    uvTask->pipe = task->session->udfUvPipe;
1039 1040
    SUdfRequest request;
    request.type = task->type;
1041
    request.seqNum =   atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
1042 1043

    if (task->type == UDF_TASK_SETUP) {
S
shenglian zhou 已提交
1044
      request.setup = task->_setup.req;
1045 1046
      request.type = UDF_TASK_SETUP;
    } else if (task->type == UDF_TASK_CALL) {
S
shenglian zhou 已提交
1047
      request.call = task->_call.req;
1048 1049
      request.type = UDF_TASK_CALL;
    } else if (task->type == UDF_TASK_TEARDOWN) {
S
shenglian zhou 已提交
1050
      request.teardown = task->_teardown.req;
1051 1052 1053 1054
      request.type = UDF_TASK_TEARDOWN;
    } else {
      //TODO log and return error
    }
1055 1056
    int32_t bufLen = encodeUdfRequest(NULL, &request);
    request.msgLen = bufLen;
S
slzhou 已提交
1057 1058
    void *bufBegin = taosMemoryMalloc(bufLen);
    void *buf = bufBegin;
1059
    encodeUdfRequest(&buf, &request);
S
slzhou 已提交
1060
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
1061 1062
    uvTask->seqNum = request.seqNum;
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
S
slzhou 已提交
1063
    uvTask->pipe = task->session->udfUvPipe;
1064 1065
  }
  uv_sem_init(&uvTask->taskSem, 0);
H
Haojun Liao 已提交
1066

1067 1068 1069
  *pUvTask = uvTask;
  return 0;
}
H
Haojun Liao 已提交
1070

S
slzhou 已提交
1071
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
1072
  fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
1073 1074 1075 1076 1077
  SUdfcProxy *udfc = uvTask->udfc;
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
  uv_mutex_unlock(&udfc->taskQueueMutex);
  uv_async_send(&udfc->loopTaskAync);
H
Haojun Liao 已提交
1078

1079
  uv_sem_wait(&uvTask->taskSem);
S
slzhou 已提交
1080
  fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
1081
  uv_sem_destroy(&uvTask->taskSem);
H
Haojun Liao 已提交
1082

1083 1084
  return 0;
}
H
Haojun Liao 已提交
1085

S
slzhou 已提交
1086
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
1087
  fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
1088 1089
  int32_t code = 0;

1090 1091
  switch (uvTask->type) {
    case UV_TASK_CONNECT: {
wafwerar's avatar
wafwerar 已提交
1092
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
1093
      uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
1094
      uvTask->pipe = pipe;
H
Haojun Liao 已提交
1095

S
slzhou 已提交
1096
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
1097 1098 1099 1100 1101
      conn->pipe = pipe;
      conn->readBuf.len = 0;
      conn->readBuf.cap = 0;
      conn->readBuf.buf = 0;
      conn->readBuf.total = -1;
S
shenglian zhou 已提交
1102
      QUEUE_INIT(&conn->taskQueue);
H
Haojun Liao 已提交
1103

1104 1105
      pipe->data = conn;

wafwerar's avatar
wafwerar 已提交
1106
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
1107
      connReq->data = uvTask;
1108
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
1109
      code = 0;
H
Haojun Liao 已提交
1110
      break;
1111 1112 1113
    }
    case UV_TASK_REQ_RSP: {
      uv_pipe_t *pipe = uvTask->pipe;
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
        write->data = uvTask;
        int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
        if (err != 0) {
          fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
        }
        code = err;
1124
      }
1125 1126 1127
      break;
    }
    case UV_TASK_DISCONNECT: {
1128 1129 1130 1131 1132 1133 1134 1135 1136
      uv_pipe_t *pipe = uvTask->pipe;
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        SClientUvConn *conn = pipe->data;
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
        uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
        code = 0;
      }
1137 1138 1139
      break;
    }
    default: {
1140
      fnError("udfc event loop unknown task type.")
1141 1142 1143
      break;
    }
  }
H
Haojun Liao 已提交
1144

1145
  return code;
1146
}
H
Haojun Liao 已提交
1147

1148
void udfcAsyncTaskCb(uv_async_t *async) {
1149
  SUdfcProxy *udfc = async->data;
S
shenglian zhou 已提交
1150
  QUEUE wq;
1151

1152 1153 1154
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1155

S
shenglian zhou 已提交
1156 1157 1158 1159
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1160 1161 1162
    int32_t code = udfcStartUvTask(task);
    if (code == 0) {
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
1163 1164 1165
    } else {
      task->errCode = code;
      uv_sem_post(&task->taskSem);
1166
    }
1167 1168 1169 1170
  }

}

1171
void cleanUpUvTasks(SUdfcProxy *udfc) {
S
slzhou 已提交
1172
  fnDebug("clean up uv tasks")
S
shenglian zhou 已提交
1173
  QUEUE wq;
1174

1175 1176 1177
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1178

S
shenglian zhou 已提交
1179 1180 1181 1182
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1183
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1184
      task->errCode = TSDB_CODE_UDF_STOPPING;
1185 1186 1187 1188
    }
    uv_sem_post(&task->taskSem);
  }

1189 1190
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
    QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
S
shenglian zhou 已提交
1191 1192
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
1193
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1194
      task->errCode = TSDB_CODE_UDF_STOPPING;
S
shenglian zhou 已提交
1195 1196 1197 1198
    }
    uv_sem_post(&task->taskSem);
  }
}
1199

S
shenglian zhou 已提交
1200
void udfStopAsyncCb(uv_async_t *async) {
1201
  SUdfcProxy *udfc = async->data;
1202
  cleanUpUvTasks(udfc);
1203 1204
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
    uv_stop(&udfc->uvLoop);
S
shenglian zhou 已提交
1205
  }
1206
}
S
shenglian zhou 已提交
1207

S
shenglian zhou 已提交
1208
void constructUdfService(void *argsThread) {
1209 1210 1211
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
  uv_loop_init(&udfc->uvLoop);

1212
  uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
1213 1214 1215 1216 1217 1218 1219
  udfc->loopTaskAync.data = udfc;
  uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
  udfc->loopStopAsync.data = udfc;
  uv_mutex_init(&udfc->taskQueueMutex);
  QUEUE_INIT(&udfc->taskQueue);
  QUEUE_INIT(&udfc->uvProcTaskQueue);
  uv_barrier_wait(&udfc->initBarrier);
1220
  //TODO return value of uv_run
1221 1222
  uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
  uv_loop_close(&udfc->uvLoop);
1223 1224
}

1225 1226 1227 1228 1229
int32_t udfcOpen() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
  if (old == 1) {
    return 0;
  }
1230
  SUdfcProxy *proxy = &gUdfdProxy;
1231
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
1232 1233 1234 1235 1236 1237
  proxy->udfcState = UDFC_STATE_STARTNG;
  uv_barrier_init(&proxy->initBarrier, 2);
  uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
  proxy->udfcState = UDFC_STATE_READY;
  uv_barrier_wait(&proxy->initBarrier);
1238 1239
  uv_mutex_init(&proxy->udfStubsMutex);
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1240
  fnInfo("udfc initialized")
1241 1242 1243
  return 0;
}

1244 1245 1246 1247 1248 1249
int32_t udfcClose() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
  if (old == 0) {
    return 0;
  }

1250 1251 1252 1253 1254 1255
  SUdfcProxy *udfc = &gUdfdProxy;
  udfc->udfcState = UDFC_STATE_STOPPING;
  uv_async_send(&udfc->loopStopAsync);
  uv_thread_join(&udfc->loopThread);
  uv_mutex_destroy(&udfc->taskQueueMutex);
  uv_barrier_destroy(&udfc->initBarrier);
1256 1257
  taosArrayDestroy(udfc->udfStubs);
  uv_mutex_destroy(&udfc->udfStubsMutex);
1258
  udfc->udfcState = UDFC_STATE_INITAL;
1259
  fnInfo("udfc cleaned up");
1260 1261 1262
  return 0;
}

S
slzhou 已提交
1263
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
1264 1265
  SClientUvTaskNode *uvTask = NULL;

S
slzhou 已提交
1266 1267 1268
  udfcCreateUvTask(task, uvTaskType, &uvTask);
  udfcQueueUvTask(uvTask);
  udfcGetUdfTaskResultFromUvTask(task, uvTask);
1269
  if (uvTaskType == UV_TASK_CONNECT) {
S
slzhou 已提交
1270 1271 1272
    task->session->udfUvPipe = uvTask->pipe;
    SClientUvConn *conn = uvTask->pipe->data;
    conn->session = task->session;
S
slzhou 已提交
1273 1274
  }
  taosMemoryFree(uvTask);
1275 1276 1277 1278
  uvTask = NULL;
  return task->errCode;
}

1279
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
1280
  if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
1281
    return TSDB_CODE_UDF_INVALID_STATE;
1282
  }
S
slzhou 已提交
1283
  SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
1284
  task->errCode = 0;
S
slzhou 已提交
1285
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
1286
  task->session->udfc = &gUdfdProxy;
1287 1288 1289
  task->type = UDF_TASK_SETUP;

  SUdfSetupRequest *req = &task->_setup.req;
S
slzhou 已提交
1290
  strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
1291

S
slzhou 已提交
1292
  int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
1293
  if (errCode != 0) {
1294
    fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
1295
    return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
H
Haojun Liao 已提交
1296
  }
1297

S
slzhou 已提交
1298
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1299 1300 1301

  SUdfSetupResponse *rsp = &task->_setup.rsp;
  task->session->severHandle = rsp->udfHandle;
S
shenglian zhou 已提交
1302 1303 1304
  task->session->outputType = rsp->outputType;
  task->session->outputLen = rsp->outputLen;
  task->session->bufSize = rsp->bufSize;
S
slzhou 已提交
1305
  strcpy(task->session->udfName, udfName);
1306
  if (task->errCode != 0) {
1307
    fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
1308
  } else {
S
slzhou 已提交
1309
    fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
1310 1311
    *funcHandle = task->session;
  }
1312
  int32_t err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1313
  taosMemoryFree(task);
1314
  return err;
H
Haojun Liao 已提交
1315 1316
}

S
slzhou 已提交
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
  return strcmp(stub1->udfName, stub2->udfName);
}

int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
  int32_t code = 0;
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  SUdfcFuncStub key = {0};
  strcpy(key.udfName, udfName);
  int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
  if (stubIndex != -1) {
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
    UdfcFuncHandle handle = foundStub->handle;
    if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
      *pHandle = foundStub->handle;
      ++foundStub->refCount;
      foundStub->lastRefTime = taosGetTimestampUs();
      uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
      return 0;
    } else {
      fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
             udfName, foundStub->refCount, foundStub->lastRefTime);
      taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
    }
  }
  *pHandle = NULL;
  code = doSetupUdf(udfName, pHandle);
  if (code == TSDB_CODE_SUCCESS) {
    SUdfcFuncStub stub = {0};
    strcpy(stub.udfName, udfName);
    stub.handle = *pHandle;
    ++stub.refCount;
    stub.lastRefTime = taosGetTimestampUs();
    taosArrayPush(gUdfdProxy.udfStubs, &stub);
    taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
  } else {
    *pHandle = NULL;
  }

  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
  return code;
}

void releaseUdfFuncHandle(char* udfName) {
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  SUdfcFuncStub key = {0};
  strcpy(key.udfName, udfName);
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
1367 1368 1369 1370 1371 1372
  if (!foundStub) {
    return;
  }
  if (foundStub->refCount > 0) {
    --foundStub->refCount;
  }
S
slzhou 已提交
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382
  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}

int32_t cleanUpUdfs() {
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  int32_t i = 0;
  SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
  while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
    SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
    if (stub->refCount == 0) {
1383
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
S
slzhou 已提交
1384 1385 1386 1387 1388 1389 1390 1391
      doTeardownUdf(stub->handle);
    } else {
      fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
             stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
      UdfcFuncHandle handle = stub->handle;
      if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
        taosArrayPush(udfStubs, stub);
      } else {
1392
        fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
S
slzhou 已提交
1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
               stub->udfName, stub->refCount, stub->lastRefTime);
      }
    }
    ++i;
  }
  taosArrayDestroy(gUdfdProxy.udfStubs);
  gUdfdProxy.udfStubs = udfStubs;
  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
  return 0;
}

1404
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1405
                SSDataBlock* output, SUdfInterBuf *newState) {
1406
  fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
S
slzhou 已提交
1407
  SUdfcUvSession *session = (SUdfcUvSession *) handle;
S
slzhou 已提交
1408 1409
  if (session->udfUvPipe == NULL) {
    fnError("No pipe to udfd");
1410
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1411 1412
  }
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1413
  task->errCode = 0;
S
slzhou 已提交
1414
  task->session = (SUdfcUvSession *) handle;
1415 1416 1417
  task->type = UDF_TASK_CALL;

  SUdfCallRequest *req = &task->_call.req;
S
slzhou 已提交
1418
  req->udfHandle = task->session->severHandle;
S
slzhou 已提交
1419
  req->callType = callType;
S
slzhou 已提交
1420

S
shenglian zhou 已提交
1421
  switch (callType) {
1422 1423 1424 1425
    case TSDB_UDF_CALL_AGG_INIT: {
      req->initFirst = 1;
      break;
    }
S
shenglian zhou 已提交
1426 1427 1428 1429 1430
    case TSDB_UDF_CALL_AGG_PROC: {
      req->block = *input;
      req->interBuf = *state;
      break;
    }
1431 1432 1433 1434 1435 1436
    case TSDB_UDF_CALL_AGG_MERGE: {
      req->interBuf = *state;
      req->interBuf2 = *state2;
      break;
    }
    case TSDB_UDF_CALL_AGG_FIN: {
S
shenglian zhou 已提交
1437 1438 1439 1440 1441 1442 1443 1444 1445
      req->interBuf = *state;
      break;
    }
    case TSDB_UDF_CALL_SCALA_PROC: {
      req->block = *input;
      break;
    }
  }

S
slzhou 已提交
1446
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1447

1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
  if (task->errCode != 0) {
    fnError("call udf failure. err: %d", task->errCode);
  } else {
    SUdfCallResponse *rsp = &task->_call.rsp;
    switch (callType) {
      case TSDB_UDF_CALL_AGG_INIT: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_PROC: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_MERGE: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_FIN: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_SCALA_PROC: {
        *output = rsp->resultData;
        break;
      }
S
shenglian zhou 已提交
1473
    }
S
slzhou 已提交
1474 1475
  };
  int err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1476
  taosMemoryFree(task);
S
slzhou 已提交
1477
  return err;
1478 1479
}

1480
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
S
slzhou 已提交
1481 1482 1483 1484 1485 1486 1487 1488 1489
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;

  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);

  return err;
}

// input: block, state
// output: interbuf,
1490
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
S
slzhou 已提交
1491 1492 1493 1494 1495 1496 1497
  int8_t callType = TSDB_UDF_CALL_AGG_PROC;
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
  return err;
}

// input: interbuf1, interbuf2
// output: resultBuf
1498
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
S
slzhou 已提交
1499 1500 1501 1502 1503 1504 1505
  int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
  return err;
}

// input: interBuf
// output: resultData
1506
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
S
slzhou 已提交
1507
  int8_t callType = TSDB_UDF_CALL_AGG_FIN;
S
slzhou 已提交
1508 1509 1510 1511
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
  return err;
}

1512
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
S
slzhou 已提交
1513
  int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
S
slzhou 已提交
1514 1515 1516 1517
  SSDataBlock inputBlock = {0};
  convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
  SSDataBlock resultBlock = {0};
  int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
S
slzhou 已提交
1518 1519
  if (err == 0) {
    convertDataBlockToScalarParm(&resultBlock, output);
S
slzhou 已提交
1520
    taosArrayDestroy(resultBlock.pDataBlock);
S
slzhou 已提交
1521
  }
S
slzhou 已提交
1522 1523

  taosArrayDestroy(inputBlock.pDataBlock);
S
slzhou 已提交
1524 1525 1526
  return err;
}

S
slzhou 已提交
1527

1528 1529
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
  UdfcFuncHandle handle = NULL;
S
slzhou 已提交
1530
  int32_t code = acquireUdfFuncHandle(udfName, &handle);
1531 1532 1533
  if (code != 0) {
    return code;
  }
1534
  SUdfcUvSession *session = handle;
1535
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
1536 1537
  if (output->columnData == NULL) {
    fnError("udfc scalar function calculate error. no column data");
1538
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
1539 1540 1541 1542 1543 1544
  } else {
    if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
              session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
    }
1545
  }
S
slzhou 已提交
1546
  releaseUdfFuncHandle(udfName);
1547 1548 1549 1550
  return code;
}

int32_t doTeardownUdf(UdfcFuncHandle handle) {
S
slzhou 已提交
1551
  SUdfcUvSession *session = (SUdfcUvSession *) handle;
S
slzhou 已提交
1552

S
slzhou 已提交
1553
  if (session->udfUvPipe == NULL) {
S
slzhou 已提交
1554
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
1555
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1556 1557 1558
  }

  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1559
  task->errCode = 0;
S
slzhou 已提交
1560
  task->session = session;
1561 1562 1563 1564 1565
  task->type = UDF_TASK_TEARDOWN;

  SUdfTeardownRequest *req = &task->_teardown.req;
  req->udfHandle = task->session->severHandle;

S
slzhou 已提交
1566
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1567 1568 1569

  int32_t err = task->errCode;

S
slzhou 已提交
1570
  udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
1571

S
slzhou 已提交
1572 1573
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);

1574
  taosMemoryFree(session);
wafwerar's avatar
wafwerar 已提交
1575
  taosMemoryFree(task);
1576 1577 1578

  return err;
}
S
shenglian zhou 已提交
1579

S
shenglian zhou 已提交
1580 1581 1582 1583 1584 1585 1586 1587
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
  int8_t finalResNum;
  int8_t interResNum;
  char* finalResBuf;
  char* interResBuf;
} SUdfAggRes;

S
shenglian zhou 已提交
1588
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
S
slzhou 已提交
1589
  if (fmIsScalarFunc(pFunc->funcId)) {
S
shenglian zhou 已提交
1590 1591
    return false;
  }
S
slzhou 已提交
1592
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
S
shenglian zhou 已提交
1593 1594 1595 1596 1597 1598 1599 1600
  return true;
}

bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
  if (functionSetup(pCtx, pResultCellInfo) != true) {
    return false;
  }
  UdfcFuncHandle handle;
1601
  int32_t udfCode = 0;
S
slzhou 已提交
1602
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
1603
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
S
shenglian zhou 已提交
1604 1605
    return false;
  }
S
slzhou 已提交
1606
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
S
shenglian zhou 已提交
1607 1608
  SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
  int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
S
shenglian zhou 已提交
1609
  memset(udfRes, 0, envSize);
S
shenglian zhou 已提交
1610

S
slzhou 已提交
1611 1612 1613
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;

S
shenglian zhou 已提交
1614
  SUdfInterBuf buf = {0};
1615 1616
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
1617
    releaseUdfFuncHandle(pCtx->udfName);
S
shenglian zhou 已提交
1618 1619
    return false;
  }
S
shenglian zhou 已提交
1620
  udfRes->interResNum = buf.numOfResult;
1621 1622 1623 1624
  if (buf.bufLen <= session->bufSize) {
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
  } else {
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
1625
    releaseUdfFuncHandle(pCtx->udfName);
1626 1627
    return false;
  }
1628
  releaseUdfFuncHandle(pCtx->udfName);
S
slzhou 已提交
1629
  freeUdfInterBuf(&buf);
S
shenglian zhou 已提交
1630 1631 1632 1633
  return true;
}

int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
1634 1635 1636 1637 1638 1639
  int32_t udfCode = 0;
  UdfcFuncHandle handle = 0;
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
    return udfCode;
  }
S
shenglian zhou 已提交
1640

1641
  SUdfcUvSession *session = handle;
S
slzhou 已提交
1642 1643 1644
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1645

1646 1647
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t numOfCols = pInput->numOfInputCols;
S
shenglian zhou 已提交
1648 1649 1650 1651
  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;


1652 1653
  SSDataBlock tempBlock = {0};
  tempBlock.info.numOfCols = numOfCols;
1654
  tempBlock.info.rows = pInput->totalRows;
1655
  tempBlock.info.uid = pInput->uid;
S
shenglian zhou 已提交
1656
  bool hasVarCol = false;
1657
  tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
S
shenglian zhou 已提交
1658 1659 1660 1661 1662 1663

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData *col = pInput->pData[i];
    if (IS_VAR_DATA_TYPE(col->info.type)) {
      hasVarCol = true;
    }
1664
    taosArrayPush(tempBlock.pDataBlock, col);
S
shenglian zhou 已提交
1665
  }
1666
  tempBlock.info.hasVarCol = hasVarCol;
S
shenglian zhou 已提交
1667

1668 1669
  SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);

S
slzhou 已提交
1670
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1671
                        .bufLen = session->bufSize,
S
slzhou 已提交
1672
                        .numOfResult = udfRes->interResNum};
S
shenglian zhou 已提交
1673
  SUdfInterBuf newState = {0};
1674

1675
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
1676 1677 1678 1679 1680
  if (udfCode != 0) {
    fnError("udfAggProcess error. code: %d", udfCode);
    newState.numOfResult = 0;
  } else {
    udfRes->interResNum = newState.numOfResult;
1681 1682 1683 1684 1685 1686
    if (newState.bufLen <= session->bufSize) {
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
    } else {
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
    }
1687
  }
S
slzhou 已提交
1688 1689 1690
  if (newState.numOfResult == 1 || state.numOfResult == 1) {
    GET_RES_INFO(pCtx)->numOfRes = 1;
  }
S
shenglian zhou 已提交
1691

1692 1693
  blockDataDestroy(inputBlock);
  taosArrayDestroy(tempBlock.pDataBlock);
S
shenglian zhou 已提交
1694

1695
  releaseUdfFuncHandle(pCtx->udfName);
S
slzhou 已提交
1696
  freeUdfInterBuf(&newState);
1697
  return udfCode;
S
shenglian zhou 已提交
1698 1699 1700
}

int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
1701 1702 1703 1704 1705
  int32_t udfCode = 0;
  UdfcFuncHandle handle = 0;
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
    return udfCode;
1706
  }
1707 1708 1709

  SUdfcUvSession *session = handle;
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
S
slzhou 已提交
1710 1711
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1712 1713


S
slzhou 已提交
1714
  SUdfInterBuf resultBuf = {0};
S
slzhou 已提交
1715
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1716
                        .bufLen = session->bufSize,
S
slzhou 已提交
1717
                        .numOfResult = udfRes->interResNum};
1718
  int32_t udfCallCode= 0;
1719 1720 1721
  udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
  if (udfCallCode != 0) {
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
1722 1723
    GET_RES_INFO(pCtx)->numOfRes = 0;
  } else {
1724 1725 1726 1727 1728 1729 1730 1731 1732
    if (resultBuf.bufLen <= session->outputLen) {
      memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
      udfRes->finalResNum = resultBuf.numOfResult;
      GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
    } else {
      fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
      GET_RES_INFO(pCtx)->numOfRes = 0;
      udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
    }
1733
  }
S
shenglian zhou 已提交
1734

S
slzhou 已提交
1735 1736
  freeUdfInterBuf(&resultBuf);

1737
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
S
slzhou 已提交
1738
  releaseUdfFuncHandle(pCtx->udfName);
1739
  return udfCallCode == 0 ? numOfResults : udfCallCode;
S
slzhou 已提交
1740
}