tudf.c 42.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
#include "os.h"
S
slzhou 已提交
17
#include "fnLog.h"
18
#include "tudf.h"
19
#include "tudfInt.h"
S
shenglian zhou 已提交
20
#include "tarray.h"
S
slzhou 已提交
21
#include "tdatablock.h"
S
shenglian zhou 已提交
22 23
#include "querynodes.h"
#include "builtinsimpl.h"
S
slzhou 已提交
24
#include "functionMgt.h"
25

26 27
//TODO: network error processing.
//TODO: add unit test
S
shenglian zhou 已提交
28
//TODO: include all global variable under context struct
S
slzhou 已提交
29

S
shenglian zhou 已提交
30 31 32 33
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
 * The QUEUE is copied from queue.h under libuv
 * */

S
shenglian zhou 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
typedef void *QUEUE[2];

/* Private macros. */
#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - offsetof(type, field)))

/* Important note: mutating the list while QUEUE_FOREACH is
 * iterating over its elements results in undefined behavior.
 */
#define QUEUE_FOREACH(q, h)                                                   \
  for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))

#define QUEUE_EMPTY(q)                                                        \
  ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))

#define QUEUE_HEAD(q)                                                         \
  (QUEUE_NEXT(q))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_ADD(h, n)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
  }                                                                           \
  while (0)

#define QUEUE_SPLIT(h, q, n)                                                  \
  do {                                                                        \
    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(n) = (n);                                                 \
    QUEUE_NEXT(n) = (q);                                                      \
    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
    QUEUE_PREV_NEXT(h) = (h);                                                 \
    QUEUE_PREV(q) = (n);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_MOVE(h, n)                                                      \
  do {                                                                        \
    if (QUEUE_EMPTY(h))                                                       \
      QUEUE_INIT(n);                                                          \
    else {                                                                    \
      QUEUE* q = QUEUE_HEAD(h);                                               \
      QUEUE_SPLIT(h, q, n);                                                   \
    }                                                                         \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_HEAD(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
    QUEUE_PREV(q) = (h);                                                      \
    QUEUE_NEXT_PREV(q) = (q);                                                 \
    QUEUE_NEXT(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_INSERT_TAIL(h, q)                                               \
  do {                                                                        \
    QUEUE_NEXT(q) = (h);                                                      \
    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
    QUEUE_PREV_NEXT(q) = (q);                                                 \
    QUEUE_PREV(h) = (q);                                                      \
  }                                                                           \
  while (0)

#define QUEUE_REMOVE(q)                                                       \
  do {                                                                        \
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
  }                                                                           \
  while (0)


122 123 124 125 126 127
enum {
  UV_TASK_CONNECT = 0,
  UV_TASK_REQ_RSP = 1,
  UV_TASK_DISCONNECT = 2
};

128 129
int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy {
130
  char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN];
131 132 133 134 135 136 137 138 139 140 141 142
  uv_barrier_t gUdfInitBarrier;

  uv_loop_t gUdfdLoop;
  uv_thread_t gUdfLoopThread;
  uv_async_t gUdfLoopTaskAync;

  uv_async_t gUdfLoopStopAsync;

  uv_mutex_t gUdfTaskQueueMutex;
  int8_t gUdfcState;
  QUEUE gUdfTaskQueue;
  QUEUE gUvProcTaskQueue;
143 144

  int8_t initialized;
145 146
} SUdfdProxy;

147
SUdfdProxy gUdfdProxy = {0};
148

149
typedef struct SUdfUvSession {
150
  SUdfdProxy *udfc;
151 152
  int64_t severHandle;
  uv_pipe_t *udfSvcPipe;
S
shenglian zhou 已提交
153 154 155 156

  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
157 158 159
} SUdfUvSession;

typedef struct SClientUvTaskNode {
160
  SUdfdProxy *udfc;
161 162 163 164 165 166 167 168 169 170 171
  int8_t type;
  int errCode;

  uv_pipe_t *pipe;

  int64_t seqNum;
  uv_buf_t reqBuf;

  uv_sem_t taskSem;
  uv_buf_t rspBuf;

S
shenglian zhou 已提交
172 173 174
  QUEUE recvTaskQueue;
  QUEUE procTaskQueue;
  QUEUE connTaskQueue;
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
} SClientUvTaskNode;

typedef struct SClientUdfTask {
  int8_t type;

  SUdfUvSession *session;

  int32_t errCode;

  union {
    struct {
      SUdfSetupRequest req;
      SUdfSetupResponse rsp;
    } _setup;
    struct {
      SUdfCallRequest req;
      SUdfCallResponse rsp;
    } _call;
    struct {
      SUdfTeardownRequest req;
      SUdfTeardownResponse rsp;
    } _teardown;
  };

} SClientUdfTask;

typedef struct SClientConnBuf {
  char *buf;
  int32_t len;
  int32_t cap;
  int32_t total;
} SClientConnBuf;

typedef struct SClientUvConn {
  uv_pipe_t *pipe;
S
shenglian zhou 已提交
210
  QUEUE taskQueue;
211 212 213
  SClientConnBuf readBuf;
} SClientUvConn;

214 215
enum {
  UDFC_STATE_INITAL = 0, // initial state
216
  UDFC_STATE_STARTNG, // starting after udfcOpen
217
  UDFC_STATE_READY, // started and begin to receive quests
218
  UDFC_STATE_STOPPING, // stopping after udfcClose
219
};
220

221 222
int32_t getUdfdPipeName(char* pipeName, int32_t size) {
  char    dnodeId[8] = {0};
wafwerar's avatar
wafwerar 已提交
223
  size_t  dnodeIdSize = sizeof(dnodeId);
224 225 226 227 228 229 230 231
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
  if (err != 0) {
    dnodeId[0] = '1';
  }
  snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
  return 0;
}

232 233 234
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
  int32_t len = 0;
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
S
shenglian zhou 已提交
235 236
  return len;
}
237

238 239 240
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
  return (void*)buf;
S
shenglian zhou 已提交
241
}
242

243 244
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
  int32_t len = 0;
245
  len += taosEncodeFixedI8(buf, state->numOfResult);
246 247
  len += taosEncodeFixedI32(buf, state->bufLen);
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
S
shenglian zhou 已提交
248 249
  return len;
}
250

251
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
252
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
253 254 255
  buf = taosDecodeFixedI32(buf, &state->bufLen);
  buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
  return (void*)buf;
S
shenglian zhou 已提交
256 257
}

258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, call->udfHandle);
  len += taosEncodeFixedI8(buf, call->callType);
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
    len += taosEncodeFixedI8(buf, call->initFirst);
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
    len += tEncodeDataBlock(buf, &call->block);
    len += encodeUdfInterBuf(buf, &call->interBuf);
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
    len += encodeUdfInterBuf(buf, &call->interBuf2);
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
    len += encodeUdfInterBuf(buf, &call->interBuf);
274
  }
275
  return len;
276 277
}

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) {
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
  buf = taosDecodeFixedI8(buf, &call->callType);
  switch (call->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
      buf = taosDecodeFixedI8(buf, &call->initFirst);
      break;
    case TSDB_UDF_CALL_AGG_PROC:
      buf = tDecodeDataBlock(buf, &call->block);
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
      break;
    case TSDB_UDF_CALL_AGG_FIN:
      buf = decodeUdfInterBuf(buf, &call->interBuf);
      break;
299
  }
300
  return (void*)buf;
S
shenglian zhou 已提交
301 302
}

303 304 305 306
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
  return len;
S
shenglian zhou 已提交
307 308
}

309 310 311
void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) {
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
  return (void*)buf;
S
shenglian zhou 已提交
312 313
}

314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(request->msgLen);
  } else {
    *(int32_t*)(*buf) = request->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
  }
  len += taosEncodeFixedI64(buf, request->seqNum);
  len += taosEncodeFixedI8(buf, request->type);
  if (request->type == UDF_TASK_SETUP) {
    len += encodeUdfSetupRequest(buf, &request->setup);
  } else if (request->type == UDF_TASK_CALL) {
    len += encodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    len += encodeUdfTeardownRequest(buf, &request->teardown);
  }
  return len;
S
shenglian zhou 已提交
332 333
}

334 335
void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
  request->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
336
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
S
shenglian zhou 已提交
337

338 339
  buf = taosDecodeFixedI64(buf, &request->seqNum);
  buf = taosDecodeFixedI8(buf, &request->type);
S
shenglian zhou 已提交
340 341

  if (request->type == UDF_TASK_SETUP) {
342
    buf = decodeUdfSetupRequest(buf, &request->setup);
S
shenglian zhou 已提交
343
  } else if (request->type == UDF_TASK_CALL) {
344 345 346
    buf = decodeUdfCallRequest(buf, &request->call);
  } else if (request->type == UDF_TASK_TEARDOWN) {
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
S
shenglian zhou 已提交
347
  }
348
  return (void*)buf;
S
shenglian zhou 已提交
349
}
350

351 352 353
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
S
shenglian zhou 已提交
354 355 356
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
  len += taosEncodeFixedI32(buf, setupRsp->outputLen);
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
357 358
  return len;
}
359

360 361
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
S
shenglian zhou 已提交
362 363 364
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
  buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
365
  return (void*)buf;
S
shenglian zhou 已提交
366
}
367

368 369 370 371 372 373
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
  int32_t len = 0;
  len += taosEncodeFixedI8(buf, callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      len += tEncodeDataBlock(buf, &callRsp->resultData);
374
      break;
375
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
376
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
377 378
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
379
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
380 381
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
382
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
383 384
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
385
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
386 387
      break;
  }
388
  return len;
S
shenglian zhou 已提交
389 390
}

391 392 393 394 395 396 397
void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
  switch (callRsp->callType) {
    case TSDB_UDF_CALL_SCALA_PROC:
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
      break;
    case TSDB_UDF_CALL_AGG_INIT:
S
slzhou 已提交
398
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
399 400
      break;
    case TSDB_UDF_CALL_AGG_PROC:
S
slzhou 已提交
401
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
402 403
      break;
    case TSDB_UDF_CALL_AGG_MERGE:
S
slzhou 已提交
404
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
405 406
      break;
    case TSDB_UDF_CALL_AGG_FIN:
S
slzhou 已提交
407
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
408
      break;
S
shenglian zhou 已提交
409
  }
410
  return (void*)buf;
S
shenglian zhou 已提交
411 412
}

413 414
int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) {
  return 0;
S
shenglian zhou 已提交
415 416
}

417 418
void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) {
  return (void*)buf;
S
shenglian zhou 已提交
419 420
}

421 422 423 424 425 426 427
int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
  int32_t len = 0;
  if (buf == NULL) {
    len += sizeof(rsp->msgLen);
  } else {
    *(int32_t*)(*buf) = rsp->msgLen;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
S
shenglian zhou 已提交
428 429
  }

S
slzhou 已提交
430 431 432 433 434 435 436
  if (buf == NULL) {
    len += sizeof(rsp->seqNum);
  } else {
    *(int64_t*)(*buf) = rsp->seqNum;
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
  }

437 438 439
  len += taosEncodeFixedI64(buf, rsp->seqNum);
  len += taosEncodeFixedI8(buf, rsp->type);
  len += taosEncodeFixedI32(buf, rsp->code);
S
shenglian zhou 已提交
440

441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
  }
  return len;
S
shenglian zhou 已提交
456 457
}

458 459
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
  rsp->msgLen = *(int32_t*)(buf);
S
slzhou 已提交
460
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
S
slzhou 已提交
461 462
  rsp->seqNum = *(int64_t*)(buf);
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
463 464 465
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
  buf = taosDecodeFixedI8(buf, &rsp->type);
  buf = taosDecodeFixedI32(buf, &rsp->code);
S
shenglian zhou 已提交
466

467 468 469 470 471 472 473 474 475 476 477 478 479
  switch (rsp->type) {
    case UDF_TASK_SETUP:
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
      break;
    case UDF_TASK_CALL:
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
      break;
    case UDF_TASK_TEARDOWN:
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
      break;
    default:
      //TODO: log error
      break;
480
  }
481
  return (void*)buf;
482
}
483

S
shenglian zhou 已提交
484 485
void freeUdfColumnData(SUdfColumnData *data) {
  if (data->varLengthColumn) {
S
slzhou 已提交
486 487 488 489
    taosMemoryFree(data->varLenCol.varOffsets);
    data->varLenCol.varOffsets = NULL;
    taosMemoryFree(data->varLenCol.payload);
    data->varLenCol.payload = NULL;
S
shenglian zhou 已提交
490
  } else {
S
slzhou 已提交
491 492 493 494
    taosMemoryFree(data->fixLenCol.nullBitmap);
    data->fixLenCol.nullBitmap = NULL;
    taosMemoryFree(data->fixLenCol.data);
    data->fixLenCol.data = NULL;
S
shenglian zhou 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
  }
}

void freeUdfColumn(SUdfColumn* col) {
  freeUdfColumnData(&col->colData);
}

void freeUdfDataDataBlock(SUdfDataBlock *block) {
  for (int32_t i = 0; i < block->numOfCols; ++i) {
    freeUdfColumn(block->udfCols[i]);
    taosMemoryFree(block->udfCols[i]);
    block->udfCols[i] = NULL;
  }
  taosMemoryFree(block->udfCols);
  block->udfCols = NULL;
}

void freeUdfInterBuf(SUdfInterBuf *buf) {
  taosMemoryFree(buf->buf);
  buf->buf = NULL;
}

S
slzhou 已提交
517 518 519 520

int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
  udfBlock->numOfRows = block->info.rows;
  udfBlock->numOfCols = block->info.numOfCols;
S
slzhou 已提交
521
  udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*));
S
slzhou 已提交
522
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
S
slzhou 已提交
523
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
S
slzhou 已提交
524 525 526 527 528 529 530 531 532
    SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
    SUdfColumn *udfCol = udfBlock->udfCols[i];
    udfCol->colMeta.type = col->info.type;
    udfCol->colMeta.bytes = col->info.bytes;
    udfCol->colMeta.scale = col->info.scale;
    udfCol->colMeta.precision = col->info.precision;
    udfCol->colData.numOfRows = udfBlock->numOfRows;
    udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
    if (udfCol->colData.varLengthColumn) {
S
slzhou 已提交
533 534 535 536 537 538
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
      memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
S
slzhou 已提交
539
    } else {
S
slzhou 已提交
540 541 542 543 544 545 546 547 548 549
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
      char* bitmap = udfCol->colData.fixLenCol.nullBitmap;
      memcpy(bitmap, col->nullbitmap, bitmapLen);
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
      char* data = udfCol->colData.fixLenCol.data;
      memcpy(data, col->pData, dataLen);
S
slzhou 已提交
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
    }
  }
  return 0;
}

int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
  block->info.numOfCols = 1;
  block->info.rows = udfCol->colData.numOfRows;
  block->info.hasVarCol = udfCol->colData.varLengthColumn;

  block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
  taosArraySetSize(block->pDataBlock, 1);
  SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
  SUdfColumnMeta *meta = &udfCol->colMeta;
  col->info.precision = meta->precision;
  col->info.bytes = meta->bytes;
  col->info.scale = meta->scale;
  col->info.type = meta->type;
  SUdfColumnData *data = &udfCol->colData;

  if (!IS_VAR_DATA_TYPE(meta->type)) {
S
slzhou 已提交
571 572 573 574
    col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
    memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
    col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
    memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
S
slzhou 已提交
575
  } else {
S
slzhou 已提交
576 577 578 579
    col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
    memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
    col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
    memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
S
slzhou 已提交
580 581 582 583
  }
  return 0;
}

S
slzhou 已提交
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
  output->info.rows = input->numOfRows;
  output->info.numOfCols = numOfCols;
  bool hasVarCol = false;
  for (int32_t i = 0; i < numOfCols; ++i) {
    if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) {
      hasVarCol = true;
      break;
    }
  }
  output->info.hasVarCol = hasVarCol;

  //TODO: free the array output->pDataBlock
  output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  taosArrayPush(output->pDataBlock, input->columnData);
  return 0;
}

int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
  if (input->info.numOfCols != 1) {
    fnError("scalar function only support one column");
    return -1;
  }
  output->numOfRows = input->info.rows;
  //TODO: memory
  output->columnData = taosArrayGet(input->pDataBlock, 0);
  return 0;
}
S
slzhou 已提交
612

613 614
void onUdfcPipeClose(uv_handle_t *handle) {
  SClientUvConn *conn = handle->data;
S
shenglian zhou 已提交
615 616 617
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
618 619
    task->errCode = 0;
    uv_sem_post(&task->taskSem);
S
shenglian zhou 已提交
620
    QUEUE_REMOVE(&task->procTaskQueue);
621 622
  }

wafwerar's avatar
wafwerar 已提交
623 624 625
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
  taosMemoryFree((uv_pipe_t *) handle);
626 627 628 629

}

int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
630
  fnDebug("udfc get uv task result. task: %p", task);
631 632
  if (uvTask->type == UV_TASK_REQ_RSP) {
    if (uvTask->rspBuf.base != NULL) {
S
shenglian zhou 已提交
633
      SUdfResponse rsp;
634 635
      void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
      assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base));
S
shenglian zhou 已提交
636
      task->errCode = rsp.code;
637 638 639

      switch (task->type) {
        case UDF_TASK_SETUP: {
S
shenglian zhou 已提交
640
          //TODO: copy or not
S
shenglian zhou 已提交
641
          task->_setup.rsp = rsp.setupRsp;
642 643 644
          break;
        }
        case UDF_TASK_CALL: {
S
shenglian zhou 已提交
645
          task->_call.rsp = rsp.callRsp;
S
shenglian zhou 已提交
646
          //TODO: copy or not
647 648 649
          break;
        }
        case UDF_TASK_TEARDOWN: {
S
shenglian zhou 已提交
650
          task->_teardown.rsp = rsp.teardownRsp;
651 652 653 654 655 656 657 658 659
          //TODO: copy or not?
          break;
        }
        default: {
          break;
        }
      }

      // TODO: the call buffer is setup and freed by udf invocation
wafwerar's avatar
wafwerar 已提交
660
      taosMemoryFree(uvTask->rspBuf.base);
661 662
    } else {
      task->errCode = uvTask->errCode;
663
    }
664 665 666 667
  } else if (uvTask->type == UV_TASK_CONNECT) {
    task->errCode = uvTask->errCode;
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
    task->errCode = uvTask->errCode;
668
  }
669 670 671 672 673 674 675 676 677
  return 0;
}

void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
  SClientUvConn *conn = handle->data;
  SClientConnBuf *connBuf = &conn->readBuf;

  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (connBuf->cap == 0) {
wafwerar's avatar
wafwerar 已提交
678
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
679 680 681 682 683 684 685 686
    if (connBuf->buf) {
      connBuf->len = 0;
      connBuf->cap = msgHeadSize;
      connBuf->total = -1;

      buf->base = connBuf->buf;
      buf->len = connBuf->cap;
    } else {
687
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
688 689 690 691 692
      buf->base = NULL;
      buf->len = 0;
    }
  } else {
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
wafwerar's avatar
wafwerar 已提交
693
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
694 695 696 697 698
    if (resultBuf) {
      connBuf->buf = resultBuf;
      buf->base = connBuf->buf + connBuf->len;
      buf->len = connBuf->cap - connBuf->len;
    } else {
699
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
700 701 702 703 704
      buf->base = NULL;
      buf->len = 0;
    }
  }

705
  fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
706 707 708

}

709 710 711 712 713
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
    connBuf->total = *(int32_t *) (connBuf->buf);
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
714
    fnTrace("udfc complete message is received, now handle it");
715
    return true;
716
  }
717 718 719 720 721
  return false;
}

void udfcUvHandleRsp(SClientUvConn *conn) {
  SClientConnBuf *connBuf = &conn->readBuf;
722
  int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
723

S
shenglian zhou 已提交
724
  if (QUEUE_EMPTY(&conn->taskQueue)) {
725
    fnError("udfc no task waiting for response on connection");
726 727 728 729
    return;
  }
  bool found = false;
  SClientUvTaskNode *taskFound = NULL;
S
shenglian zhou 已提交
730 731 732 733
  QUEUE* h = QUEUE_NEXT(&conn->taskQueue);
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);

  while (h != &conn->taskQueue) {
734 735 736 737 738
    if (task->seqNum == seqNum) {
      if (found == false) {
        found = true;
        taskFound = task;
      } else {
739
        fnError("udfc more than one task waiting for the same response");
740 741
        continue;
      }
742
    }
S
shenglian zhou 已提交
743 744
    h = QUEUE_NEXT(h);
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
745 746
  }

747 748
  if (taskFound) {
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
S
shenglian zhou 已提交
749
    QUEUE_REMOVE(&taskFound->connTaskQueue);
750
    uv_sem_post(&taskFound->taskSem);
S
shenglian zhou 已提交
751
    QUEUE_REMOVE(&taskFound->procTaskQueue);
752
  } else {
753
    fnError("no task is waiting for the response.");
754 755 756 757 758 759
  }
  connBuf->buf = NULL;
  connBuf->total = -1;
  connBuf->len = 0;
  connBuf->cap = 0;
}
760

761
void udfcUvHandleError(SClientUvConn *conn) {
S
shenglian zhou 已提交
762 763 764 765 766 767 768 769 770 771 772 773
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
    QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
    task->errCode = UDFC_CODE_PIPE_READ_ERR;
    uv_sem_post(&task->taskSem);
    QUEUE_REMOVE(&task->procTaskQueue);
  }

  uv_close((uv_handle_t *) conn->pipe, NULL);
  taosMemoryFree(conn->pipe);
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
774 775 776
}

void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
777
  fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
778 779 780 781 782 783 784 785 786 787 788 789
  if (nread == 0) return;

  SClientUvConn *conn = client->data;
  SClientConnBuf *connBuf = &conn->readBuf;
  if (nread > 0) {
    connBuf->len += nread;
    if (isUdfcUvMsgComplete(connBuf)) {
      udfcUvHandleRsp(conn);
    }

  }
  if (nread < 0) {
790
    fnError("udfc client pipe %p read error: %s", client, uv_strerror(nread));
791
    if (nread == UV_EOF) {
792
      fnError("udfc client pipe %p closed", client);
793 794
    }
    udfcUvHandleError(conn);
795 796 797 798
  }

}

799 800
void onUdfClientWrite(uv_write_t *write, int status) {
  SClientUvTaskNode *uvTask = write->data;
801
  uv_pipe_t *pipe = uvTask->pipe;
802 803
  if (status == 0) {
    SClientUvConn *conn = pipe->data;
S
shenglian zhou 已提交
804
    QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
805
  } else {
806
    fnError("udfc client %p write error.", pipe);
807
  }
808
  fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
wafwerar's avatar
wafwerar 已提交
809 810
  taosMemoryFree(write);
  taosMemoryFree(uvTask->reqBuf.base);
811
}
H
Haojun Liao 已提交
812

813 814 815 816 817
void onUdfClientConnect(uv_connect_t *connect, int status) {
  SClientUvTaskNode *uvTask = connect->data;
  uvTask->errCode = status;
  if (status != 0) {
    //TODO: LOG error
H
Haojun Liao 已提交
818
  }
819
  uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead);
wafwerar's avatar
wafwerar 已提交
820
  taosMemoryFree(connect);
821
  uv_sem_post(&uvTask->taskSem);
S
shenglian zhou 已提交
822
  QUEUE_REMOVE(&uvTask->procTaskQueue);
823
}
H
Haojun Liao 已提交
824

825
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
wafwerar's avatar
wafwerar 已提交
826
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
827
  uvTask->type = uvTaskType;
828
  uvTask->udfc = task->session->udfc;
829 830 831 832 833 834

  if (uvTaskType == UV_TASK_CONNECT) {
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
    uvTask->pipe = task->session->udfSvcPipe;
    SUdfRequest request;
    request.type = task->type;
835
    request.seqNum =   atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
836 837

    if (task->type == UDF_TASK_SETUP) {
S
shenglian zhou 已提交
838
      request.setup = task->_setup.req;
839 840
      request.type = UDF_TASK_SETUP;
    } else if (task->type == UDF_TASK_CALL) {
S
shenglian zhou 已提交
841
      request.call = task->_call.req;
842 843
      request.type = UDF_TASK_CALL;
    } else if (task->type == UDF_TASK_TEARDOWN) {
S
shenglian zhou 已提交
844
      request.teardown = task->_teardown.req;
845 846 847 848
      request.type = UDF_TASK_TEARDOWN;
    } else {
      //TODO log and return error
    }
849 850
    int32_t bufLen = encodeUdfRequest(NULL, &request);
    request.msgLen = bufLen;
S
slzhou 已提交
851 852
    void *bufBegin = taosMemoryMalloc(bufLen);
    void *buf = bufBegin;
853
    encodeUdfRequest(&buf, &request);
S
slzhou 已提交
854
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
855 856 857 858 859
    uvTask->seqNum = request.seqNum;
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
    uvTask->pipe = task->session->udfSvcPipe;
  }
  uv_sem_init(&uvTask->taskSem, 0);
H
Haojun Liao 已提交
860

861 862 863
  *pUvTask = uvTask;
  return 0;
}
H
Haojun Liao 已提交
864

865
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
866
  fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
867 868 869 870 871
  SUdfdProxy *udfc = uvTask->udfc;
  uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
  QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
  uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
  uv_async_send(&udfc->gUdfLoopTaskAync);
H
Haojun Liao 已提交
872

873 874
  uv_sem_wait(&uvTask->taskSem);
  uv_sem_destroy(&uvTask->taskSem);
H
Haojun Liao 已提交
875

876 877
  return 0;
}
H
Haojun Liao 已提交
878

879
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
880
  fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
881 882
  switch (uvTask->type) {
    case UV_TASK_CONNECT: {
wafwerar's avatar
wafwerar 已提交
883
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
884
      uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
885
      uvTask->pipe = pipe;
H
Haojun Liao 已提交
886

wafwerar's avatar
wafwerar 已提交
887
      SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn));
888 889 890 891 892
      conn->pipe = pipe;
      conn->readBuf.len = 0;
      conn->readBuf.cap = 0;
      conn->readBuf.buf = 0;
      conn->readBuf.total = -1;
S
shenglian zhou 已提交
893
      QUEUE_INIT(&conn->taskQueue);
H
Haojun Liao 已提交
894

895 896
      pipe->data = conn;

wafwerar's avatar
wafwerar 已提交
897
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
898
      connReq->data = uvTask;
899
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect);
H
Haojun Liao 已提交
900
      break;
901 902 903
    }
    case UV_TASK_REQ_RSP: {
      uv_pipe_t *pipe = uvTask->pipe;
wafwerar's avatar
wafwerar 已提交
904
      uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
905 906 907 908 909 910
      write->data = uvTask;
      uv_write(write, (uv_stream_t *) pipe, &uvTask->reqBuf, 1, onUdfClientWrite);
      break;
    }
    case UV_TASK_DISCONNECT: {
      SClientUvConn *conn = uvTask->pipe->data;
S
shenglian zhou 已提交
911
      QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
912 913 914 915 916 917 918
      uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
      break;
    }
    default: {
      break;
    }
  }
H
Haojun Liao 已提交
919

920 921
  return 0;
}
H
Haojun Liao 已提交
922

923
void udfClientAsyncCb(uv_async_t *async) {
924
  SUdfdProxy *udfc = async->data;
S
shenglian zhou 已提交
925
  QUEUE wq;
926

927 928 929
  uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
  QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
  uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
930

S
shenglian zhou 已提交
931 932 933 934
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
935
    startUvUdfTask(task);
936
    QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
937 938 939 940
  }

}

941
void cleanUpUvTasks(SUdfdProxy *udfc) {
S
shenglian zhou 已提交
942
  QUEUE wq;
943

944 945 946
  uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
  QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
  uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
947

S
shenglian zhou 已提交
948 949 950 951
  while (!QUEUE_EMPTY(&wq)) {
    QUEUE* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
952
    if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
953 954 955 956 957 958
      task->errCode = UDFC_CODE_STOPPING;
    }
    uv_sem_post(&task->taskSem);
  }

  // TODO: deal with tasks that are waiting result.
959 960
  while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
    QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
S
shenglian zhou 已提交
961 962
    QUEUE_REMOVE(h);
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
963
    if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
S
shenglian zhou 已提交
964 965 966 967 968
      task->errCode = UDFC_CODE_STOPPING;
    }
    uv_sem_post(&task->taskSem);
  }
}
969

S
shenglian zhou 已提交
970
void udfStopAsyncCb(uv_async_t *async) {
971 972 973 974
  SUdfdProxy *udfc = async->data;
  cleanUpUvTasks(udfc);
  if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
    uv_stop(&udfc->gUdfdLoop);
S
shenglian zhou 已提交
975
  }
976
}
S
shenglian zhou 已提交
977

S
shenglian zhou 已提交
978
void constructUdfService(void *argsThread) {
979 980 981 982 983 984 985 986 987 988 989
  SUdfdProxy *udfc = (SUdfdProxy*)argsThread;
  uv_loop_init(&udfc->gUdfdLoop);

  uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb);
  udfc->gUdfLoopTaskAync.data = udfc;
  uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb);
  udfc->gUdfLoopStopAsync.data = udfc;
  uv_mutex_init(&udfc->gUdfTaskQueueMutex);
  QUEUE_INIT(&udfc->gUdfTaskQueue);
  QUEUE_INIT(&udfc->gUvProcTaskQueue);
  uv_barrier_wait(&udfc->gUdfInitBarrier);
990
  //TODO return value of uv_run
991 992
  uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT);
  uv_loop_close(&udfc->gUdfdLoop);
993 994
}

995 996 997 998 999 1000
int32_t udfcOpen() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
  if (old == 1) {
    return 0;
  }
  SUdfdProxy *proxy = &gUdfdProxy;
1001
  getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN);
1002 1003 1004
  proxy->gUdfcState = UDFC_STATE_STARTNG;
  uv_barrier_init(&proxy->gUdfInitBarrier, 2);
  uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
1005
  atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY);
1006
  proxy->gUdfcState = UDFC_STATE_READY;
1007 1008
  uv_barrier_wait(&proxy->gUdfInitBarrier);
  fnInfo("udfc initialized")
1009 1010 1011
  return 0;
}

1012 1013 1014 1015 1016 1017 1018
int32_t udfcClose() {
  int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
  if (old == 0) {
    return 0;
  }

  SUdfdProxy *udfc = &gUdfdProxy;
1019 1020 1021 1022 1023
  udfc->gUdfcState = UDFC_STATE_STOPPING;
  uv_async_send(&udfc->gUdfLoopStopAsync);
  uv_thread_join(&udfc->gUdfLoopThread);
  uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
  uv_barrier_destroy(&udfc->gUdfInitBarrier);
1024 1025
  udfc->gUdfcState = UDFC_STATE_INITAL;
  fnInfo("udfc cleaned up");
1026 1027 1028
  return 0;
}

1029 1030 1031 1032 1033 1034 1035 1036
int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
  SClientUvTaskNode *uvTask = NULL;

  createUdfcUvTask(task, uvTaskType, &uvTask);
  queueUvUdfTask(uvTask);
  udfcGetUvTaskResponseResult(task, uvTask);
  if (uvTaskType == UV_TASK_CONNECT) {
    task->session->udfSvcPipe = uvTask->pipe;
S
slzhou 已提交
1037 1038
  }
  taosMemoryFree(uvTask);
1039 1040 1041 1042
  uvTask = NULL;
  return task->errCode;
}

1043 1044 1045 1046 1047
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
  fnInfo("udfc setup udf. udfName: %s", udfName);
  if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
    return UDFC_CODE_INVALID_STATE;
  }
wafwerar's avatar
wafwerar 已提交
1048
  SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
1049
  task->errCode = 0;
wafwerar's avatar
wafwerar 已提交
1050
  task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
1051
  task->session->udfc = &gUdfdProxy;
1052 1053 1054
  task->type = UDF_TASK_SETUP;

  SUdfSetupRequest *req = &task->_setup.req;
S
shenglian zhou 已提交
1055
  memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
1056 1057 1058

  int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
  if (errCode != 0) {
1059 1060
    fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
    return UDFC_CODE_CONNECT_PIPE_ERR;
H
Haojun Liao 已提交
1061
  }
1062 1063 1064 1065 1066

  udfcRunUvTask(task, UV_TASK_REQ_RSP);

  SUdfSetupResponse *rsp = &task->_setup.rsp;
  task->session->severHandle = rsp->udfHandle;
S
shenglian zhou 已提交
1067 1068 1069
  task->session->outputType = rsp->outputType;
  task->session->outputLen = rsp->outputLen;
  task->session->bufSize = rsp->bufSize;
1070 1071 1072 1073 1074 1075
  if (task->errCode != 0) {
    fnError("failed to setup udf. err: %d", task->errCode)
  } else {
    fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
    *funcHandle = task->session;
  }
1076
  int32_t err = task->errCode;
wafwerar's avatar
wafwerar 已提交
1077
  taosMemoryFree(task);
1078
  return err;
H
Haojun Liao 已提交
1079 1080
}

1081
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1082
                SSDataBlock* output, SUdfInterBuf *newState) {
1083
  fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
1084

wafwerar's avatar
wafwerar 已提交
1085
  SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
1086 1087 1088 1089 1090
  task->errCode = 0;
  task->session = (SUdfUvSession *) handle;
  task->type = UDF_TASK_CALL;

  SUdfCallRequest *req = &task->_call.req;
S
slzhou 已提交
1091
  req->udfHandle = task->session->severHandle;
S
slzhou 已提交
1092
  req->callType = callType;
S
slzhou 已提交
1093

S
shenglian zhou 已提交
1094
  switch (callType) {
1095 1096 1097 1098
    case TSDB_UDF_CALL_AGG_INIT: {
      req->initFirst = 1;
      break;
    }
S
shenglian zhou 已提交
1099 1100 1101 1102 1103
    case TSDB_UDF_CALL_AGG_PROC: {
      req->block = *input;
      req->interBuf = *state;
      break;
    }
1104 1105 1106 1107 1108 1109
    case TSDB_UDF_CALL_AGG_MERGE: {
      req->interBuf = *state;
      req->interBuf2 = *state2;
      break;
    }
    case TSDB_UDF_CALL_AGG_FIN: {
S
shenglian zhou 已提交
1110 1111 1112 1113 1114 1115 1116 1117 1118
      req->interBuf = *state;
      break;
    }
    case TSDB_UDF_CALL_SCALA_PROC: {
      req->block = *input;
      break;
    }
  }

1119 1120
  udfcRunUvTask(task, UV_TASK_REQ_RSP);

1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
  if (task->errCode != 0) {
    fnError("call udf failure. err: %d", task->errCode);
  } else {
    SUdfCallResponse *rsp = &task->_call.rsp;
    switch (callType) {
      case TSDB_UDF_CALL_AGG_INIT: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_PROC: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_MERGE: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_AGG_FIN: {
        *newState = rsp->resultBuf;
        break;
      }
      case TSDB_UDF_CALL_SCALA_PROC: {
        *output = rsp->resultData;
        break;
      }
S
shenglian zhou 已提交
1146 1147
    }
  }
wafwerar's avatar
wafwerar 已提交
1148
  taosMemoryFree(task);
S
shenglian zhou 已提交
1149
  return task->errCode;
1150 1151
}

1152
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
S
slzhou 已提交
1153 1154 1155 1156 1157 1158 1159 1160 1161
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;

  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);

  return err;
}

// input: block, state
// output: interbuf,
1162
int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
S
slzhou 已提交
1163 1164 1165 1166 1167 1168 1169
  int8_t callType = TSDB_UDF_CALL_AGG_PROC;
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
  return err;
}

// input: interbuf1, interbuf2
// output: resultBuf
1170
int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
S
slzhou 已提交
1171 1172 1173 1174 1175 1176 1177
  int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
  return err;
}

// input: interBuf
// output: resultData
1178
int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
S
slzhou 已提交
1179
  int8_t callType = TSDB_UDF_CALL_AGG_FIN;
S
slzhou 已提交
1180 1181 1182 1183
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
  return err;
}

S
slzhou 已提交
1184
int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
S
slzhou 已提交
1185
  int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
S
slzhou 已提交
1186 1187 1188 1189 1190
  SSDataBlock inputBlock = {0};
  convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
  SSDataBlock resultBlock = {0};
  int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
  convertDataBlockToScalarParm(&resultBlock, output);
S
slzhou 已提交
1191 1192 1193
  return err;
}

1194
int32_t teardownUdf(UdfcFuncHandle handle) {
1195
  fnInfo("tear down udf. udf func handle: %p", handle);
1196

wafwerar's avatar
wafwerar 已提交
1197
  SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
  task->errCode = 0;
  task->session = (SUdfUvSession *) handle;
  task->type = UDF_TASK_TEARDOWN;

  SUdfTeardownRequest *req = &task->_teardown.req;
  req->udfHandle = task->session->severHandle;

  udfcRunUvTask(task, UV_TASK_REQ_RSP);

  SUdfTeardownResponse *rsp = &task->_teardown.rsp;

  int32_t err = task->errCode;

  udfcRunUvTask(task, UV_TASK_DISCONNECT);

wafwerar's avatar
wafwerar 已提交
1213 1214
  taosMemoryFree(task->session);
  taosMemoryFree(task);
1215 1216 1217

  return err;
}
S
shenglian zhou 已提交
1218

S
shenglian zhou 已提交
1219 1220 1221 1222 1223 1224 1225 1226 1227
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
typedef struct SUdfAggRes {
  SUdfUvSession *session;
  int8_t finalResNum;
  int8_t interResNum;
  char* finalResBuf;
  char* interResBuf;
} SUdfAggRes;

S
shenglian zhou 已提交
1228
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
S
slzhou 已提交
1229
  if (fmIsScalarFunc(pFunc->funcId)) {
S
shenglian zhou 已提交
1230 1231
    return false;
  }
S
slzhou 已提交
1232
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
S
shenglian zhou 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
  return true;
}

bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
  if (functionSetup(pCtx, pResultCellInfo) != true) {
    return false;
  }
  UdfcFuncHandle handle;
  if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
    return false;
  }
  SUdfUvSession *session = (SUdfUvSession *)handle;
S
shenglian zhou 已提交
1245 1246
  SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
  int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
S
shenglian zhou 已提交
1247
  memset(udfRes, 0, envSize);
S
shenglian zhou 已提交
1248

S
slzhou 已提交
1249 1250 1251
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;

S
shenglian zhou 已提交
1252
  udfRes->session = (SUdfUvSession *)handle;
S
shenglian zhou 已提交
1253 1254 1255 1256
  SUdfInterBuf buf = {0};
  if (callUdfAggInit(handle, &buf) != 0) {
    return false;
  }
S
shenglian zhou 已提交
1257
  udfRes->interResNum = buf.numOfResult;
S
slzhou 已提交
1258
  memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
S
shenglian zhou 已提交
1259 1260 1261 1262 1263 1264 1265
  return true;
}

int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
  SInputColumnInfoData* pInput = &pCtx->input;
  int32_t numOfCols = pInput->numOfInputCols;

S
slzhou 已提交
1266 1267 1268 1269
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  SUdfUvSession *session = udfRes->session;
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1270 1271 1272 1273 1274

  int32_t start = pInput->startRowIndex;
  int32_t numOfRows = pInput->numOfRows;


1275 1276 1277 1278
  SSDataBlock tempBlock = {0};
  tempBlock.info.numOfCols = numOfCols;
  tempBlock.info.rows = numOfRows;
  tempBlock.info.uid = pInput->uid;
S
shenglian zhou 已提交
1279
  bool hasVarCol = false;
1280
  tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
S
shenglian zhou 已提交
1281 1282 1283 1284 1285 1286

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData *col = pInput->pData[i];
    if (IS_VAR_DATA_TYPE(col->info.type)) {
      hasVarCol = true;
    }
1287
    taosArrayPush(tempBlock.pDataBlock, col);
S
shenglian zhou 已提交
1288
  }
1289
  tempBlock.info.hasVarCol = hasVarCol;
S
shenglian zhou 已提交
1290

1291 1292
  SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);

S
slzhou 已提交
1293
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1294
                        .bufLen = session->bufSize,
S
slzhou 已提交
1295
                        .numOfResult = udfRes->interResNum};
S
shenglian zhou 已提交
1296
  SUdfInterBuf newState = {0};
1297

S
slzhou 已提交
1298
  callUdfAggProcess(session, inputBlock, &state, &newState);
S
shenglian zhou 已提交
1299

S
slzhou 已提交
1300 1301 1302 1303 1304 1305
  udfRes->interResNum = newState.numOfResult;
  memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);

  if (newState.numOfResult == 1 || state.numOfResult == 1) {
    GET_RES_INFO(pCtx)->numOfRes = 1;
  }
S
shenglian zhou 已提交
1306

1307 1308 1309
  blockDataDestroy(inputBlock);

  taosArrayDestroy(tempBlock.pDataBlock);
S
shenglian zhou 已提交
1310 1311 1312 1313 1314 1315

  taosMemoryFree(newState.buf);
  return 0;
}

int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
S
slzhou 已提交
1316 1317 1318 1319
  SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
  SUdfUvSession *session = udfRes->session;
  udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
  udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
S
shenglian zhou 已提交
1320 1321


S
slzhou 已提交
1322
  SUdfInterBuf resultBuf = {0};
S
slzhou 已提交
1323
  SUdfInterBuf state = {.buf = udfRes->interResBuf,
1324
                        .bufLen = session->bufSize,
S
slzhou 已提交
1325 1326
                        .numOfResult = udfRes->interResNum};
  callUdfAggFinalize(session, &state, &resultBuf);
S
slzhou 已提交
1327 1328 1329 1330

  udfRes->finalResBuf = resultBuf.buf;
  udfRes->finalResNum = resultBuf.numOfResult;

S
slzhou 已提交
1331
  teardownUdf(session);
S
shenglian zhou 已提交
1332

S
slzhou 已提交
1333 1334 1335
  if (resultBuf.numOfResult == 1) {
    GET_RES_INFO(pCtx)->numOfRes = 1;
  }
1336
  return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
S
shenglian zhou 已提交
1337
}