tudf.c 60.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
H
Hongze Cheng 已提交
16

17
#include "os.h"
H
Hongze Cheng 已提交
18 19

#include "builtinsimpl.h"
S
slzhou 已提交
20
#include "fnLog.h"
H
Hongze Cheng 已提交
21 22
#include "functionMgt.h"
#include "querynodes.h"
S
shenglian zhou 已提交
23
#include "tarray.h"
S
slzhou 已提交
24
#include "tdatablock.h"
H
Hongze Cheng 已提交
25 26 27
#include "tglobal.h"
#include "tudf.h"
#include "tudfInt.h"
28

29
typedef struct SUdfdData {
H
Hongze Cheng 已提交
30 31 32 33 34 35
  bool         startCalled;
  bool         needCleanUp;
  uv_loop_t    loop;
  uv_thread_t  thread;
  uv_barrier_t barrier;
  uv_process_t process;
36
#ifdef WINDOWS
H
Hongze Cheng 已提交
37
  HANDLE jobHandle;
38
#endif
H
Hongze Cheng 已提交
39 40 41 42
  int        spawnErr;
  uv_pipe_t  ctrlPipe;
  uv_async_t stopAsync;
  int32_t    stopCalled;
43

H
Hongze Cheng 已提交
44
  int32_t dnodeId;
45 46 47 48
} SUdfdData;

SUdfdData udfdGlobal = {0};

S
shenglian zhou 已提交
49 50 51
int32_t udfStartUdfd(int32_t startDnodeId);
int32_t udfStopUdfd();

52
static int32_t udfSpawnUdfd(SUdfdData *pData);
H
Hongze Cheng 已提交
53 54 55 56 57
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal);
static int32_t udfSpawnUdfd(SUdfdData *pData);
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
static void    udfUdfdStopAsyncCb(uv_async_t *async);
static void    udfWatchUdfd(void *args);
58 59 60 61 62 63 64 65 66 67 68 69

void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
  SUdfdData *pData = process->data;
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
  } else {
    fnInfo("udfd process restart");
    udfSpawnUdfd(pData);
  }
}

H
Hongze Cheng 已提交
70
static int32_t udfSpawnUdfd(SUdfdData *pData) {
S
Shengliang Guan 已提交
71
  fnInfo("start to init udfd");
72 73 74 75 76
  uv_process_options_t options = {0};

  char path[PATH_MAX] = {0};
  if (tsProcPath == NULL) {
    path[0] = '.';
H
Hongze Cheng 已提交
77
#ifdef WINDOWS
wafwerar's avatar
wafwerar 已提交
78 79
    GetModuleFileName(NULL, path, PATH_MAX);
    taosDirName(path);
H
Hongze Cheng 已提交
80
#elif defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
81 82 83
    uint32_t pathSize = sizeof(path);
    _NSGetExecutablePath(path, &pathSize);
    taosDirName(path);
H
Hongze Cheng 已提交
84
#endif
85
  } else {
86
    strncpy(path, tsProcPath, PATH_MAX);
87 88 89
    taosDirName(path);
  }
#ifdef WINDOWS
H
Hongze Cheng 已提交
90
  if (strlen(path) == 0) {
wafwerar's avatar
wafwerar 已提交
91 92 93 94
    strcat(path, "udfd.exe");
  } else {
    strcat(path, "\\udfd.exe");
  }
95 96 97
#else
  strcat(path, "/udfd");
#endif
H
Hongze Cheng 已提交
98
  char *argsUdfd[] = {path, "-c", configDir, NULL};
99 100 101 102 103 104 105 106 107
  options.args = argsUdfd;
  options.file = path;

  options.exit_cb = udfUdfdExit;

  uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);

  uv_stdio_container_t child_stdio[3];
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
H
Hongze Cheng 已提交
108
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
109 110 111 112 113 114 115 116 117 118 119 120 121
  child_stdio[1].flags = UV_IGNORE;
  child_stdio[2].flags = UV_INHERIT_FD;
  child_stdio[2].data.fd = 2;
  options.stdio_count = 3;
  options.stdio = child_stdio;

  options.flags = UV_PROCESS_DETACHED;

  char dnodeIdEnvItem[32] = {0};
  char thrdPoolSizeEnvItem[32] = {0};
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
  float numCpuCores = 4;
  taosGetCpuCores(&numCpuCores);
H
Hongze Cheng 已提交
122 123
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2);
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
124 125 126
  options.env = envUdfd;

  int err = uv_spawn(&pData->loop, &pData->process, &options);
H
Hongze Cheng 已提交
127
  pData->process.data = (void *)pData;
128

129 130 131 132 133 134 135 136 137 138 139
#ifdef WINDOWS
  // End udfd.exe by Job.
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
  pData->jobHandle = CreateJobObject(NULL, NULL);
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
  if (!add_job_ok) {
    fnError("Assign udfd to job failed.");
  } else {
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
    memset(&limit_info, 0x0, sizeof(limit_info));
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
H
Hongze Cheng 已提交
140 141
    bool set_auto_kill_ok =
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
142 143 144 145 146 147
    if (!set_auto_kill_ok) {
      fnError("Set job auto kill udfd failed.");
    }
  }
#endif

148 149
  if (err != 0) {
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
S
Shengliang Guan 已提交
150 151
  } else {
    fnInfo("udfd is initialized");
152 153 154 155
  }
  return err;
}

H
Hongze Cheng 已提交
156
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  if (!uv_is_closing(handle)) {
    uv_close(handle, NULL);
  }
}

static void udfUdfdStopAsyncCb(uv_async_t *async) {
  SUdfdData *pData = async->data;
  uv_stop(&pData->loop);
}

static void udfWatchUdfd(void *args) {
  SUdfdData *pData = args;
  uv_loop_init(&pData->loop);
  uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
  pData->stopAsync.data = pData;
  int32_t err = udfSpawnUdfd(pData);
  atomic_store_32(&pData->spawnErr, err);
  uv_barrier_wait(&pData->barrier);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);

  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
  uv_run(&pData->loop, UV_RUN_DEFAULT);
  uv_loop_close(&pData->loop);
  return;
}

int32_t udfStartUdfd(int32_t startDnodeId) {
S
slzhou 已提交
185
  if (!tsStartUdfd) {
H
Hongze Cheng 已提交
186
    fnInfo("start udfd is disabled.") return 0;
S
slzhou 已提交
187
  }
188 189
  SUdfdData *pData = &udfdGlobal;
  if (pData->startCalled) {
S
Shengliang Guan 已提交
190
    fnInfo("dnode start udfd already called");
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    return 0;
  }
  pData->startCalled = true;
  char dnodeId[8] = {0};
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
  uv_os_setenv("DNODE_ID", dnodeId);
  pData->dnodeId = startDnodeId;

  uv_barrier_init(&pData->barrier, 2);
  uv_thread_create(&pData->thread, udfWatchUdfd, pData);
  uv_barrier_wait(&pData->barrier);
  int32_t err = atomic_load_32(&pData->spawnErr);
  if (err != 0) {
    uv_barrier_destroy(&pData->barrier);
    uv_async_send(&pData->stopAsync);
    uv_thread_join(&pData->thread);
    pData->needCleanUp = false;
S
Shengliang Guan 已提交
208
    fnInfo("udfd is cleaned up after spawn err");
209 210 211 212 213 214 215 216
  } else {
    pData->needCleanUp = true;
  }
  return err;
}

int32_t udfStopUdfd() {
  SUdfdData *pData = &udfdGlobal;
H
Hongze Cheng 已提交
217
  fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
218 219 220 221 222 223 224 225
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
    return 0;
  }
  atomic_store_32(&pData->stopCalled, 1);
  pData->needCleanUp = false;
  uv_barrier_destroy(&pData->barrier);
  uv_async_send(&pData->stopAsync);
  uv_thread_join(&pData->thread);
226 227 228
#ifdef WINDOWS
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
#endif
S
Shengliang Guan 已提交
229
  fnInfo("udfd is cleaned up");
230 231 232 233
  return 0;
}

//==============================================================================================
S
shenglian zhou 已提交
234 235 236 237
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 * The QUEUE is copied from queue.h under libuv
 * */

S
shenglian zhou 已提交
238 239 240
typedef void *QUEUE[2];

/* Private macros. */
H
Hongze Cheng 已提交
241 242 243 244
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
S
shenglian zhou 已提交
245 246

/* Public macros. */
H
Hongze Cheng 已提交
247
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr)-offsetof(type, field)))
S
shenglian zhou 已提交
248 249 250 251

/* Important note: mutating the list while QUEUE_FOREACH is
 * iterating over its elements results in undefined behavior.
 */
H
Hongze Cheng 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))

#define QUEUE_HEAD(q) (QUEUE_NEXT(q))

#define QUEUE_INIT(q)    \
  do {                   \
    QUEUE_NEXT(q) = (q); \
    QUEUE_PREV(q) = (q); \
  } while (0)

#define QUEUE_ADD(h, n)                 \
  do {                                  \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
    QUEUE_PREV_NEXT(h) = (h);           \
  } while (0)

#define QUEUE_SPLIT(h, q, n)       \
  do {                             \
    QUEUE_PREV(n) = QUEUE_PREV(h); \
    QUEUE_PREV_NEXT(n) = (n);      \
    QUEUE_NEXT(n) = (q);           \
    QUEUE_PREV(h) = QUEUE_PREV(q); \
    QUEUE_PREV_NEXT(h) = (h);      \
    QUEUE_PREV(q) = (n);           \
  } while (0)

#define QUEUE_MOVE(h, n)        \
  do {                          \
    if (QUEUE_EMPTY(h))         \
      QUEUE_INIT(n);            \
    else {                      \
      QUEUE *q = QUEUE_HEAD(h); \
      QUEUE_SPLIT(h, q, n);     \
    }                           \
  } while (0)

#define QUEUE_INSERT_HEAD(h, q)    \
  do {                             \
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
    QUEUE_PREV(q) = (h);           \
    QUEUE_NEXT_PREV(q) = (q);      \
    QUEUE_NEXT(h) = (q);           \
  } while (0)

#define QUEUE_INSERT_TAIL(h, q)    \
  do {                             \
    QUEUE_NEXT(q) = (h);           \
    QUEUE_PREV(q) = QUEUE_PREV(h); \
    QUEUE_PREV_NEXT(q) = (q);      \
    QUEUE_PREV(h) = (q);           \
  } while (0)

#define QUEUE_REMOVE(q)                 \
  do {                                  \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
  } while (0)

enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
315

316
int64_t gUdfTaskSeqNum = 0;
317
typedef struct SUdfcFuncStub {
S
slzhou 已提交
318
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
319
  UdfcFuncHandle handle;
H
Hongze Cheng 已提交
320 321
  int32_t        refCount;
  int64_t        lastRefTime;
322 323
} SUdfcFuncStub;

324
typedef struct SUdfcProxy {
H
Hongze Cheng 已提交
325
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
326
  uv_barrier_t initBarrier;
327

328 329 330
  uv_loop_t   uvLoop;
  uv_thread_t loopThread;
  uv_async_t  loopTaskAync;
331

332
  uv_async_t loopStopAsync;
333

334 335 336 337
  uv_mutex_t taskQueueMutex;
  int8_t     udfcState;
  QUEUE      taskQueue;
  QUEUE      uvProcTaskQueue;
338

339
  uv_mutex_t udfStubsMutex;
H
Hongze Cheng 已提交
340
  SArray    *udfStubs;  // SUdfcFuncStub
341

342
  uv_mutex_t udfcUvMutex;
343
  int8_t initialized;
344
} SUdfcProxy;
345

346
SUdfcProxy gUdfcProxy = {0};
347

S
slzhou 已提交
348
typedef struct SUdfcUvSession {
349
  SUdfcProxy *udfc;
H
Hongze Cheng 已提交
350 351
  int64_t     severHandle;
  uv_pipe_t  *udfUvPipe;
S
shenglian zhou 已提交
352 353 354 355

  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
S
slzhou 已提交
356

S
slzhou 已提交
357
  char udfName[TSDB_FUNC_NAME_LEN + 1];
S
slzhou 已提交
358
} SUdfcUvSession;
359 360

typedef struct SClientUvTaskNode {
361
  SUdfcProxy *udfc;
H
Hongze Cheng 已提交
362 363
  int8_t      type;
  int         errCode;
364 365 366

  uv_pipe_t *pipe;

H
Hongze Cheng 已提交
367
  int64_t  seqNum;
368 369 370 371 372
  uv_buf_t reqBuf;

  uv_sem_t taskSem;
  uv_buf_t rspBuf;

S
shenglian zhou 已提交
373 374 375
  QUEUE recvTaskQueue;
  QUEUE procTaskQueue;
  QUEUE connTaskQueue;
376 377 378 379 380
} SClientUvTaskNode;

typedef struct SClientUdfTask {
  int8_t type;

S
slzhou 已提交
381
  SUdfcUvSession *session;
382 383 384 385 386

  int32_t errCode;

  union {
    struct {
H
Hongze Cheng 已提交
387
      SUdfSetupRequest  req;
388 389 390
      SUdfSetupResponse rsp;
    } _setup;
    struct {
H
Hongze Cheng 已提交
391
      SUdfCallRequest  req;
392 393 394
      SUdfCallResponse rsp;
    } _call;
    struct {
H
Hongze Cheng 已提交
395
      SUdfTeardownRequest  req;
396 397 398 399 400 401 402
      SUdfTeardownResponse rsp;
    } _teardown;
  };

} SClientUdfTask;

typedef struct SClientConnBuf {
H
Hongze Cheng 已提交
403
  char   *buf;
404 405 406 407 408 409
  int32_t len;
  int32_t cap;
  int32_t total;
} SClientConnBuf;

typedef struct SClientUvConn {
H
Hongze Cheng 已提交
410 411 412
  uv_pipe_t      *pipe;
  QUEUE           taskQueue;
  SClientConnBuf  readBuf;
S
slzhou 已提交
413
  SUdfcUvSession *session;
414 415
} SClientUvConn;

416
enum {
H
Hongze Cheng 已提交
417 418 419 420
  UDFC_STATE_INITAL = 0,  // initial state
  UDFC_STATE_STARTNG,     // starting after udfcOpen
  UDFC_STATE_READY,       // started and begin to receive quests
  UDFC_STATE_STOPPING,    // stopping after udfcClose
421
};
422

H
Hongze Cheng 已提交
423
int32_t getUdfdPipeName(char *pipeName, int32_t size);
S
shenglian zhou 已提交
424
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
H
Hongze Cheng 已提交
425 426 427
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
S
shenglian zhou 已提交
428
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
H
Hongze Cheng 已提交
429
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
S
shenglian zhou 已提交
430
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
H
Hongze Cheng 已提交
431 432 433
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
S
shenglian zhou 已提交
434
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
H
Hongze Cheng 已提交
435
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
S
shenglian zhou 已提交
436
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
H
Hongze Cheng 已提交
437 438 439 440 441 442 443 444 445
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
void    freeUdfColumn(SUdfColumn *col);
void    freeUdfDataDataBlock(SUdfDataBlock *block);
void    freeUdfInterBuf(SUdfInterBuf *buf);
S
shenglian zhou 已提交
446 447 448 449 450
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);

H
Hongze Cheng 已提交
451
int32_t getUdfdPipeName(char *pipeName, int32_t size) {
452
  char    dnodeId[8] = {0};
wafwerar's avatar
wafwerar 已提交
453
  size_t  dnodeIdSize = sizeof(dnodeId);
454 455
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
  if (err != 0) {
S
Shengliang Guan 已提交
456
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
457 458
    dnodeId[0] = '1';
  }
459
#ifdef _WIN32
H
Hongze Cheng 已提交
460 461
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
           dnodeId);
462 463 464
#else
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#endif
S
Shengliang Guan 已提交
465
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
466 467 468
  return 0;
}

469 470 471
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
  int32_t len = 0;
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
S
shenglian zhou 已提交
472 473
  return len;
}
474

H
Hongze Cheng 已提交
475
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
476
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
H
Hongze Cheng 已提交
477
  return (void *)buf;
S
shenglian zhou 已提交
478
}
479

H
Hongze Cheng 已提交
480
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
481
  int32_t len = 0;
482
  len += taosEncodeFixedI8(buf, state->numOfResult);
483 484
  len += taosEncodeFixedI32(buf, state->bufLen);
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
S
shenglian zhou 已提交
485 486
  return len;
}
487

H
Hongze Cheng 已提交
488
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
489
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
490
  buf = taosDecodeFixedI32(buf, &state->bufLen);
H
Hongze Cheng 已提交
491 492
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
  return (void *)buf;
S
shenglian zhou 已提交
493 494
}

495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, call->udfHandle);
  len += taosEncodeFixedI8(buf, call->callType);
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
    len += taosEncodeFixedI8(buf, call->initFirst);
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
    len += encodeUdfInterBuf(buf, &call->interBuf);
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
    len += encodeUdfInterBuf(buf, &call->interBuf2);
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
511
  }
512
  return len;
513 514
}

H
Hongze Cheng 已提交
515
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
  buf = taosDecodeFixedI8(buf, &call->callType);
  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
      buf = taosDecodeFixedI8(buf, &call->initFirst);
      break;
    case TSDB_UDF_CALL_AGG_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
      break;
    case TSDB_UDF_CALL_AGG_FIN:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
536
  }
H
Hongze Cheng 已提交
537
  return (void *)buf;
S
shenglian zhou 已提交
538 539
}

540 541 542 543
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
  return len;
S
shenglian zhou 已提交
544 545
}

H
Hongze Cheng 已提交
546
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
547
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
H
Hongze Cheng 已提交
548
  return (void *)buf;
S
shenglian zhou 已提交
549 550
}

H
Hongze Cheng 已提交
551
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
552 553 554 555
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(request->msgLen);
  } else {
H
Hongze Cheng 已提交
556
    *(int32_t *)(*buf) = request->msgLen;
557 558 559 560 561 562 563 564 565 566 567 568
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
  }
  len += taosEncodeFixedI64(buf, request->seqNum);
  len += taosEncodeFixedI8(buf, request->type);
  if (request->type == UDF_TASK_SETUP) {
    len += encodeUdfSetupRequest(buf, &request->setup);
  } else if (request->type == UDF_TASK_CALL) {
    len += encodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    len += encodeUdfTeardownRequest(buf, &request->teardown);
  }
  return len;
S
shenglian zhou 已提交
569 570
}

H
Hongze Cheng 已提交
571 572
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
  request->msgLen = *(int32_t *)(buf);
S
slzhou 已提交
573
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
S
shenglian zhou 已提交
574

575 576
  buf = taosDecodeFixedI64(buf, &request->seqNum);
  buf = taosDecodeFixedI8(buf, &request->type);
S
shenglian zhou 已提交
577 578

  if (request->type == UDF_TASK_SETUP) {
579
    buf = decodeUdfSetupRequest(buf, &request->setup);
S
shenglian zhou 已提交
580
  } else if (request->type == UDF_TASK_CALL) {
581 582 583
    buf = decodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
S
shenglian zhou 已提交
584
  }
H
Hongze Cheng 已提交
585
  return (void *)buf;
S
shenglian zhou 已提交
586
}
587

588 589 590
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
S
shenglian zhou 已提交
591 592 593
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
  len += taosEncodeFixedI32(buf, setupRsp->outputLen);
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
594 595
  return len;
}
596

H
Hongze Cheng 已提交
597
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
598
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
S
shenglian zhou 已提交
599 600 601
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
  buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
H
Hongze Cheng 已提交
602
  return (void *)buf;
S
shenglian zhou 已提交
603
}
604

605 606 607 608 609 610
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI8(buf, callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      len += tEncodeDataBlock(buf, &callRsp->resultData);
611
      break;
612
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
613
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
614 615
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
616
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
617 618
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
619
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
620 621
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
622
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
623 624
      break;
  }
625
  return len;
S
shenglian zhou 已提交
626 627
}

H
Hongze Cheng 已提交
628
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
629 630 631 632 633 634
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
635
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
636 637
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
638
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
639 640
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
641
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
642 643
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
644
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
645
      break;
S
shenglian zhou 已提交
646
  }
H
Hongze Cheng 已提交
647
  return (void *)buf;
S
shenglian zhou 已提交
648 649
}

H
Hongze Cheng 已提交
650
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
S
shenglian zhou 已提交
651

H
Hongze Cheng 已提交
652
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
S
shenglian zhou 已提交
653

H
Hongze Cheng 已提交
654
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
655 656 657 658
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(rsp->msgLen);
  } else {
H
Hongze Cheng 已提交
659
    *(int32_t *)(*buf) = rsp->msgLen;
660
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
S
shenglian zhou 已提交
661 662
  }

S
slzhou 已提交
663 664 665
  if (buf == NULL) {
    len += sizeof(rsp->seqNum);
  } else {
H
Hongze Cheng 已提交
666
    *(int64_t *)(*buf) = rsp->seqNum;
S
slzhou 已提交
667 668 669
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
  }

670 671 672
  len += taosEncodeFixedI64(buf, rsp->seqNum);
  len += taosEncodeFixedI8(buf, rsp->type);
  len += taosEncodeFixedI32(buf, rsp->code);
S
shenglian zhou 已提交
673

674 675 676 677 678 679 680 681 682 683 684
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
S
shenglian zhou 已提交
685
      fnError("encode udf response, invalid udf response type %d", rsp->type);
686 687 688
      break;
  }
  return len;
S
shenglian zhou 已提交
689 690
}

H
Hongze Cheng 已提交
691 692
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
  rsp->msgLen = *(int32_t *)(buf);
S
slzhou 已提交
693
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
H
Hongze Cheng 已提交
694
  rsp->seqNum = *(int64_t *)(buf);
S
slzhou 已提交
695
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
696 697 698
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
  buf = taosDecodeFixedI8(buf, &rsp->type);
  buf = taosDecodeFixedI32(buf, &rsp->code);
S
shenglian zhou 已提交
699

700 701 702 703 704 705 706 707 708 709 710
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
S
shenglian zhou 已提交
711
      fnError("decode udf response, invalid udf response type %d", rsp->type);
712
      break;
713
  }
H
Hongze Cheng 已提交
714
  return (void *)buf;
715
}
716

S
shenglian zhou 已提交
717 718
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
  if (IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
719 720 721 722
    taosMemoryFree(data->varLenCol.varOffsets);
    data->varLenCol.varOffsets = NULL;
    taosMemoryFree(data->varLenCol.payload);
    data->varLenCol.payload = NULL;
S
shenglian zhou 已提交
723
  } else {
S
slzhou 已提交
724 725 726 727
    taosMemoryFree(data->fixLenCol.nullBitmap);
    data->fixLenCol.nullBitmap = NULL;
    taosMemoryFree(data->fixLenCol.data);
    data->fixLenCol.data = NULL;
S
shenglian zhou 已提交
728 729 730
  }
}

H
Hongze Cheng 已提交
731
void freeUdfColumn(SUdfColumn *col) { freeUdfColumnData(&col->colData, &col->colMeta); }
S
shenglian zhou 已提交
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747

void freeUdfDataDataBlock(SUdfDataBlock *block) {
  for (int32_t i = 0; i < block->numOfCols; ++i) {
    freeUdfColumn(block->udfCols[i]);
    taosMemoryFree(block->udfCols[i]);
    block->udfCols[i] = NULL;
  }
  taosMemoryFree(block->udfCols);
  block->udfCols = NULL;
}

void freeUdfInterBuf(SUdfInterBuf *buf) {
  taosMemoryFree(buf->buf);
  buf->buf = NULL;
}

S
slzhou 已提交
748 749
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
  udfBlock->numOfRows = block->info.rows;
750
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
H
Hongze Cheng 已提交
751
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
S
slzhou 已提交
752
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
S
slzhou 已提交
753
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
H
Hongze Cheng 已提交
754 755
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
S
slzhou 已提交
756 757 758 759 760
    udfCol->colMeta.type = col->info.type;
    udfCol->colMeta.bytes = col->info.bytes;
    udfCol->colMeta.scale = col->info.scale;
    udfCol->colMeta.precision = col->info.precision;
    udfCol->colData.numOfRows = udfBlock->numOfRows;
S
slzhou@taodata.com 已提交
761
    udfCol->hasNull = col->hasNull;
S
shenglian zhou 已提交
762
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
S
slzhou 已提交
763 764 765 766 767 768
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
      memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
S
slzhou 已提交
769
    } else {
S
slzhou 已提交
770 771 772
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
H
Hongze Cheng 已提交
773
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
S
slzhou 已提交
774 775 776 777
      memcpy(bitmap, col->nullbitmap, bitmapLen);
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
H
Hongze Cheng 已提交
778
      char *data = udfCol->colData.fixLenCol.data;
S
slzhou 已提交
779
      memcpy(data, col->pData, dataLen);
S
slzhou 已提交
780 781 782 783 784 785 786
    }
  }
  return 0;
}

int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
  block->info.rows = udfCol->colData.numOfRows;
S
shenglian zhou 已提交
787
  block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
S
slzhou 已提交
788 789 790 791

  block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
  taosArraySetSize(block->pDataBlock, 1);
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
H
Hongze Cheng 已提交
792
  SUdfColumnMeta  *meta = &udfCol->colMeta;
S
slzhou 已提交
793 794 795 796
  col->info.precision = meta->precision;
  col->info.bytes = meta->bytes;
  col->info.scale = meta->scale;
  col->info.type = meta->type;
S
slzhou@taodata.com 已提交
797
  col->hasNull = udfCol->hasNull;
S
slzhou 已提交
798 799 800
  SUdfColumnData *data = &udfCol->colData;

  if (!IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
801 802 803 804
    col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
    memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
    col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
    memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
S
slzhou 已提交
805
  } else {
S
slzhou 已提交
806 807 808 809
    col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
    memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
    col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
    memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
S
slzhou 已提交
810 811 812 813
  }
  return 0;
}

S
slzhou 已提交
814 815 816
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
  output->info.rows = input->numOfRows;
  output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
817 818
  for (int32_t i = 0; i < numOfCols; ++i) {
    taosArrayPush(output->pDataBlock, (input + i)->columnData);
819

H
Hongze Cheng 已提交
820
    if (IS_VAR_DATA_TYPE((input + i)->columnData->info.type)) {
821 822
      output->info.hasVarCol = true;
    }
823
  }
S
slzhou 已提交
824 825 826 827
  return 0;
}

int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
828
  if (taosArrayGetSize(input->pDataBlock) != 1) {
S
slzhou 已提交
829 830 831 832
    fnError("scalar function only support one column");
    return -1;
  }
  output->numOfRows = input->info.rows;
S
slzhou 已提交
833 834

  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
H
Hongze Cheng 已提交
835 836
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
  output->colAlloced = true;
S
slzhou 已提交
837

S
slzhou 已提交
838 839
  return 0;
}
S
slzhou 已提交
840

S
shenglian zhou 已提交
841
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
H
Hongze Cheng 已提交
842
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
S
shenglian zhou 已提交
843 844 845
typedef struct SUdfAggRes {
  int8_t finalResNum;
  int8_t interResNum;
H
Hongze Cheng 已提交
846 847
  char  *finalResBuf;
  char  *interResBuf;
S
shenglian zhou 已提交
848
} SUdfAggRes;
H
Hongze Cheng 已提交
849
void    onUdfcPipeClose(uv_handle_t *handle);
S
shenglian zhou 已提交
850
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
H
Hongze Cheng 已提交
851 852 853 854 855 856 857
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
void    udfcUvHandleRsp(SClientUvConn *conn);
void    udfcUvHandleError(SClientUvConn *conn);
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void    onUdfcPipeWrite(uv_write_t *write, int status);
void    onUdfcPipeConnect(uv_connect_t *connect, int status);
S
shenglian zhou 已提交
858
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
S
shenglian zhou 已提交
859 860
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
H
Hongze Cheng 已提交
861 862 863 864
void    udfcAsyncTaskCb(uv_async_t *async);
void    cleanUpUvTasks(SUdfcProxy *udfc);
void    udfStopAsyncCb(uv_async_t *async);
void    constructUdfService(void *argsThread);
S
shenglian zhou 已提交
865 866
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
H
Hongze Cheng 已提交
867
int     compareUdfcFuncSub(const void *elem1, const void *elem2);
S
shenglian zhou 已提交
868
int32_t doTeardownUdf(UdfcFuncHandle handle);
869

S
shenglian zhou 已提交
870
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
H
Hongze Cheng 已提交
871
                SSDataBlock *output, SUdfInterBuf *newState);
S
shenglian zhou 已提交
872 873
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
H
Hongze Cheng 已提交
874 875
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
                          SUdfInterBuf *resultBuf);
S
shenglian zhou 已提交
876
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
H
Hongze Cheng 已提交
877
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
S
shenglian zhou 已提交
878 879 880 881 882
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);

int32_t udfcOpen();
int32_t udfcClose();

H
Hongze Cheng 已提交
883 884
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
void    releaseUdfFuncHandle(char *udfName);
S
shenglian zhou 已提交
885 886
int32_t cleanUpUdfs();

H
Hongze Cheng 已提交
887 888
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
bool    udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
S
shenglian zhou 已提交
889
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
H
Hongze Cheng 已提交
890
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
891

H
Hongze Cheng 已提交
892
int compareUdfcFuncSub(const void *elem1, const void *elem2) {
S
shenglian zhou 已提交
893 894 895
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
  return strcmp(stub1->udfName, stub2->udfName);
896 897
}

H
Hongze Cheng 已提交
898
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
S
shenglian zhou 已提交
899
  int32_t code = 0;
900
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
S
shenglian zhou 已提交
901
  SUdfcFuncStub key = {0};
S
slzhou 已提交
902
  strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
903
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
S
shenglian zhou 已提交
904
  if (stubIndex != -1) {
905
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
S
shenglian zhou 已提交
906
    UdfcFuncHandle handle = foundStub->handle;
H
Hongze Cheng 已提交
907
    if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
S
shenglian zhou 已提交
908 909 910
      *pHandle = foundStub->handle;
      ++foundStub->refCount;
      foundStub->lastRefTime = taosGetTimestampUs();
911
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
S
shenglian zhou 已提交
912
      return 0;
913
    } else {
H
Hongze Cheng 已提交
914 915
      fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName,
             foundStub->refCount, foundStub->lastRefTime);
916
      taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
917
    }
S
shenglian zhou 已提交
918 919 920 921 922 923 924 925 926
  }
  *pHandle = NULL;
  code = doSetupUdf(udfName, pHandle);
  if (code == TSDB_CODE_SUCCESS) {
    SUdfcFuncStub stub = {0};
    strcpy(stub.udfName, udfName);
    stub.handle = *pHandle;
    ++stub.refCount;
    stub.lastRefTime = taosGetTimestampUs();
927 928
    taosArrayPush(gUdfcProxy.udfStubs, &stub);
    taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
929
  } else {
S
shenglian zhou 已提交
930
    *pHandle = NULL;
931 932
  }

933
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
S
shenglian zhou 已提交
934
  return code;
935 936
}

H
Hongze Cheng 已提交
937
void releaseUdfFuncHandle(char *udfName) {
938
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
S
shenglian zhou 已提交
939
  SUdfcFuncStub key = {0};
S
slzhou 已提交
940
  strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
941
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
S
shenglian zhou 已提交
942
  if (!foundStub) {
943
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
S
shenglian zhou 已提交
944
    return;
945
  }
S
shenglian zhou 已提交
946 947
  if (foundStub->refCount > 0) {
    --foundStub->refCount;
948
  }
949
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
950 951
}

S
shenglian zhou 已提交
952
int32_t cleanUpUdfs() {
953
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
954 955 956 957
  if (!initialized) {
    return TSDB_CODE_SUCCESS;
  }

958 959 960
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
  if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) {
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
961 962
    return TSDB_CODE_SUCCESS;
  }
H
Hongze Cheng 已提交
963
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
964
  int32_t i = 0;
965 966
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
S
shenglian zhou 已提交
967 968 969 970
    if (stub->refCount == 0) {
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
      doTeardownUdf(stub->handle);
    } else {
H
Hongze Cheng 已提交
971 972
      fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName,
             stub->refCount, stub->lastRefTime, stub->handle);
S
shenglian zhou 已提交
973
      UdfcFuncHandle handle = stub->handle;
H
Hongze Cheng 已提交
974
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
S
shenglian zhou 已提交
975
        taosArrayPush(udfStubs, stub);
976
      } else {
H
Hongze Cheng 已提交
977
        fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache",
S
shenglian zhou 已提交
978
               stub->udfName, stub->refCount, stub->lastRefTime);
979
      }
980
    }
S
shenglian zhou 已提交
981
    ++i;
982
  }
983 984 985
  taosArrayDestroy(gUdfcProxy.udfStubs);
  gUdfcProxy.udfStubs = udfStubs;
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
S
shenglian zhou 已提交
986 987
  return 0;
}
988

S
shenglian zhou 已提交
989 990
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
  UdfcFuncHandle handle = NULL;
H
Hongze Cheng 已提交
991
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
S
shenglian zhou 已提交
992 993 994 995 996 997 998 999
  if (code != 0) {
    return code;
  }
  SUdfcUvSession *session = handle;
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
  if (output->columnData == NULL) {
    fnError("udfc scalar function calculate error. no column data");
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
1000
  } else {
S
shenglian zhou 已提交
1001
    if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
H
Hongze Cheng 已提交
1002 1003
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
              session->outputType, session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
S
shenglian zhou 已提交
1004 1005
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
    }
1006
  }
S
shenglian zhou 已提交
1007 1008
  releaseUdfFuncHandle(udfName);
  return code;
1009
}
1010

H
Hongze Cheng 已提交
1011
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
S
shenglian zhou 已提交
1012 1013
  if (fmIsScalarFunc(pFunc->funcId)) {
    return false;
S
shenglian zhou 已提交
1014
  }
S
shenglian zhou 已提交
1015 1016
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
  return true;
1017 1018
}

H
Hongze Cheng 已提交
1019
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
S
shenglian zhou 已提交
1020 1021 1022 1023
  if (functionSetup(pCtx, pResultCellInfo) != true) {
    return false;
  }
  UdfcFuncHandle handle;
H
Hongze Cheng 已提交
1024
  int32_t        udfCode = 0;
S
shenglian zhou 已提交
1025 1026 1027 1028 1029
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
    return false;
  }
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
H
Hongze Cheng 已提交
1030 1031
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
  int32_t         envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
S
shenglian zhou 已提交
1032 1033
  memset(udfRes, 0, envSize);

H
Hongze Cheng 已提交
1034 1035
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056

  SUdfInterBuf buf = {0};
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
    releaseUdfFuncHandle(pCtx->udfName);
    return false;
  }
  udfRes->interResNum = buf.numOfResult;
  if (buf.bufLen <= session->bufSize) {
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
  } else {
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
    releaseUdfFuncHandle(pCtx->udfName);
    return false;
  }
  releaseUdfFuncHandle(pCtx->udfName);
  freeUdfInterBuf(&buf);
  return true;
}

int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
H
Hongze Cheng 已提交
1057
  int32_t        udfCode = 0;
S
shenglian zhou 已提交
1058 1059 1060 1061 1062 1063 1064
  UdfcFuncHandle handle = 0;
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
    return udfCode;
  }

  SUdfcUvSession *session = handle;
H
Hongze Cheng 已提交
1065 1066 1067
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1068

H
Hongze Cheng 已提交
1069 1070 1071 1072
  SInputColumnInfoData *pInput = &pCtx->input;
  int32_t               numOfCols = pInput->numOfInputCols;
  int32_t               start = pInput->startRowIndex;
  int32_t               numOfRows = pInput->numOfRows;
S
shenglian zhou 已提交
1073

H
Hongze Cheng 已提交
1074
  SSDataBlock *pTempBlock = createDataBlock();
1075 1076
  pTempBlock->info.rows = pInput->totalRows;
  pTempBlock->info.uid = pInput->uid;
S
shenglian zhou 已提交
1077
  for (int32_t i = 0; i < numOfCols; ++i) {
1078
    blockDataAppendColInfo(pTempBlock, pInput->pData[i]);
S
shenglian zhou 已提交
1079 1080
  }

1081
  SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows);
S
shenglian zhou 已提交
1082

H
Hongze Cheng 已提交
1083
  SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum};
S
shenglian zhou 已提交
1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
  SUdfInterBuf newState = {0};

  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
  if (udfCode != 0) {
    fnError("udfAggProcess error. code: %d", udfCode);
    newState.numOfResult = 0;
  } else {
    udfRes->interResNum = newState.numOfResult;
    if (newState.bufLen <= session->bufSize) {
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
    } else {
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
    }
  }
  if (newState.numOfResult == 1 || state.numOfResult == 1) {
    GET_RES_INFO(pCtx)->numOfRes = 1;
  }

  blockDataDestroy(inputBlock);
1104 1105 1106

  taosArrayDestroy(pTempBlock->pDataBlock);
  taosMemoryFree(pTempBlock);
S
shenglian zhou 已提交
1107 1108 1109 1110 1111 1112

  releaseUdfFuncHandle(pCtx->udfName);
  freeUdfInterBuf(&newState);
  return udfCode;
}

H
Hongze Cheng 已提交
1113 1114
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
  int32_t        udfCode = 0;
S
shenglian zhou 已提交
1115 1116 1117 1118 1119 1120 1121
  UdfcFuncHandle handle = 0;
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
    return udfCode;
  }

  SUdfcUvSession *session = handle;
H
Hongze Cheng 已提交
1122 1123 1124
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1125 1126

  SUdfInterBuf resultBuf = {0};
H
Hongze Cheng 已提交
1127 1128 1129
  SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum};
  int32_t      udfCallCode = 0;
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
S
shenglian zhou 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
  if (udfCallCode != 0) {
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
    GET_RES_INFO(pCtx)->numOfRes = 0;
  } else {
    if (resultBuf.bufLen <= session->outputLen) {
      memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
      udfRes->finalResNum = resultBuf.numOfResult;
      GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
    } else {
      fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
      GET_RES_INFO(pCtx)->numOfRes = 0;
      udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
    }
  }

  freeUdfInterBuf(&resultBuf);

  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
  releaseUdfFuncHandle(pCtx->udfName);
  return udfCallCode == 0 ? numOfResults : udfCallCode;
}

void onUdfcPipeClose(uv_handle_t *handle) {
  SClientUvConn *conn = handle->data;
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
H
Hongze Cheng 已提交
1155
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
S
shenglian zhou 已提交
1156 1157 1158 1159 1160
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
    task->errCode = 0;
    QUEUE_REMOVE(&task->procTaskQueue);
    uv_sem_post(&task->taskSem);
  }
1161
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
1162 1163 1164
  if (conn->session != NULL) {
    conn->session->udfUvPipe = NULL;
  }
1165
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
S
shenglian zhou 已提交
1166 1167
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
H
Hongze Cheng 已提交
1168
  taosMemoryFree((uv_pipe_t *)handle);
S
shenglian zhou 已提交
1169 1170 1171 1172 1173 1174 1175
}

int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
  if (uvTask->type == UV_TASK_REQ_RSP) {
    if (uvTask->rspBuf.base != NULL) {
      SUdfResponse rsp = {0};
H
Hongze Cheng 已提交
1176
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
S
shenglian zhou 已提交
1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
      assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
      task->errCode = rsp.code;

      switch (task->type) {
        case UDF_TASK_SETUP: {
          task->_setup.rsp = rsp.setupRsp;
          break;
        }
        case UDF_TASK_CALL: {
          task->_call.rsp = rsp.callRsp;
          break;
        }
        case UDF_TASK_TEARDOWN: {
          task->_teardown.rsp = rsp.teardownRsp;
          break;
        }
        default: {
          break;
        }
      }

      // TODO: the call buffer is setup and freed by udf invocation
      taosMemoryFree(uvTask->rspBuf.base);
    } else {
      task->errCode = uvTask->errCode;
    }
  } else if (uvTask->type == UV_TASK_CONNECT) {
    task->errCode = uvTask->errCode;
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
    task->errCode = uvTask->errCode;
  }
  return 0;
}

void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
1212
  SClientUvConn  *conn = handle->data;
S
shenglian zhou 已提交
1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
  SClientConnBuf *connBuf = &conn->readBuf;

  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (connBuf->cap == 0) {
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
    if (connBuf->buf) {
      connBuf->len = 0;
      connBuf->cap = msgHeadSize;
      connBuf->total = -1;

      buf->base = connBuf->buf;
      buf->len = connBuf->cap;
    } else {
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
      buf->base = NULL;
      buf->len = 0;
    }
1230 1231 1232
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
    buf->base = connBuf->buf + connBuf->len;
    buf->len = msgHeadSize - connBuf->len;
S
shenglian zhou 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
  } else {
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
    if (resultBuf) {
      connBuf->buf = resultBuf;
      buf->base = connBuf->buf + connBuf->len;
      buf->len = connBuf->cap - connBuf->len;
    } else {
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
      buf->base = NULL;
      buf->len = 0;
    }
  }

1247
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
S
shenglian zhou 已提交
1248 1249 1250 1251
}

bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
H
Hongze Cheng 已提交
1252
    connBuf->total = *(int32_t *)(connBuf->buf);
S
shenglian zhou 已提交
1253 1254
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
1255
    fnDebug("udfc complete message is received, now handle it");
S
shenglian zhou 已提交
1256 1257 1258 1259 1260 1261 1262
    return true;
  }
  return false;
}

void udfcUvHandleRsp(SClientUvConn *conn) {
  SClientConnBuf *connBuf = &conn->readBuf;
H
Hongze Cheng 已提交
1263
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
S
shenglian zhou 已提交
1264 1265

  if (QUEUE_EMPTY(&conn->taskQueue)) {
H
Hongze Cheng 已提交
1266
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
S
shenglian zhou 已提交
1267 1268
    return;
  }
H
Hongze Cheng 已提交
1269
  bool               found = false;
S
shenglian zhou 已提交
1270
  SClientUvTaskNode *taskFound = NULL;
H
Hongze Cheng 已提交
1271
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
S
shenglian zhou 已提交
1272 1273 1274
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);

  while (h != &conn->taskQueue) {
1275
    fnDebug("udfc handle response iterate through queue. uvTask:%d-%p", task->seqNum, task);
S
shenglian zhou 已提交
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
    if (task->seqNum == seqNum) {
      if (found == false) {
        found = true;
        taskFound = task;
      } else {
        fnError("udfc more than one task waiting for the same response");
        continue;
      }
    }
    h = QUEUE_NEXT(h);
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
  }

  if (taskFound) {
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
    QUEUE_REMOVE(&taskFound->connTaskQueue);
    QUEUE_REMOVE(&taskFound->procTaskQueue);
    uv_sem_post(&taskFound->taskSem);
  } else {
    fnError("no task is waiting for the response.");
  }
  connBuf->buf = NULL;
  connBuf->total = -1;
  connBuf->len = 0;
  connBuf->cap = 0;
}

void udfcUvHandleError(SClientUvConn *conn) {
1304
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
S
shenglian zhou 已提交
1305
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
H
Hongze Cheng 已提交
1306
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
S
shenglian zhou 已提交
1307 1308 1309 1310 1311 1312 1313
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
    QUEUE_REMOVE(&task->connTaskQueue);
    QUEUE_REMOVE(&task->procTaskQueue);
    uv_sem_post(&task->taskSem);
  }

H
Hongze Cheng 已提交
1314
  uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
S
shenglian zhou 已提交
1315 1316 1317
}

void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
1318
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
S
shenglian zhou 已提交
1319 1320
  if (nread == 0) return;

H
Hongze Cheng 已提交
1321
  SClientUvConn  *conn = client->data;
S
shenglian zhou 已提交
1322 1323 1324 1325 1326 1327 1328 1329
  SClientConnBuf *connBuf = &conn->readBuf;
  if (nread > 0) {
    connBuf->len += nread;
    if (isUdfcUvMsgComplete(connBuf)) {
      udfcUvHandleRsp(conn);
    }
  }
  if (nread < 0) {
1330
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
S
shenglian zhou 已提交
1331 1332 1333 1334
    if (nread == UV_EOF) {
      fnError("\tudfc client pipe %p closed", client);
    }
    udfcUvHandleError(conn);
1335 1336 1337
  }
}

1338 1339 1340 1341
void onUdfcPipeWrite(uv_write_t *write, int status) {
  SClientUvConn *conn = write->data;
  if (status < 0) {
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
1342
    udfcUvHandleError(conn);
1343 1344
  } else {
    fnDebug("udfc client connection %p write succeed", conn);
1345
  }
wafwerar's avatar
wafwerar 已提交
1346
  taosMemoryFree(write);
1347
}
H
Haojun Liao 已提交
1348

1349
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
1350 1351
  SClientUvTaskNode *uvTask = connect->data;
  if (status != 0) {
H
Hongze Cheng 已提交
1352
    fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status));
H
Haojun Liao 已提交
1353
  }
1354 1355 1356
  uvTask->errCode = status;

  uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
wafwerar's avatar
wafwerar 已提交
1357
  taosMemoryFree(connect);
S
shenglian zhou 已提交
1358
  QUEUE_REMOVE(&uvTask->procTaskQueue);
1359
  uv_sem_post(&uvTask->taskSem);
1360
}
H
Haojun Liao 已提交
1361

S
shenglian zhou 已提交
1362
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
1363
  uvTask->type = uvTaskType;
1364
  uvTask->udfc = task->session->udfc;
1365 1366 1367

  if (uvTaskType == UV_TASK_CONNECT) {
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
S
slzhou 已提交
1368
    uvTask->pipe = task->session->udfUvPipe;
1369 1370
    SUdfRequest request;
    request.type = task->type;
H
Hongze Cheng 已提交
1371
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
1372 1373

    if (task->type == UDF_TASK_SETUP) {
S
shenglian zhou 已提交
1374
      request.setup = task->_setup.req;
1375 1376
      request.type = UDF_TASK_SETUP;
    } else if (task->type == UDF_TASK_CALL) {
S
shenglian zhou 已提交
1377
      request.call = task->_call.req;
1378 1379
      request.type = UDF_TASK_CALL;
    } else if (task->type == UDF_TASK_TEARDOWN) {
S
shenglian zhou 已提交
1380
      request.teardown = task->_teardown.req;
1381 1382
      request.type = UDF_TASK_TEARDOWN;
    } else {
S
shenglian zhou 已提交
1383
      fnError("udfc create uv task, invalid task type : %d", task->type);
1384
    }
1385 1386
    int32_t bufLen = encodeUdfRequest(NULL, &request);
    request.msgLen = bufLen;
S
slzhou 已提交
1387 1388
    void *bufBegin = taosMemoryMalloc(bufLen);
    void *buf = bufBegin;
1389
    encodeUdfRequest(&buf, &request);
S
slzhou 已提交
1390
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
1391 1392
    uvTask->seqNum = request.seqNum;
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
S
slzhou 已提交
1393
    uvTask->pipe = task->session->udfUvPipe;
1394 1395
  }
  uv_sem_init(&uvTask->taskSem, 0);
H
Haojun Liao 已提交
1396

1397 1398
  return 0;
}
H
Haojun Liao 已提交
1399

S
slzhou 已提交
1400
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
1401
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
1402 1403 1404 1405 1406
  SUdfcProxy *udfc = uvTask->udfc;
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
  uv_mutex_unlock(&udfc->taskQueueMutex);
  uv_async_send(&udfc->loopTaskAync);
H
Haojun Liao 已提交
1407

1408
  uv_sem_wait(&uvTask->taskSem);
H
Hongze Cheng 已提交
1409
  fnInfo("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
1410
  uv_sem_destroy(&uvTask->taskSem);
H
Haojun Liao 已提交
1411

1412 1413
  return 0;
}
H
Haojun Liao 已提交
1414

S
slzhou 已提交
1415
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
H
Hongze Cheng 已提交
1416
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
1417 1418
  int32_t code = 0;

1419 1420
  switch (uvTask->type) {
    case UV_TASK_CONNECT: {
wafwerar's avatar
wafwerar 已提交
1421
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
1422
      uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
1423
      uvTask->pipe = pipe;
H
Haojun Liao 已提交
1424

S
slzhou 已提交
1425
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
1426 1427 1428 1429 1430
      conn->pipe = pipe;
      conn->readBuf.len = 0;
      conn->readBuf.cap = 0;
      conn->readBuf.buf = 0;
      conn->readBuf.total = -1;
S
shenglian zhou 已提交
1431
      QUEUE_INIT(&conn->taskQueue);
H
Haojun Liao 已提交
1432

1433 1434
      pipe->data = conn;

wafwerar's avatar
wafwerar 已提交
1435
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
1436
      connReq->data = uvTask;
1437
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
1438
      code = 0;
H
Haojun Liao 已提交
1439
      break;
1440 1441 1442
    }
    case UV_TASK_REQ_RSP: {
      uv_pipe_t *pipe = uvTask->pipe;
1443 1444 1445 1446
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
1447
        write->data = pipe->data;
H
Hongze Cheng 已提交
1448
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
1449 1450
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
        int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
1451
        if (err != 0) {
S
slzhou 已提交
1452
          taosMemoryFree(write);
1453
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
1454 1455
        }
        code = err;
1456
      }
1457 1458 1459
      break;
    }
    case UV_TASK_DISCONNECT: {
1460 1461 1462 1463 1464 1465 1466 1467 1468
      uv_pipe_t *pipe = uvTask->pipe;
      if (pipe == NULL) {
        code = TSDB_CODE_UDF_PIPE_NO_PIPE;
      } else {
        SClientUvConn *conn = pipe->data;
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
        uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
        code = 0;
      }
1469 1470 1471
      break;
    }
    default: {
H
Hongze Cheng 已提交
1472
      fnError("udfc event loop unknown task type.") break;
1473 1474
    }
  }
H
Haojun Liao 已提交
1475

1476
  return code;
1477
}
H
Haojun Liao 已提交
1478

1479
void udfcAsyncTaskCb(uv_async_t *async) {
1480
  SUdfcProxy *udfc = async->data;
H
Hongze Cheng 已提交
1481
  QUEUE       wq;
1482

1483 1484 1485
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1486

S
shenglian zhou 已提交
1487
  while (!QUEUE_EMPTY(&wq)) {
H
Hongze Cheng 已提交
1488
    QUEUE *h = QUEUE_HEAD(&wq);
S
shenglian zhou 已提交
1489 1490
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
H
Hongze Cheng 已提交
1491
    int32_t            code = udfcStartUvTask(task);
1492 1493
    if (code == 0) {
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
1494 1495 1496
    } else {
      task->errCode = code;
      uv_sem_post(&task->taskSem);
1497
    }
1498 1499 1500
  }
}

1501
void cleanUpUvTasks(SUdfcProxy *udfc) {
H
Hongze Cheng 已提交
1502
  fnDebug("clean up uv tasks") QUEUE wq;
1503

1504 1505 1506
  uv_mutex_lock(&udfc->taskQueueMutex);
  QUEUE_MOVE(&udfc->taskQueue, &wq);
  uv_mutex_unlock(&udfc->taskQueueMutex);
1507

S
shenglian zhou 已提交
1508
  while (!QUEUE_EMPTY(&wq)) {
H
Hongze Cheng 已提交
1509
    QUEUE *h = QUEUE_HEAD(&wq);
S
shenglian zhou 已提交
1510 1511
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1512
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1513
      task->errCode = TSDB_CODE_UDF_STOPPING;
1514 1515 1516 1517
    }
    uv_sem_post(&task->taskSem);
  }

1518
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
H
Hongze Cheng 已提交
1519
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
S
shenglian zhou 已提交
1520 1521
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
1522
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
1523
      task->errCode = TSDB_CODE_UDF_STOPPING;
S
shenglian zhou 已提交
1524 1525 1526 1527
    }
    uv_sem_post(&task->taskSem);
  }
}
1528

S
shenglian zhou 已提交
1529
void udfStopAsyncCb(uv_async_t *async) {
1530
  SUdfcProxy *udfc = async->data;
1531
  cleanUpUvTasks(udfc);
1532 1533
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
    uv_stop(&udfc->uvLoop);
S
shenglian zhou 已提交
1534
  }
1535
}
S
shenglian zhou 已提交
1536

S
shenglian zhou 已提交
1537
void constructUdfService(void *argsThread) {
1538 1539 1540
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
  uv_loop_init(&udfc->uvLoop);

1541
  uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
1542 1543 1544 1545 1546 1547 1548
  udfc->loopTaskAync.data = udfc;
  uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
  udfc->loopStopAsync.data = udfc;
  uv_mutex_init(&udfc->taskQueueMutex);
  QUEUE_INIT(&udfc->taskQueue);
  QUEUE_INIT(&udfc->uvProcTaskQueue);
  uv_barrier_wait(&udfc->initBarrier);
H
Hongze Cheng 已提交
1549
  // TODO return value of uv_run
1550 1551
  uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
  uv_loop_close(&udfc->uvLoop);
S
slzhou 已提交
1552 1553 1554 1555

  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
  uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
  uv_loop_close(&udfc->uvLoop);
1556 1557
}

1558
int32_t udfcOpen() {
1559
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
1560 1561 1562
  if (old == 1) {
    return 0;
  }
1563
  SUdfcProxy *proxy = &gUdfcProxy;
1564
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
1565 1566 1567 1568 1569 1570
  proxy->udfcState = UDFC_STATE_STARTNG;
  uv_barrier_init(&proxy->initBarrier, 2);
  uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
  proxy->udfcState = UDFC_STATE_READY;
  uv_barrier_wait(&proxy->initBarrier);
1571 1572
  uv_mutex_init(&proxy->udfStubsMutex);
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1573
  uv_mutex_init(&proxy->udfcUvMutex);
H
Hongze Cheng 已提交
1574
  fnInfo("udfc initialized") return 0;
1575 1576
}

1577
int32_t udfcClose() {
1578
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
1579 1580 1581 1582
  if (old == 0) {
    return 0;
  }

1583
  SUdfcProxy *udfc = &gUdfcProxy;
1584 1585 1586 1587 1588
  udfc->udfcState = UDFC_STATE_STOPPING;
  uv_async_send(&udfc->loopStopAsync);
  uv_thread_join(&udfc->loopThread);
  uv_mutex_destroy(&udfc->taskQueueMutex);
  uv_barrier_destroy(&udfc->initBarrier);
1589 1590
  taosArrayDestroy(udfc->udfStubs);
  uv_mutex_destroy(&udfc->udfStubsMutex);
1591
  uv_mutex_destroy(&udfc->udfcUvMutex);
1592
  udfc->udfcState = UDFC_STATE_INITAL;
S
Shengliang Guan 已提交
1593
  fnInfo("udfc is cleaned up");
1594 1595 1596
  return 0;
}

S
slzhou 已提交
1597
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
S
shenglian zhou 已提交
1598
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
1599
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
S
shenglian zhou 已提交
1600 1601

  udfcInitializeUvTask(task, uvTaskType, uvTask);
S
slzhou 已提交
1602 1603
  udfcQueueUvTask(uvTask);
  udfcGetUdfTaskResultFromUvTask(task, uvTask);
1604
  if (uvTaskType == UV_TASK_CONNECT) {
S
slzhou 已提交
1605 1606 1607
    task->session->udfUvPipe = uvTask->pipe;
    SClientUvConn *conn = uvTask->pipe->data;
    conn->session = task->session;
S
slzhou 已提交
1608
  }
1609 1610
  taosMemoryFree(uvTask->reqBuf.base);
  uvTask->reqBuf.base = NULL;
S
shenglian zhou 已提交
1611
  taosMemoryFree(uvTask);
S
shenglian zhou 已提交
1612 1613
  fnDebug("udfc freed uvTask: %p", task);

S
shenglian zhou 已提交
1614 1615
  uvTask = NULL;
  return task->errCode;
S
slzhou 已提交
1616 1617
}

S
shenglian zhou 已提交
1618
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
1619
  if (gUdfcProxy.udfcState != UDFC_STATE_READY) {
S
shenglian zhou 已提交
1620
    return TSDB_CODE_UDF_INVALID_STATE;
1621
  }
H
Hongze Cheng 已提交
1622
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
S
shenglian zhou 已提交
1623 1624
  task->errCode = 0;
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
1625
  task->session->udfc = &gUdfcProxy;
S
shenglian zhou 已提交
1626 1627 1628 1629 1630 1631 1632
  task->type = UDF_TASK_SETUP;

  SUdfSetupRequest *req = &task->_setup.req;
  strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);

  int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
  if (errCode != 0) {
1633
    fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfcProxy)->udfdPipeName);
1634 1635
    taosMemoryFree(task->session);
    taosMemoryFree(task);
S
shenglian zhou 已提交
1636
    return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
1637
  }
S
slzhou 已提交
1638

S
shenglian zhou 已提交
1639 1640 1641 1642 1643 1644 1645
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);

  SUdfSetupResponse *rsp = &task->_setup.rsp;
  task->session->severHandle = rsp->udfHandle;
  task->session->outputType = rsp->outputType;
  task->session->outputLen = rsp->outputLen;
  task->session->bufSize = rsp->bufSize;
S
slzhou 已提交
1646
  strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
S
shenglian zhou 已提交
1647 1648 1649 1650 1651
  if (task->errCode != 0) {
    fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
  } else {
    fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
    *funcHandle = task->session;
S
slzhou 已提交
1652
  }
S
shenglian zhou 已提交
1653 1654 1655
  int32_t err = task->errCode;
  taosMemoryFree(task);
  return err;
S
slzhou 已提交
1656 1657
}

1658
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
H
Hongze Cheng 已提交
1659
                SSDataBlock *output, SUdfInterBuf *newState) {
1660
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
H
Hongze Cheng 已提交
1661
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
S
slzhou 已提交
1662 1663
  if (session->udfUvPipe == NULL) {
    fnError("No pipe to udfd");
1664
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1665 1666
  }
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1667
  task->errCode = 0;
H
Hongze Cheng 已提交
1668
  task->session = (SUdfcUvSession *)handle;
1669 1670 1671
  task->type = UDF_TASK_CALL;

  SUdfCallRequest *req = &task->_call.req;
S
slzhou 已提交
1672
  req->udfHandle = task->session->severHandle;
S
slzhou 已提交
1673
  req->callType = callType;
S
slzhou 已提交
1674

S
shenglian zhou 已提交
1675
  switch (callType) {
1676 1677 1678 1679
    case TSDB_UDF_CALL_AGG_INIT: {
      req->initFirst = 1;
      break;
    }
S
shenglian zhou 已提交
1680 1681 1682 1683 1684
    case TSDB_UDF_CALL_AGG_PROC: {
      req->block = *input;
      req->interBuf = *state;
      break;
    }
1685 1686 1687 1688 1689 1690
    case TSDB_UDF_CALL_AGG_MERGE: {
      req->interBuf = *state;
      req->interBuf2 = *state2;
      break;
    }
    case TSDB_UDF_CALL_AGG_FIN: {
S
shenglian zhou 已提交
1691 1692 1693 1694 1695 1696 1697 1698 1699
      req->interBuf = *state;
      break;
    }
    case TSDB_UDF_CALL_SCALA_PROC: {
      req->block = *input;
      break;
    }
  }

S
slzhou 已提交
1700
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1701

1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726
  if (task->errCode != 0) {
    fnError("call udf failure. err: %d", task->errCode);
  } else {
    SUdfCallResponse *rsp = &task->_call.rsp;
    switch (callType) {
      case TSDB_UDF_CALL_AGG_INIT: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_PROC: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_MERGE: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_FIN: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_SCALA_PROC: {
        *output = rsp->resultData;
        break;
      }
S
shenglian zhou 已提交
1727
    }
S
slzhou 已提交
1728 1729
  };
  int err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1730
  taosMemoryFree(task);
S
slzhou 已提交
1731
  return err;
1732 1733
}

1734
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
S
slzhou 已提交
1735 1736 1737 1738 1739 1740 1741 1742 1743
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;

  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);

  return err;
}

// input: block, state
// output: interbuf,
1744
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
H
Hongze Cheng 已提交
1745
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
S
slzhou 已提交
1746 1747 1748 1749 1750 1751
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
  return err;
}

// input: interbuf1, interbuf2
// output: resultBuf
H
Hongze Cheng 已提交
1752 1753 1754
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
                          SUdfInterBuf *resultBuf) {
  int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
S
slzhou 已提交
1755 1756 1757 1758 1759 1760
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
  return err;
}

// input: interBuf
// output: resultData
1761
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
H
Hongze Cheng 已提交
1762
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
S
slzhou 已提交
1763 1764 1765 1766
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
  return err;
}

H
Hongze Cheng 已提交
1767 1768
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
S
slzhou 已提交
1769 1770 1771
  SSDataBlock inputBlock = {0};
  convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
  SSDataBlock resultBlock = {0};
H
Hongze Cheng 已提交
1772
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
S
slzhou 已提交
1773 1774
  if (err == 0) {
    convertDataBlockToScalarParm(&resultBlock, output);
S
slzhou 已提交
1775
    taosArrayDestroy(resultBlock.pDataBlock);
S
slzhou 已提交
1776
  }
S
slzhou 已提交
1777 1778

  taosArrayDestroy(inputBlock.pDataBlock);
S
slzhou 已提交
1779 1780 1781
  return err;
}

1782
int32_t doTeardownUdf(UdfcFuncHandle handle) {
H
Hongze Cheng 已提交
1783
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
S
slzhou 已提交
1784

S
slzhou 已提交
1785
  if (session->udfUvPipe == NULL) {
S
slzhou 已提交
1786
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
1787
    taosMemoryFree(session);
1788
    return TSDB_CODE_UDF_PIPE_NO_PIPE;
S
slzhou 已提交
1789 1790 1791
  }

  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1792
  task->errCode = 0;
S
slzhou 已提交
1793
  task->session = session;
1794 1795 1796 1797 1798
  task->type = UDF_TASK_TEARDOWN;

  SUdfTeardownRequest *req = &task->_teardown.req;
  req->udfHandle = task->session->severHandle;

S
slzhou 已提交
1799
  udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1800 1801 1802

  int32_t err = task->errCode;

S
slzhou 已提交
1803
  udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
1804

S
slzhou 已提交
1805
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
H
Hongze Cheng 已提交
1806
  // TODO: synchronization refactor between libuv event loop and request thread
1807
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
S
slzhou 已提交
1808
  if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
1809 1810 1811
    SClientUvConn *conn = session->udfUvPipe->data;
    conn->session = NULL;
  }
1812
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
1813
  taosMemoryFree(session);
wafwerar's avatar
wafwerar 已提交
1814
  taosMemoryFree(task);
1815 1816 1817

  return err;
}