tudf.c 26.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#include "uv.h"
#include "os.h"
S
shenglian zhou 已提交
17
#include "tlog.h"
18
#include "tudf.h"
19
#include "tudfInt.h"
20

21 22 23 24 25
//TODO: when startup, set thread poll size. add it to cfg 
//TODO: udfd restart when exist or aborts
//TODO: network error processing.
//TODO: add unit test
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
26 27
int32_t destructUdfService();
int32_t constructUdfService();
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
enum {
  UV_TASK_CONNECT = 0,
  UV_TASK_REQ_RSP = 1,
  UV_TASK_DISCONNECT = 2
};

typedef struct SUdfUvSession {
  int64_t severHandle;
  uv_pipe_t *udfSvcPipe;
} SUdfUvSession;

typedef struct SClientUvTaskNode {
  int8_t type;
  int errCode;

  uv_pipe_t *pipe;

  int64_t seqNum;
  uv_buf_t reqBuf;

  uv_sem_t taskSem;
  uv_buf_t rspBuf;

  struct SClientUvTaskNode *prev;
  struct SClientUvTaskNode *next;
} SClientUvTaskNode;

typedef struct SClientUdfTask {
  int8_t type;

  SUdfUvSession *session;

  int32_t errCode;

  union {
    struct {
      SUdfSetupRequest req;
      SUdfSetupResponse rsp;
    } _setup;
    struct {
      SUdfCallRequest req;
      SUdfCallResponse rsp;
    } _call;
    struct {
      SUdfTeardownRequest req;
      SUdfTeardownResponse rsp;
    } _teardown;
  };


} SClientUdfTask;

typedef struct SClientConnBuf {
  char *buf;
  int32_t len;
  int32_t cap;
  int32_t total;
} SClientConnBuf;

typedef struct SClientUvConn {
  uv_pipe_t *pipe;
  SClientUvTaskNode taskQueue;
  SClientConnBuf readBuf;
} SClientUvConn;

uv_process_t gUdfdProcess;

uv_barrier_t gUdfInitBarrier;

uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;

uv_async_t gUdfLoopStopAsync;

uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0;

106 107 108 109 110 111 112 113 114 115
enum {
  UDFC_STATE_INITAL = 0, // initial state
  UDFC_STATE_STARTNG, // starting after startUdfService
  UDFC_STATE_READY, // started and begin to receive quests
  UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
  UDFC_STATE_STOPPING, // stopping after stopUdfService
  UDFC_STATUS_FINAL, // stopped
};
int8_t gUdfcState = UDFC_STATE_INITAL;

116 117 118 119 120
//double circular linked list
typedef SClientUvTaskNode *SClientUvTaskQueue;
SClientUvTaskNode gUdfQueueNode;
SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode;

121
//TODO: deal with uv task that has been started and then udfd core dumped
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182

void udfTaskQueueInit(SClientUvTaskQueue q) {
  q->next = q;
  q->prev = q;
}

bool udfTaskQueueIsEmpty(SClientUvTaskQueue q) {
  return q == q->next;
}

void udfTaskQueueInsertTail(SClientUvTaskQueue q, SClientUvTaskNode *e) {
  e->next = q;
  e->prev = q->prev;
  e->prev->next = e;
  q->prev = e;
}

void udfTaskQueueInsertTaskAtHead(SClientUvTaskQueue q, SClientUvTaskNode *e) {
  e->next = q->next;
  e->prev = q;
  q->next->prev = e;
  q->next = e;
}

void udfTaskQueueRemoveTask(SClientUvTaskNode *e) {
  e->prev->next = e->next;
  e->next->prev = e->prev;
}

void udfTaskQueueSplit(SClientUvTaskQueue q, SClientUvTaskNode *from, SClientUvTaskQueue n) {
  n->prev = q->prev;
  n->prev->next = n;
  n->next = from;
  q->prev = from->prev;
  q->prev->next = q;
  from->prev = n;
}

SClientUvTaskNode *udfTaskQueueHeadTask(SClientUvTaskQueue q) {
  return q->next;
}

SClientUvTaskNode *udfTaskQueueTailTask(SClientUvTaskQueue q) {
  return q->prev;
}

SClientUvTaskNode *udfTaskQueueNext(SClientUvTaskNode *e) {
  return e->next;
}

void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) {
  if (udfTaskQueueIsEmpty(q)) {
    udfTaskQueueInit(n);
  } else {
    SClientUvTaskNode *h = udfTaskQueueHeadTask(q);
    udfTaskQueueSplit(q, h, n);
  }
}


int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
S
shenglian zhou 已提交
183
  debugPrint("%s", "encoding request");
184 185 186

  int len = sizeof(SUdfRequest) - sizeof(void *);
  switch (request->type) {
S
shenglian zhou 已提交
187
    case UDF_TASK_SETUP: {
188 189
      SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
      len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize;
190
      break;
191
    }
S
shenglian zhou 已提交
192
    case UDF_TASK_CALL: {
193 194
      SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
      len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes;
195
      break;
196
    }
S
shenglian zhou 已提交
197
    case UDF_TASK_TEARDOWN: {
198 199
      SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
      len += sizeof(SUdfTeardownRequest);
200
      break;
201 202
    }
    default:
203
      break;
204 205
  }

wafwerar's avatar
wafwerar 已提交
206
  char *bufBegin = taosMemoryMalloc(len);
207 208 209 210 211 212 213 214 215 216 217
  char *buf = bufBegin;

  //skip msgLen first
  buf += sizeof(int32_t);

  *(int64_t *) buf = request->seqNum;
  buf += sizeof(int64_t);
  *(int8_t *) buf = request->type;
  buf += sizeof(int8_t);

  switch (request->type) {
S
shenglian zhou 已提交
218
    case UDF_TASK_SETUP: {
219 220 221 222 223 224 225 226 227 228 229
      SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
      memcpy(buf, setup->udfName, 16);
      buf += 16;
      *(int8_t *) buf = setup->scriptType;
      buf += sizeof(int8_t);
      *(int8_t *) buf = setup->udfType;
      buf += sizeof(int8_t);
      *(int16_t *) buf = setup->pathSize;
      buf += sizeof(int16_t);
      memcpy(buf, setup->path, setup->pathSize);
      buf += setup->pathSize;
230
      break;
231 232
    }

S
shenglian zhou 已提交
233
    case UDF_TASK_CALL: {
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
      SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
      *(int64_t *) buf = call->udfHandle;
      buf += sizeof(int64_t);
      *(int8_t *) buf = call->step;
      buf += sizeof(int8_t);
      *(int32_t *) buf = call->inputBytes;
      buf += sizeof(int32_t);
      memcpy(buf, call->input, call->inputBytes);
      buf += call->inputBytes;
      *(int32_t *) buf = call->stateBytes;
      buf += sizeof(int32_t);
      memcpy(buf, call->state, call->stateBytes);
      buf += call->stateBytes;
      break;
    }

S
shenglian zhou 已提交
250
    case UDF_TASK_TEARDOWN: {
251 252 253 254 255
      SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
      *(int64_t *) buf = teardown->udfHandle;
      buf += sizeof(int64_t);
      break;
    }
256 257 258 259
    default:
      break;
  }

260 261 262 263 264
  request->msgLen = buf - bufBegin;
  *(int32_t *) bufBegin = request->msgLen;
  *pBuf = bufBegin;
  *pBufLen = request->msgLen;
  return 0;
265 266
}

267
int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
S
shenglian zhou 已提交
268
  debugPrint("%s", "decoding request");
269
  if (*(int32_t *) bufMsg != bufLen) {
S
shenglian zhou 已提交
270
    debugPrint("%s", "decoding request error");
271
    return -1;
272
  }
273
  char *buf = bufMsg;
wafwerar's avatar
wafwerar 已提交
274
  SUdfRequest *request = taosMemoryMalloc(sizeof(SUdfRequest));
275 276 277 278 279 280 281 282 283
  request->subReq = NULL;
  request->msgLen = *(int32_t *) (buf);
  buf += sizeof(int32_t);
  request->seqNum = *(int64_t *) (buf);
  buf += sizeof(int64_t);
  request->type = *(int8_t *) (buf);
  buf += sizeof(int8_t);

  switch (request->type) {
S
shenglian zhou 已提交
284
    case UDF_TASK_SETUP: {
wafwerar's avatar
wafwerar 已提交
285
      SUdfSetupRequest *setup = taosMemoryMalloc(sizeof(SUdfSetupRequest));
286 287 288 289 290 291 292 293 294 295 296 297 298 299

      memcpy(setup->udfName, buf, 16);
      buf += 16;
      setup->scriptType = *(int8_t *) buf;
      buf += sizeof(int8_t);
      setup->udfType = *(int8_t *) buf;
      buf += sizeof(int8_t);
      setup->pathSize = *(int16_t *) buf;
      buf += sizeof(int16_t);
      setup->path = buf;
      buf += setup->pathSize;

      request->subReq = setup;
      break;
300
    }
S
shenglian zhou 已提交
301
    case UDF_TASK_CALL: {
wafwerar's avatar
wafwerar 已提交
302
      SUdfCallRequest *call = taosMemoryMalloc(sizeof(SUdfCallRequest));
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318

      call->udfHandle = *(int64_t *) buf;
      buf += sizeof(int64_t);
      call->step = *(int8_t *) buf;
      buf += sizeof(int8_t);
      call->inputBytes = *(int32_t *) buf;
      buf += sizeof(int32_t);
      call->input = buf;
      buf += call->inputBytes;
      call->stateBytes = *(int32_t *) buf;
      buf += sizeof(int32_t);
      call->state = buf;
      buf += call->stateBytes;

      request->subReq = call;
      break;
319 320
    }

S
shenglian zhou 已提交
321
    case UDF_TASK_TEARDOWN: {
wafwerar's avatar
wafwerar 已提交
322
      SUdfTeardownRequest *teardown = taosMemoryMalloc(sizeof(SUdfTeardownRequest));
323 324 325

      teardown->udfHandle = *(int64_t *) buf;
      buf += sizeof(int64_t);
326

327
      request->subReq = teardown;
328 329
    }

330 331
  }
  if (buf - bufMsg != bufLen) {
S
shenglian zhou 已提交
332
    debugPrint("%s", "decode request error");
wafwerar's avatar
wafwerar 已提交
333 334
    taosMemoryFree(request->subReq);
    taosMemoryFree(request);
335 336 337 338 339 340 341
    return -1;
  }
  *pRequest = request;
  return 0;
}

int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
S
shenglian zhou 已提交
342
  debugPrint("%s", "encoding response");
343 344 345 346

  int32_t len = sizeof(SUdfResponse) - sizeof(void *);

  switch (response->type) {
S
shenglian zhou 已提交
347
    case UDF_TASK_SETUP: {
348 349 350
      len += sizeof(SUdfSetupResponse);
      break;
    }
S
shenglian zhou 已提交
351
    case UDF_TASK_CALL: {
352 353 354 355 356
      SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
      len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) +
             callResp->outputBytes + callResp->newStateBytes;
      break;
    }
S
shenglian zhou 已提交
357
    case UDF_TASK_TEARDOWN: {
358 359 360 361
      len += sizeof(SUdfTeardownResponse);
      break;
    }
  }
362

wafwerar's avatar
wafwerar 已提交
363
  char *bufBegin = taosMemoryMalloc(len);
364
  char *buf = bufBegin;
365

366 367
  //skip msgLen
  buf += sizeof(int32_t);
368

369 370 371 372 373 374
  *(int64_t *) buf = response->seqNum;
  buf += sizeof(int64_t);
  *(int8_t *) buf = response->type;
  buf += sizeof(int8_t);
  *(int32_t *) buf = response->code;
  buf += sizeof(int32_t);
375 376


377
  switch (response->type) {
S
shenglian zhou 已提交
378
    case UDF_TASK_SETUP: {
379 380 381 382
      SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp);
      *(int64_t *) buf = setupResp->udfHandle;
      buf += sizeof(int64_t);
      break;
383
    }
S
shenglian zhou 已提交
384
    case UDF_TASK_CALL: {
385 386 387 388 389 390 391 392 393 394 395 396
      SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
      *(int32_t *) buf = callResp->outputBytes;
      buf += sizeof(int32_t);
      memcpy(buf, callResp->output, callResp->outputBytes);
      buf += callResp->outputBytes;

      *(int32_t *) buf = callResp->newStateBytes;
      buf += sizeof(int32_t);
      memcpy(buf, callResp->newState, callResp->newStateBytes);
      buf += callResp->newStateBytes;
      break;
    }
S
shenglian zhou 已提交
397
    case UDF_TASK_TEARDOWN: {
398 399 400 401 402 403 404 405 406 407 408 409 410 411
      SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp);
      break;
    }
    default:
      break;
  }
  response->msgLen = buf - bufBegin;
  *(int32_t *) bufBegin = response->msgLen;
  *pBuf = bufBegin;
  *pBufLen = response->msgLen;
  return 0;
}

int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
S
shenglian zhou 已提交
412
  debugPrint("%s", "decoding response");
413

414
  if (*(int32_t *) bufMsg != bufLen) {
S
shenglian zhou 已提交
415
    debugPrint("%s", "can not decode response");
416 417 418
    return -1;
  }
  char *buf = bufMsg;
wafwerar's avatar
wafwerar 已提交
419
  SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse));
420 421 422 423 424 425 426 427 428 429
  rsp->msgLen = *(int32_t *) buf;
  buf += sizeof(int32_t);
  rsp->seqNum = *(int64_t *) buf;
  buf += sizeof(int64_t);
  rsp->type = *(int8_t *) buf;
  buf += sizeof(int8_t);
  rsp->code = *(int32_t *) buf;
  buf += sizeof(int32_t);

  switch (rsp->type) {
S
shenglian zhou 已提交
430
    case UDF_TASK_SETUP: {
wafwerar's avatar
wafwerar 已提交
431
      SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) taosMemoryMalloc(sizeof(SUdfSetupResponse));
432 433 434 435
      setupRsp->udfHandle = *(int64_t *) buf;
      buf += sizeof(int64_t);
      rsp->subRsp = (char *) setupRsp;
      break;
436
    }
S
shenglian zhou 已提交
437
    case UDF_TASK_CALL: {
wafwerar's avatar
wafwerar 已提交
438
      SUdfCallResponse *callRsp = (SUdfCallResponse *) taosMemoryMalloc(sizeof(SUdfCallResponse));
439 440 441 442 443 444 445 446
      callRsp->outputBytes = *(int32_t *) buf;
      buf += sizeof(int32_t);

      callRsp->output = buf;
      buf += callRsp->outputBytes;

      callRsp->newStateBytes = *(int32_t *) buf;
      buf += sizeof(int32_t);
447

448 449
      callRsp->newState = buf;
      buf += callRsp->newStateBytes;
450

451 452 453
      rsp->subRsp = callRsp;
      break;
    }
S
shenglian zhou 已提交
454
    case UDF_TASK_TEARDOWN: {
wafwerar's avatar
wafwerar 已提交
455
      SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) taosMemoryMalloc(sizeof(SUdfTeardownResponse));
456 457
      rsp->subRsp = teardownRsp;
      break;
458
    }
459 460 461 462
    default:
      break;
  }
  if (buf - bufMsg != bufLen) {
S
shenglian zhou 已提交
463
    debugPrint("%s", "can not decode response");
wafwerar's avatar
wafwerar 已提交
464 465
    taosMemoryFree(rsp->subRsp);
    taosMemoryFree(rsp);
466 467 468 469 470
    return -1;
  }
  *pResponse = rsp;
  return 0;
}
471

472
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
S
shenglian zhou 已提交
473
  debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
474
  uv_close((uv_handle_t *) req, NULL);
S
shenglian zhou 已提交
475
  //TODO: restart the udfd process
476 477 478 479 480 481 482 483 484 485 486 487
  if (gUdfcState == UDFC_STATE_STOPPING) {
    if (term_signal != SIGINT) {
      //TODO: log error
    }
  }
  if (gUdfcState == UDFC_STATE_READY) {
    gUdfcState = UDFC_STATE_RESTARTING;
    //TODO: asynchronous without blocking. how to do it
    destructUdfService();
    constructUdfService();
  }

488
}
489

490 491 492 493 494 495 496 497
void onUdfcPipeClose(uv_handle_t *handle) {
  SClientUvConn *conn = handle->data;
  if (!udfTaskQueueIsEmpty(&conn->taskQueue)) {
    SClientUvTaskNode *task = udfTaskQueueHeadTask(&conn->taskQueue);
    task->errCode = 0;
    uv_sem_post(&task->taskSem);
  }

wafwerar's avatar
wafwerar 已提交
498 499 500
  taosMemoryFree(conn->readBuf.buf);
  taosMemoryFree(conn);
  taosMemoryFree((uv_pipe_t *) handle);
501 502 503 504

}

int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
S
shenglian zhou 已提交
505
  debugPrint("%s", "get uv task result");
506 507 508 509 510 511 512 513
  if (uvTask->type == UV_TASK_REQ_RSP) {
    if (uvTask->rspBuf.base != NULL) {
      SUdfResponse *rsp;
      decodeResponse(uvTask->rspBuf.base, uvTask->rspBuf.len, &rsp);
      task->errCode = rsp->code;

      switch (task->type) {
        case UDF_TASK_SETUP: {
S
shenglian zhou 已提交
514
          //TODO: copy or not
515 516 517 518 519
          task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp);
          break;
        }
        case UDF_TASK_CALL: {
          task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp);
S
shenglian zhou 已提交
520
          //TODO: copy or not
521 522 523 524 525 526 527 528 529 530 531 532 533
          break;
        }
        case UDF_TASK_TEARDOWN: {
          task->_teardown.rsp = *(SUdfTeardownResponse *) (rsp->subRsp);
          //TODO: copy or not?
          break;
        }
        default: {
          break;
        }
      }

      // TODO: the call buffer is setup and freed by udf invocation
wafwerar's avatar
wafwerar 已提交
534 535 536
      taosMemoryFree(uvTask->rspBuf.base);
      taosMemoryFree(rsp->subRsp);
      taosMemoryFree(rsp);
537 538
    } else {
      task->errCode = uvTask->errCode;
539
    }
540 541 542 543
  } else if (uvTask->type == UV_TASK_CONNECT) {
    task->errCode = uvTask->errCode;
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
    task->errCode = uvTask->errCode;
544
  }
545 546 547 548
  return 0;
}

void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
S
shenglian zhou 已提交
549
  debugPrint("%s", "client allocate buffer to receive from pipe");
550 551 552 553 554
  SClientUvConn *conn = handle->data;
  SClientConnBuf *connBuf = &conn->readBuf;

  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
  if (connBuf->cap == 0) {
wafwerar's avatar
wafwerar 已提交
555
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
556 557 558 559 560 561 562 563 564 565 566 567 568 569
    if (connBuf->buf) {
      connBuf->len = 0;
      connBuf->cap = msgHeadSize;
      connBuf->total = -1;

      buf->base = connBuf->buf;
      buf->len = connBuf->cap;
    } else {
      //TODO: log error
      buf->base = NULL;
      buf->len = 0;
    }
  } else {
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
wafwerar's avatar
wafwerar 已提交
570
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
571 572 573 574 575 576 577 578 579 580 581
    if (resultBuf) {
      connBuf->buf = resultBuf;
      buf->base = connBuf->buf + connBuf->len;
      buf->len = connBuf->cap - connBuf->len;
    } else {
      //TODO: log error free connBuf->buf
      buf->base = NULL;
      buf->len = 0;
    }
  }

S
shenglian zhou 已提交
582
  debugPrint("\tconn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
583 584 585

}

586 587 588 589 590 591
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
    connBuf->total = *(int32_t *) (connBuf->buf);
  }
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
    return true;
592
  }
593 594 595 596 597 598
  return false;
}

void udfcUvHandleRsp(SClientUvConn *conn) {
  SClientConnBuf *connBuf = &conn->readBuf;
  int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum
599

600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
  if (udfTaskQueueIsEmpty(&conn->taskQueue)) {
    //LOG error
    return;
  }
  bool found = false;
  SClientUvTaskNode *taskFound = NULL;
  SClientUvTaskNode *task = udfTaskQueueNext(&conn->taskQueue);
  while (task != &conn->taskQueue) {
    if (task->seqNum == seqNum) {
      if (found == false) {
        found = true;
        taskFound = task;
      } else {
        //LOG error;
        continue;
      }
616
    }
617
    task = udfTaskQueueNext(task);
618 619
  }

620 621 622 623 624
  if (taskFound) {
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
    udfTaskQueueRemoveTask(taskFound);
    uv_sem_post(&taskFound->taskSem);
  } else {
625
    //TODO: LOG error
626 627 628 629 630 631
  }
  connBuf->buf = NULL;
  connBuf->total = -1;
  connBuf->len = 0;
  connBuf->cap = 0;
}
632

633 634 635 636 637
void udfcUvHandleError(SClientUvConn *conn) {
  uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
}

void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
S
shenglian zhou 已提交
638
  debugPrint("%s, nread: %zd", "client read from pipe", nread);
639 640 641 642 643 644 645 646 647 648 649 650
  if (nread == 0) return;

  SClientUvConn *conn = client->data;
  SClientConnBuf *connBuf = &conn->readBuf;
  if (nread > 0) {
    connBuf->len += nread;
    if (isUdfcUvMsgComplete(connBuf)) {
      udfcUvHandleRsp(conn);
    }

  }
  if (nread < 0) {
S
shenglian zhou 已提交
651
    debugPrint("\tclient read error: %s", uv_strerror(nread));
652 653 654 655
    if (nread == UV_EOF) {
      //TODO:
    }
    udfcUvHandleError(conn);
656 657 658 659
  }

}

660
void onUdfClientWrite(uv_write_t *write, int status) {
S
shenglian zhou 已提交
661
  debugPrint("%s", "after writing to pipe");
662 663 664 665 666 667 668 669
  SClientUvTaskNode *uvTask = write->data;
  if (status == 0) {
    uv_pipe_t *pipe = uvTask->pipe;
    SClientUvConn *conn = pipe->data;
    udfTaskQueueInsertTail(&conn->taskQueue, uvTask);
  } else {
    //TODO Log error;
  }
S
shenglian zhou 已提交
670
  debugPrint("\tlength:%zu", uvTask->reqBuf.len);
wafwerar's avatar
wafwerar 已提交
671 672
  taosMemoryFree(write);
  taosMemoryFree(uvTask->reqBuf.base);
673
}
H
Haojun Liao 已提交
674

675 676 677 678 679
void onUdfClientConnect(uv_connect_t *connect, int status) {
  SClientUvTaskNode *uvTask = connect->data;
  uvTask->errCode = status;
  if (status != 0) {
    //TODO: LOG error
H
Haojun Liao 已提交
680
  }
681
  uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead);
wafwerar's avatar
wafwerar 已提交
682
  taosMemoryFree(connect);
683 684
  uv_sem_post(&uvTask->taskSem);
}
H
Haojun Liao 已提交
685

686
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
wafwerar's avatar
wafwerar 已提交
687
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
  uvTask->type = uvTaskType;

  if (uvTaskType == UV_TASK_CONNECT) {
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
    uvTask->pipe = task->session->udfSvcPipe;
    SUdfRequest request;
    request.type = task->type;
    request.seqNum = gUdfTaskSeqNum++;

    if (task->type == UDF_TASK_SETUP) {
      request.subReq = &task->_setup.req;
      request.type = UDF_TASK_SETUP;
    } else if (task->type == UDF_TASK_CALL) {
      request.subReq = &task->_call.req;
      request.type = UDF_TASK_CALL;
    } else if (task->type == UDF_TASK_TEARDOWN) {
      request.subReq = &task->_teardown.req;
      request.type = UDF_TASK_TEARDOWN;
    } else {
      //TODO log and return error
    }
    char *buf = NULL;
    int32_t bufLen = 0;
    encodeRequest(&buf, &bufLen, &request);
    uvTask->reqBuf = uv_buf_init(buf, bufLen);
    uvTask->seqNum = request.seqNum;
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
    uvTask->pipe = task->session->udfSvcPipe;
  }
  uv_sem_init(&uvTask->taskSem, 0);
H
Haojun Liao 已提交
718

719 720 721
  *pUvTask = uvTask;
  return 0;
}
H
Haojun Liao 已提交
722

723
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
S
shenglian zhou 已提交
724
  debugPrint("%s, %d", "queue uv task", uvTask->type);
H
Haojun Liao 已提交
725

726 727 728 729
  uv_mutex_lock(&gUdfTaskQueueMutex);
  udfTaskQueueInsertTail(gUdfTaskQueue, uvTask);
  uv_mutex_unlock(&gUdfTaskQueueMutex);
  uv_async_send(&gUdfLoopTaskAync);
H
Haojun Liao 已提交
730

731 732
  uv_sem_wait(&uvTask->taskSem);
  uv_sem_destroy(&uvTask->taskSem);
H
Haojun Liao 已提交
733

734 735
  return 0;
}
H
Haojun Liao 已提交
736

737
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
S
shenglian zhou 已提交
738
  debugPrint("%s, type %d", "start uv task ", uvTask->type);
739 740
  switch (uvTask->type) {
    case UV_TASK_CONNECT: {
wafwerar's avatar
wafwerar 已提交
741
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
742 743
      uv_pipe_init(&gUdfdLoop, pipe, 0);
      uvTask->pipe = pipe;
H
Haojun Liao 已提交
744

wafwerar's avatar
wafwerar 已提交
745
      SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn));
746 747 748 749 750 751
      conn->pipe = pipe;
      conn->readBuf.len = 0;
      conn->readBuf.cap = 0;
      conn->readBuf.buf = 0;
      conn->readBuf.total = -1;
      udfTaskQueueInit(&conn->taskQueue);
H
Haojun Liao 已提交
752

753 754
      pipe->data = conn;

wafwerar's avatar
wafwerar 已提交
755
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
756
      connReq->data = uvTask;
H
Haojun Liao 已提交
757

758
      uv_pipe_connect(connReq, pipe, "udf.sock", onUdfClientConnect);
H
Haojun Liao 已提交
759
      break;
760 761 762
    }
    case UV_TASK_REQ_RSP: {
      uv_pipe_t *pipe = uvTask->pipe;
wafwerar's avatar
wafwerar 已提交
763
      uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
764 765 766 767 768 769 770 771 772 773 774 775 776 777
      write->data = uvTask;
      uv_write(write, (uv_stream_t *) pipe, &uvTask->reqBuf, 1, onUdfClientWrite);
      break;
    }
    case UV_TASK_DISCONNECT: {
      SClientUvConn *conn = uvTask->pipe->data;
      udfTaskQueueInsertTail(&conn->taskQueue, uvTask);
      uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
      break;
    }
    default: {
      break;
    }
  }
H
Haojun Liao 已提交
778

779 780
  return 0;
}
H
Haojun Liao 已提交
781

782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
void udfClientAsyncCb(uv_async_t *async) {
  SClientUvTaskNode node;
  SClientUvTaskQueue q = &node;
  udfTaskQueueInit(q);

  uv_mutex_lock(&gUdfTaskQueueMutex);
  udfTaskQueueMove(gUdfTaskQueue, q);
  uv_mutex_unlock(&gUdfTaskQueueMutex);

  while (!udfTaskQueueIsEmpty(q)) {
    SClientUvTaskNode *task = udfTaskQueueHeadTask(q);
    udfTaskQueueRemoveTask(task);
    startUvUdfTask(task);
  }

}

void udfStopAsyncCb(uv_async_t *async) {
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
  SClientUvTaskNode node;
  SClientUvTaskQueue q = &node;
  udfTaskQueueInit(q);

  uv_mutex_lock(&gUdfTaskQueueMutex);
  udfTaskQueueMove(gUdfTaskQueue, q);
  uv_mutex_unlock(&gUdfTaskQueueMutex);

  while (!udfTaskQueueIsEmpty(q)) {
    SClientUvTaskNode *task = udfTaskQueueHeadTask(q);
    udfTaskQueueRemoveTask(task);
    if (gUdfcState == UDFC_STATE_STOPPING) {
      task->errCode = UDFC_CODE_STOPPING;
    } else if (gUdfcState == UDFC_STATE_RESTARTING) {
      task->errCode = UDFC_CODE_RESTARTING;
    }
    uv_sem_post(&task->taskSem);
  }

  // TODO: deal with tasks that are waiting result.

821 822 823
  uv_stop(&gUdfdLoop);
}

824 825


826 827 828
void startUdfd(void *argsThread) {
  uv_loop_init(&gUdfdLoop);

S
shenglian zhou 已提交
829 830 831 832 833 834 835 836 837 838 839 840 841 842 843
  //TODO: path
    uv_process_options_t options;
    static char path[256] = {0};
    size_t cwdSize;
    uv_cwd(path, &cwdSize);
    strcat(path, "./udfd");
    char* args[2] = {path, NULL};
    options.args = args;
    options.file = path;
    options.exit_cb = onUdfdExit;

    int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
    if (err != 0) {
        debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
    }
844 845 846 847 848 849

  uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
  uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
  uv_mutex_init(&gUdfTaskQueueMutex);
  udfTaskQueueInit(gUdfTaskQueue);
  uv_barrier_wait(&gUdfInitBarrier);
850
  //TODO return value of uv_run
851
  uv_run(&gUdfdLoop, UV_RUN_DEFAULT);
852
  uv_loop_close(&gUdfdLoop);
853 854
}

855
int32_t constructUdfService() {
856 857 858 859 860 861
  uv_barrier_init(&gUdfInitBarrier, 2);
  uv_thread_create(&gUdfLoopThread, startUdfd, 0);
  uv_barrier_wait(&gUdfInitBarrier);
  return 0;
}

862 863 864 865 866 867 868 869
int32_t startUdfService() {
  gUdfcState = UDFC_STATE_STARTNG;
  constructUdfService();
  gUdfcState = UDFC_STATE_READY;
  return 0;
}

int32_t destructUdfService() {
870
  uv_barrier_destroy(&gUdfInitBarrier);
871 872 873
  if (gUdfcState == UDFC_STATE_STOPPING) {
    uv_process_kill(&gUdfdProcess, SIGINT);
  }
874 875 876 877 878 879
  uv_async_send(&gUdfLoopStopAsync);
  uv_mutex_destroy(&gUdfTaskQueueMutex);
  uv_thread_join(&gUdfLoopThread);
  return 0;
}

880 881 882 883 884 885 886
int32_t stopUdfService() {
  gUdfcState = UDFC_STATE_STOPPING;
  destructUdfService();
  gUdfcState = UDFC_STATUS_FINAL;
  return 0;
}

887 888 889 890 891 892 893 894 895
int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
  SClientUvTaskNode *uvTask = NULL;

  createUdfcUvTask(task, uvTaskType, &uvTask);
  queueUvUdfTask(uvTask);
  udfcGetUvTaskResponseResult(task, uvTask);
  if (uvTaskType == UV_TASK_CONNECT) {
    task->session->udfSvcPipe = uvTask->pipe;
  }
wafwerar's avatar
wafwerar 已提交
896
  taosMemoryFree(uvTask);
897 898 899 900 901
  uvTask = NULL;
  return task->errCode;
}

int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
S
shenglian zhou 已提交
902
  debugPrint("%s", "client setup udf");
wafwerar's avatar
wafwerar 已提交
903
  SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
904
  task->errCode = 0;
wafwerar's avatar
wafwerar 已提交
905
  task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
906 907 908 909 910 911 912 913 914 915 916 917 918
  task->type = UDF_TASK_SETUP;

  SUdfSetupRequest *req = &task->_setup.req;
  memcpy(req->udfName, udfInfo->udfName, 16);
  req->path = udfInfo->path;
  req->pathSize = strlen(req->path) + 1;
  req->udfType = udfInfo->udfType;
  req->scriptType = udfInfo->scriptType;

  int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
  if (errCode != 0) {
    //TODO: log error
    return -1;
H
Haojun Liao 已提交
919
  }
920 921 922 923 924 925 926

  udfcRunUvTask(task, UV_TASK_REQ_RSP);

  SUdfSetupResponse *rsp = &task->_setup.rsp;
  task->session->severHandle = rsp->udfHandle;
  *handle = task->session;
  int32_t err = task->errCode;
wafwerar's avatar
wafwerar 已提交
927
  taosMemoryFree(task);
928
  return err;
H
Haojun Liao 已提交
929 930
}

931 932
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState,
                int32_t *newStateSize, SUdfDataBlock *output) {
S
shenglian zhou 已提交
933
  debugPrint("%s", "client call udf");
934

wafwerar's avatar
wafwerar 已提交
935
  SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956
  task->errCode = 0;
  task->session = (SUdfUvSession *) handle;
  task->type = UDF_TASK_CALL;

  SUdfCallRequest *req = &task->_call.req;

  req->state = state;
  req->stateBytes = stateSize;
  req->inputBytes = input.size;
  req->input = input.data;
  req->udfHandle = task->session->severHandle;
  req->step = step;

  udfcRunUvTask(task, UV_TASK_REQ_RSP);

  SUdfCallResponse *rsp = &task->_call.rsp;
  *newState = rsp->newState;
  *newStateSize = rsp->newStateBytes;
  output->size = rsp->outputBytes;
  output->data = rsp->output;
  int32_t err = task->errCode;
wafwerar's avatar
wafwerar 已提交
957
  taosMemoryFree(task);
958 959 960 961
  return err;
}

int32_t teardownUdf(UdfHandle handle) {
S
shenglian zhou 已提交
962
  debugPrint("%s", "client teardown udf");
963

wafwerar's avatar
wafwerar 已提交
964
  SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
  task->errCode = 0;
  task->session = (SUdfUvSession *) handle;
  task->type = UDF_TASK_TEARDOWN;

  SUdfTeardownRequest *req = &task->_teardown.req;
  req->udfHandle = task->session->severHandle;

  udfcRunUvTask(task, UV_TASK_REQ_RSP);


  SUdfTeardownResponse *rsp = &task->_teardown.rsp;

  int32_t err = task->errCode;

  udfcRunUvTask(task, UV_TASK_DISCONNECT);

wafwerar's avatar
wafwerar 已提交
981 982
  taosMemoryFree(task->session);
  taosMemoryFree(task);
983 984 985

  return err;
}