提交 5fa8bc80 编写于 作者: S Shengliang Guan

Merge from freemine:with-http-parser

#ifndef _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_
#define _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_
#include <stddef.h>
#define EHTTP_GZIP_CHUNK_SIZE_DEFAULT (1024*16)
typedef struct ehttp_gzip_s ehttp_gzip_t;
typedef struct ehttp_gzip_callbacks_s ehttp_gzip_callbacks_t;
typedef struct ehttp_gzip_conf_s ehttp_gzip_conf_t;
struct ehttp_gzip_callbacks_s {
void (*on_data)(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len);
};
struct ehttp_gzip_conf_s {
int get_header:2; // 0: not fetching header info
size_t chunk_size; // 0: fallback to default: EHTTP_GZIP_CHUNK_SIZE_DEFAULT
};
ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
void ehttp_gzip_destroy(ehttp_gzip_t *gzip);
int ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, size_t len);
int ehttp_gzip_finish(ehttp_gzip_t *gzip);
#endif // _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_
#ifndef _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e
#define _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e
#include <stddef.h>
typedef struct ehttp_parser_s ehttp_parser_t;
typedef struct ehttp_parser_callbacks_s ehttp_parser_callbacks_t;
typedef struct ehttp_parser_conf_s ehttp_parser_conf_t;
typedef struct ehttp_status_code_s ehttp_status_code_t;
struct ehttp_parser_callbacks_s {
void (*on_request_line)(void *arg, const char *method, const char *target, const char *version, const char *target_raw);
void (*on_status_line)(void *arg, const char *version, int status_code, const char *reason_phrase);
void (*on_header_field)(void *arg, const char *key, const char *val);
void (*on_body)(void *arg, const char *chunk, size_t len);
void (*on_end)(void *arg);
void (*on_error)(void *arg, int status_code);
};
struct ehttp_parser_conf_s {
size_t flush_block_size; // <=0: immediately
};
ehttp_parser_t* ehttp_parser_create(ehttp_parser_callbacks_t callbacks, ehttp_parser_conf_t conf, void *arg);
void ehttp_parser_destroy(ehttp_parser_t *parser);
int ehttp_parser_parse(ehttp_parser_t *parser, const char *buf, size_t len);
int ehttp_parser_parse_string(ehttp_parser_t *parser, const char *str);
int ehttp_parser_parse_char(ehttp_parser_t *parser, const char c);
int ehttp_parser_parse_end(ehttp_parser_t *parser);
char* ehttp_parser_urldecode(const char *enc);
const char* ehttp_status_code_get_desc(const int status_code);
#endif // _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e
#ifndef _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_
#define _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_
#include <stddef.h>
typedef struct ehttp_util_string_s ehttp_util_string_t;
struct ehttp_util_string_s {
char *str;
size_t len;
};
void ehttp_util_string_cleanup(ehttp_util_string_t *str);
int ehttp_util_string_append(ehttp_util_string_t *str, const char *s, size_t len);
void ehttp_util_string_clear(ehttp_util_string_t *str);
#endif // _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_
......@@ -31,4 +31,7 @@ void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
void ehttpIncContextRef(HttpContext *pContext);
void ehttpDecContextRef(HttpContext **ppContext);
#endif
......@@ -28,6 +28,8 @@
#include "httpLog.h"
#include "httpJson.h"
#include "ehttp_parser.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
......@@ -162,6 +164,11 @@ typedef struct {
int32_t len;
} HttpBuf;
typedef enum {
EHTTP_CONTEXT_PROCESS_FAILED = 0x01,
EHTTP_CONTEXT_PARSER_FAILED = 0x02
} EHTTP_CONTEXT_FAILED_CAUSE;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
......@@ -172,6 +179,10 @@ typedef struct {
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
ehttp_parser_t *parser;
int inited:2;
int failed:4;
} HttpParser;
typedef struct HttpContext {
......@@ -201,6 +212,8 @@ typedef struct HttpContext {
void * timer;
HttpEncodeMethod * encodeMethod;
struct HttpThread *pThread;
int closed:2;
} HttpContext;
typedef struct HttpThread {
......@@ -231,6 +244,8 @@ typedef struct HttpServer {
pthread_mutex_t serverMutex;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
bool (*processData)(HttpContext *pContext);
int fallback:2;
} HttpServer;
extern const char *httpKeepAliveStr[];
......
#include "ehttp_gzip.h"
#include "os.h"
#include "zlib.h"
#include <stdlib.h>
typedef enum {
EHTTP_GZIP_INITING,
EHTTP_GZIP_READY,
EHTTP_GZIP_CLOSED,
} EHTTP_GZIP_STATE;
struct ehttp_gzip_s {
ehttp_gzip_conf_t conf;
ehttp_gzip_callbacks_t callbacks;
void *arg;
z_stream *gzip;
gz_header *header;
char *chunk;
int state;
};
static void dummy_on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len) {
}
static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) {
switch(gzip->state) {
case EHTTP_GZIP_READY: {
inflateEnd(gzip->gzip);
} break;
default: break;
}
if (gzip->gzip) {
free(gzip->gzip);
gzip->gzip = NULL;
}
if (gzip->header) {
free(gzip->header);
gzip->header = NULL;
}
if (gzip->chunk) {
free(gzip->chunk);
gzip->chunk = NULL;
}
gzip->state = EHTTP_GZIP_CLOSED;
}
ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg) {
ehttp_gzip_t *gzip = (ehttp_gzip_t*)calloc(1, sizeof(*gzip));
if (!gzip) return NULL;
do {
gzip->conf = conf;
gzip->callbacks = callbacks;
gzip->arg = arg;
if (gzip->callbacks.on_data == NULL) gzip->callbacks.on_data = dummy_on_data;
gzip->gzip = (z_stream*)calloc(1, sizeof(*gzip->gzip));
if (gzip->conf.get_header) {
gzip->header = (gz_header*)calloc(1, sizeof(*gzip->header));
}
if (gzip->conf.chunk_size<=0) gzip->conf.chunk_size = EHTTP_GZIP_CHUNK_SIZE_DEFAULT;
gzip->chunk = (char*)malloc(gzip->conf.chunk_size);
if (!gzip->gzip || (gzip->conf.get_header && !gzip->header) || !gzip->chunk) break;
gzip->gzip->zalloc = Z_NULL;
gzip->gzip->zfree = Z_NULL;
gzip->gzip->opaque = Z_NULL;
// 863 windowBits can also be greater than 15 for optional gzip decoding. Add
// 864 32 to windowBits to enable zlib and gzip decoding with automatic header
// 865 detection, or add 16 to decode only the gzip format (the zlib format will
// 866 return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a
// 867 CRC-32 instead of an Adler-32. Unlike the gunzip utility and gzread() (see
// 868 below), inflate() will not automatically decode concatenated gzip streams.
// 869 inflate() will return Z_STREAM_END at the end of the gzip stream. The state
// 870 would need to be reset to continue decoding a subsequent gzip stream.
int ret = inflateInit2(gzip->gzip, 32); // 32/16? 32/16 + MAX_WBITS
if (ret != Z_OK) break;
if (gzip->header) {
ret = inflateGetHeader(gzip->gzip, gzip->header);
}
if (ret != Z_OK) break;
gzip->gzip->next_out = (z_const Bytef*)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size;
gzip->state = EHTTP_GZIP_READY;
return gzip;
} while (0);
ehttp_gzip_destroy(gzip);
return NULL;
}
ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
void ehttp_gzip_destroy(ehttp_gzip_t *gzip) {
ehttp_gzip_cleanup(gzip);
free(gzip);
}
int ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, size_t len) {
if (gzip->state != EHTTP_GZIP_READY) return -1;
if (len <= 0) return 0;
gzip->gzip->next_in = (z_const Bytef*)buf;
gzip->gzip->avail_in = len;
while (gzip->gzip->avail_in) {
int ret;
if (gzip->header) {
ret = inflate(gzip->gzip, Z_BLOCK);
} else {
ret = inflate(gzip->gzip, Z_SYNC_FLUSH);
}
if (ret != Z_OK && ret != Z_STREAM_END) return -1;
if (gzip->gzip->avail_out>0) {
if (ret!=Z_STREAM_END) continue;
}
size_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk;
gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);
gzip->gzip->next_out = (z_const Bytef*)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size;
}
return 0;
}
int ehttp_gzip_finish(ehttp_gzip_t *gzip) {
if (gzip->state != EHTTP_GZIP_READY) return -1;
gzip->gzip->next_in = NULL;
gzip->gzip->avail_in = 0;
int ret;
ret = inflate(gzip->gzip, Z_FINISH);
if (ret != Z_STREAM_END) return -1;
size_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk;
gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);
gzip->gzip->next_out = NULL;
gzip->gzip->avail_out = 0;
return 0;
}
此差异已折叠。
#include "ehttp_util_string.h"
#include <stdlib.h>
#include <string.h>
void ehttp_util_string_cleanup(ehttp_util_string_t *str) {
free(str->str);
str->str = NULL;
str->len = 0;
}
int ehttp_util_string_append(ehttp_util_string_t *str, const char *s, size_t len) {
// int n = str->str?strlen(str->str):0;
int n = str->len;
char *p = (char*)realloc(str->str, n + len + 1);
if (!p) return -1;
strncpy(p+n, s, len);
p[n+len] = '\0';
str->str = p;
str->len = n+len;
return 0;
}
void ehttp_util_string_clear(ehttp_util_string_t *str) {
if (str->str) {
str->str[0] = '\0';
str->len = 0;
}
}
......@@ -27,17 +27,42 @@
#include "httpSql.h"
#include "httpSession.h"
#include "httpContext.h"
#include "elog.h"
// dirty tweak
extern bool httpGetHttpMethod(HttpContext* pContext);
extern bool httpParseURL(HttpContext* pContext);
extern bool httpParseHttpVersion(HttpContext* pContext);
extern bool httpGetDecodeMethod(HttpContext* pContext);
extern bool httpParseHead(HttpContext* pContext);
static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw);
static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase);
static void on_header_field(void *arg, const char *key, const char *val);
static void on_body(void *arg, const char *chunk, size_t len);
static void on_end(void *arg);
static void on_error(void *arg, int status_code);
static void httpDestroyContext(void *data);
static void httpMightDestroyContext(void *data);
static void ehttpReleaseContext(HttpContext *pContext);
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1);
taosCloseSocket(fd);
if (!tsHttpServer.fallback) {
ehttpDecContextRef(&pContext);
}
}
}
static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
D("==context[%p] destroyed==", pContext);
if (pContext->fd > 0) taosClose(pContext->fd);
HttpThread *pThread = pContext->pThread;
......@@ -54,11 +79,18 @@ static void httpDestroyContext(void *data) {
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
if (!tsHttpServer.fallback) {
if (pContext->parser.parser) {
ehttp_parser_destroy(pContext->parser.parser);
pContext->parser.parser = NULL;
}
}
taosTFree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc");
tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
......@@ -103,16 +135,20 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
D("==context[%p] created==", pContext);
pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
ehttpIncContextRef(pContext);
uint64_t handleVal = (uint64_t)pContext;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(int64_t), &pContext, sizeof(int64_t), 3000);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
ehttpIncContextRef(pContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
......@@ -122,10 +158,13 @@ HttpContext *httpCreateContext(int32_t fd) {
HttpContext *httpGetContext(void *ptr) {
uint64_t handleVal = (uint64_t)ptr;
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &handleVal, sizeof(HttpContext *));
EQ_ASSERT(ppContext);
EQ_ASSERT(*ppContext);
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
if (!tsHttpServer.fallback) return pContext;
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext;
......@@ -135,6 +174,10 @@ HttpContext *httpGetContext(void *ptr) {
}
void httpReleaseContext(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
ehttpReleaseContext(pContext);
return;
}
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount < 0) {
httpError("context:%p, is already released, refCount:%d", pContext, refCount);
......@@ -169,12 +212,31 @@ bool httpInitContext(HttpContext *pContext) {
memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer;
if (!tsHttpServer.fallback) {
ehttp_parser_callbacks_t callbacks = {
on_request_line,
on_status_line,
on_header_field,
on_body,
on_end,
on_error
};
ehttp_parser_conf_t conf = {
.flush_block_size = 0
};
pParser->parser = ehttp_parser_create(callbacks, conf, pContext);
pParser->inited = 1;
}
httpDebug("context:%p, fd:%d, ip:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr,
pContext->accessTimes, pContext->parsed);
return true;
}
void httpCloseContextByApp(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
if (pContext->parsed == false) return;
}
pContext->parsed = false;
bool keepAlive = true;
......@@ -211,10 +273,14 @@ void httpCloseContextByApp(HttpContext *pContext) {
pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
httpReleaseContext(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
}
void httpCloseContextByServer(HttpContext *pContext) {
if (!tsHttpServer.fallback) {
if (pContext->closed) return;
pContext->closed = 1;
}
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
......@@ -229,5 +295,176 @@ void httpCloseContextByServer(HttpContext *pContext) {
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
}
static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer);
int n = snprintf(pParser->pLast, avail,
"%s %s %s\r\n", method, target_raw, version);
char *last = pParser->pLast;
do {
if (n>=avail) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), exceeding buffer size",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw);
break;
}
pParser->bufsize += n;
if (!httpGetHttpMethod(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http method failed",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw);
break;
}
if (!httpParseURL(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http url failed",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw);
break;
}
if (!httpParseHttpVersion(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http version failed",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw);
break;
}
if (!httpGetDecodeMethod(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), get decode method failed",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw);
break;
}
last += n;
pParser->pLast = last;
return;
} while (0);
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
}
static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
}
static void on_header_field(void *arg, const char *key, const char *val) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
if (pParser->failed) return;
D("==key:[%s], val:[%s]==", key, val);
int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer);
int n = snprintf(pParser->pLast, avail,
"%s: %s\r\n", key, val);
char *last = pParser->pLast;
do {
if (n>=avail) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), exceeding buffer size",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val);
break;
}
pParser->bufsize += n;
pParser->pCur = pParser->pLast + n;
if (!httpParseHead(pContext)) {
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), parse head failed",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val);
break;
}
last += n;
pParser->pLast = last;
return;
} while (0);
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
}
static void on_body(void *arg, const char *chunk, size_t len) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
if (pParser->failed) return;
if (pParser->data.pos == 0) {
pParser->data.pos = pParser->pLast;
pParser->data.len = 0;
}
int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer);
if (len+1>=avail) {
pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED;
return;
}
memcpy(pParser->pLast, chunk, len);
pParser->pLast += len;
pParser->data.len += len;
}
static void on_end(void *arg) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
if (pParser->failed) return;
if (pParser->data.pos == 0) pParser->data.pos = pParser->pLast;
if (!pContext->parsed) {
pContext->parsed = true;
}
}
static void on_error(void *arg, int status_code) {
HttpContext *pContext = (HttpContext*)arg;
HttpParser *pParser = &pContext->parser;
pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED;
}
static void httpMightDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
if (!tsHttpServer.fallback) {
httpRemoveContextFromEpoll(pContext);
ehttpDecContextRef(&pContext);
return;
}
httpDestroyContext(data);
}
static void ehttpReleaseContext(HttpContext *pContext) {
HttpContext **ppContext = pContext->ppContext;
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
httpDebug("context:%p, won't be destroyed for cache is already released", pContext);
// httpDestroyContext((void **)(&ppContext));
}
}
void ehttpIncContextRef(HttpContext *pContext) {
if (tsHttpServer.fallback) return;
atomic_add_fetch_32(&pContext->refCount, 1);
}
void ehttpDecContextRef(HttpContext **ppContext) {
if (tsHttpServer.fallback) return;
HttpContext *pContext = *ppContext;
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount>0) return;
EQ_ASSERT(refCount==0);
httpDestroyContext(ppContext);
}
......@@ -25,10 +25,14 @@
#include "httpResp.h"
#include "httpUtil.h"
#include "elog.h"
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
static bool ehttpReadData(HttpContext *pContext);
static void httpStopThread(HttpThread* pThread) {
pThread->stop = true;
......@@ -134,6 +138,8 @@ static bool httpDecompressData(HttpContext *pContext) {
}
static bool httpReadData(HttpContext *pContext) {
if (!tsHttpServer.fallback) return ehttpReadData(pContext);
if (!pContext->parsed) {
httpInitContext(pContext);
}
......@@ -188,6 +194,8 @@ static void httpProcessHttpData(void *param) {
sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL);
elog_set_thread_name("httpProcessHttpData");
while (1) {
struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
......@@ -207,10 +215,14 @@ static void httpProcessHttpData(void *param) {
continue;
}
ehttpIncContextRef(pContext);
if (events[i].events & EPOLLPRI) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -218,6 +230,8 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -225,6 +239,8 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -232,6 +248,8 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpCloseContextByServer(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -239,6 +257,7 @@ static void httpProcessHttpData(void *param) {
httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state));
httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -247,11 +266,15 @@ static void httpProcessHttpData(void *param) {
pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes);
httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE);
httpNotifyContextClose(pContext);
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
} else {
if (httpReadData(pContext)) {
(*(pThread->processData))(pContext);
atomic_fetch_add_32(&pServer->requestNum, 1);
}
if (!tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
}
}
}
......@@ -332,7 +355,8 @@ static void *httpAcceptHttpConnection(void *arg) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
pContext->ipstr, pThread->label, strerror(errno));
taosClose(pContext->fd);
httpReleaseContext(pContext);
if (tsHttpServer.fallback) httpReleaseContext(pContext);
ehttpDecContextRef(&pContext);
continue;
}
......@@ -405,3 +429,80 @@ bool httpInitConnect() {
pServer->serverPort, pServer->numOfThreads);
return true;
}
static bool ehttpReadData(HttpContext *pContext) {
HttpParser *pParser = &pContext->parser;
EQ_ASSERT(!pContext->parsed);
if (!pParser->parser) {
if (!pParser->inited) {
httpInitContext(pContext);
}
if (!pParser->parser) {
return false;
}
}
pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec();
char buf[HTTP_STEP_SIZE+1] = {0};
int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf));
if (nread > 0) {
buf[nread] = '\0';
if (strstr(buf, "GET ")==buf && !strchr(buf, '\r') && !strchr(buf, '\n')) {
D("==half of request line received:\n%s\n==", buf);
}
if (ehttp_parser_parse(pParser->parser, buf, nread)) {
D("==parsing failed==");
httpCloseContextByServer(pContext);
return false;
}
if (pContext->parser.failed) {
D("==parsing failed: [0x%x]==", pContext->parser.failed);
httpNotifyContextClose(pContext);
return false;
}
if (pContext->parsed) {
// int ret = httpCheckReadCompleted(pContext);
// already done in ehttp_parser
int ret = HTTP_CHECK_BODY_SUCCESS;
if (ret == HTTP_CHECK_BODY_CONTINUE) {
//httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr);
return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len);
if (httpDecompressData(pContext)) {
return true;
} else {
httpNotifyContextClose(pContext);
return false;
}
} else {
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
httpNotifyContextClose(pContext);
return false;
}
}
return pContext->parsed;
} else if (nread < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event",
pContext, pContext->fd, pContext->ipstr, errno);
return false; // later again
} else {
httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect",
pContext, pContext->fd, pContext->ipstr, errno);
return false;
}
} else {
// eof
return false;
}
}
......@@ -40,6 +40,12 @@ HttpServer tsHttpServer;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
int httpInitSystem() {
tsHttpServer.fallback = 0;
const char *v = getenv("FALLBACK");
if (v) {
tsHttpServer.fallback = 1;
}
strcpy(tsHttpServer.label, "rest");
tsHttpServer.serverIp = 0;
tsHttpServer.serverPort = tsHttpPort;
......
#ifndef _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_
#define _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_
#include <stdio.h>
typedef enum {
ELOG_DEBUG,
ELOG_INFO,
ELOG_WARN,
ELOG_ERROR,
ELOG_CRITICAL,
ELOG_VERBOSE,
ELOG_ABORT,
} ELOG_LEVEL;
void elog_set_level(ELOG_LEVEL base); // only log those not less than base
void elog_set_thread_name(const char *name);
void elog(ELOG_LEVEL level, int fd, const char *file, int line, const char *func, const char *fmt, ...)
#ifdef __GNUC__
__attribute__((format(printf, 6, 7)))
#endif
;
#define DLOG(fd, fmt, ...) elog(ELOG_DEBUG, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define ILOG(fd, fmt, ...) elog(ELOG_INFO, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define WLOG(fd, fmt, ...) elog(ELOG_WARN, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define ELOG(fd, fmt, ...) elog(ELOG_ERROR, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define CLOG(fd, fmt, ...) elog(ELOG_CRITICAL, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define VLOG(fd, fmt, ...) elog(ELOG_VERBOSE, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define ALOG(fd, fmt, ...) elog(ELOG_ABORT, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define D(fmt, ...) elog(ELOG_DEBUG, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define I(fmt, ...) elog(ELOG_INFO, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define W(fmt, ...) elog(ELOG_WARN, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define E(fmt, ...) elog(ELOG_ERROR, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define C(fmt, ...) elog(ELOG_CRITICAL, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define V(fmt, ...) elog(ELOG_VERBOSE, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
#define A(fmt, ...) elog(ELOG_ABORT, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__)
// NOTE: https://en.wikipedia.org/wiki/Fail-fast
// for the sake of simplicity, both implementation and usage,
// we'll follow `fail-fast` or `let-it-crash` philosophy.
// assertion in both debug/release build
#define EQ_ABORT(fmt, ...) A("Assertion failure: "fmt, ##__VA_ARGS__)
#define EQ_ASSERT(statement) do { \
if (statement) break; \
A("Assertion failure: %s", #statement); \
} while (0)
#define EQ_ASSERT_EXT(statement, fmt, ...) do { \
if (statement) break; \
A("Assertion failure: %s: "fmt, #statement, ##__VA_ARGS__); \
} while (0)
#define EQ_ASSERT_API0(statement) do { \
if (statement) break; \
A("Assertion failure: %s failed: [%d]%s", #statement, errno, strerror(errno)); \
} while (0)
#define EQ_ASSERT_API(api) do { \
A("Assertion failure: %s failed: [%d]%s", #api, errno, strerror(errno)); \
} while (0)
#endif // _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_
#include "elog.h"
#include <libgen.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/syscall.h>
#include <time.h>
#include <unistd.h>
#define gettid() syscall(__NR_gettid)
static ELOG_LEVEL elog_level_base = ELOG_DEBUG;
static __thread long elog_thread_id;
static __thread char elog_thread_name[24] = {0};
void elog_set_level(ELOG_LEVEL base) {
elog_level_base = base;
}
void elog_set_thread_name(const char *name) {
elog_thread_id = gettid();
snprintf(elog_thread_name, sizeof(elog_thread_name), "%s", name);
}
void elog(ELOG_LEVEL level, int fd, const char *file, int line, const char *func, const char *fmt, ...)
{
if (level < elog_level_base) return;
if (fd == -1) return;
if (elog_thread_name[0]=='\0') {
elog_set_thread_name("unknown");
}
char *p;
int n;
size_t bytes;
char buf[4096];
snprintf(buf, sizeof(buf), "%s", file);
char fn[1024];
snprintf(fn, sizeof(fn), "%s", basename(buf));
char C;
switch (level) {
case ELOG_DEBUG: C = 'D'; break;
case ELOG_INFO: C = 'I'; break;
case ELOG_WARN: C = 'W'; break;
case ELOG_ERROR: C = 'E'; break;
case ELOG_CRITICAL: C = 'C'; break;
case ELOG_VERBOSE: C = 'V'; break;
case ELOG_ABORT: C = 'A'; break;
default: return;
}
struct tm t;
struct timeval tv;
if (gettimeofday(&tv, NULL)) return;
if (!localtime_r(&tv.tv_sec, &t)) return;
p = buf;
bytes = sizeof(buf);
n = snprintf(p, bytes, "%c[%02d/%02d %02d:%02d:%02d.%06ld][%06ld]: ==",
C,
t.tm_mon + 1, t.tm_mday,
t.tm_hour, t.tm_min, t.tm_sec,
tv.tv_usec,
elog_thread_id);
p += n; bytes -= n;
va_list arg;
va_start(arg, fmt);
if (bytes>0) {
n = vsnprintf(p, bytes, fmt, arg);
p += n; bytes -= n;
}
va_end(arg);
if (bytes>0) {
n = snprintf(p, bytes, "== t:%s#%s[%d]#%s()",
elog_thread_name, fn, line, func);
}
dprintf(fd, "%s\n", buf);
if (level == ELOG_ABORT) {
abort();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册