thttp.c 6.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
dengyihao's avatar
opt rpc  
dengyihao 已提交
17
// clang-format off
dengyihao's avatar
dengyihao 已提交
18
#include <uv.h>
dengyihao's avatar
dengyihao 已提交
19
#include "zlib.h"
dengyihao's avatar
opt rpc  
dengyihao 已提交
20
#include "thttp.h"
dengyihao's avatar
opt rpc  
dengyihao 已提交
21
#include "taoserror.h"
S
Shengliang Guan 已提交
22
#include "tlog.h"
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25

#define HTTP_RECV_BUF_SIZE 1024
dengyihao's avatar
dengyihao 已提交
26

dengyihao's avatar
dengyihao 已提交
27 28 29 30
typedef struct SHttpClient {
  uv_connect_t conn;
  uv_tcp_t     tcp;
  uv_write_t   req;
dengyihao's avatar
dengyihao 已提交
31 32
  uv_buf_t*    wbuf;
  char         *rbuf; 
dengyihao's avatar
dengyihao 已提交
33 34 35
  char*        addr;
  uint16_t     port;
} SHttpClient;
S
Shengliang Guan 已提交
36

S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
                                   EHttpCompFlag flag) {
  if (flag == HTTP_FLAT) {
    return snprintf(pHead, headLen,
                    "POST /report HTTP/1.1\n"
                    "Host: %s\n"
                    "Content-Type: application/json\n"
                    "Content-Length: %d\n\n",
                    server, contLen);
  } else if (flag == HTTP_GZIP) {
    return snprintf(pHead, headLen,
                    "POST /report HTTP/1.1\n"
                    "Host: %s\n"
                    "Content-Type: application/json\n"
                    "Content-Encoding: gzip\n"
                    "Content-Length: %d\n\n",
                    server, contLen);
  } else {
    return -1;
  }
}

dengyihao's avatar
dengyihao 已提交
59
static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
S
Shengliang Guan 已提交
60 61
  int32_t code = -1;
  int32_t destLen = srcLen;
wafwerar's avatar
wafwerar 已提交
62
  void*   pDest = taosMemoryMalloc(destLen);
S
Shengliang Guan 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123

  if (pDest == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

  z_stream gzipStream = {0};
  gzipStream.zalloc = (alloc_func)0;
  gzipStream.zfree = (free_func)0;
  gzipStream.opaque = (voidpf)0;
  if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _OVER;
  }

  gzipStream.next_in = (Bytef*)pSrc;
  gzipStream.avail_in = (uLong)srcLen;
  gzipStream.next_out = (Bytef*)pDest;
  gzipStream.avail_out = (uLong)(destLen);

  while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
    if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
      terrno = TSDB_CODE_COMPRESS_ERROR;
      goto _OVER;
    }
  }

  if (gzipStream.avail_in != 0) {
    terrno = TSDB_CODE_COMPRESS_ERROR;
    goto _OVER;
  }

  int32_t err = 0;
  while (1) {
    if ((err = deflate(&gzipStream, Z_FINISH)) == Z_STREAM_END) {
      break;
    }
    if (err != Z_OK) {
      terrno = TSDB_CODE_COMPRESS_ERROR;
      goto _OVER;
    }
  }

  if (deflateEnd(&gzipStream) != Z_OK) {
    terrno = TSDB_CODE_COMPRESS_ERROR;
    goto _OVER;
  }

  if (gzipStream.total_out >= srcLen) {
    terrno = TSDB_CODE_COMPRESS_ERROR;
    goto _OVER;
  }

  code = 0;

_OVER:
  if (code == 0) {
    memcpy(pSrc, pDest, gzipStream.total_out);
    code = gzipStream.total_out;
  }

wafwerar's avatar
wafwerar 已提交
124
  taosMemoryFree(pDest);
S
Shengliang Guan 已提交
125 126 127
  return code;
}

dengyihao's avatar
dengyihao 已提交
128
static void destroyHttpClient(SHttpClient* cli) {
dengyihao's avatar
dengyihao 已提交
129 130
  taosMemoryFree(cli->wbuf);
  taosMemoryFree(cli->rbuf);
dengyihao's avatar
dengyihao 已提交
131 132
  taosMemoryFree(cli->addr);
  taosMemoryFree(cli);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134 135 136 137 138
}
static void clientCloseCb(uv_handle_t* handle) {
  SHttpClient* cli = handle->data;
  destroyHttpClient(cli);
}
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145 146
static void clientAllocBuffCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
  SHttpClient* cli = handle->data; 
  buf->base = cli->rbuf; 
  buf->len = HTTP_RECV_BUF_SIZE;  
}
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t *buf) {
  SHttpClient* cli = handle->data; 
  if (nread < 0) {
dengyihao's avatar
dengyihao 已提交
147
    uError("http-report recv error:%s", uv_err_name(nread));
dengyihao's avatar
dengyihao 已提交
148
  } else {
dengyihao's avatar
dengyihao 已提交
149
    uTrace("http-report succ to recv %d bytes, just ignore it", nread);
dengyihao's avatar
dengyihao 已提交
150 151 152
  }
  uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} 
dengyihao's avatar
dengyihao 已提交
153 154 155 156 157 158
static void clientSentCb(uv_write_t* req, int32_t status) {
  SHttpClient* cli = req->data;
  if (status != 0) {
    terrno = TAOS_SYSTEM_ERROR(status);
    uError("http-report failed to send data %s", uv_strerror(status));
  } else {
159
    uTrace("http-report succ to send data");
dengyihao's avatar
dengyihao 已提交
160
  }
dengyihao's avatar
dengyihao 已提交
161
  uv_read_start((uv_stream_t *)&cli->tcp, clientAllocBuffCb, clientRecvCb); 
dengyihao's avatar
dengyihao 已提交
162
}
S
Shengliang Guan 已提交
163
static void clientConnCb(uv_connect_t* req, int32_t status) {
dengyihao's avatar
dengyihao 已提交
164 165
  SHttpClient* cli = req->data;
  if (status != 0) {
166
    terrno = TAOS_SYSTEM_ERROR(status);
dengyihao's avatar
dengyihao 已提交
167 168
    uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
    uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
169 170
    return;
  }
dengyihao's avatar
dengyihao 已提交
171
  uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
172 173
}

dengyihao's avatar
dengyihao 已提交
174
static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
dengyihao's avatar
dengyihao 已提交
175 176
  uint32_t ip = taosGetIpv4FromFqdn(server);
  if (ip == 0xffffffff) {
177
    terrno = TAOS_SYSTEM_ERROR(errno);
dengyihao's avatar
dengyihao 已提交
178
    uError("http-report failed to get http server:%s ip since %s", server, terrstr());
179 180
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
181 182 183
  char buf[128] = {0};
  tinet_ntoa(buf, ip);
  uv_ip4_addr(buf, port, dest);
dengyihao's avatar
dengyihao 已提交
184
  return 0;
185
}
S
Shengliang Guan 已提交
186
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
dengyihao's avatar
dengyihao 已提交
187 188 189
  struct sockaddr_in dest = {0};
  if (taosBuildDstAddr(server, port, &dest) < 0) {
    return -1;
S
Shengliang Guan 已提交
190
  }
S
Shengliang Guan 已提交
191 192 193 194 195 196 197 198
  if (flag == HTTP_GZIP) {
    int32_t dstLen = taosCompressHttpRport(pCont, contLen);
    if (dstLen > 0) {
      contLen = dstLen;
    } else {
      flag = HTTP_FLAT;
    }
  }
dengyihao's avatar
dengyihao 已提交
199
  terrno = 0;
S
Shengliang Guan 已提交
200

dengyihao's avatar
dengyihao 已提交
201
  char    header[2048] = {0};
S
Shengliang Guan 已提交
202
  int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
S
Shengliang Guan 已提交
203

dengyihao's avatar
dengyihao 已提交
204 205 206
  uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
  wb[0] = uv_buf_init((char*)header, headLen);  // stack var
  wb[1] = uv_buf_init((char*)pCont, contLen);   //  heap var
S
Shengliang Guan 已提交
207

dengyihao's avatar
dengyihao 已提交
208 209 210 211
  SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
  cli->conn.data = cli;
  cli->tcp.data = cli;
  cli->req.data = cli;
dengyihao's avatar
dengyihao 已提交
212 213
  cli->wbuf = wb;
  cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); 
dengyihao's avatar
dengyihao 已提交
214 215
  cli->addr = tstrdup(server);
  cli->port = port;
S
Shengliang Guan 已提交
216

dengyihao's avatar
dengyihao 已提交
217 218 219
  uv_loop_t* loop = uv_default_loop();
  uv_tcp_init(loop, &cli->tcp);
  // set up timeout to avoid stuck;
dengyihao's avatar
dengyihao 已提交
220 221
  int32_t fd = taosCreateSocketWithTimeout(5);
  uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
S
Shengliang Guan 已提交
222

dengyihao's avatar
dengyihao 已提交
223 224 225 226
  int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb);
  if (ret != 0) {
    uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
    destroyHttpClient(cli);
S
Shengliang Guan 已提交
227 228
  }

dengyihao's avatar
dengyihao 已提交
229 230 231
  uv_run(loop, UV_RUN_DEFAULT);
  uv_loop_close(loop);
  return terrno;
S
Shengliang Guan 已提交
232
}
dengyihao's avatar
opt rpc  
dengyihao 已提交
233
// clang-format on