httpServer.c 14.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#define _DEFAULT_SOURCE
17
#include "os.h"
H
hzcheng 已提交
18 19 20
#include "taosmsg.h"
#include "tsocket.h"
#include "tutil.h"
21
#include "ttime.h"
S
slguan 已提交
22
#include "ttimer.h"
S
slguan 已提交
23
#include "tglobal.h"
24 25
#include "httpInt.h"
#include "httpContext.h"
H
hzcheng 已提交
26
#include "httpResp.h"
27
#include "httpUtil.h"
H
hzcheng 已提交
28

29 30 31 32
#ifndef EPOLLWAKEUP
 #define EPOLLWAKEUP (1u << 29)
#endif

33 34
static void httpStopThread(HttpThread* pThread) {
  pThread->stop = true;
H
hzcheng 已提交
35

36 37 38 39 40
  // signal the thread to stop, try graceful method first,
  // and use pthread_cancel when failed
  struct epoll_event event = { .events = EPOLLIN };
  eventfd_t fd = eventfd(1, 0);
  if (fd == -1) {
41
    httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno));
42 43
    pthread_cancel(pThread->thread);
  } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
44
    httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno));
45 46
    pthread_cancel(pThread->thread);
  }
H
hzcheng 已提交
47

48 49 50 51
  pthread_join(pThread->thread, NULL);
  if (fd != -1) {
    close(fd);
  }
H
hzcheng 已提交
52

53 54 55 56
  close(pThread->pollFd);
  pthread_mutex_destroy(&(pThread->threadMutex));
}

57 58
void httpCleanUpConnect() {
  HttpServer *pServer = &tsHttpServer;
S
Shengliang Guan 已提交
59
  if (pServer->pThreads == NULL) return;
60

S
Shengliang Guan 已提交
61
  pthread_join(pServer->thread, NULL);
62 63 64 65 66
  for (int i = 0; i < pServer->numOfThreads; ++i) {
    HttpThread* pThread = pServer->pThreads + i;
    if (pThread != NULL) {
      httpStopThread(pThread);
    }
H
hzcheng 已提交
67 68
  }

69
  httpDebug("http server:%s is cleaned up", pServer->label);
H
hzcheng 已提交
70 71
}

72
int httpReadDataImp(HttpContext *pContext) {
S
slguan 已提交
73
  HttpParser *pParser = &pContext->parser;
H
hzcheng 已提交
74 75 76 77 78 79 80

  while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
    int nread = (int)taosReadSocket(pContext->fd, pParser->buffer + pParser->bufsize, HTTP_STEP_SIZE);
    if (nread >= 0 && nread < HTTP_STEP_SIZE) {
      pParser->bufsize += nread;
      break;
    } else if (nread < 0) {
81
      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
82
        httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event",
S
slguan 已提交
83
                  pContext, pContext->fd, pContext->ipstr, errno);
S
slguan 已提交
84
        break;
H
hzcheng 已提交
85 86 87
      } else {
        httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
                  pContext, pContext->fd, pContext->ipstr, errno);
88
        return HTTP_READ_DATA_FAILED;
H
hzcheng 已提交
89 90 91 92 93 94
      }
    } else {
      pParser->bufsize += nread;
    }

    if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) {
S
slguan 已提交
95 96
      httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d",
                pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE);
97
      return HTTP_REQUSET_TOO_BIG;
H
hzcheng 已提交
98 99 100 101 102
    }
  }

  pParser->buffer[pParser->bufsize] = 0;

103
  return HTTP_READ_DATA_SUCCESS;
H
hzcheng 已提交
104 105
}

106
static bool httpDecompressData(HttpContext *pContext) {
107
  if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) {
S
Shengliang Guan 已提交
108
    httpTraceL("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
109
    return true;
S
slguan 已提交
110 111
  }

112 113 114 115 116 117 118 119 120 121 122 123
  char   *decompressBuf = calloc(HTTP_DECOMPRESS_BUF_SIZE, 1);
  int32_t decompressBufLen = HTTP_DECOMPRESS_BUF_SIZE;
  size_t  bufsize = sizeof(pContext->parser.buffer) - (pContext->parser.data.pos - pContext->parser.buffer) - 1;
  if (decompressBufLen > (int)bufsize) {
    decompressBufLen = (int)bufsize;
  }

  int ret = httpGzipDeCompress(pContext->parser.data.pos, pContext->parser.data.len, decompressBuf, &decompressBufLen);

  if (ret == 0) {
    memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen);
    pContext->parser.data.pos[decompressBufLen] = 0;
S
Shengliang Guan 已提交
124 125
    httpTraceL("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd,
              pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf);
126 127 128 129 130 131 132 133
    pContext->parser.data.len = decompressBufLen;
  } else {
    httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d",
              pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, ret);
  }

  free(decompressBuf);
  return ret == 0;
S
slguan 已提交
134 135
}

136
static bool httpReadData(HttpContext *pContext) {
S
slguan 已提交
137 138 139 140
  if (!pContext->parsed) {
    httpInitContext(pContext);
  }

141 142 143 144 145 146 147 148
  int32_t code = httpReadDataImp(pContext);
  if (code != HTTP_READ_DATA_SUCCESS) {
    if (code == HTTP_READ_DATA_FAILED) {
      httpReleaseContext(pContext);
    } else {
      httpSendErrorResp(pContext, code);
      httpNotifyContextClose(pContext);
    }
S
slguan 已提交
149 150 151 152
    return false;
  }

  if (!httpParseRequest(pContext)) {
153
    httpNotifyContextClose(pContext);
S
slguan 已提交
154 155 156 157
    return false;
  }

  int ret = httpCheckReadCompleted(pContext);
158
  if (ret == HTTP_CHECK_BODY_CONTINUE) {
159
    //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
160
    httpReleaseContext(pContext);
S
slguan 已提交
161
    return false;
162
  } else if (ret == HTTP_CHECK_BODY_SUCCESS){
163
    httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
S
slguan 已提交
164
              pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len);
165
    if (httpDecompressData(pContext)) {
S
slguan 已提交
166 167
      return true;
    } else {
168
      httpNotifyContextClose(pContext);
169
      httpReleaseContext(pContext);
S
slguan 已提交
170 171
      return false;
    }
S
slguan 已提交
172 173
  } else {
    httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
174
    httpNotifyContextClose(pContext);
175
    httpReleaseContext(pContext);
S
slguan 已提交
176 177
    return false;
  }
H
hzcheng 已提交
178 179
}

180 181
static void httpProcessHttpData(void *param) {
  HttpServer  *pServer = &tsHttpServer;
S
slguan 已提交
182
  HttpThread  *pThread = (HttpThread *)param;
H
hzcheng 已提交
183 184 185 186 187 188 189 190 191 192 193 194
  HttpContext *pContext;
  int          fdNum;

  sigset_t set;
  sigemptyset(&set);
  sigaddset(&set, SIGPIPE);
  pthread_sigmask(SIG_SETMASK, &set, NULL);

  while (1) {
    struct epoll_event events[HTTP_MAX_EVENTS];
    //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
    fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
195
    if (pThread->stop) {
196
      httpDebug("%p, http thread get stop event, exiting...", pThread);
197 198
      break;
    }
H
hzcheng 已提交
199 200 201
    if (fdNum <= 0) continue;

    for (int i = 0; i < fdNum; ++i) {
202
      pContext = httpGetContext(events[i].data.ptr);
203
      if (pContext == NULL) {
S
Shengliang Guan 已提交
204 205 206
        httpError("context:%p, is already released, close connect", events[i].data.ptr);
        //epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
        //tclose(events[i].data.fd);
H
hzcheng 已提交
207 208 209 210
        continue;
      }

      if (events[i].events & EPOLLPRI) {
211
        httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect",
S
slguan 已提交
212
                  pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
213
        httpCloseContextByServer(pContext);
H
hzcheng 已提交
214 215 216 217
        continue;
      }

      if (events[i].events & EPOLLRDHUP) {
218
        httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect",
S
slguan 已提交
219
                  pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
220
        httpCloseContextByServer(pContext);
H
hzcheng 已提交
221 222 223 224
        continue;
      }

      if (events[i].events & EPOLLERR) {
225
        httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect",
S
slguan 已提交
226
                  pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
227
        httpCloseContextByServer(pContext);
H
hzcheng 已提交
228 229 230 231
        continue;
      }

      if (events[i].events & EPOLLHUP) {
232
        httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect",
S
slguan 已提交
233
                  pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
234
        httpCloseContextByServer(pContext);
H
hzcheng 已提交
235 236 237
        continue;
      }

S
slguan 已提交
238
      if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
239
        httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events",
S
slguan 已提交
240
                pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state));
241
        httpReleaseContext(pContext);
H
hzcheng 已提交
242 243 244
        continue;
      }

245
      if (pServer->status != HTTP_SERVER_RUNNING) {
246
        httpDebug("context:%p, fd:%d, ip:%s, state:%s, server is not running, accessed:%d, close connect", pContext,
247
                  pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
S
slguan 已提交
248
        httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE);
249
        httpNotifyContextClose(pContext);
S
slguan 已提交
250
      } else {
251
        if (httpReadData(pContext)) {
S
slguan 已提交
252
          (*(pThread->processData))(pContext);
253
          atomic_fetch_add_32(&pServer->requestNum, 1);
S
slguan 已提交
254
        }
H
hzcheng 已提交
255 256 257 258 259
      }
    }
  }
}

260
static void *httpAcceptHttpConnection(void *arg) {
H
hzcheng 已提交
261 262 263
  int                connFd = -1;
  struct sockaddr_in clientAddr;
  int                threadId = 0;
264 265 266 267
  HttpServer *       pServer = &tsHttpServer;
  HttpThread *       pThread = NULL;
  HttpContext *      pContext = NULL;
  int                totalFds = 0;
H
hzcheng 已提交
268 269 270 271 272 273

  sigset_t set;
  sigemptyset(&set);
  sigaddset(&set, SIGPIPE);
  pthread_sigmask(SIG_SETMASK, &set, NULL);

274
  pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
H
hzcheng 已提交
275

276
  if (pServer->fd < 0) {
277 278
    httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label,
              taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno));
279
    return NULL;
H
hzcheng 已提交
280
  } else {
281
    httpInfo("http server init success at %u", pServer->serverPort);
282
    pServer->status = HTTP_SERVER_RUNNING;
H
hzcheng 已提交
283 284 285 286
  }

  while (1) {
    socklen_t addrlen = sizeof(clientAddr);
287 288 289
    connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
    if (connFd == -1) {
      if (errno == EINVAL) {
290
        httpDebug("http server:%s socket was shutdown, exiting...", pServer->label);
291 292
        break;
      }
293
      httpError("http server:%s, accept connect failure, errno:%d reason:%s", pServer->label, errno, strerror(errno));
H
hzcheng 已提交
294 295 296
      continue;
    }

S
slguan 已提交
297 298
    totalFds = 1;
    for (int i = 0; i < pServer->numOfThreads; ++i) {
S
Shengliang Guan 已提交
299
      totalFds += pServer->pThreads[i].numOfContexts;
S
slguan 已提交
300 301
    }

S
Shengliang Guan 已提交
302
#if 0
S
slguan 已提交
303
    if (totalFds > tsHttpCacheSessions * 100) {
304 305
      httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection", connFd,
                inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions);
S
slguan 已提交
306 307 308
      taosCloseSocket(connFd);
      continue;
    }
S
Shengliang Guan 已提交
309
#endif    
S
slguan 已提交
310

H
hzcheng 已提交
311 312 313 314 315 316
    taosKeepTcpAlive(connFd);
    taosSetNonblocking(connFd, 1);

    // pick up the thread to handle this connection
    pThread = pServer->pThreads + threadId;

317
    pContext = httpCreateContext(connFd);
H
hzcheng 已提交
318 319 320 321 322 323 324 325
    if (pContext == NULL) {
      httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd, inet_ntoa(clientAddr.sin_addr),
                htons(clientAddr.sin_port));
      taosCloseSocket(connFd);
      continue;
    }

    pContext->pThread = pThread;
326
    sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
327
    
H
hzcheng 已提交
328 329
    struct epoll_event event;
    event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
330
    event.data.ptr = pContext;
H
hzcheng 已提交
331
    if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
332 333 334 335
      httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
                pContext->ipstr, pThread->label, strerror(errno));
      tclose(pContext->fd);
      httpReleaseContext(pContext);
H
hzcheng 已提交
336 337 338 339
      continue;
    }

    // notify the data process, add into the FdObj list
S
Shengliang Guan 已提交
340 341 342
    atomic_add_fetch_32(&pThread->numOfContexts, 1);
    httpDebug("context:%p, fd:%d, ip:%s, thread:%s numOfContexts:%d totalFds:%d, accept a new connection", pContext,
              connFd, pContext->ipstr, pThread->label, pThread->numOfContexts, totalFds);
H
hzcheng 已提交
343 344 345 346 347

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServer->numOfThreads;
  }
348 349 350

  close(pServer->fd);
  return NULL;
H
hzcheng 已提交
351 352
}

353 354 355
bool httpInitConnect() {
  HttpServer *pServer = &tsHttpServer;
  pServer->pThreads = calloc(pServer->numOfThreads, sizeof(HttpThread));
H
hzcheng 已提交
356 357 358 359 360
  if (pServer->pThreads == NULL) {
    httpError("init error no enough memory");
    return false;
  }

361 362
  HttpThread *pThread = pServer->pThreads;
  for (int i = 0; i < pServer->numOfThreads; ++i) {
H
hzcheng 已提交
363 364 365 366 367 368 369 370 371 372 373 374
    sprintf(pThread->label, "%s%d", pServer->label, i);
    pThread->processData = pServer->processData;
    pThread->threadId = i;

    if (pthread_mutex_init(&(pThread->threadMutex), NULL) < 0) {
      httpError("http thread:%s, failed to init HTTP process data mutex, reason:%s", pThread->label, strerror(errno));
      return false;
    }

    pThread->pollFd = epoll_create(HTTP_MAX_EVENTS);  // size does not matter
    if (pThread->pollFd < 0) {
      httpError("http thread:%s, failed to create HTTP epoll", pThread->label);
S
Shengliang Guan 已提交
375
      pthread_mutex_destroy(&(pThread->threadMutex));
H
hzcheng 已提交
376 377 378
      return false;
    }

S
slguan 已提交
379 380 381
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
H
hzcheng 已提交
382
    if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) {
383 384
      httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label,
                strerror(errno));
S
Shengliang Guan 已提交
385
      pthread_mutex_destroy(&(pThread->threadMutex));        
H
hzcheng 已提交
386 387
      return false;
    }
S
slguan 已提交
388
    pthread_attr_destroy(&thattr);
H
hzcheng 已提交
389

390
    httpDebug("http thread:%p:%s, initialized", pThread, pThread->label);
H
hzcheng 已提交
391 392 393
    pThread++;
  }

S
slguan 已提交
394 395 396
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
H
hzcheng 已提交
397 398
  if (pthread_create(&(pServer->thread), &thattr, (void *)httpAcceptHttpConnection, (void *)(pServer)) != 0) {
    httpError("http server:%s, failed to create Http accept thread, reason:%s", pServer->label, strerror(errno));
S
Shengliang Guan 已提交
399
    httpCleanUpConnect();
H
hzcheng 已提交
400 401 402 403
    return false;
  }
  pthread_attr_destroy(&thattr);

404
  httpDebug("http server:%s, initialized, ip:%s:%u, numOfThreads:%d", pServer->label, taosIpStr(pServer->serverIp),
H
hzcheng 已提交
405 406 407
            pServer->serverPort, pServer->numOfThreads);
  return true;
}