tudf.c 52.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
#include "os.h"
S
slzhou 已提交
17
#include "fnLog.h"
18
#include "tudf.h"
19
#include "tudfInt.h"
S
shenglian zhou 已提交
20
#include "tarray.h"
S
slzhou 已提交
21
#include "tglobal.h"
S
slzhou 已提交
22
#include "tdatablock.h"
S
shenglian zhou 已提交
23 24
#include "querynodes.h"
#include "builtinsimpl.h"
S
slzhou 已提交
25
#include "functionMgt.h"
26

27
//TODO: add unit test
S
shenglian zhou 已提交
28
//TODO: include all global variable under context struct
S
slzhou 已提交
29

30 31 32 33 34 35 36
typedef struct SUdfdData {
  bool          startCalled;
  bool          needCleanUp;
  uv_loop_t     loop;
  uv_thread_t   thread;
  uv_barrier_t  barrier;
  uv_process_t  process;
37 38 39
#ifdef WINDOWS
  HANDLE        jobHandle;
#endif
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  int           spawnErr;
  uv_pipe_t     ctrlPipe;
  uv_async_t    stopAsync;
  int32_t        stopCalled;

  int32_t         dnodeId;
} SUdfdData;

SUdfdData udfdGlobal = {0};

static int32_t udfSpawnUdfd(SUdfdData *pData);

void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
  SUdfdData *pData = process->data;
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
  } else {
    fnInfo("udfd process restart");
    udfSpawnUdfd(pData);
  }
}

static int32_t udfSpawnUdfd(SUdfdData* pData) {
  fnInfo("dnode start spawning udfd");
  uv_process_options_t options = {0};

  char path[PATH_MAX] = {0};
  if (tsProcPath == NULL) {
    path[0] = '.';
  } else {
    strncpy(path, tsProcPath, strlen(tsProcPath));
    taosDirName(path);
  }
#ifdef WINDOWS
  strcat(path, "udfd.exe");
#else
  strcat(path, "/udfd");
#endif
  char* argsUdfd[] = {path, "-c", configDir, NULL};
  options.args = argsUdfd;
  options.file = path;

  options.exit_cb = udfUdfdExit;

  uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);

  uv_stdio_container_t child_stdio[3];
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
  child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
  child_stdio[1].flags = UV_IGNORE;
  child_stdio[2].flags = UV_INHERIT_FD;
  child_stdio[2].data.fd = 2;
  options.stdio_count = 3;
  options.stdio = child_stdio;

  options.flags = UV_PROCESS_DETACHED;

  char dnodeIdEnvItem[32] = {0};
  char thrdPoolSizeEnvItem[32] = {0};
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
  float numCpuCores = 4;
  taosGetCpuCores(&numCpuCores);
  snprintf(thrdPoolSizeEnvItem,32,  "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
  char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
  options.env = envUdfd;

  int err = uv_spawn(&pData->loop, &pData->process, &options);
  pData->process.data = (void*)pData;

110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
#ifdef WINDOWS
  // End udfd.exe by Job.
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
  pData->jobHandle = CreateJobObject(NULL, NULL);
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
  if (!add_job_ok) {
    fnError("Assign udfd to job failed.");
  } else {
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
    memset(&limit_info, 0x0, sizeof(limit_info));
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
    bool set_auto_kill_ok = SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
    if (!set_auto_kill_ok) {
      fnError("Set job auto kill udfd failed.");
    }
  }
#endif

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
  if (err != 0) {
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
  }
  return err;
}

static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}

static void udfUdfdStopAsyncCb(uv_async_t *async) {
  SUdfdData *pData = async->data;
  uv_stop(&pData->loop);
}

static void udfWatchUdfd(void *args) {
  SUdfdData *pData = args;
  uv_loop_init(&pData->loop);
  uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
  pData->stopAsync.data = pData;
  int32_t err = udfSpawnUdfd(pData);
  atomic_store_32(&pData->spawnErr, err);
  uv_barrier_wait(&pData->barrier);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);

  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);
  return;
}

int32_t udfStartUdfd(int32_t startDnodeId) {
S
slzhou 已提交
163 164
  if (!tsStartUdfd) {
    fnInfo("start udfd is disabled.")
S
slzhou 已提交
165
    return 0;
S
slzhou 已提交
166
  }
167 168
  SUdfdData *pData = &udfdGlobal;
  if (pData->startCalled) {
S
Shengliang Guan 已提交
169
    fnInfo("dnode start udfd already called");
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
    return 0;
  }
  pData->startCalled = true;
  char dnodeId[8] = {0};
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
  uv_os_setenv("DNODE_ID", dnodeId);
  pData->dnodeId = startDnodeId;

  uv_barrier_init(&pData->barrier, 2);
  uv_thread_create(&pData->thread, udfWatchUdfd, pData);
  uv_barrier_wait(&pData->barrier);
  int32_t err = atomic_load_32(&pData->spawnErr);
  if (err != 0) {
    uv_barrier_destroy(&pData->barrier);
    uv_async_send(&pData->stopAsync);
    uv_thread_join(&pData->thread);
    pData->needCleanUp = false;
S
Shengliang Guan 已提交
187
    fnInfo("dnode udfd cleaned up after spawn err");
188 189 190 191 192 193 194 195
  } else {
    pData->needCleanUp = true;
  }
  return err;
}

int32_t udfStopUdfd() {
  SUdfdData *pData = &udfdGlobal;
S
Shengliang Guan 已提交
196
  fnInfo("dnode to stop udfd. need cleanup: %d, spawn err: %d",
197 198 199 200 201 202 203 204 205
        pData->needCleanUp, pData->spawnErr);
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
    return 0;
  }
  atomic_store_32(&pData->stopCalled, 1);
  pData->needCleanUp = false;
  uv_barrier_destroy(&pData->barrier);
  uv_async_send(&pData->stopAsync);
  uv_thread_join(&pData->thread);
206 207 208
#ifdef WINDOWS
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
#endif
S
Shengliang Guan 已提交
209
  fnInfo("dnode udfd cleaned up");
210 211 212 213
  return 0;
}

//==============================================================================================
S
shenglian zhou 已提交
214 215 216 217
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 * The QUEUE is copied from queue.h under libuv
 * */

S
shenglian zhou 已提交
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - offsetof(type, field)))

/* Important note: mutating the list while QUEUE_FOREACH is
 * iterating over its elements results in undefined behavior.
 */
#define QUEUE_FOREACH(q, h)                                                   \
  for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

#define QUEUE_EMPTY(q)                                                        \
  ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))

#define QUEUE_HEAD(q)                                                         \
  (QUEUE_NEXT(q))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_ADD(h, n)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
  }                                                                           \
  while (0)

#define QUEUE_SPLIT(h, q, n)                                                  \
  do {                                                                        \
    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(n) = (n);                                                 \
    QUEUE_NEXT(n) = (q);                                                      \
    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
    QUEUE_PREV(q) = (n);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_MOVE(h, n)                                                      \
  do {                                                                        \
    if (QUEUE_EMPTY(h))                                                       \
      QUEUE_INIT(n);                                                          \
    else {                                                                    \
      QUEUE* q = QUEUE_HEAD(h);                                               \
      QUEUE_SPLIT(h, q, n);                                                   \
    }                                                                         \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_HEAD(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
    QUEUE_PREV(q) = (h);                                                      \
    QUEUE_NEXT_PREV(q) = (q);                                                 \
    QUEUE_NEXT(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_TAIL(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = (h);                                                      \
    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(q) = (q);                                                 \
    QUEUE_PREV(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_REMOVE(q)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
  }                                                                           \
  while (0)


306 307 308 309 310 311
enum {
  UV_TASK_CONNECT = 0,
  UV_TASK_REQ_RSP = 1,
  UV_TASK_DISCONNECT = 2
};

312
int64_t gUdfTaskSeqNum = 0;
313 314 315 316 317
typedef struct SUdfcFuncStub {
  char udfName[TSDB_FUNC_NAME_LEN];
  UdfcFuncHandle handle;
} SUdfcFuncStub;

318
typedef struct SUdfcProxy {
319
  char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
320
  uv_barrier_t initBarrier;
321

322 323 324
  uv_loop_t   uvLoop;
  uv_thread_t loopThread;
  uv_async_t  loopTaskAync;
325

326
  uv_async_t loopStopAsync;
327

328 329 330 331
  uv_mutex_t taskQueueMutex;
  int8_t     udfcState;
  QUEUE      taskQueue;
  QUEUE      uvProcTaskQueue;
332

333 334 335
  uv_mutex_t udfStubsMutex;
  SArray*    udfStubs; // SUdfcFuncStub

336
  int8_t initialized;
337
} SUdfcProxy;
338

339
SUdfcProxy gUdfdProxy = {0};
340

S
slzhou 已提交
341
typedef struct SClientUdfUvSession {
342
  SUdfcProxy *udfc;
343
  int64_t severHandle;
S
slzhou 已提交
344
  uv_pipe_t *udfUvPipe;
S
shenglian zhou 已提交
345 346 347 348

  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
S
slzhou 已提交
349
} SClientUdfUvSession;
350 351

typedef struct SClientUvTaskNode {
352
  SUdfcProxy *udfc;
353 354 355 356 357 358 359 360 361 362 363
  int8_t type;
  int errCode;

  uv_pipe_t *pipe;

  int64_t seqNum;
  uv_buf_t reqBuf;

  uv_sem_t taskSem;
  uv_buf_t rspBuf;

S
shenglian zhou 已提交
364 365 366
  QUEUE recvTaskQueue;
  QUEUE procTaskQueue;
  QUEUE connTaskQueue;
367 368 369 370 371
} SClientUvTaskNode;

typedef struct SClientUdfTask {
  int8_t type;

S
slzhou 已提交
372
  SClientUdfUvSession *session;
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401

  int32_t errCode;

  union {
    struct {
      SUdfSetupRequest req;
      SUdfSetupResponse rsp;
    } _setup;
    struct {
      SUdfCallRequest req;
      SUdfCallResponse rsp;
    } _call;
    struct {
      SUdfTeardownRequest req;
      SUdfTeardownResponse rsp;
    } _teardown;
  };

} SClientUdfTask;

typedef struct SClientConnBuf {
  char *buf;
  int32_t len;
  int32_t cap;
  int32_t total;
} SClientConnBuf;

typedef struct SClientUvConn {
  uv_pipe_t *pipe;
S
shenglian zhou 已提交
402
  QUEUE taskQueue;
403
  SClientConnBuf readBuf;
S
slzhou 已提交
404
  SClientUdfUvSession *session;
405 406
} SClientUvConn;

407 408
enum {
  UDFC_STATE_INITAL = 0, // initial state
409
  UDFC_STATE_STARTNG, // starting after udfcOpen
410
  UDFC_STATE_READY, // started and begin to receive quests
411
  UDFC_STATE_STOPPING, // stopping after udfcClose
412
};
413

414 415
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
  char    dnodeId[8] = {0};
wafwerar's avatar
wafwerar 已提交
416
  size_t  dnodeIdSize = sizeof(dnodeId);
417 418
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
  if (err != 0) {
419
    fnError("get dnode id from env. error: %s.", uv_err_name(err));
420 421
    dnodeId[0] = '1';
  }
422
#ifdef _WIN32
423
  snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
424 425 426 427
#else
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#endif
  fnInfo("get dnode id from env. dnode id: %s. pipe path: %s", dnodeId, pipeName);
428 429 430
  return 0;
}

431 432 433
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
  int32_t len = 0;
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
S
shenglian zhou 已提交
434 435
  return len;
}
436

437 438 439
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
  return (void*)buf;
S
shenglian zhou 已提交
440
}
441

442 443
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
  int32_t len = 0;
444
  len += taosEncodeFixedI8(buf, state->numOfResult);
445 446
  len += taosEncodeFixedI32(buf, state->bufLen);
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
S
shenglian zhou 已提交
447 448
  return len;
}
449

450
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
451
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
452 453 454
  buf = taosDecodeFixedI32(buf, &state->bufLen);
  buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
  return (void*)buf;
S
shenglian zhou 已提交
455 456
}

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, call->udfHandle);
  len += taosEncodeFixedI8(buf, call->callType);
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
    len += taosEncodeFixedI8(buf, call->initFirst);
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
    len += encodeUdfInterBuf(buf, &call->interBuf);
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
    len += encodeUdfInterBuf(buf, &call->interBuf2);
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
473
  }
474
  return len;
475 476
}

477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
  buf = taosDecodeFixedI8(buf, &call->callType);
  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
      buf = taosDecodeFixedI8(buf, &call->initFirst);
      break;
    case TSDB_UDF_CALL_AGG_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
      break;
    case TSDB_UDF_CALL_AGG_FIN:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
498
  }
499
  return (void*)buf;
S
shenglian zhou 已提交
500 501
}

502 503 504 505
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
  return len;
S
shenglian zhou 已提交
506 507
}

508 509 510
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) {
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
  return (void*)buf;
S
shenglian zhou 已提交
511 512
}

513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(request->msgLen);
  } else {
    *(int32_t*)(*buf) = request->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
  }
  len += taosEncodeFixedI64(buf, request->seqNum);
  len += taosEncodeFixedI8(buf, request->type);
  if (request->type == UDF_TASK_SETUP) {
    len += encodeUdfSetupRequest(buf, &request->setup);
  } else if (request->type == UDF_TASK_CALL) {
    len += encodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    len += encodeUdfTeardownRequest(buf, &request->teardown);
  }
  return len;
S
shenglian zhou 已提交
531 532
}

533 534
void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
  request->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
535
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
S
shenglian zhou 已提交
536

537 538
  buf = taosDecodeFixedI64(buf, &request->seqNum);
  buf = taosDecodeFixedI8(buf, &request->type);
S
shenglian zhou 已提交
539 540

  if (request->type == UDF_TASK_SETUP) {
541
    buf = decodeUdfSetupRequest(buf, &request->setup);
S
shenglian zhou 已提交
542
  } else if (request->type == UDF_TASK_CALL) {
543 544 545
    buf = decodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
S
shenglian zhou 已提交
546
  }
547
  return (void*)buf;
S
shenglian zhou 已提交
548
}
549

550 551 552
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
S
shenglian zhou 已提交
553 554 555
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
  len += taosEncodeFixedI32(buf, setupRsp->outputLen);
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
556 557
  return len;
}
558

559 560
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
S
shenglian zhou 已提交
561 562 563
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
  buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
564
  return (void*)buf;
S
shenglian zhou 已提交
565
}
566

567 568 569 570 571 572
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI8(buf, callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      len += tEncodeDataBlock(buf, &callRsp->resultData);
573
      break;
574
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
575
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
576 577
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
578
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
579 580
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
581
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
582 583
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
584
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
585 586
      break;
  }
587
  return len;
S
shenglian zhou 已提交
588 589
}

590 591 592 593 594 595 596
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
597
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
598 599
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
600
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
601 602
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
603
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
604 605
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
606
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
607
      break;
S
shenglian zhou 已提交
608
  }
609
  return (void*)buf;
S
shenglian zhou 已提交
610 611
}

612 613
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) {
  return 0;
S
shenglian zhou 已提交
614 615
}

616 617
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) {
  return (void*)buf;
S
shenglian zhou 已提交
618 619
}

620 621 622 623 624 625 626
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(rsp->msgLen);
  } else {
    *(int32_t*)(*buf) = rsp->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
S
shenglian zhou 已提交
627 628
  }

S
slzhou 已提交
629 630 631 632 633 634 635
  if (buf == NULL) {
    len += sizeof(rsp->seqNum);
  } else {
    *(int64_t*)(*buf) = rsp->seqNum;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
  }

636 637 638
  len += taosEncodeFixedI64(buf, rsp->seqNum);
  len += taosEncodeFixedI8(buf, rsp->type);
  len += taosEncodeFixedI32(buf, rsp->code);
S
shenglian zhou 已提交
639

640 641 642 643 644 645 646 647 648 649 650 651 652 653 654
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
  }
  return len;
S
shenglian zhou 已提交
655 656
}

657 658
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
  rsp->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
659
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
S
slzhou 已提交
660 661
  rsp->seqNum = *(int64_t*)(buf);
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
662 663 664
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
  buf = taosDecodeFixedI8(buf, &rsp->type);
  buf = taosDecodeFixedI32(buf, &rsp->code);
S
shenglian zhou 已提交
665

666 667 668 669 670 671 672 673 674 675 676 677 678
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
679
  }
680
  return (void*)buf;
681
}
682

S
shenglian zhou 已提交
683 684
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
  if (IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
685 686 687 688
    taosMemoryFree(data->varLenCol.varOffsets);
    data->varLenCol.varOffsets = NULL;
    taosMemoryFree(data->varLenCol.payload);
    data->varLenCol.payload = NULL;
S
shenglian zhou 已提交
689
  } else {
S
slzhou 已提交
690 691 692 693
    taosMemoryFree(data->fixLenCol.nullBitmap);
    data->fixLenCol.nullBitmap = NULL;
    taosMemoryFree(data->fixLenCol.data);
    data->fixLenCol.data = NULL;
S
shenglian zhou 已提交
694 695 696 697
  }
}

void freeUdfColumn(SUdfColumn* col) {
S
shenglian zhou 已提交
698
  freeUdfColumnData(&col->colData, &col->colMeta);
S
shenglian zhou 已提交
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
}

void freeUdfDataDataBlock(SUdfDataBlock *block) {
  for (int32_t i = 0; i < block->numOfCols; ++i) {
    freeUdfColumn(block->udfCols[i]);
    taosMemoryFree(block->udfCols[i]);
    block->udfCols[i] = NULL;
  }
  taosMemoryFree(block->udfCols);
  block->udfCols = NULL;
}

void freeUdfInterBuf(SUdfInterBuf *buf) {
  taosMemoryFree(buf->buf);
  buf->buf = NULL;
}

S
slzhou 已提交
716 717 718 719

int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
  udfBlock->numOfRows = block->info.rows;
  udfBlock->numOfCols = block->info.numOfCols;
S
slzhou 已提交
720
  udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*));
S
slzhou 已提交
721
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
S
slzhou 已提交
722
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
S
slzhou 已提交
723 724 725 726 727 728 729
    SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
    SUdfColumn *udfCol = udfBlock->udfCols[i];
    udfCol->colMeta.type = col->info.type;
    udfCol->colMeta.bytes = col->info.bytes;
    udfCol->colMeta.scale = col->info.scale;
    udfCol->colMeta.precision = col->info.precision;
    udfCol->colData.numOfRows = udfBlock->numOfRows;
S
slzhou@taodata.com 已提交
730
    udfCol->hasNull = col->hasNull;
S
shenglian zhou 已提交
731
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
S
slzhou 已提交
732 733 734 735 736 737
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
      memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
S
slzhou 已提交
738
    } else {
S
slzhou 已提交
739 740 741 742 743 744 745 746 747 748
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
      char* bitmap = udfCol->colData.fixLenCol.nullBitmap;
      memcpy(bitmap, col->nullbitmap, bitmapLen);
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
      char* data = udfCol->colData.fixLenCol.data;
      memcpy(data, col->pData, dataLen);
S
slzhou 已提交
749 750 751 752 753 754 755 756
    }
  }
  return 0;
}

int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
  block->info.numOfCols = 1;
  block->info.rows = udfCol->colData.numOfRows;
S
shenglian zhou 已提交
757
  block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
S
slzhou 已提交
758 759 760 761 762 763 764 765 766

  block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
  taosArraySetSize(block->pDataBlock, 1);
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
  SUdfColumnMeta *meta = &udfCol->colMeta;
  col->info.precision = meta->precision;
  col->info.bytes = meta->bytes;
  col->info.scale = meta->scale;
  col->info.type = meta->type;
S
slzhou@taodata.com 已提交
767
  col->hasNull = udfCol->hasNull;
S
slzhou 已提交
768 769 770
  SUdfColumnData *data = &udfCol->colData;

  if (!IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
771 772 773 774
    col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
    memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
    col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
    memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
S
slzhou 已提交
775
  } else {
S
slzhou 已提交
776 777 778 779
    col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
    memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
    col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
    memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
S
slzhou 已提交
780 781 782 783
  }
  return 0;
}

S
slzhou 已提交
784 785 786 787 788 789 790 791 792 793 794 795 796 797
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
  output->info.rows = input->numOfRows;
  output->info.numOfCols = numOfCols;
  bool hasVarCol = false;
  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) {
      hasVarCol = true;
      break;
    }
  }
  output->info.hasVarCol = hasVarCol;

  //TODO: free the array output->pDataBlock
  output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
798 799 800
  for (int32_t i = 0; i < numOfCols; ++i) {
    taosArrayPush(output->pDataBlock, (input + i)->columnData);
  }
S
slzhou 已提交
801 802 803 804 805 806 807 808 809 810 811 812 813
  return 0;
}

int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
  if (input->info.numOfCols != 1) {
    fnError("scalar function only support one column");
    return -1;
  }
  output->numOfRows = input->info.rows;
  //TODO: memory
  output->columnData = taosArrayGet(input->pDataBlock, 0);
  return 0;
}
S
slzhou 已提交
814

815 816
void onUdfcPipeClose(uv_handle_t *handle) {
  SClientUvConn *conn = handle->data;
S
shenglian zhou 已提交
817 818 819
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
820
    task->errCode = 0;
S
shenglian zhou 已提交
821
    QUEUE_REMOVE(&task->procTaskQueue);
S
slzhou 已提交
822
    uv_sem_post(&task->taskSem);
823
  }
S
slzhou 已提交
824
  conn->session->udfUvPipe = NULL;
wafwerar's avatar
wafwerar 已提交
825 826 827
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
  taosMemoryFree((uv_pipe_t *) handle);
828

S
slzhou 已提交
829
  //clear the udf handles cache TODO move to other thread
830 831 832
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  taosArrayClear(gUdfdProxy.udfStubs);
  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
833 834
}

S
slzhou 已提交
835 836
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
837 838
  if (uvTask->type == UV_TASK_REQ_RSP) {
    if (uvTask->rspBuf.base != NULL) {
S
shenglian zhou 已提交
839
      SUdfResponse rsp;
840 841
      void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
      assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
S
shenglian zhou 已提交
842
      task->errCode = rsp.code;
843 844 845

      switch (task->type) {
        case UDF_TASK_SETUP: {
S
shenglian zhou 已提交
846
          //TODO: copy or not
S
shenglian zhou 已提交
847
          task->_setup.rsp = rsp.setupRsp;
848 849 850
          break;
        }
        case UDF_TASK_CALL: {
S
shenglian zhou 已提交
851
          task->_call.rsp = rsp.callRsp;
S
shenglian zhou 已提交
852
          //TODO: copy or not
853 854 855
          break;
        }
        case UDF_TASK_TEARDOWN: {
S
shenglian zhou 已提交
856
          task->_teardown.rsp = rsp.teardownRsp;
857 858 859 860 861 862 863 864 865
          //TODO: copy or not?
          break;
        }
        default: {
          break;
        }
      }

      // TODO: the call buffer is setup and freed by udf invocation
wafwerar's avatar
wafwerar 已提交
866
      taosMemoryFree(uvTask->rspBuf.base);
867 868
    } else {
      task->errCode = uvTask->errCode;
869
    }
870 871 872 873
  } else if (uvTask->type == UV_TASK_CONNECT) {
    task->errCode = uvTask->errCode;
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
    task->errCode = uvTask->errCode;
874
  }
875 876 877 878 879 880 881 882 883
  return 0;
}

void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
  SClientUvConn *conn = handle->data;
  SClientConnBuf *connBuf = &conn->readBuf;

  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (connBuf->cap == 0) {
wafwerar's avatar
wafwerar 已提交
884
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
885 886 887 888 889 890 891 892
    if (connBuf->buf) {
      connBuf->len = 0;
      connBuf->cap = msgHeadSize;
      connBuf->total = -1;

      buf->base = connBuf->buf;
      buf->len = connBuf->cap;
    } else {
893
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
894 895 896 897 898
      buf->base = NULL;
      buf->len = 0;
    }
  } else {
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
wafwerar's avatar
wafwerar 已提交
899
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
900 901 902 903 904
    if (resultBuf) {
      connBuf->buf = resultBuf;
      buf->base = connBuf->buf + connBuf->len;
      buf->len = connBuf->cap - connBuf->len;
    } else {
905
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
906 907 908 909 910
      buf->base = NULL;
      buf->len = 0;
    }
  }

911
  fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
912 913 914

}

915 916 917 918 919
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
    connBuf->total = *(int32_t *) (connBuf->buf);
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
920
    fnTrace("udfc complete message is received, now handle it");
921
    return true;
922
  }
923 924 925 926 927
  return false;
}

void udfcUvHandleRsp(SClientUvConn *conn) {
  SClientConnBuf *connBuf = &conn->readBuf;
928
  int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
929

S
shenglian zhou 已提交
930
  if (QUEUE_EMPTY(&conn->taskQueue)) {
931
    fnError("udfc no task waiting for response on connection");
932 933 934 935
    return;
  }
  bool found = false;
  SClientUvTaskNode *taskFound = NULL;
S
shenglian zhou 已提交
936 937 938 939
  QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);

  while (h != &conn->taskQueue) {
940 941 942 943 944
    if (task->seqNum == seqNum) {
      if (found == false) {
        found = true;
        taskFound = task;
      } else {
945
        fnError("udfc more than one task waiting for the same response");
946 947
        continue;
      }
948
    }
S
shenglian zhou 已提交
949 950
    h = QUEUE_NEXT(h);
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
951 952
  }

953 954
  if (taskFound) {
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
S
shenglian zhou 已提交
955 956
    QUEUE_REMOVE(&taskFound->connTaskQueue);
    QUEUE_REMOVE(&taskFound->procTaskQueue);
S
slzhou 已提交
957
    uv_sem_post(&taskFound->taskSem);
958
  } else {
959
    fnError("no task is waiting for the response.");
960 961 962 963 964 965
  }
  connBuf->buf = NULL;
  connBuf->total = -1;
  connBuf->len = 0;
  connBuf->cap = 0;
}
966

967
void udfcUvHandleError(SClientUvConn *conn) {
S
shenglian zhou 已提交
968 969 970
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
971
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
S
slzhou 已提交
972
    QUEUE_REMOVE(&task->connTaskQueue);
S
shenglian zhou 已提交
973
    QUEUE_REMOVE(&task->procTaskQueue);
S
slzhou 已提交
974
    uv_sem_post(&task->taskSem);
S
shenglian zhou 已提交
975 976
  }

S
slzhou 已提交
977
  uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
978 979
}

980
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
981
  fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
982 983 984 985 986 987 988 989 990 991 992 993
  if (nread == 0) return;

  SClientUvConn *conn = client->data;
  SClientConnBuf *connBuf = &conn->readBuf;
  if (nread > 0) {
    connBuf->len += nread;
    if (isUdfcUvMsgComplete(connBuf)) {
      udfcUvHandleRsp(conn);
    }

  }
  if (nread < 0) {
S
slzhou 已提交
994
    fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
995
    if (nread == UV_EOF) {
S
slzhou 已提交
996
      fnError("\tudfc client pipe %p closed", client);
997 998
    }
    udfcUvHandleError(conn);
999 1000 1001 1002
  }

}

1003
void onUdfcPipetWrite(uv_write_t *write, int status) {
1004
  SClientUvTaskNode *uvTask = write->data;
1005
  uv_pipe_t *pipe = uvTask->pipe;
1006 1007
  fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
  SClientUvConn *conn = pipe->data;
1008
  if (status == 0) {
S
shenglian zhou 已提交
1009
    QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
1010
  } else {
1011
    fnError("udfc client %p write error.", pipe);
1012
    udfcUvHandleError(conn);
1013
  }
wafwerar's avatar
wafwerar 已提交
1014 1015
  taosMemoryFree(write);
  taosMemoryFree(uvTask->reqBuf.base);
1016
}
H
Haojun Liao 已提交
1017

1018
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
1019 1020
  SClientUvTaskNode *uvTask = connect->data;
  if (status != 0) {
1021
    fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status));
H
Haojun Liao 已提交
1022
  }
1023 1024 1025
  uvTask->errCode = status;

  uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
wafwerar's avatar
wafwerar 已提交
1026
  taosMemoryFree(connect);
S
shenglian zhou 已提交
1027
  QUEUE_REMOVE(&uvTask->procTaskQueue);
1028
  uv_sem_post(&uvTask->taskSem);
1029
}
H
Haojun Liao 已提交
1030

S
slzhou 已提交
1031
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
wafwerar's avatar
wafwerar 已提交
1032
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
1033
  uvTask->type = uvTaskType;
1034
  uvTask->udfc = task->session->udfc;
1035 1036 1037

  if (uvTaskType == UV_TASK_CONNECT) {
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
S
slzhou 已提交
1038
    uvTask->pipe = task->session->udfUvPipe;
1039 1040
    SUdfRequest request;
    request.type = task->type;
1041
    request.seqNum =   atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
1042 1043

    if (task->type == UDF_TASK_SETUP) {
S
shenglian zhou 已提交
1044
      request.setup = task->_setup.req;
1045 1046
      request.type = UDF_TASK_SETUP;
    } else if (task->type == UDF_TASK_CALL) {
S
shenglian zhou 已提交
1047
      request.call = task->_call.req;
1048 1049
      request.type = UDF_TASK_CALL;
    } else if (task->type == UDF_TASK_TEARDOWN) {
S
shenglian zhou 已提交
1050
      request.teardown = task->_teardown.req;
1051 1052 1053 1054
      request.type = UDF_TASK_TEARDOWN;
    } else {
      //TODO log and return error
    }
1055 1056
    int32_t bufLen = encodeUdfRequest(NULL, &request);
    request.msgLen = bufLen;
S
slzhou 已提交
1057 1058
    void *bufBegin = taosMemoryMalloc(bufLen);
    void *buf = bufBegin;
1059
    encodeUdfRequest(&buf, &request);
S
slzhou 已提交
1060
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
1061 1062
    uvTask->seqNum = request.seqNum;
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
S
slzhou 已提交
1063
    uvTask->pipe = task->session->udfUvPipe;
1064 1065
  }
  uv_sem_init(&uvTask->taskSem, 0);
H
Haojun Liao 已提交
1066

1067 1068 1069
  *pUvTask = uvTask;
  return 0;
}
H
Haojun Liao 已提交
1070

S
slzhou 已提交
1071
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
1072
  fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
1073 1074 1075 1076 1077
  SUdfcProxy *udfc = uvTask->udfc;
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
  uv_mutex_unlock(&udfc->taskQueueMutex);
  uv_async_send(&udfc->loopTaskAync);
H
Haojun Liao 已提交
1078

1079
  uv_sem_wait(&uvTask->taskSem);
S
slzhou 已提交
1080
  fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
1081
  uv_sem_destroy(&uvTask->taskSem);
H
Haojun Liao 已提交
1082

1083 1084
  return 0;
}
H
Haojun Liao 已提交
1085

S
slzhou 已提交
1086
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
1087
  fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
1088 1089
  int32_t code = 0;

1090 1091
  switch (uvTask->type) {
    case UV_TASK_CONNECT: {
wafwerar's avatar
wafwerar 已提交
1092
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
1093
      uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
1094
      uvTask->pipe = pipe;
H
Haojun Liao 已提交
1095

S
slzhou 已提交
1096
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
1097 1098 1099 1100 1101
      conn->pipe = pipe;
      conn->readBuf.len = 0;
      conn->readBuf.cap = 0;
      conn->readBuf.buf = 0;
      conn->readBuf.total = -1;
S
shenglian zhou 已提交
1102
      QUEUE_INIT(&conn->taskQueue);
H
Haojun Liao 已提交
1103

1104 1105
      pipe->data = conn;

wafwerar's avatar
wafwerar 已提交
1106
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
1107
      connReq->data = uvTask;
1108
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
1109
      code = 0;
H
Haojun Liao 已提交
1110
      break;
1111 1112 1113
    }
    case UV_TASK_REQ_RSP: {
      uv_pipe_t *pipe = uvTask->pipe;
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
        write->data = uvTask;
        int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
        if (err != 0) {
          fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
        }
        code = err;
1124
      }
1125 1126 1127
      break;
    }
    case UV_TASK_DISCONNECT: {
1128 1129 1130 1131 1132 1133 1134 1135 1136
      uv_pipe_t *pipe = uvTask->pipe;
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        SClientUvConn *conn = pipe->data;
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
        uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
        code = 0;
      }
1137 1138 1139
      break;
    }
    default: {
1140
      fnError("udfc event loop unknown task type.")
1141 1142 1143
      break;
    }
  }
H
Haojun Liao 已提交
1144

1145
  return code;
1146
}
H
Haojun Liao 已提交
1147

1148
void udfcAsyncTaskCb(uv_async_t *async) {
1149
  SUdfcProxy *udfc = async->data;
S
shenglian zhou 已提交
1150
  QUEUE wq;
1151

1152 1153 1154
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1155

S
shenglian zhou 已提交
1156 1157 1158 1159
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1160 1161 1162
    int32_t code = udfcStartUvTask(task);
    if (code == 0) {
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
1163 1164 1165
    } else {
      task->errCode = code;
      uv_sem_post(&task->taskSem);
1166
    }
1167 1168 1169 1170
  }

}

1171
void cleanUpUvTasks(SUdfcProxy *udfc) {
S
slzhou 已提交
1172
  fnDebug("clean up uv tasks")
S
shenglian zhou 已提交
1173
  QUEUE wq;
1174

1175 1176 1177
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1178

S
shenglian zhou 已提交
1179 1180 1181 1182
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1183
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1184
      task->errCode = TSDB_CODE_UDF_STOPPING;
1185 1186 1187 1188
    }
    uv_sem_post(&task->taskSem);
  }

1189 1190
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
    QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
S
shenglian zhou 已提交
1191 1192
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
1193
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1194
      task->errCode = TSDB_CODE_UDF_STOPPING;
S
shenglian zhou 已提交
1195 1196 1197 1198
    }
    uv_sem_post(&task->taskSem);
  }
}
1199

S
shenglian zhou 已提交
1200
void udfStopAsyncCb(uv_async_t *async) {
1201
  SUdfcProxy *udfc = async->data;
1202
  cleanUpUvTasks(udfc);
1203 1204
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
    uv_stop(&udfc->uvLoop);
S
shenglian zhou 已提交
1205
  }
1206
}
S
shenglian zhou 已提交
1207

S
shenglian zhou 已提交
1208
void constructUdfService(void *argsThread) {
1209 1210 1211
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
  uv_loop_init(&udfc->uvLoop);

1212
  uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
1213 1214 1215 1216 1217 1218 1219
  udfc->loopTaskAync.data = udfc;
  uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
  udfc->loopStopAsync.data = udfc;
  uv_mutex_init(&udfc->taskQueueMutex);
  QUEUE_INIT(&udfc->taskQueue);
  QUEUE_INIT(&udfc->uvProcTaskQueue);
  uv_barrier_wait(&udfc->initBarrier);
1220
  //TODO return value of uv_run
1221 1222
  uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
  uv_loop_close(&udfc->uvLoop);
1223 1224
}

1225 1226 1227 1228 1229
int32_t udfcOpen() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
  if (old == 1) {
    return 0;
  }
1230
  SUdfcProxy *proxy = &gUdfdProxy;
1231
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
1232 1233 1234 1235 1236 1237
  proxy->udfcState = UDFC_STATE_STARTNG;
  uv_barrier_init(&proxy->initBarrier, 2);
  uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
  proxy->udfcState = UDFC_STATE_READY;
  uv_barrier_wait(&proxy->initBarrier);
1238 1239
  uv_mutex_init(&proxy->udfStubsMutex);
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1240
  fnInfo("udfc initialized")
1241 1242 1243
  return 0;
}

1244 1245 1246 1247 1248 1249
int32_t udfcClose() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
  if (old == 0) {
    return 0;
  }

1250 1251 1252 1253 1254 1255
  SUdfcProxy *udfc = &gUdfdProxy;
  udfc->udfcState = UDFC_STATE_STOPPING;
  uv_async_send(&udfc->loopStopAsync);
  uv_thread_join(&udfc->loopThread);
  uv_mutex_destroy(&udfc->taskQueueMutex);
  uv_barrier_destroy(&udfc->initBarrier);
1256 1257
  taosArrayDestroy(udfc->udfStubs);
  uv_mutex_destroy(&udfc->udfStubsMutex);
1258
  udfc->udfcState = UDFC_STATE_INITAL;
1259
  fnInfo("udfc cleaned up");
1260 1261 1262
  return 0;
}

S
slzhou 已提交
1263
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
1264 1265
  SClientUvTaskNode *uvTask = NULL;

S
slzhou 已提交
1266 1267 1268
  udfcCreateUvTask(task, uvTaskType, &uvTask);
  udfcQueueUvTask(uvTask);
  udfcGetUdfTaskResultFromUvTask(task, uvTask);
1269
  if (uvTaskType == UV_TASK_CONNECT) {
S
slzhou 已提交
1270 1271 1272
    task->session->udfUvPipe = uvTask->pipe;
    SClientUvConn *conn = uvTask->pipe->data;
    conn->session = task->session;
S
slzhou 已提交
1273 1274
  }
  taosMemoryFree(uvTask);
1275 1276 1277 1278
  uvTask = NULL;
  return task->errCode;
}

1279
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
1280
  fnInfo("udfc setup udf. udfName: %s", udfName);
1281
  if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
1282
    return TSDB_CODE_UDF_INVALID_STATE;
1283
  }
S
slzhou 已提交
1284
  SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
1285
  task->errCode = 0;
S
slzhou 已提交
1286
  task->session = taosMemoryCalloc(1, sizeof(SClientUdfUvSession));
1287
  task->session->udfc = &gUdfdProxy;
1288 1289 1290
  task->type = UDF_TASK_SETUP;

  SUdfSetupRequest *req = &task->_setup.req;
S
shenglian zhou 已提交
1291
  memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
1292

S
slzhou 已提交
1293
  int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
1294
  if (errCode != 0) {
1295
    fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
1296
    return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
H
Haojun Liao 已提交
1297
  }
1298

S
slzhou 已提交
1299
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1300 1301 1302

  SUdfSetupResponse *rsp = &task->_setup.rsp;
  task->session->severHandle = rsp->udfHandle;
S
shenglian zhou 已提交
1303 1304 1305
  task->session->outputType = rsp->outputType;
  task->session->outputLen = rsp->outputLen;
  task->session->bufSize = rsp->bufSize;
1306
  if (task->errCode != 0) {
1307
    fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
1308 1309 1310 1311
  } else {
    fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
    *funcHandle = task->session;
  }
1312
  int32_t err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1313
  taosMemoryFree(task);
1314
  return err;
H
Haojun Liao 已提交
1315 1316
}

1317
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1318
                SSDataBlock* output, SUdfInterBuf *newState) {
1319
  fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
S
slzhou 已提交
1320 1321 1322
  SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
  if (session->udfUvPipe == NULL) {
    fnError("No pipe to udfd");
1323
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1324 1325
  }
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1326
  task->errCode = 0;
S
slzhou 已提交
1327
  task->session = (SClientUdfUvSession *) handle;
1328 1329 1330
  task->type = UDF_TASK_CALL;

  SUdfCallRequest *req = &task->_call.req;
S
slzhou 已提交
1331
  req->udfHandle = task->session->severHandle;
S
slzhou 已提交
1332
  req->callType = callType;
S
slzhou 已提交
1333

S
shenglian zhou 已提交
1334
  switch (callType) {
1335 1336 1337 1338
    case TSDB_UDF_CALL_AGG_INIT: {
      req->initFirst = 1;
      break;
    }
S
shenglian zhou 已提交
1339 1340 1341 1342 1343
    case TSDB_UDF_CALL_AGG_PROC: {
      req->block = *input;
      req->interBuf = *state;
      break;
    }
1344 1345 1346 1347 1348 1349
    case TSDB_UDF_CALL_AGG_MERGE: {
      req->interBuf = *state;
      req->interBuf2 = *state2;
      break;
    }
    case TSDB_UDF_CALL_AGG_FIN: {
S
shenglian zhou 已提交
1350 1351 1352 1353 1354 1355 1356 1357 1358
      req->interBuf = *state;
      break;
    }
    case TSDB_UDF_CALL_SCALA_PROC: {
      req->block = *input;
      break;
    }
  }

S
slzhou 已提交
1359
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1360

1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
  if (task->errCode != 0) {
    fnError("call udf failure. err: %d", task->errCode);
  } else {
    SUdfCallResponse *rsp = &task->_call.rsp;
    switch (callType) {
      case TSDB_UDF_CALL_AGG_INIT: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_PROC: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_MERGE: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_FIN: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_SCALA_PROC: {
        *output = rsp->resultData;
        break;
      }
S
shenglian zhou 已提交
1386
    }
S
slzhou 已提交
1387 1388
  };
  int err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1389
  taosMemoryFree(task);
S
slzhou 已提交
1390
  return err;
1391 1392
}

1393
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
S
slzhou 已提交
1394 1395 1396 1397 1398 1399 1400 1401 1402
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;

  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);

  return err;
}

// input: block, state
// output: interbuf,
1403
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
S
slzhou 已提交
1404 1405 1406 1407 1408 1409 1410
  int8_t callType = TSDB_UDF_CALL_AGG_PROC;
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
  return err;
}

// input: interbuf1, interbuf2
// output: resultBuf
1411
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
S
slzhou 已提交
1412 1413 1414 1415 1416 1417 1418
  int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
  return err;
}

// input: interBuf
// output: resultData
1419
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
S
slzhou 已提交
1420
  int8_t callType = TSDB_UDF_CALL_AGG_FIN;
S
slzhou 已提交
1421 1422 1423 1424
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
  return err;
}

1425
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
S
slzhou 已提交
1426
  int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
S
slzhou 已提交
1427 1428 1429 1430
  SSDataBlock inputBlock = {0};
  convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
  SSDataBlock resultBlock = {0};
  int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
S
slzhou 已提交
1431 1432 1433
  if (err == 0) {
    convertDataBlockToScalarParm(&resultBlock, output);
  }
S
slzhou 已提交
1434 1435 1436
  return err;
}

1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
  return strcmp(stub1->udfName, stub2->udfName);
}

int32_t setupUdf(char* udfName, UdfcFuncHandle* pHandle) {
  int32_t code = 0;
  uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
  SUdfcFuncStub key = {0};
  strcpy(key.udfName, udfName);
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
  if (foundStub != NULL) {
    uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
    *pHandle = foundStub->handle;
    return 0;
  }
  *pHandle = NULL;
  code = doSetupUdf(udfName, pHandle);
  if (code == TSDB_CODE_SUCCESS) {
    SUdfcFuncStub stub = {0};
    strcpy(stub.udfName, udfName);
    stub.handle = *pHandle;
    taosArrayPush(gUdfdProxy.udfStubs, &stub);
    taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
  } else {
    *pHandle = NULL;
  }

  uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
  return code;
}

int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
  UdfcFuncHandle handle = NULL;
  int32_t code = setupUdf(udfName, &handle);
  if (code != 0) {
    return code;
  }
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
  return code;
}

1480
//TODO: when to teardown udf. teardown udf is not called
1481
int32_t doTeardownUdf(UdfcFuncHandle handle) {
1482
  fnInfo("tear down udf. udf func handle: %p", handle);
1483

S
slzhou 已提交
1484 1485 1486
  SClientUdfUvSession *session = (SClientUdfUvSession *) handle;
  if (session->udfUvPipe == NULL) {
    fnError("pipe to udfd does not exist");
1487
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1488 1489 1490
  }

  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1491
  task->errCode = 0;
S
slzhou 已提交
1492
  task->session = session;
1493 1494 1495 1496 1497
  task->type = UDF_TASK_TEARDOWN;

  SUdfTeardownRequest *req = &task->_teardown.req;
  req->udfHandle = task->session->severHandle;

S
slzhou 已提交
1498
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1499 1500 1501 1502 1503

  SUdfTeardownResponse *rsp = &task->_teardown.rsp;

  int32_t err = task->errCode;

S
slzhou 已提交
1504
  udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
1505

wafwerar's avatar
wafwerar 已提交
1506 1507
  taosMemoryFree(task->session);
  taosMemoryFree(task);
1508 1509 1510

  return err;
}
S
shenglian zhou 已提交
1511

S
shenglian zhou 已提交
1512 1513
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
S
slzhou 已提交
1514
  SClientUdfUvSession *session;
S
shenglian zhou 已提交
1515 1516 1517 1518 1519 1520
  int8_t finalResNum;
  int8_t interResNum;
  char* finalResBuf;
  char* interResBuf;
} SUdfAggRes;

S
shenglian zhou 已提交
1521
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
S
slzhou 已提交
1522
  if (fmIsScalarFunc(pFunc->funcId)) {
S
shenglian zhou 已提交
1523 1524
    return false;
  }
S
slzhou 已提交
1525
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
S
shenglian zhou 已提交
1526 1527 1528 1529 1530 1531 1532 1533
  return true;
}

bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
  if (functionSetup(pCtx, pResultCellInfo) != true) {
    return false;
  }
  UdfcFuncHandle handle;
1534
  int32_t udfCode = 0;
1535 1536
  if ((udfCode = setupUdf((char *)pCtx->udfName, &handle)) != 0) {
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
S
shenglian zhou 已提交
1537 1538
    return false;
  }
S
slzhou 已提交
1539
  SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
S
shenglian zhou 已提交
1540 1541
  SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
  int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
S
shenglian zhou 已提交
1542
  memset(udfRes, 0, envSize);
S
shenglian zhou 已提交
1543

S
slzhou 已提交
1544 1545 1546
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;

S
slzhou 已提交
1547
  udfRes->session = (SClientUdfUvSession *)handle;
S
shenglian zhou 已提交
1548
  SUdfInterBuf buf = {0};
1549 1550
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
S
shenglian zhou 已提交
1551 1552
    return false;
  }
S
shenglian zhou 已提交
1553
  udfRes->interResNum = buf.numOfResult;
S
slzhou 已提交
1554
  memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
S
shenglian zhou 已提交
1555 1556 1557 1558 1559 1560 1561
  return true;
}

int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t numOfCols = pInput->numOfInputCols;

S
slzhou 已提交
1562
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
S
slzhou 已提交
1563
  SClientUdfUvSession *session = udfRes->session;
1564
  if (session == NULL) {
1565
    return TSDB_CODE_UDF_NO_FUNC_HANDLE;
1566
  }
S
slzhou 已提交
1567 1568
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1569 1570 1571 1572 1573

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;


1574 1575
  SSDataBlock tempBlock = {0};
  tempBlock.info.numOfCols = numOfCols;
1576
  tempBlock.info.rows = pInput->totalRows;
1577
  tempBlock.info.uid = pInput->uid;
S
shenglian zhou 已提交
1578
  bool hasVarCol = false;
1579
  tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
S
shenglian zhou 已提交
1580 1581 1582 1583 1584 1585

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData *col = pInput->pData[i];
    if (IS_VAR_DATA_TYPE(col->info.type)) {
      hasVarCol = true;
    }
1586
    taosArrayPush(tempBlock.pDataBlock, col);
S
shenglian zhou 已提交
1587
  }
1588
  tempBlock.info.hasVarCol = hasVarCol;
S
shenglian zhou 已提交
1589

1590 1591
  SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);

S
slzhou 已提交
1592
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1593
                        .bufLen = session->bufSize,
S
slzhou 已提交
1594
                        .numOfResult = udfRes->interResNum};
S
shenglian zhou 已提交
1595
  SUdfInterBuf newState = {0};
1596

1597
  int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
1598 1599 1600 1601 1602 1603 1604
  if (udfCode != 0) {
    fnError("udfAggProcess error. code: %d", udfCode);
    newState.numOfResult = 0;
  } else {
    udfRes->interResNum = newState.numOfResult;
    memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
  }
S
slzhou 已提交
1605 1606 1607
  if (newState.numOfResult == 1 || state.numOfResult == 1) {
    GET_RES_INFO(pCtx)->numOfRes = 1;
  }
S
shenglian zhou 已提交
1608

1609 1610
  blockDataDestroy(inputBlock);
  taosArrayDestroy(tempBlock.pDataBlock);
S
shenglian zhou 已提交
1611 1612

  taosMemoryFree(newState.buf);
1613
  return udfCode;
S
shenglian zhou 已提交
1614 1615 1616
}

int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
S
slzhou 已提交
1617
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
S
slzhou 已提交
1618
  SClientUdfUvSession *session = udfRes->session;
1619
  if (session == NULL) {
1620
    return TSDB_CODE_UDF_NO_FUNC_HANDLE;
1621
  }
S
slzhou 已提交
1622 1623
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1624 1625


S
slzhou 已提交
1626
  SUdfInterBuf resultBuf = {0};
S
slzhou 已提交
1627
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1628
                        .bufLen = session->bufSize,
S
slzhou 已提交
1629
                        .numOfResult = udfRes->interResNum};
1630
  int32_t udfCallCode= 0;
1631 1632 1633
  udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf);
  if (udfCallCode != 0) {
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
1634 1635 1636 1637 1638 1639
    GET_RES_INFO(pCtx)->numOfRes = 0;
  } else {
    memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
    udfRes->finalResNum = resultBuf.numOfResult;
    GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
  }
S
shenglian zhou 已提交
1640

1641 1642 1643 1644
//  int32_t code = doTeardownUdf(session);
//  if (code != 0) {
//    fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code);
//  }
1645

1646 1647
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
  return udfCallCode == 0 ? numOfResults : udfCallCode;
S
shenglian zhou 已提交
1648
}