thttp.c 13.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
dengyihao's avatar
opt rpc  
dengyihao 已提交
17
// clang-format off
dengyihao's avatar
dengyihao 已提交
18
#include <uv.h>
dengyihao's avatar
dengyihao 已提交
19
#include "zlib.h"
dengyihao's avatar
opt rpc  
dengyihao 已提交
20
#include "thttp.h"
dengyihao's avatar
opt rpc  
dengyihao 已提交
21
#include "taoserror.h"
S
Shengliang Guan 已提交
22
#include "tlog.h"
dengyihao's avatar
dengyihao 已提交
23
#include "transComm.h"
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25
// clang-format on
dengyihao's avatar
dengyihao 已提交
26 27

#define HTTP_RECV_BUF_SIZE 1024
dengyihao's avatar
dengyihao 已提交
28

dengyihao's avatar
dengyihao 已提交
29 30
static int32_t httpRefMgt = 0;
static int64_t httpRef = -1;
dengyihao's avatar
dengyihao 已提交
31 32 33 34 35 36 37 38 39
typedef struct SHttpModule {
  uv_loop_t*  loop;
  SAsyncPool* asyncPool;
  TdThread    thread;
} SHttpModule;

typedef struct SHttpMsg {
  queue         q;
  char*         server;
D
dapan1121 已提交
40
  char*         uri;
dengyihao's avatar
dengyihao 已提交
41 42 43 44
  int32_t       port;
  char*         cont;
  int32_t       len;
  EHttpCompFlag flag;
dengyihao's avatar
dengyihao 已提交
45
  int8_t        quit;
dengyihao's avatar
dengyihao 已提交
46 47 48

} SHttpMsg;

dengyihao's avatar
dengyihao 已提交
49
typedef struct SHttpClient {
dengyihao's avatar
dengyihao 已提交
50 51 52 53 54 55 56 57
  uv_connect_t       conn;
  uv_tcp_t           tcp;
  uv_write_t         req;
  uv_buf_t*          wbuf;
  char*              rbuf;
  char*              addr;
  uint16_t           port;
  struct sockaddr_in dest;
dengyihao's avatar
dengyihao 已提交
58
} SHttpClient;
S
Shengliang Guan 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61 62 63 64 65 66
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static void         transHttpEnvInit();

static void    httpHandleReq(SHttpMsg* msg);
static void    httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit();

D
dapan1121 已提交
67
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
dengyihao's avatar
dengyihao 已提交
68 69
                                      EHttpCompFlag flag);

D
dapan1121 已提交
70
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
S
Shengliang Guan 已提交
71 72 73
                                   EHttpCompFlag flag) {
  if (flag == HTTP_FLAT) {
    return snprintf(pHead, headLen,
D
dapan1121 已提交
74
                    "POST %s HTTP/1.1\n"
S
Shengliang Guan 已提交
75 76 77
                    "Host: %s\n"
                    "Content-Type: application/json\n"
                    "Content-Length: %d\n\n",
D
dapan1121 已提交
78
                    uri, server, contLen);
S
Shengliang Guan 已提交
79 80
  } else if (flag == HTTP_GZIP) {
    return snprintf(pHead, headLen,
D
dapan1121 已提交
81
                    "POST %s HTTP/1.1\n"
S
Shengliang Guan 已提交
82 83 84 85
                    "Host: %s\n"
                    "Content-Type: application/json\n"
                    "Content-Encoding: gzip\n"
                    "Content-Length: %d\n\n",
D
dapan1121 已提交
86
                    uri, server, contLen);
S
Shengliang Guan 已提交
87
  } else {
dengyihao's avatar
dengyihao 已提交
88
    terrno = TSDB_CODE_INVALID_CFG;
S
Shengliang Guan 已提交
89 90 91 92
    return -1;
  }
}

dengyihao's avatar
dengyihao 已提交
93
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
S
Shengliang Guan 已提交
94 95
  int32_t code = -1;
  int32_t destLen = srcLen;
wafwerar's avatar
wafwerar 已提交
96
  void*   pDest = taosMemoryMalloc(destLen);
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157

  if (pDest == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

  z_stream gzipStream = {0};
  gzipStream.zalloc = (alloc_func)0;
  gzipStream.zfree = (free_func)0;
  gzipStream.opaque = (voidpf)0;
  if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

  gzipStream.next_in = (Bytef*)pSrc;
  gzipStream.avail_in = (uLong)srcLen;
  gzipStream.next_out = (Bytef*)pDest;
  gzipStream.avail_out = (uLong)(destLen);

  while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
    if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
      terrno = TSDB_CODE_COMPRESS_ERROR;
      goto _OVER;
    }
  }

  if (gzipStream.avail_in != 0) {
    terrno = TSDB_CODE_COMPRESS_ERROR;
    goto _OVER;
  }

  int32_t err = 0;
  while (1) {
    if ((err = deflate(&gzipStream, Z_FINISH)) == Z_STREAM_END) {
      break;
    }
    if (err != Z_OK) {
      terrno = TSDB_CODE_COMPRESS_ERROR;
      goto _OVER;
    }
  }

  if (deflateEnd(&gzipStream) != Z_OK) {
    terrno = TSDB_CODE_COMPRESS_ERROR;
    goto _OVER;
  }

  if (gzipStream.total_out >= srcLen) {
    terrno = TSDB_CODE_COMPRESS_ERROR;
    goto _OVER;
  }

  code = 0;

_OVER:
  if (code == 0) {
    memcpy(pSrc, pDest, gzipStream.total_out);
    code = gzipStream.total_out;
  }

wafwerar's avatar
wafwerar 已提交
158
  taosMemoryFree(pDest);
S
Shengliang Guan 已提交
159 160 161
  return code;
}

dengyihao's avatar
dengyihao 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
  uint32_t ip = taosGetIpv4FromFqdn(server);
  if (ip == 0xffffffff) {
    tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
    return -1;
  }
  char buf[128] = {0};
  tinet_ntoa(buf, ip);
  uv_ip4_addr(buf, port, dest);
  return 0;
}

static void* httpThread(void* arg) {
  SHttpModule* http = (SHttpModule*)arg;
  setThreadName("http-cli-send-thread");
  uv_run(http->loop, UV_RUN_DEFAULT);
  return NULL;
}

static void httpDestroyMsg(SHttpMsg* msg) {
  if (msg == NULL) return;

  taosMemoryFree(msg->server);
D
dapan1121 已提交
185
  taosMemoryFree(msg->uri);
dengyihao's avatar
dengyihao 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
  taosMemoryFree(msg->cont);
  taosMemoryFree(msg);
}
static void httpAsyncCb(uv_async_t* handle) {
  SAsyncItem*  item = handle->data;
  SHttpModule* http = item->pThrd;

  SHttpMsg *msg = NULL, *quitMsg = NULL;

  queue wq;
  taosThreadMutexLock(&item->mtx);
  QUEUE_MOVE(&item->qmsg, &wq);
  taosThreadMutexUnlock(&item->mtx);

  int count = 0;
  while (!QUEUE_IS_EMPTY(&wq)) {
    queue* h = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(h);
    msg = QUEUE_DATA(h, SHttpMsg, q);
    if (msg->quit) {
      quitMsg = msg;
    } else {
      httpHandleReq(msg);
    }
  }
  if (quitMsg) httpHandleQuit(quitMsg);
}

dengyihao's avatar
dengyihao 已提交
214
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
dengyihao's avatar
dengyihao 已提交
215 216
  taosMemoryFree(cli->wbuf[0].base);
  taosMemoryFree(cli->wbuf[1].base);
dengyihao's avatar
dengyihao 已提交
217 218
  taosMemoryFree(cli->wbuf);
  taosMemoryFree(cli->rbuf);
dengyihao's avatar
dengyihao 已提交
219 220 221
  taosMemoryFree(cli->addr);
  taosMemoryFree(cli);
}
dengyihao's avatar
dengyihao 已提交
222

dengyihao's avatar
dengyihao 已提交
223
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
dengyihao's avatar
dengyihao 已提交
224 225 226
  SHttpClient* cli = handle->data;
  destroyHttpClient(cli);
}
dengyihao's avatar
dengyihao 已提交
227

dengyihao's avatar
dengyihao 已提交
228
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
229 230 231
  SHttpClient* cli = handle->data;
  buf->base = cli->rbuf;
  buf->len = HTTP_RECV_BUF_SIZE;
dengyihao's avatar
dengyihao 已提交
232
}
dengyihao's avatar
dengyihao 已提交
233

dengyihao's avatar
dengyihao 已提交
234
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
dengyihao's avatar
dengyihao 已提交
235
  SHttpClient* cli = handle->data;
dengyihao's avatar
dengyihao 已提交
236
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
237
    tError("http-report recv error:%s", uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
238
  } else {
dengyihao's avatar
dengyihao 已提交
239
    tTrace("http-report succ to recv %d bytes", (int32_t)nread);
dengyihao's avatar
dengyihao 已提交
240
  }
dengyihao's avatar
dengyihao 已提交
241 242 243
  if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
    uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
  }
dengyihao's avatar
dengyihao 已提交
244
}
dengyihao's avatar
dengyihao 已提交
245 246 247
static void clientSentCb(uv_write_t* req, int32_t status) {
  SHttpClient* cli = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
248
    tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
dengyihao's avatar
dengyihao 已提交
249 250 251
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
    }
dengyihao's avatar
dengyihao 已提交
252
    return;
dengyihao's avatar
dengyihao 已提交
253
  } else {
dengyihao's avatar
dengyihao 已提交
254
    tTrace("http-report succ to send data");
dengyihao's avatar
dengyihao 已提交
255
  }
dengyihao's avatar
dengyihao 已提交
256 257
  status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
258
    tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
dengyihao's avatar
dengyihao 已提交
259 260 261 262
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
    }
  }
dengyihao's avatar
dengyihao 已提交
263
}
S
Shengliang Guan 已提交
264
static void clientConnCb(uv_connect_t* req, int32_t status) {
dengyihao's avatar
dengyihao 已提交
265 266
  SHttpClient* cli = req->data;
  if (status != 0) {
dengyihao's avatar
dengyihao 已提交
267
    tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
dengyihao's avatar
dengyihao 已提交
268 269 270
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
    }
271 272
    return;
  }
dengyihao's avatar
dengyihao 已提交
273 274
  status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
  if (0 != status) {
dengyihao's avatar
dengyihao 已提交
275
    tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
dengyihao's avatar
dengyihao 已提交
276 277 278 279
    if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
      uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
    }
  }
280 281
}

dengyihao's avatar
dengyihao 已提交
282
int32_t httpSendQuit() {
dengyihao's avatar
dengyihao 已提交
283 284 285
  SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
  if (http == NULL) return 0;

dengyihao's avatar
dengyihao 已提交
286 287 288
  SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
  msg->quit = 1;

dengyihao's avatar
dengyihao 已提交
289 290
  transAsyncSend(http->asyncPool, &(msg->q));
  taosReleaseRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
291
  return 0;
292
}
dengyihao's avatar
dengyihao 已提交
293

D
dapan1121 已提交
294
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
dengyihao's avatar
dengyihao 已提交
295
                                      EHttpCompFlag flag) {
dengyihao's avatar
dengyihao 已提交
296 297 298 299 300 301
  SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
  if (load == NULL) {
    tError("http-report already released");
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
302
  SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
303

304 305
  msg->server = taosStrdup(server);
  msg->uri  = taosStrdup(uri);
dengyihao's avatar
dengyihao 已提交
306 307 308
  msg->port = port;
  msg->cont = taosMemoryMalloc(contLen);
  memcpy(msg->cont, pCont, contLen);
dengyihao's avatar
dengyihao 已提交
309
  msg->len = contLen;
dengyihao's avatar
dengyihao 已提交
310
  msg->flag = flag;
dengyihao's avatar
dengyihao 已提交
311 312
  msg->quit = 0;

dengyihao's avatar
dengyihao 已提交
313 314 315
  int ret = transAsyncSend(load->asyncPool, &(msg->q));
  taosReleaseRef(httpRefMgt, httpRef);
  return ret;
dengyihao's avatar
dengyihao 已提交
316
}
dengyihao's avatar
dengyihao 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336

static void httpDestroyClientCb(uv_handle_t* handle) {
  SHttpClient* http = handle->data;
  destroyHttpClient(http);
}
static void httpWalkCb(uv_handle_t* handle, void* arg) {
  // impl later
  if (!uv_is_closing(handle)) {
    uv_handle_type type = uv_handle_get_type(handle);
    if (uv_handle_get_type(handle) == UV_TCP) {
      uv_close(handle, httpDestroyClientCb);
    } else {
      uv_close(handle, NULL);
    }
  }
  return;
}
static void httpHandleQuit(SHttpMsg* msg) {
  taosMemoryFree(msg);

dengyihao's avatar
dengyihao 已提交
337 338 339
  SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
  if (http == NULL) return;

dengyihao's avatar
dengyihao 已提交
340
  uv_walk(http->loop, httpWalkCb, NULL);
dengyihao's avatar
dengyihao 已提交
341
  taosReleaseRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
342
}
dengyihao's avatar
dengyihao 已提交
343
static void httpHandleReq(SHttpMsg* msg) {
dengyihao's avatar
dengyihao 已提交
344 345 346 347
  SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
  if (http == NULL) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
348

dengyihao's avatar
dengyihao 已提交
349
  struct sockaddr_in dest = {0};
dengyihao's avatar
dengyihao 已提交
350
  if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
dengyihao's avatar
dengyihao 已提交
351
    goto END;
dengyihao's avatar
dengyihao 已提交
352 353 354 355 356 357 358 359
  }
  if (msg->flag == HTTP_GZIP) {
    int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
    if (dstLen > 0) {
      msg->len = dstLen;
    } else {
      msg->flag = HTTP_FLAT;
    }
dengyihao's avatar
dengyihao 已提交
360 361 362
    if (dstLen < 0) {
      goto END;
    }
dengyihao's avatar
dengyihao 已提交
363 364 365 366
  }

  int32_t len = 2048;
  char*   header = taosMemoryCalloc(1, len);
D
dapan1121 已提交
367
  int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag);
dengyihao's avatar
dengyihao 已提交
368 369 370 371
  if (headLen < 0) {
    taosMemoryFree(header);
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384

  uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
  wb[0] = uv_buf_init((char*)header, strlen(header));  //  heap var
  wb[1] = uv_buf_init((char*)msg->cont, msg->len);     //  heap var

  SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
  cli->conn.data = cli;
  cli->tcp.data = cli;
  cli->req.data = cli;
  cli->wbuf = wb;
  cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
  cli->addr = msg->server;
  cli->port = msg->port;
dengyihao's avatar
dengyihao 已提交
385
  cli->dest = dest;
dengyihao's avatar
dengyihao 已提交
386

D
dapan1121 已提交
387
  taosMemoryFree(msg->uri);
dengyihao's avatar
dengyihao 已提交
388 389 390
  taosMemoryFree(msg);

  uv_tcp_init(http->loop, &cli->tcp);
dengyihao's avatar
dengyihao 已提交
391

dengyihao's avatar
dengyihao 已提交
392 393
  // set up timeout to avoid stuck;
  int32_t fd = taosCreateSocketWithTimeout(5);
dengyihao's avatar
dengyihao 已提交
394
  int     ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
dengyihao's avatar
dengyihao 已提交
395
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
396
    tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
dengyihao's avatar
dengyihao 已提交
397
    taosReleaseRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
398 399 400 401
    destroyHttpClient(cli);
    return;
  }

dengyihao's avatar
dengyihao 已提交
402
  ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
dengyihao's avatar
dengyihao 已提交
403
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
404
    tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
dengyihao's avatar
dengyihao 已提交
405 406 407
           cli->port);
    destroyHttpClient(cli);
  }
dengyihao's avatar
dengyihao 已提交
408
  taosReleaseRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
409 410 411 412 413
  return;

END:
  tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
  httpDestroyMsg(msg);
dengyihao's avatar
dengyihao 已提交
414
  taosReleaseRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
415 416
}

dengyihao's avatar
dengyihao 已提交
417 418
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
                           EHttpCompFlag flag) {
dengyihao's avatar
dengyihao 已提交
419
  taosThreadOnce(&transHttpInit, transHttpEnvInit);
D
dapan1121 已提交
420
  return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
dengyihao's avatar
dengyihao 已提交
421 422
}

dengyihao's avatar
dengyihao 已提交
423
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
dengyihao's avatar
dengyihao 已提交
424
static void transHttpEnvInit() {
dengyihao's avatar
dengyihao 已提交
425
  httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
dengyihao's avatar
dengyihao 已提交
426

dengyihao's avatar
dengyihao 已提交
427
  SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
dengyihao's avatar
dengyihao 已提交
428 429 430 431
  http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
  uv_loop_init(http->loop);

  http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
D
dapan1121 已提交
432 433 434 435 436 437
  if (NULL == http->asyncPool) {
    taosMemoryFree(http->loop);
    taosMemoryFree(http);
    http = NULL;
    return;
  }
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439 440 441 442 443 444
  int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
  if (err != 0) {
    taosMemoryFree(http->loop);
    taosMemoryFree(http);
    http = NULL;
  }
dengyihao's avatar
dengyihao 已提交
445
  httpRef = taosAddRef(httpRefMgt, http);
dengyihao's avatar
dengyihao 已提交
446 447 448
}

void transHttpEnvDestroy() {
dengyihao's avatar
dengyihao 已提交
449 450
  // remove http
  if (httpRef == -1 || transHttpInit == PTHREAD_ONCE_INIT) {
dengyihao's avatar
dengyihao 已提交
451 452
    return;
  }
dengyihao's avatar
dengyihao 已提交
453
  SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
454 455 456 457 458 459 460 461
  httpSendQuit();
  taosThreadJoin(load->thread, NULL);

  TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg);
  transAsyncPoolDestroy(load->asyncPool);
  uv_loop_close(load->loop);
  taosMemoryFree(load->loop);

dengyihao's avatar
dengyihao 已提交
462 463
  taosReleaseRef(httpRefMgt, httpRef);
  taosRemoveRef(httpRefMgt, httpRef);
dengyihao's avatar
dengyihao 已提交
464
}