diff --git a/src/plugins/http/inc/ehttp_gzip.h b/src/plugins/http/inc/ehttp_gzip.h new file mode 100644 index 0000000000000000000000000000000000000000..b2d6ace9b00d102128b41749b3070d5c45fb3a39 --- /dev/null +++ b/src/plugins/http/inc/ehttp_gzip.h @@ -0,0 +1,30 @@ +#ifndef _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_ +#define _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_ + +#include + +#define EHTTP_GZIP_CHUNK_SIZE_DEFAULT (1024*16) + +typedef struct ehttp_gzip_s ehttp_gzip_t; + +typedef struct ehttp_gzip_callbacks_s ehttp_gzip_callbacks_t; +typedef struct ehttp_gzip_conf_s ehttp_gzip_conf_t; + +struct ehttp_gzip_callbacks_s { + void (*on_data)(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len); +}; + +struct ehttp_gzip_conf_s { + int get_header:2; // 0: not fetching header info + size_t chunk_size; // 0: fallback to default: EHTTP_GZIP_CHUNK_SIZE_DEFAULT +}; + +ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); +ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); +void ehttp_gzip_destroy(ehttp_gzip_t *gzip); + +int ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, size_t len); +int ehttp_gzip_finish(ehttp_gzip_t *gzip); + +#endif // _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_ + diff --git a/src/plugins/http/inc/ehttp_parser.h b/src/plugins/http/inc/ehttp_parser.h new file mode 100644 index 0000000000000000000000000000000000000000..be876501286d6e11d3304f67a480ff33674f3d53 --- /dev/null +++ b/src/plugins/http/inc/ehttp_parser.h @@ -0,0 +1,36 @@ +#ifndef _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e +#define _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e + +#include + +typedef struct ehttp_parser_s ehttp_parser_t; +typedef struct ehttp_parser_callbacks_s ehttp_parser_callbacks_t; +typedef struct ehttp_parser_conf_s ehttp_parser_conf_t; +typedef struct ehttp_status_code_s ehttp_status_code_t; + +struct ehttp_parser_callbacks_s { + void (*on_request_line)(void *arg, const char *method, const char *target, const char *version, const char *target_raw); + void (*on_status_line)(void *arg, const char *version, int status_code, const char *reason_phrase); + void (*on_header_field)(void *arg, const char *key, const char *val); + void (*on_body)(void *arg, const char *chunk, size_t len); + void (*on_end)(void *arg); + void (*on_error)(void *arg, int status_code); +}; + +struct ehttp_parser_conf_s { + size_t flush_block_size; // <=0: immediately +}; + +ehttp_parser_t* ehttp_parser_create(ehttp_parser_callbacks_t callbacks, ehttp_parser_conf_t conf, void *arg); +void ehttp_parser_destroy(ehttp_parser_t *parser); +int ehttp_parser_parse(ehttp_parser_t *parser, const char *buf, size_t len); +int ehttp_parser_parse_string(ehttp_parser_t *parser, const char *str); +int ehttp_parser_parse_char(ehttp_parser_t *parser, const char c); +int ehttp_parser_parse_end(ehttp_parser_t *parser); + +char* ehttp_parser_urldecode(const char *enc); + +const char* ehttp_status_code_get_desc(const int status_code); + +#endif // _ehttp_parser_fc7f9ac9_52da_4ee3_b556_deb2e1c3866e + diff --git a/src/plugins/http/inc/ehttp_util_string.h b/src/plugins/http/inc/ehttp_util_string.h new file mode 100644 index 0000000000000000000000000000000000000000..46c5a428277c75834cd0b14147be47e6e1ff9629 --- /dev/null +++ b/src/plugins/http/inc/ehttp_util_string.h @@ -0,0 +1,18 @@ +#ifndef _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_ +#define _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_ + +#include + +typedef struct ehttp_util_string_s ehttp_util_string_t; + +struct ehttp_util_string_s { + char *str; + size_t len; +}; + +void ehttp_util_string_cleanup(ehttp_util_string_t *str); +int ehttp_util_string_append(ehttp_util_string_t *str, const char *s, size_t len); +void ehttp_util_string_clear(ehttp_util_string_t *str); + +#endif // _ehttp_util_string_h_99dacde5_2e7d_4662_97d6_04611fde683b_ + diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h index a2d50d6b7fdfba3bc80c918abd7999247636b6ee..594900d0cf8cf9fcbdbe7f9183e10e89a84e9830 100644 --- a/src/plugins/http/inc/httpContext.h +++ b/src/plugins/http/inc/httpContext.h @@ -31,4 +31,7 @@ void httpCloseContextByApp(HttpContext *pContext); void httpNotifyContextClose(HttpContext *pContext); bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); +void ehttpIncContextRef(HttpContext *pContext); +void ehttpDecContextRef(HttpContext **ppContext); + #endif diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index ffd621be7acf223bc002c7b49d87bad595211d2b..40f980f101a492b3ec4c6468c86c562fd3a2417f 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -28,6 +28,8 @@ #include "httpLog.h" #include "httpJson.h" +#include "ehttp_parser.h" + #define HTTP_MAX_CMD_SIZE 1024 #define HTTP_MAX_BUFFER_SIZE 1024*1024 @@ -162,6 +164,11 @@ typedef struct { int32_t len; } HttpBuf; +typedef enum { + EHTTP_CONTEXT_PROCESS_FAILED = 0x01, + EHTTP_CONTEXT_PARSER_FAILED = 0x02 +} EHTTP_CONTEXT_FAILED_CAUSE; + typedef struct { char buffer[HTTP_BUFFER_SIZE]; int bufsize; @@ -172,6 +179,10 @@ typedef struct { HttpBuf data; // body content HttpBuf token; // auth token HttpDecodeMethod *pMethod; + + ehttp_parser_t *parser; + int inited:2; + int failed:4; } HttpParser; typedef struct HttpContext { @@ -201,6 +212,8 @@ typedef struct HttpContext { void * timer; HttpEncodeMethod * encodeMethod; struct HttpThread *pThread; + + int closed:2; } HttpContext; typedef struct HttpThread { @@ -231,6 +244,8 @@ typedef struct HttpServer { pthread_mutex_t serverMutex; HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; bool (*processData)(HttpContext *pContext); + + int fallback:2; } HttpServer; extern const char *httpKeepAliveStr[]; diff --git a/src/plugins/http/src/ehttp_gzip.c b/src/plugins/http/src/ehttp_gzip.c new file mode 100644 index 0000000000000000000000000000000000000000..ded344dfea8b987035dc87bdb31e5b0ced64bcae --- /dev/null +++ b/src/plugins/http/src/ehttp_gzip.c @@ -0,0 +1,154 @@ +#include "ehttp_gzip.h" + +#include "os.h" +#include "zlib.h" + +#include + +typedef enum { + EHTTP_GZIP_INITING, + EHTTP_GZIP_READY, + EHTTP_GZIP_CLOSED, +} EHTTP_GZIP_STATE; + +struct ehttp_gzip_s { + ehttp_gzip_conf_t conf; + ehttp_gzip_callbacks_t callbacks; + void *arg; + z_stream *gzip; + gz_header *header; + char *chunk; + + int state; +}; + +static void dummy_on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len) { +} + +static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) { + switch(gzip->state) { + case EHTTP_GZIP_READY: { + inflateEnd(gzip->gzip); + } break; + default: break; + } + if (gzip->gzip) { + free(gzip->gzip); + gzip->gzip = NULL; + } + if (gzip->header) { + free(gzip->header); + gzip->header = NULL; + } + if (gzip->chunk) { + free(gzip->chunk); + gzip->chunk = NULL; + } + gzip->state = EHTTP_GZIP_CLOSED; +} + +ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg) { + ehttp_gzip_t *gzip = (ehttp_gzip_t*)calloc(1, sizeof(*gzip)); + if (!gzip) return NULL; + + do { + gzip->conf = conf; + gzip->callbacks = callbacks; + gzip->arg = arg; + if (gzip->callbacks.on_data == NULL) gzip->callbacks.on_data = dummy_on_data; + gzip->gzip = (z_stream*)calloc(1, sizeof(*gzip->gzip)); + if (gzip->conf.get_header) { + gzip->header = (gz_header*)calloc(1, sizeof(*gzip->header)); + } + if (gzip->conf.chunk_size<=0) gzip->conf.chunk_size = EHTTP_GZIP_CHUNK_SIZE_DEFAULT; + gzip->chunk = (char*)malloc(gzip->conf.chunk_size); + if (!gzip->gzip || (gzip->conf.get_header && !gzip->header) || !gzip->chunk) break; + gzip->gzip->zalloc = Z_NULL; + gzip->gzip->zfree = Z_NULL; + gzip->gzip->opaque = Z_NULL; + + // 863 windowBits can also be greater than 15 for optional gzip decoding. Add + // 864 32 to windowBits to enable zlib and gzip decoding with automatic header + // 865 detection, or add 16 to decode only the gzip format (the zlib format will + // 866 return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a + // 867 CRC-32 instead of an Adler-32. Unlike the gunzip utility and gzread() (see + // 868 below), inflate() will not automatically decode concatenated gzip streams. + // 869 inflate() will return Z_STREAM_END at the end of the gzip stream. The state + // 870 would need to be reset to continue decoding a subsequent gzip stream. + int ret = inflateInit2(gzip->gzip, 32); // 32/16? 32/16 + MAX_WBITS + if (ret != Z_OK) break; + if (gzip->header) { + ret = inflateGetHeader(gzip->gzip, gzip->header); + } + if (ret != Z_OK) break; + + gzip->gzip->next_out = (z_const Bytef*)gzip->chunk; + gzip->gzip->avail_out = gzip->conf.chunk_size; + gzip->state = EHTTP_GZIP_READY; + return gzip; + } while (0); + + ehttp_gzip_destroy(gzip); + return NULL; +} + +ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg); + +void ehttp_gzip_destroy(ehttp_gzip_t *gzip) { + ehttp_gzip_cleanup(gzip); + + free(gzip); +} + +int ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, size_t len) { + if (gzip->state != EHTTP_GZIP_READY) return -1; + if (len <= 0) return 0; + + gzip->gzip->next_in = (z_const Bytef*)buf; + gzip->gzip->avail_in = len; + + while (gzip->gzip->avail_in) { + int ret; + if (gzip->header) { + ret = inflate(gzip->gzip, Z_BLOCK); + } else { + ret = inflate(gzip->gzip, Z_SYNC_FLUSH); + } + if (ret != Z_OK && ret != Z_STREAM_END) return -1; + + if (gzip->gzip->avail_out>0) { + if (ret!=Z_STREAM_END) continue; + } + + size_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; + + gzip->gzip->next_out[0] = '\0'; + gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); + gzip->gzip->next_out = (z_const Bytef*)gzip->chunk; + gzip->gzip->avail_out = gzip->conf.chunk_size; + } + + return 0; +} + +int ehttp_gzip_finish(ehttp_gzip_t *gzip) { + if (gzip->state != EHTTP_GZIP_READY) return -1; + + gzip->gzip->next_in = NULL; + gzip->gzip->avail_in = 0; + + int ret; + ret = inflate(gzip->gzip, Z_FINISH); + + if (ret != Z_STREAM_END) return -1; + + size_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; + + gzip->gzip->next_out[0] = '\0'; + gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); + gzip->gzip->next_out = NULL; + gzip->gzip->avail_out = 0; + + return 0; +} + diff --git a/src/plugins/http/src/ehttp_parser.c b/src/plugins/http/src/ehttp_parser.c new file mode 100644 index 0000000000000000000000000000000000000000..30d37f8a0d20f10947e5fc0e1726c443831a6506 --- /dev/null +++ b/src/plugins/http/src/ehttp_parser.c @@ -0,0 +1,967 @@ +#include "ehttp_parser.h" + +#include "ehttp_gzip.h" +#include "ehttp_util_string.h" +#include "elog.h" + +#include +#include +#include + +struct ehttp_status_code_s { + int status_code; + const char *status_desc; +}; + +static ehttp_status_code_t status_codes[] = { + {100, "Continue"}, + {101, "Switching Protocol"}, + {102, "Processing (WebDAV)"}, + {103, "Early Hints"}, + {200, "OK"}, + {201, "Created"}, + {202, "Accepted"}, + {203, "Non-Authoritative Information"}, + {204, "No Content"}, + {205, "Reset Content"}, + {206, "Partial Content"}, + {207, "Multi-Status (WebDAV)"}, + {208, "Already Reported (WebDAV)"}, + {226, "IM Used (HTTP Delta encoding)"}, + {300, "Multiple Choice"}, + {301, "Moved Permanently"}, + {302, "Found"}, + {303, "See Other"}, + {304, "Not Modified"}, + {305, "Use Proxy"}, + {306, "unused"}, + {307, "Temporary Redirect"}, + {308, "Permanent Redirect"}, + {400, "Bad Request"}, + {401, "Unauthorized"}, + {402, "Payment Required"}, + {403, "Forbidden"}, + {404, "Not Found"}, + {405, "Method Not Allowed"}, + {406, "Not Acceptable"}, + {407, "Proxy Authentication Required"}, + {408, "Request Timeout"}, + {409, "Conflict"}, + {410, "Gone"}, + {411, "Length Required"}, + {412, "Precondition Failed"}, + {413, "Payload Too Large"}, + {414, "URI Too Long"}, + {415, "Unsupported Media Type"}, + {416, "Range Not Satisfiable"}, + {417, "Expectation Failed"}, + {418, "I'm a teapot"}, + {421, "Misdirected Request"}, + {422, "Unprocessable Entity (WebDAV)"}, + {423, "Locked (WebDAV)"}, + {424, "Failed Dependency (WebDAV)"}, + {425, "Too Early"}, + {426, "Upgrade Required"}, + {428, "Precondition Required"}, + {429, "Too Many Requests"}, + {431, "Request Header Fields Too Large"}, + {451, "Unavailable For Legal Reasons"}, + {500, "Internal Server Error"}, + {501, "Not Implemented"}, + {502, "Bad Gateway"}, + {503, "Service Unavailable"}, + {504, "Gateway Timeout"}, + {505, "HTTP Version Not Supported"}, + {506, "Variant Also Negotiates"}, + {507, "Insufficient Storage"}, + {508, "Loop Detected (WebDAV)"}, + {510, "Not Extended"}, + {511, "Network Authentication Required"}, + {0, NULL} +}; + +const char* ehttp_status_code_get_desc(const int status_code) { + ehttp_status_code_t *p = status_codes; + while (p->status_code!=0) { + if (p->status_code==status_code) return p->status_desc; + ++p; + } + return "Unknow status code"; +} + +typedef enum HTTP_PARSER_STATE { + HTTP_PARSER_BEGIN, + HTTP_PARSER_REQUEST_OR_RESPONSE, + HTTP_PARSER_METHOD, + HTTP_PARSER_TARGET, + HTTP_PARSER_HTTP_VERSION, + HTTP_PARSER_SP, + HTTP_PARSER_STATUS_CODE, + HTTP_PARSER_REASON_PHRASE, + HTTP_PARSER_CRLF, + HTTP_PARSER_HEADER, + HTTP_PARSER_HEADER_KEY, + HTTP_PARSER_HEADER_VAL, + HTTP_PARSER_CHUNK_SIZE, + HTTP_PARSER_CHUNK, + HTTP_PARSER_END, + HTTP_PARSER_ERROR, +} HTTP_PARSER_STATE; + +typedef struct ehttp_parser_kv_s ehttp_parser_kv_t; + +struct ehttp_parser_kv_s { + char *key; + char *val; +}; + +struct ehttp_parser_s { + ehttp_parser_callbacks_t callbacks; + void *arg; + ehttp_parser_conf_t conf; + + char *method; + char *target; + char *target_raw; + char *version; + + int http_10:2; + int http_11:2; + int accept_encoding_gzip:2; + int accept_encoding_chunked:2; + int transfer_gzip:2; + int transfer_chunked:2; + int content_length_specified:2; + int content_chunked:2; + + + int status_code; + char *reason_phrase; + + char *key; + char *val; + ehttp_parser_kv_t *kvs; + size_t kvs_count; + + char *auth_basic; + + size_t content_length; + + size_t chunk_size; + size_t received_chunk_size; + size_t received_size; + + ehttp_gzip_t *gzip; + ehttp_util_string_t str; + HTTP_PARSER_STATE *stacks; + size_t stacks_count; +}; + +static void dummy_on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) { +} + +static void dummy_on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) { +} + +static void dummy_on_header_field(void *arg, const char *key, const char *val) { +} + +static void dummy_on_body(void *arg, const char *chunk, size_t len) { +} + +static void dummy_on_end(void *arg) { +} + +static void dummy_on_error(void *arg, int status_code) { +} + + +static HTTP_PARSER_STATE ehttp_parser_top(ehttp_parser_t *parser) { + EQ_ASSERT(parser->stacks_count >= 1); + EQ_ASSERT(parser->stacks); + + return parser->stacks[parser->stacks_count-1]; +} + +static int ehttp_parser_push(ehttp_parser_t *parser, HTTP_PARSER_STATE state) { + size_t n = parser->stacks_count + 1; + // HTTP_PARSER_STATE *stacks = (HTTP_PARSER_STATE*)reallocarray(parser->stacks, n, sizeof(*stacks)); + HTTP_PARSER_STATE *stacks = (HTTP_PARSER_STATE*)realloc(parser->stacks, n * sizeof(*stacks)); + if (!stacks) return -1; + + parser->stacks_count = n; + parser->stacks = stacks; + parser->stacks[n-1] = state; + + return 0; +} + +static int ehttp_parser_pop(ehttp_parser_t *parser) { + if (parser->stacks_count <= 0) return -1; + --parser->stacks_count; + + return 0; +} + +ehttp_parser_t *ehttp_parser_create(ehttp_parser_callbacks_t callbacks, ehttp_parser_conf_t conf, void *arg) { + ehttp_parser_t *parser = (ehttp_parser_t*)calloc(1, sizeof(*parser)); + if (!parser) return NULL; + + parser->callbacks = callbacks; + parser->arg = arg; + parser->conf = conf; + + if (parser->callbacks.on_request_line == NULL) { + parser->callbacks.on_request_line = dummy_on_request_line; + } + if (parser->callbacks.on_status_line == NULL) { + parser->callbacks.on_status_line = dummy_on_status_line; + } + if (parser->callbacks.on_header_field == NULL) { + parser->callbacks.on_header_field = dummy_on_header_field; + } + if (parser->callbacks.on_body == NULL) { + parser->callbacks.on_body = dummy_on_body; + } + if (parser->callbacks.on_end == NULL) { + parser->callbacks.on_end = dummy_on_end; + } + if (parser->callbacks.on_error == NULL) { + parser->callbacks.on_error = dummy_on_error; + } + + ehttp_parser_push(parser, HTTP_PARSER_BEGIN); + + return parser; +} + +static void ehttp_parser_kvs_destroy(ehttp_parser_t *parser) { + if (!parser->kvs) return; + + for (size_t i=0; ikvs_count; ++i) { + ehttp_parser_kv_t *p = &parser->kvs[i]; + free(p->key); p->key = NULL; + free(p->val); p->val = NULL; + } + free(parser->kvs); + parser->kvs = NULL; + parser->kvs_count = 0; + + free(parser->auth_basic); + parser->auth_basic = NULL; +} + +void ehttp_parser_destroy(ehttp_parser_t *parser) { + if (!parser) return; + + free(parser->method); parser->method = NULL; + free(parser->target); parser->target = NULL; + free(parser->target_raw); parser->target_raw = NULL; + free(parser->version); parser->version = NULL; + free(parser->reason_phrase); parser->reason_phrase = NULL; + free(parser->key); parser->key = NULL; + free(parser->val); parser->val = NULL; + free(parser->auth_basic); parser->auth_basic = NULL; + free(parser->stacks); parser->stacks = NULL; + + parser->stacks_count = 0; + + ehttp_parser_kvs_destroy(parser); + + ehttp_util_string_cleanup(&parser->str); + if (parser->gzip) { + ehttp_gzip_destroy(parser->gzip); + parser->gzip = NULL; + } + + free(parser); +} + +#define is_token(c) (strchr("!#$%&'*+-.^_`|~", c) || isdigit(c) || isalpha(c)) + +char *ehttp_parser_urldecode(const char *enc) { + int ok = 1; + ehttp_util_string_t str = {0}; + while (*enc) { + char *p = strchr(enc, '%'); + if (!p) break; + int hex, cnt; + int n = sscanf(p+1, "%2x%n", &hex, &cnt); + if (n!=1 && cnt !=2) { ok = 0; break; } + if (ehttp_util_string_append(&str, enc, p-enc)) { ok = 0; break; } + char c = (char)hex; + if (ehttp_util_string_append(&str, &c, 1)) { ok = 0; break; } + enc = p+3; + } + char *dec = NULL; + if (ok && *enc) { + if (ehttp_util_string_append(&str, enc, strlen(enc))) { ok = 0; } + } + if (ok) { + dec = str.str; + str.str = NULL; + } + ehttp_util_string_cleanup(&str); + return dec; +} + +static void on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, size_t len) { + ehttp_parser_t *parser = (ehttp_parser_t*)arg; + parser->callbacks.on_body(parser->arg, buf, len); +} + +static int ehttp_parser_check_field(ehttp_parser_t *parser, const char *key, const char *val) { + int ok = 0; + do { + if (0==strcasecmp(key, "Content-Length")) { + size_t len = 0; + int bytes = 0; + int n = sscanf(val, "%ld%n", &len, &bytes); + if (n==1 && bytes==strlen(val)) { + parser->content_length = len; + parser->chunk_size = len; + parser->content_length_specified = 1; + break; + } + ok = -1; + break; + } + if (0==strcasecmp(key, "Accept-Encoding")) { + if (strstr(val, "gzip")) { + parser->accept_encoding_gzip = 1; + } + if (strstr(val, "chunked")) { + parser->accept_encoding_chunked = 1; + } + break; + } + if (0==strcasecmp(key, "Content-Encoding")) { + if (0==strcmp(val, "gzip")) { + parser->content_chunked = 1; + } + break; + } + if (0==strcasecmp(key, "Transfer-Encoding")) { + if (strstr(val, "gzip")) { + parser->transfer_gzip = 1; + ehttp_gzip_conf_t conf = {0}; + ehttp_gzip_callbacks_t callbacks = {0}; + + callbacks.on_data = on_data; + + parser->gzip = ehttp_gzip_create_decompressor(conf, callbacks, parser); + + if (!parser->gzip) { + E("failed to create gzip decompressor"); + ok = -1; + break; + } + } + if (strstr(val, "chunked")) { + parser->transfer_chunked = 1; + } + break; + } + if (0==strcasecmp(key, "Authorization")) { + char *t = NULL; + char *s = NULL; + int bytes = 0; + int n = sscanf(val, "%ms %ms%n", &t, &s, &bytes); + if (n==2 && t && s && bytes==strlen(val) && strcmp(t, "Basic")) { + free(parser->auth_basic); + parser->auth_basic = s; s = NULL; + } else { + ok = -1; + } + free(t); free(s); + break; + } + } while (0); + return ok; +} + +static int ehttp_parser_kvs_append_kv(ehttp_parser_t *parser, const char *key, const char *val) { + // ehttp_parser_kv_t *kvs = (ehttp_parser_kv_t*)reallocarray(parser->kvs, parser->kvs_count + 1, sizeof(*kvs)); + ehttp_parser_kv_t *kvs = (ehttp_parser_kv_t*)realloc(parser->kvs, (parser->kvs_count + 1) * sizeof(*kvs)); + if (!kvs) return -1; + + parser->kvs = kvs; + + kvs[parser->kvs_count].key = strdup(key); + kvs[parser->kvs_count].val = strdup(val); + + if (kvs[parser->kvs_count].key && kvs[parser->kvs_count].val) { + ++parser->kvs_count; + return 0; + } + + free(kvs[parser->kvs_count].key); + kvs[parser->kvs_count].key = NULL; + free(kvs[parser->kvs_count].val); + kvs[parser->kvs_count].val = NULL; + + return -1; +} + +static int on_begin(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c=='G' || c=='P' || c=='H' || c=='D' || c=='C' || c=='O' || c=='T') { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_REQUEST_OR_RESPONSE); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_request_or_response(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (parser->str.len==1) { + if (c=='T' && parser->str.str[0]=='H') { + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_END); + ehttp_parser_push(parser, HTTP_PARSER_HEADER); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_REASON_PHRASE); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_STATUS_CODE); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_HTTP_VERSION); + *again = 1; + break; + } + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_END); + ehttp_parser_push(parser, HTTP_PARSER_HEADER); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_HTTP_VERSION); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_TARGET); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_METHOD); + *again = 1; + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_method(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + parser->method = strdup(parser->str.str); + if (!parser->method) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + } while (0); + return ok; +} + +static int on_target(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (!isspace(c) && c!='\r' && c!='\n') { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + parser->target_raw = strdup(parser->str.str); + parser->target = ehttp_parser_urldecode(parser->str.str); + if (!parser->target_raw || !parser->target) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + } while (0); + return ok; +} + +static int on_version(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + const char *prefix = "HTTP/1."; + int len = strlen(prefix); + if (parser->str.len < len) { + if (prefix[parser->str.len]!=c) { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + + if (c!='0' && c!='1') { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + if (c=='0') parser->http_10 = 1; + if (c=='1') parser->http_11 = 1; + + parser->version = strdup(parser->str.str); + if (!parser->version) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + + if (parser->method) { + parser->callbacks.on_request_line(parser->arg, parser->method, parser->target, parser->version, parser->target_raw); + } + + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + } while (0); + return ok; +} + +static int on_sp(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c==' ') { + ehttp_parser_pop(parser); + break; + } + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + } while (0); + return ok; +} + +static int on_status_code(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (isdigit(c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + if (parser->str.len < 3) break; + + sscanf(parser->str.str, "%d", &parser->status_code); + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_reason_phrase(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c=='\r') { + parser->reason_phrase = strdup(parser->str.str); + if (!parser->reason_phrase) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + parser->callbacks.on_status_line(parser->arg, parser->version, parser->status_code, parser->reason_phrase); + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + } while (0); + return ok; +} + +static int post_process(ehttp_parser_t *parser) { + if (parser->gzip) { + if (ehttp_gzip_finish(parser->gzip)) { + E("gzip failed"); + parser->callbacks.on_error(parser->arg, 507); + return -1; + } + } + parser->callbacks.on_end(parser->arg); + return 0; +} + +static int on_crlf(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + const char *s = "\r\n"; + int len = strlen(s); + if (s[parser->str.len]!=c) { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + break; + } + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + if (parser->str.len == len) { + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + if (ehttp_parser_top(parser) == HTTP_PARSER_END) { + ok = post_process(parser); + } + } + break; + } while (0); + return ok; +} + +static int on_header(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c=='\r') { + ehttp_parser_pop(parser); + if (parser->transfer_chunked) { + ehttp_parser_push(parser, HTTP_PARSER_CHUNK_SIZE); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + } else { + if (parser->content_length > 0) { + ehttp_parser_push(parser, HTTP_PARSER_CHUNK); + } + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + } + *again = 1; + break; + } + if (c!=' ' && c!='\t' && c!=':' ) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_HEADER_VAL); + ehttp_parser_push(parser, HTTP_PARSER_SP); + ehttp_parser_push(parser, HTTP_PARSER_HEADER_KEY); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_header_key(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (isalnum(c) || strchr("!#$%&'*+-.^_`|~", c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + if (c==':') { + parser->key = strdup(parser->str.str); + if (!parser->key) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + break; + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_header_val(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (c != '\r' && c != '\n' && (!isspace(c) || parser->str.len>0)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + const char *val = parser->str.str; + ok = ehttp_parser_check_field(parser, parser->key, val); + if (ehttp_parser_kvs_append_kv(parser, parser->key, val)) { + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + } else { + parser->callbacks.on_header_field(parser->arg, parser->key, val); + } + free(parser->key); parser->key = NULL; + val = NULL; + if (ok==-1) break; + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + *again = 1; + } while (0); + return ok; +} + +static int on_chunk_size(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + int bytes; + size_t len; + int n; + do { + if (isxdigit(c)) { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + break; + } + if (c=='\r') { + n = sscanf(parser->str.str, "%lx%n", &len, &bytes); + if (n==1 && bytes==strlen(parser->str.str) && len>=0) { + if (len==0) { + if (parser->content_length_specified == 0 || parser->received_size == parser->content_length) { + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + *again = 1; + break; + } + } else { + if (parser->content_length_specified == 0 || parser->received_size + len <= parser->content_length) { + parser->chunk_size = len; + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + ehttp_parser_push(parser, HTTP_PARSER_CHUNK_SIZE); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + ehttp_parser_push(parser, HTTP_PARSER_CHUNK); + ehttp_parser_push(parser, HTTP_PARSER_CRLF); + *again = 1; + break; + } + } + } + } + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 400); + } while (0); + return ok; +} + +static int on_chunk(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + if (ehttp_util_string_append(&parser->str, &c, 1)) { + E("parser state: %d, char: [%c]%02x, oom", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + ++parser->received_size; + ++parser->received_chunk_size; + if (parser->received_chunk_size < parser->chunk_size) break; + + if (parser->gzip) { + if (ehttp_gzip_write(parser->gzip, parser->str.str, parser->str.len)) { + E("gzip failed"); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + break; + } + } else { + parser->callbacks.on_body(parser->arg, parser->str.str, parser->str.len); + } + parser->received_chunk_size = 0; + ehttp_util_string_clear(&parser->str); + ehttp_parser_pop(parser); + if (ehttp_parser_top(parser) == HTTP_PARSER_END) { + ok = post_process(parser); + } + } while (0); + return ok; +} + +static int on_end(ehttp_parser_t *parser, HTTP_PARSER_STATE state, const char c, int *again) { + int ok = 0; + do { + E("parser state: %d, unexpected char: [%c]%02x", state, c, c); + ok = -1; + parser->callbacks.on_error(parser->arg, 507); + } while (0); + return ok; +} + +static int parse_char(ehttp_parser_t *parser, const char c, int *again) { + int ok = 0; + HTTP_PARSER_STATE state = ehttp_parser_top(parser); + do { + if (state == HTTP_PARSER_BEGIN) { + ok = on_begin(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_REQUEST_OR_RESPONSE) { + ok = on_request_or_response(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_METHOD) { + ok = on_method(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_TARGET) { + ok = on_target(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HTTP_VERSION) { + ok = on_version(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_SP) { + ok = on_sp(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_STATUS_CODE) { + ok = on_status_code(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_REASON_PHRASE) { + ok = on_reason_phrase(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_CRLF) { + ok = on_crlf(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HEADER) { + ok = on_header(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HEADER_KEY) { + ok = on_header_key(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_HEADER_VAL) { + ok = on_header_val(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_CHUNK_SIZE) { + ok = on_chunk_size(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_CHUNK) { + ok = on_chunk(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_END) { + ok = on_end(parser, state, c, again); + break; + } + if (state == HTTP_PARSER_ERROR) { + ok = -2; + break; + } + E("unknown parser state: %d", state); + ok = -1; + parser->callbacks.on_error(parser->arg, 500); + } while (0); + if (ok==-1) { + ehttp_parser_push(parser, HTTP_PARSER_ERROR); + } + if (ok==-2) ok = -1; + return ok; +} + +int ehttp_parser_parse_string(ehttp_parser_t *parser, const char *str) { + return ehttp_parser_parse(parser, str, str?strlen(str):0); +} + +int ehttp_parser_parse_char(ehttp_parser_t *parser, const char c) { + return ehttp_parser_parse(parser, &c, 1); +} + +int ehttp_parser_parse(ehttp_parser_t *parser, const char *buf, size_t len) { + const char *p = buf; + int ret = 0; + size_t i = 0; + while (i < len) { + int again = 0; + ret = parse_char(parser, *p, &again); + if (ret) break; + if (again) continue; + ++p; + ++i; + } + return ret; +} + diff --git a/src/plugins/http/src/ehttp_util_string.c b/src/plugins/http/src/ehttp_util_string.c new file mode 100644 index 0000000000000000000000000000000000000000..94ebaaafa6c472698543a8b07090f1e6c379cb38 --- /dev/null +++ b/src/plugins/http/src/ehttp_util_string.c @@ -0,0 +1,30 @@ +#include "ehttp_util_string.h" + +#include +#include + +void ehttp_util_string_cleanup(ehttp_util_string_t *str) { + free(str->str); + str->str = NULL; + str->len = 0; +} + +int ehttp_util_string_append(ehttp_util_string_t *str, const char *s, size_t len) { + // int n = str->str?strlen(str->str):0; + int n = str->len; + char *p = (char*)realloc(str->str, n + len + 1); + if (!p) return -1; + strncpy(p+n, s, len); + p[n+len] = '\0'; + str->str = p; + str->len = n+len; + return 0; +} + +void ehttp_util_string_clear(ehttp_util_string_t *str) { + if (str->str) { + str->str[0] = '\0'; + str->len = 0; + } +} + diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 5ef3c9a66efd35ce37eca96d52022f3a9cceeaa9..16d8e9189901daf9c74d8f3ddafcacb864a16538 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -27,17 +27,42 @@ #include "httpSql.h" #include "httpSession.h" +#include "httpContext.h" +#include "elog.h" + +// dirty tweak +extern bool httpGetHttpMethod(HttpContext* pContext); +extern bool httpParseURL(HttpContext* pContext); +extern bool httpParseHttpVersion(HttpContext* pContext); +extern bool httpGetDecodeMethod(HttpContext* pContext); +extern bool httpParseHead(HttpContext* pContext); + +static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw); +static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase); +static void on_header_field(void *arg, const char *key, const char *val); +static void on_body(void *arg, const char *chunk, size_t len); +static void on_end(void *arg); +static void on_error(void *arg, int status_code); + +static void httpDestroyContext(void *data); +static void httpMightDestroyContext(void *data); +static void ehttpReleaseContext(HttpContext *pContext); + static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; if (pContext->fd >= 0) { epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1); taosCloseSocket(fd); + if (!tsHttpServer.fallback) { + ehttpDecContextRef(&pContext); + } } } static void httpDestroyContext(void *data) { HttpContext *pContext = *(HttpContext **)data; + D("==context[%p] destroyed==", pContext); if (pContext->fd > 0) taosClose(pContext->fd); HttpThread *pThread = pContext->pThread; @@ -54,11 +79,18 @@ static void httpDestroyContext(void *data) { httpFreeJsonBuf(pContext); httpFreeMultiCmds(pContext); + if (!tsHttpServer.fallback) { + if (pContext->parser.parser) { + ehttp_parser_destroy(pContext->parser.parser); + pContext->parser.parser = NULL; + } + } + taosTFree(pContext); } bool httpInitContexts() { - tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpDestroyContext, "restc"); + tsHttpServer.contextCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, 2, true, httpMightDestroyContext, "restc"); if (tsHttpServer.contextCache == NULL) { httpError("failed to init context cache"); return false; @@ -103,16 +135,20 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *pContext = calloc(1, sizeof(HttpContext)); if (pContext == NULL) return NULL; + D("==context[%p] created==", pContext); + pContext->fd = fd; pContext->httpVersion = HTTP_VERSION_10; pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; + ehttpIncContextRef(pContext); uint64_t handleVal = (uint64_t)pContext; HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(int64_t), &pContext, sizeof(int64_t), 3000); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); + ehttpIncContextRef(pContext); // set the ref to 0 taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); @@ -122,10 +158,13 @@ HttpContext *httpCreateContext(int32_t fd) { HttpContext *httpGetContext(void *ptr) { uint64_t handleVal = (uint64_t)ptr; HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &handleVal, sizeof(HttpContext *)); + EQ_ASSERT(ppContext); + EQ_ASSERT(*ppContext); if (ppContext) { HttpContext *pContext = *ppContext; if (pContext) { + if (!tsHttpServer.fallback) return pContext; int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); httpDebug("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount); return pContext; @@ -135,6 +174,10 @@ HttpContext *httpGetContext(void *ptr) { } void httpReleaseContext(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + ehttpReleaseContext(pContext); + return; + } int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); if (refCount < 0) { httpError("context:%p, is already released, refCount:%d", pContext, refCount); @@ -169,12 +212,31 @@ bool httpInitContext(HttpContext *pContext) { memset(pParser, 0, sizeof(HttpParser)); pParser->pCur = pParser->pLast = pParser->buffer; + if (!tsHttpServer.fallback) { + ehttp_parser_callbacks_t callbacks = { + on_request_line, + on_status_line, + on_header_field, + on_body, + on_end, + on_error + }; + ehttp_parser_conf_t conf = { + .flush_block_size = 0 + }; + pParser->parser = ehttp_parser_create(callbacks, conf, pContext); + pParser->inited = 1; + } + httpDebug("context:%p, fd:%d, ip:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr, pContext->accessTimes, pContext->parsed); return true; } void httpCloseContextByApp(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + if (pContext->parsed == false) return; + } pContext->parsed = false; bool keepAlive = true; @@ -211,10 +273,14 @@ void httpCloseContextByApp(HttpContext *pContext) { pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); } - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); } void httpCloseContextByServer(HttpContext *pContext) { + if (!tsHttpServer.fallback) { + if (pContext->closed) return; + pContext->closed = 1; + } if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { httpDebug("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { @@ -229,5 +295,176 @@ void httpCloseContextByServer(HttpContext *pContext) { pContext->parsed = false; httpRemoveContextFromEpoll(pContext); - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); +} + + + + + +static void on_request_line(void *arg, const char *method, const char *target, const char *version, const char *target_raw) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + int n = snprintf(pParser->pLast, avail, + "%s %s %s\r\n", method, target_raw, version); + + char *last = pParser->pLast; + + do { + if (n>=avail) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), exceeding buffer size", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + pParser->bufsize += n; + + if (!httpGetHttpMethod(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http method failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + if (!httpParseURL(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http url failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + if (!httpParseHttpVersion(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), parse http version failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + if (!httpGetDecodeMethod(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_request_line(%s,%s,%s,%s), get decode method failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, method, target, version, target_raw); + break; + } + + last += n; + pParser->pLast = last; + return; + } while (0); + + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; } + +static void on_status_line(void *arg, const char *version, int status_code, const char *reason_phrase) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; +} + +static void on_header_field(void *arg, const char *key, const char *val) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + if (pParser->failed) return; + + D("==key:[%s], val:[%s]==", key, val); + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + int n = snprintf(pParser->pLast, avail, + "%s: %s\r\n", key, val); + + char *last = pParser->pLast; + + do { + if (n>=avail) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), exceeding buffer size", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val); + break; + } + pParser->bufsize += n; + pParser->pCur = pParser->pLast + n; + + if (!httpParseHead(pContext)) { + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, on_header_field(%s,%s), parse head failed", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, key, val); + break; + } + + last += n; + pParser->pLast = last; + return; + } while (0); + + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; +} + +static void on_body(void *arg, const char *chunk, size_t len) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + if (pParser->failed) return; + + if (pParser->data.pos == 0) { + pParser->data.pos = pParser->pLast; + pParser->data.len = 0; + } + + int avail = sizeof(pParser->buffer) - (pParser->pLast - pParser->buffer); + if (len+1>=avail) { + pParser->failed |= EHTTP_CONTEXT_PROCESS_FAILED; + return; + } + memcpy(pParser->pLast, chunk, len); + pParser->pLast += len; + pParser->data.len += len; +} + +static void on_end(void *arg) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + if (pParser->failed) return; + + if (pParser->data.pos == 0) pParser->data.pos = pParser->pLast; + + if (!pContext->parsed) { + pContext->parsed = true; + } +} + +static void on_error(void *arg, int status_code) { + HttpContext *pContext = (HttpContext*)arg; + HttpParser *pParser = &pContext->parser; + + pParser->failed |= EHTTP_CONTEXT_PARSER_FAILED; +} + +static void httpMightDestroyContext(void *data) { + HttpContext *pContext = *(HttpContext **)data; + if (!tsHttpServer.fallback) { + httpRemoveContextFromEpoll(pContext); + ehttpDecContextRef(&pContext); + return; + } + httpDestroyContext(data); +} + +static void ehttpReleaseContext(HttpContext *pContext) { + HttpContext **ppContext = pContext->ppContext; + + if (tsHttpServer.contextCache != NULL) { + taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); + } else { + httpDebug("context:%p, won't be destroyed for cache is already released", pContext); + // httpDestroyContext((void **)(&ppContext)); + } +} + +void ehttpIncContextRef(HttpContext *pContext) { + if (tsHttpServer.fallback) return; + atomic_add_fetch_32(&pContext->refCount, 1); +} + +void ehttpDecContextRef(HttpContext **ppContext) { + if (tsHttpServer.fallback) return; + HttpContext *pContext = *ppContext; + int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + if (refCount>0) return; + EQ_ASSERT(refCount==0); + httpDestroyContext(ppContext); +} + diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index f4aca91cba7fb330b3a74ba2206d568df8563cf3..6802d3624a23623033e262f548789c09287fc671 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -25,10 +25,14 @@ #include "httpResp.h" #include "httpUtil.h" +#include "elog.h" + #ifndef EPOLLWAKEUP #define EPOLLWAKEUP (1u << 29) #endif +static bool ehttpReadData(HttpContext *pContext); + static void httpStopThread(HttpThread* pThread) { pThread->stop = true; @@ -134,6 +138,8 @@ static bool httpDecompressData(HttpContext *pContext) { } static bool httpReadData(HttpContext *pContext) { + if (!tsHttpServer.fallback) return ehttpReadData(pContext); + if (!pContext->parsed) { httpInitContext(pContext); } @@ -188,6 +194,8 @@ static void httpProcessHttpData(void *param) { sigaddset(&set, SIGPIPE); pthread_sigmask(SIG_SETMASK, &set, NULL); + elog_set_thread_name("httpProcessHttpData"); + while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 @@ -207,10 +215,14 @@ static void httpProcessHttpData(void *param) { continue; } + ehttpIncContextRef(pContext); + if (events[i].events & EPOLLPRI) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -218,6 +230,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -225,6 +239,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -232,6 +248,8 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpCloseContextByServer(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -239,6 +257,7 @@ static void httpProcessHttpData(void *param) { httpDebug("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -247,11 +266,15 @@ static void httpProcessHttpData(void *param) { pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); httpNotifyContextClose(pContext); + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); } else { if (httpReadData(pContext)) { (*(pThread->processData))(pContext); atomic_fetch_add_32(&pServer->requestNum, 1); } + if (!tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); } } } @@ -332,7 +355,8 @@ static void *httpAcceptHttpConnection(void *arg) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, pContext->ipstr, pThread->label, strerror(errno)); taosClose(pContext->fd); - httpReleaseContext(pContext); + if (tsHttpServer.fallback) httpReleaseContext(pContext); + ehttpDecContextRef(&pContext); continue; } @@ -405,3 +429,80 @@ bool httpInitConnect() { pServer->serverPort, pServer->numOfThreads); return true; } + + + + +static bool ehttpReadData(HttpContext *pContext) { + HttpParser *pParser = &pContext->parser; + EQ_ASSERT(!pContext->parsed); + if (!pParser->parser) { + if (!pParser->inited) { + httpInitContext(pContext); + } + if (!pParser->parser) { + return false; + } + } + + pContext->accessTimes++; + pContext->lastAccessTime = taosGetTimestampSec(); + + char buf[HTTP_STEP_SIZE+1] = {0}; + int nread = (int)taosReadSocket(pContext->fd, buf, sizeof(buf)); + if (nread > 0) { + buf[nread] = '\0'; + if (strstr(buf, "GET ")==buf && !strchr(buf, '\r') && !strchr(buf, '\n')) { + D("==half of request line received:\n%s\n==", buf); + } + + if (ehttp_parser_parse(pParser->parser, buf, nread)) { + D("==parsing failed=="); + httpCloseContextByServer(pContext); + return false; + } + + if (pContext->parser.failed) { + D("==parsing failed: [0x%x]==", pContext->parser.failed); + httpNotifyContextClose(pContext); + return false; + } + if (pContext->parsed) { + // int ret = httpCheckReadCompleted(pContext); + // already done in ehttp_parser + int ret = HTTP_CHECK_BODY_SUCCESS; + if (ret == HTTP_CHECK_BODY_CONTINUE) { + //httpDebug("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); + return false; + } else if (ret == HTTP_CHECK_BODY_SUCCESS){ + httpDebug("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len); + if (httpDecompressData(pContext)) { + return true; + } else { + httpNotifyContextClose(pContext); + return false; + } + } else { + httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); + httpNotifyContextClose(pContext); + return false; + } + } + return pContext->parsed; + } else if (nread < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + httpDebug("context:%p, fd:%d, ip:%s, read from socket error:%d, wait another event", + pContext, pContext->fd, pContext->ipstr, errno); + return false; // later again + } else { + httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, close connect", + pContext, pContext->fd, pContext->ipstr, errno); + return false; + } + } else { + // eof + return false; + } +} + diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index e51c8dd4f773397862483d9284e765a51d49c923..dc1b9a93be6e1cfa998a8f71e0e16a510beffdbf 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -40,6 +40,12 @@ HttpServer tsHttpServer; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); int httpInitSystem() { + tsHttpServer.fallback = 0; + const char *v = getenv("FALLBACK"); + if (v) { + tsHttpServer.fallback = 1; + } + strcpy(tsHttpServer.label, "rest"); tsHttpServer.serverIp = 0; tsHttpServer.serverPort = tsHttpPort; diff --git a/src/util/inc/elog.h b/src/util/inc/elog.h new file mode 100644 index 0000000000000000000000000000000000000000..8e11c3e761daf108f1b252bdbf225a0ddab5614d --- /dev/null +++ b/src/util/inc/elog.h @@ -0,0 +1,71 @@ +#ifndef _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_ +#define _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_ + +#include + +typedef enum { + ELOG_DEBUG, + ELOG_INFO, + ELOG_WARN, + ELOG_ERROR, + ELOG_CRITICAL, + ELOG_VERBOSE, + ELOG_ABORT, +} ELOG_LEVEL; + +void elog_set_level(ELOG_LEVEL base); // only log those not less than base +void elog_set_thread_name(const char *name); + +void elog(ELOG_LEVEL level, int fd, const char *file, int line, const char *func, const char *fmt, ...) +#ifdef __GNUC__ + __attribute__((format(printf, 6, 7))) +#endif +; + +#define DLOG(fd, fmt, ...) elog(ELOG_DEBUG, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define ILOG(fd, fmt, ...) elog(ELOG_INFO, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define WLOG(fd, fmt, ...) elog(ELOG_WARN, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define ELOG(fd, fmt, ...) elog(ELOG_ERROR, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define CLOG(fd, fmt, ...) elog(ELOG_CRITICAL, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define VLOG(fd, fmt, ...) elog(ELOG_VERBOSE, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define ALOG(fd, fmt, ...) elog(ELOG_ABORT, fd, __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) + +#define D(fmt, ...) elog(ELOG_DEBUG, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define I(fmt, ...) elog(ELOG_INFO, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define W(fmt, ...) elog(ELOG_WARN, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define E(fmt, ...) elog(ELOG_ERROR, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define C(fmt, ...) elog(ELOG_CRITICAL, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define V(fmt, ...) elog(ELOG_VERBOSE, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) +#define A(fmt, ...) elog(ELOG_ABORT, fileno(stdout), __FILE__, __LINE__, __FUNCTION__, fmt, ##__VA_ARGS__) + + + +// NOTE: https://en.wikipedia.org/wiki/Fail-fast +// for the sake of simplicity, both implementation and usage, +// we'll follow `fail-fast` or `let-it-crash` philosophy. + +// assertion in both debug/release build +#define EQ_ABORT(fmt, ...) A("Assertion failure: "fmt, ##__VA_ARGS__) + +#define EQ_ASSERT(statement) do { \ + if (statement) break; \ + A("Assertion failure: %s", #statement); \ +} while (0) + +#define EQ_ASSERT_EXT(statement, fmt, ...) do { \ + if (statement) break; \ + A("Assertion failure: %s: "fmt, #statement, ##__VA_ARGS__); \ +} while (0) + +#define EQ_ASSERT_API0(statement) do { \ + if (statement) break; \ + A("Assertion failure: %s failed: [%d]%s", #statement, errno, strerror(errno)); \ +} while (0) + +#define EQ_ASSERT_API(api) do { \ + A("Assertion failure: %s failed: [%d]%s", #api, errno, strerror(errno)); \ +} while (0) + + +#endif // _elog_h_8897be44_dda8_45b6_9d37_8d8691cb05fb_ + diff --git a/src/util/src/elog.c b/src/util/src/elog.c new file mode 100644 index 0000000000000000000000000000000000000000..95b580962cc8d50b095bcd22edea06bfb345c20a --- /dev/null +++ b/src/util/src/elog.c @@ -0,0 +1,95 @@ +#include "elog.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#define gettid() syscall(__NR_gettid) + +static ELOG_LEVEL elog_level_base = ELOG_DEBUG; + +static __thread long elog_thread_id; +static __thread char elog_thread_name[24] = {0}; + +void elog_set_level(ELOG_LEVEL base) { + elog_level_base = base; +} + +void elog_set_thread_name(const char *name) { + elog_thread_id = gettid(); + snprintf(elog_thread_name, sizeof(elog_thread_name), "%s", name); +} + +void elog(ELOG_LEVEL level, int fd, const char *file, int line, const char *func, const char *fmt, ...) +{ + if (level < elog_level_base) return; + if (fd == -1) return; + + if (elog_thread_name[0]=='\0') { + elog_set_thread_name("unknown"); + } + + char *p; + int n; + size_t bytes; + + char buf[4096]; + snprintf(buf, sizeof(buf), "%s", file); + + char fn[1024]; + snprintf(fn, sizeof(fn), "%s", basename(buf)); + + char C; + switch (level) { + case ELOG_DEBUG: C = 'D'; break; + case ELOG_INFO: C = 'I'; break; + case ELOG_WARN: C = 'W'; break; + case ELOG_ERROR: C = 'E'; break; + case ELOG_CRITICAL: C = 'C'; break; + case ELOG_VERBOSE: C = 'V'; break; + case ELOG_ABORT: C = 'A'; break; + default: return; + } + + struct tm t; + struct timeval tv; + + if (gettimeofday(&tv, NULL)) return; + if (!localtime_r(&tv.tv_sec, &t)) return; + + p = buf; + bytes = sizeof(buf); + + n = snprintf(p, bytes, "%c[%02d/%02d %02d:%02d:%02d.%06ld][%06ld]: ==", + C, + t.tm_mon + 1, t.tm_mday, + t.tm_hour, t.tm_min, t.tm_sec, + tv.tv_usec, + elog_thread_id); + p += n; bytes -= n; + + va_list arg; + va_start(arg, fmt); + if (bytes>0) { + n = vsnprintf(p, bytes, fmt, arg); + p += n; bytes -= n; + } + va_end(arg); + + if (bytes>0) { + n = snprintf(p, bytes, "== t:%s#%s[%d]#%s()", + elog_thread_name, fn, line, func); + } + + dprintf(fd, "%s\n", buf); + + if (level == ELOG_ABORT) { + abort(); + } +} +