diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index 4990ae4ef0e640899f9c1afd9501631abdcc2022..ae56b61fa5469445a293eb26745ae82a99b6e588 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -129,6 +129,7 @@ extern int tsHttpCacheSessions; extern int tsHttpSessionExpire; extern int tsHttpMaxThreads; extern int tsHttpEnableCompress; +extern int tsTelegrafUseFieldNum; extern int tsAdminRowLimit; extern char tsMonitorDbName[]; diff --git a/src/modules/http/inc/httpHandle.h b/src/modules/http/inc/httpHandle.h index 1a60fccb9fc31dede3400d2d20d753b7aec89356..a260efde64045920fe3f716b3a80668925f7daf8 100644 --- a/src/modules/http/inc/httpHandle.h +++ b/src/modules/http/inc/httpHandle.h @@ -22,16 +22,18 @@ #include "tmempool.h" #include "tsdb.h" #include "tutil.h" +#include "zlib.h" #include "http.h" #include "httpJson.h" -#define HTTP_MAX_CMD_SIZE 1024*20 -#define HTTP_MAX_BUFFER_SIZE 1024*1024*10 +#define HTTP_MAX_CMD_SIZE 1024 +#define HTTP_MAX_BUFFER_SIZE 1024*1024 #define HTTP_LABEL_SIZE 8 #define HTTP_MAX_EVENTS 10 -#define HTTP_BUFFER_SIZE 1024*70 //70k +#define HTTP_BUFFER_SIZE 1024*65 //65k +#define HTTP_DECOMPRESS_BUF_SIZE 1024*64 #define HTTP_STEP_SIZE 1024 //http message get process step by step #define HTTP_MAX_URL 5 //http url stack size #define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size @@ -64,12 +66,15 @@ #define HTTP_CHECK_BODY_CONTINUE 0 #define HTTP_CHECK_BODY_SUCCESS 1 -#define HTTP_READ_RETRY_TIMES 3 -#define HTTP_READ_WAIT_TIME_MS 3 -#define HTTP_WRITE_RETRY_TIMES 400 +#define HTTP_READ_RETRY_TIMES 5 +#define HTTP_READ_WAIT_TIME_MS 5 +#define HTTP_WRITE_RETRY_TIMES 500 #define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_EXPIRED_TIME 60000 +#define HTTP_COMPRESS_IDENTITY 0 +#define HTTP_COMPRESS_GZIP 2 + struct HttpContext; struct HttpThread; @@ -147,7 +152,7 @@ typedef struct { } HttpBuf; typedef struct { - char buffer[HTTP_MAX_BUFFER_SIZE]; + char buffer[HTTP_BUFFER_SIZE]; int bufsize; char *pLast; char *pCur; @@ -167,7 +172,8 @@ typedef struct HttpContext { uint8_t httpChunked; uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately uint8_t fromMemPool; - uint8_t compress; + uint8_t acceptEncoding; + uint8_t contentEncoding; uint8_t usedByEpoll; uint8_t usedByApp; uint8_t reqType; @@ -177,6 +183,7 @@ typedef struct HttpContext { char pass[TSDB_PASSWORD_LEN]; void *taos; HttpSession *session; + z_stream gzipStream; HttpEncodeMethod *encodeMethod; HttpSqlCmd singleCmd; HttpSqlCmds *multiCmds; @@ -293,6 +300,11 @@ void httpTrimTableName(char *name); int httpShrinkTableName(HttpContext *pContext, int pos, char *name); char *httpGetCmdsString(HttpContext *pContext, int pos); +int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData); +int httpGzipCompressInit(HttpContext *pContext); +int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen, + char *outDestData, int32_t *outDestDataLen, bool isTheLast); + extern const char *httpKeepAliveStr[]; extern const char *httpVersionStr[]; diff --git a/src/modules/http/inc/httpJson.h b/src/modules/http/inc/httpJson.h index f30d480185e49cbd7ce15a3afbc27c82dfef0d48..3ac2669da92799f455faacd1f50a6322db33d3f8 100644 --- a/src/modules/http/inc/httpJson.h +++ b/src/modules/http/inc/httpJson.h @@ -17,14 +17,13 @@ #define TDENGINE_HTTP_JSON_H #include +#include -#define JSON_BUFFER_SIZE 4096 +#define JSON_BUFFER_SIZE 10240 struct HttpContext; enum { JsonNumber, JsonString, JsonBoolean, JsonArray, JsonObject, JsonNull }; -typedef enum { JsonCompress, JsonUnCompress } JsonCompressFlag; - extern char JsonItmTkn; extern char JsonObjStt; extern char JsonObjEnd; @@ -47,6 +46,7 @@ typedef struct { // http response int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz); +int httpWriteBufNoTrace(struct HttpContext* pContext, const char* buf, int sz); int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz); // builder callback @@ -55,11 +55,12 @@ typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle); // buffer void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext); void httpWriteJsonBufHead(JsonBuf* buf); -int httpWriteJsonBufBody(JsonBuf* buf); +int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast); void httpWriteJsonBufEnd(JsonBuf* buf); // value void httpJsonString(JsonBuf* buf, char* sVal, int len); +void httpJsonOriginString(JsonBuf* buf, char* sVal, int len); void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int maxLen); void httpJsonInt64(JsonBuf* buf, int64_t num); void httpJsonTimestamp(JsonBuf* buf, int64_t t); diff --git a/src/modules/http/src/gcJson.c b/src/modules/http/src/gcJson.c index f8e178c157d6f31d8f77e4c84e8e16bd315ad3e6..4a62d5e92919416e895a0d437ec3ec77f801bb7a 100644 --- a/src/modules/http/src/gcJson.c +++ b/src/modules/http/src/gcJson.c @@ -175,7 +175,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, for (int i = dataFields; i >= 0; i--) { httpJsonItemToken(jsonBuf); if (row[i] == NULL) { - httpJsonString(jsonBuf, "NULL", 4); + httpJsonOriginString(jsonBuf, "null", 4); continue; } diff --git a/src/modules/http/src/httpCode.c b/src/modules/http/src/httpCode.c index a1d66f32b4fc99d1991af4b3b5e68bcef36b3f8d..9188f421b159029251b90e7ead4b17d3c7683621 100644 --- a/src/modules/http/src/httpCode.c +++ b/src/modules/http/src/httpCode.c @@ -42,7 +42,7 @@ char* httpMsg[] = { "database name too long", "invalid telegraf json fromat", "metrics size is 0", - "metrics size can not more than 20K", // 26 + "metrics size can not more than 1K", // 26 "metric name not find", "metric name type should be string", "metric name length is 0", diff --git a/src/modules/http/src/httpHandle.c b/src/modules/http/src/httpHandle.c index 4787ad9a337b95edeabd7b294a71d691d252630d..ffaba24fab79b0ae7be3326f19312e85c33f6368 100644 --- a/src/modules/http/src/httpHandle.c +++ b/src/modules/http/src/httpHandle.c @@ -186,12 +186,22 @@ bool httpParseHead(HttpContext* pContext) { pParser->data.len = (int32_t)atoi(pParser->pLast + 16); httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, pParser->data.len); - } else if (tsHttpEnableCompress && strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) { - if (strstr(pParser->pLast + 17, "deflate") != NULL) { - pContext->compress = JsonCompress; + } else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) { + if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) { + pContext->acceptEncoding = HTTP_COMPRESS_GZIP; + httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); + } else { + pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; + httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr); + } + } else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) { + if (strstr(pParser->pLast + 18, "gzip") != NULL) { + pContext->contentEncoding = HTTP_COMPRESS_GZIP; + httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr); + } else { + pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; + httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr); } - httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:%s", pContext, pContext->fd, pContext->ipstr, - pContext->compress == JsonCompress ? "deflate" : "identity"); } else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) { if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) { pContext->httpKeepAlive = HTTP_KEEPALIVE_ENABLE; @@ -312,7 +322,7 @@ bool httpParseRequest(HttpContext* pContext) { return true; } - httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", + httpTrace("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize, pContext->parser.buffer); diff --git a/src/modules/http/src/httpJson.c b/src/modules/http/src/httpJson.c index 692d2753995926745f38e95985b9c7e4653d27bd..a61a5783c8d854afa4899d3c4752caff0394d2c7 100644 --- a/src/modules/http/src/httpJson.c +++ b/src/modules/http/src/httpJson.c @@ -72,71 +72,86 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { return writeLen; } -int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) { +int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) { int writeSz = httpWriteBufByFd(pContext, buf, sz); - if (writeSz != sz) { - httpError("context:%p, fd:%d, ip:%s, size:%d, write size:%d, failed to send response:\n%s", + httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:\n%s", pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); } else { - httpTrace("context:%p, fd:%d, ip:%s, size:%d, write size:%d, response:\n%s", + httpTrace("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); } return writeSz; } -int httpWriteJsonBufBody(JsonBuf* buf) { +int httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int sz) { + int writeSz = httpWriteBufByFd(pContext, buf, sz); + if (writeSz != sz) { + httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response", + pContext, pContext->fd, pContext->ipstr, sz, writeSz); + } + + return writeSz; +} + +int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { int remain = 0; + char sLen[24]; + uint64_t srcLen = (uint64_t) (buf->lst - buf->buf); + if (buf->pContext->fd <= 0) { - httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, - buf->pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); buf->pContext->fd = -1; } - if (buf->lst == buf->buf) { - httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); - return 0; // there is no data to dump. - } - - char sLen[24]; - uint64_t srcLen = (uint64_t)(buf->lst - buf->buf); - - /*HTTP servers often use compression to optimize transmission, for example - * with Content-Encoding: gzip or Content-Encoding: deflate. If both - * compression and chunked encoding are enabled, then the content stream is - * first compressed, then chunked; so the chunk encoding itself is not - * compressed, and the data in each chunk is not compressed individually. The - * remote endpoint then decodes the stream by concatenating the chunks and - * uncompressing the result.*/ - if (buf->pContext->compress == JsonUnCompress) { - int len = sprintf(sLen, "%lx\r\n", srcLen); - httpTrace("context:%p, fd:%d, ip:%s, write json body, chunk size:%lld", buf->pContext, buf->pContext->fd, - buf->pContext->ipstr, srcLen); - httpWriteBuf(buf->pContext, sLen, len); // dump chunk size - remain = httpWriteBuf(buf->pContext, buf->buf, (int)srcLen); - } else if (buf->pContext->compress == JsonCompress) { - // unsigned char compressBuf[JSON_BUFFER_SIZE] = { 0 }; - // uint64_t compressBufLen = sizeof(compressBuf); - // compress(compressBuf, &compressBufLen, (const unsigned char*)buf->buf, - // srcLen); - // int len = sprintf(sLen, "%lx\r\n", compressBufLen); - // - // httpTrace("context:%p, fd:%d, ip:%s, write json body, chunk size:%lld, - // compress:%ld", buf->pContext, buf->pContext->fd, buf->pContext->ipstr, - // srcLen, compressBufLen); - // httpWriteBuf(buf->pContext, sLen, len);//dump chunk size - // remain = httpWriteBuf(buf->pContext, (const char*)compressBuf, - // (int)compressBufLen); + /* + * HTTP servers often use compression to optimize transmission, for example + * with Content-Encoding: gzip or Content-Encoding: deflate. + * If both compression and chunked encoding are enabled, then the content stream is first compressed, then chunked; + * so the chunk encoding itself is not compressed, and the data in each chunk is not compressed individually. + * The remote endpoint then decodes the stream by concatenating the chunks and uncompressing the result. + */ + + if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) { + if (buf->lst == buf->buf) { + httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); + return 0; // there is no data to dump. + } else { + int len = sprintf(sLen, "%lx\r\n", srcLen); + httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%ld, response:\n%s", + buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, buf->buf); + httpWriteBufNoTrace(buf->pContext, sLen, len); + remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen); + } } else { + char compressBuf[JSON_BUFFER_SIZE] = {0}; + int32_t compressBufLen = JSON_BUFFER_SIZE; + int ret = httpGzipCompress(buf->pContext, buf->buf, srcLen, compressBuf, &compressBufLen, isTheLast); + if (ret == 0) { + if (compressBufLen > 0) { + int len = sprintf(sLen, "%x\r\n", compressBufLen); + httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%ld, compressSize:%d, last:%d, response:\n%s", + buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, compressBufLen, isTheLast, buf->buf); + httpWriteBufNoTrace(buf->pContext, sLen, len); + remain = httpWriteBufNoTrace(buf->pContext, (const char *) compressBuf, (int) compressBufLen); + } else { + httpTrace("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s", + buf->pContext, buf->pContext->fd, buf->pContext->ipstr, isTheLast, buf->buf); + return 0; // there is no data to dump. + } + } else { + httpError("context:%p, fd:%d, ip:%s, failed to compress data, chunkSize:%d, last:%d, error:%d, response:\n%s", + buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, isTheLast, ret, buf->buf); + return 0; + } } - httpWriteBuf(buf->pContext, "\r\n", 2); - buf->total += (int)(buf->lst - buf->buf); + httpWriteBufNoTrace(buf->pContext, "\r\n", 2); + buf->total += (int) (buf->lst - buf->buf); buf->lst = buf->buf; - memset(buf->buf, 0, (size_t)buf->size); - - return remain; // remain>0 is system error + memset(buf->buf, 0, (size_t) buf->size); + return remain; } void httpWriteJsonBufHead(JsonBuf* buf) { @@ -147,7 +162,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) { char msg[1024] = {0}; int len = -1; - if (buf->pContext->compress == JsonUnCompress) { + if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) { len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_UN_COMPRESS], httpVersionStr[buf->pContext->httpVersion], httpKeepAliveStr[buf->pContext->httpKeepAlive]); } else { @@ -164,8 +179,8 @@ void httpWriteJsonBufEnd(JsonBuf* buf) { buf->pContext->fd = -1; } - httpWriteJsonBufBody(buf); - httpWriteBuf(buf->pContext, "0\r\n\r\n", 5); // end of chunked resp + httpWriteJsonBufBody(buf, true); + httpWriteBufNoTrace(buf->pContext, "0\r\n\r\n", 5); // end of chunked resp } void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) { @@ -175,8 +190,11 @@ void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) { buf->pContext = pContext; memset(buf->lst, 0, JSON_BUFFER_SIZE); - httpTrace("context:%p, fd:%d, ip:%s, json buffer initialized", buf->pContext, buf->pContext->fd, - buf->pContext->ipstr); + if (pContext->acceptEncoding == HTTP_COMPRESS_GZIP) { + httpGzipCompressInit(buf->pContext); + } + + httpTrace("context:%p, fd:%d, ip:%s, json buffer initialized", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); } void httpJsonItemToken(JsonBuf* buf) { @@ -363,7 +381,7 @@ void httpJsonArray(JsonBuf* buf, httpJsonBuilder fnBuilder, void* jsonHandle) { void httpJsonTestBuf(JsonBuf* buf, int safety) { if ((buf->lst - buf->buf + safety) < buf->size) return; // buf->slot = *buf->lst; - httpWriteJsonBufBody(buf); + httpWriteJsonBufBody(buf, false); } void httpJsonToken(JsonBuf* buf, char c) { @@ -377,7 +395,7 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len) { } if (len > buf->size) { - httpWriteJsonBufBody(buf); + httpWriteJsonBufBody(buf, false); httpJsonPrint(buf, json, len); // buf->slot = json[len - 1]; return; diff --git a/src/modules/http/src/httpResp.c b/src/modules/http/src/httpResp.c index b5387b7cdba095e3d3e17a8ea360814de90532e5..186f10ff6ce0df52f8448fcb0a26b293b9de8048 100644 --- a/src/modules/http/src/httpResp.c +++ b/src/modules/http/src/httpResp.c @@ -36,7 +36,7 @@ const char *httpRespTemplate[] = { "%s %d %s\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\n\r\n", // HTTP_RESPONSE_CHUNKED_UN_COMPRESS, HTTP_RESPONSE_CHUNKED_COMPRESS "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nTransfer-Encoding: chunked\r\n\r\n", - "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Encoding: deflate\r\nTransfer-Encoding: chunked\r\n\r\n", + "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Encoding: gzip\r\nTransfer-Encoding: chunked\r\n\r\n", // HTTP_RESPONSE_OPTIONS "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\nAccess-Control-Allow-Methods: *\r\nAccess-Control-Max-Age: 3600\r\nAccess-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept, authorization\r\n\r\n", // HTTP_RESPONSE_GRAFANA diff --git a/src/modules/http/src/httpServer.c b/src/modules/http/src/httpServer.c index 5bd3a5122a9fca8b0c7eca79f817347e82efc37c..4c20297f77e1205c625ffbbaa76a482a343785ae 100644 --- a/src/modules/http/src/httpServer.c +++ b/src/modules/http/src/httpServer.c @@ -158,7 +158,8 @@ bool httpInitContext(HttpContext *pContext) { pContext->httpVersion = HTTP_VERSION_10; pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT; pContext->httpChunked = HTTP_UNCUNKED; - pContext->compress = JsonUnCompress; + pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; + pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; pContext->usedByEpoll = 1; pContext->usedByApp = 0; pContext->reqType = HTTP_REQTYPE_OTHERS; @@ -326,6 +327,30 @@ bool httpReadDataImp(HttpContext *pContext) { return true; } +bool httpUnCompressData(HttpContext *pContext) { + if (pContext->contentEncoding == HTTP_COMPRESS_GZIP) { + char *decompressBuf = calloc(HTTP_DECOMPRESS_BUF_SIZE, 1); + int32_t decompressBufLen = pContext->parser.bufsize; + + int ret = httpGzipDeCompress(pContext->parser.data.pos, pContext->parser.data.len, decompressBuf, &decompressBufLen); + + if (ret == 0) { + memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen); + pContext->parser.data.pos[decompressBufLen] = 0; + httpDump("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", + pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf); + } else { + httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d", + pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, ret); + return false; + } + } else { + httpDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); + } + + return true; +} + bool httpReadData(HttpThread *pThread, HttpContext *pContext) { if (!pContext->parsed) { httpInitContext(pContext); @@ -350,9 +375,12 @@ bool httpReadData(HttpThread *pThread, HttpContext *pContext) { return false; } else if (ret == HTTP_CHECK_BODY_SUCCESS){ httpCleanUpContextTimer(pContext); - httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, body:\n%s", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.data.pos); - return true; + if (httpUnCompressData(pContext)) { + return true; + } else { + httpCloseContextByServer(pThread, pContext); + return false; + } } else { httpCleanUpContextTimer(pContext); httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); @@ -568,7 +596,7 @@ bool httpInitConnect(HttpServer *pServer) { } if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) { - httpError("http thread:%s, init HTTP condition variable failed, reason:%s\n", pThread->label, strerror(errno)); + httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno)); return false; } diff --git a/src/modules/http/src/httpSql.c b/src/modules/http/src/httpSql.c index f37dc77543533405b1326f5241db6f81fa765d6d..91e355ba1565d120c8d76c8014185de2eb524ac5 100644 --- a/src/modules/http/src/httpSql.c +++ b/src/modules/http/src/httpSql.c @@ -289,7 +289,7 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { return; } - httpDump("context:%p, fd:%d, ip:%s, user:%s, sql:%s, start query", pContext, pContext->fd, pContext->ipstr, + httpDump("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, sql); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); } diff --git a/src/modules/http/src/httpUtil.c b/src/modules/http/src/httpUtil.c index 85cda3239e061544e00caee590f3475578ee1fc9..b0f8b1eb0b37cc292da510fe2a87d79b5db264f2 100644 --- a/src/modules/http/src/httpUtil.c +++ b/src/modules/http/src/httpUtil.c @@ -356,4 +356,95 @@ char *httpGetCmdsString(HttpContext *pContext, int pos) { } return multiCmds->buffer + pos; -} \ No newline at end of file +} + +int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData) { + int err = 0; + z_stream gzipStream = {0}; + + static char dummyHead[2] = { + 0x8 + 0x7 * 0x10, + (((0x8 + 0x7 * 0x10) * 0x100 + 30) / 31 * 31) & 0xFF, + }; + + gzipStream.zalloc = (alloc_func) 0; + gzipStream.zfree = (free_func) 0; + gzipStream.opaque = (voidpf) 0; + gzipStream.next_in = (Bytef *) srcData; + gzipStream.avail_in = 0; + gzipStream.next_out = (Bytef *) destData; + if (inflateInit2(&gzipStream, 47) != Z_OK) { + return -1; + } + + while (gzipStream.total_out < *nDestData && gzipStream.total_in < nSrcData) { + gzipStream.avail_in = gzipStream.avail_out = nSrcData; //1 + if ((err = inflate(&gzipStream, Z_NO_FLUSH)) == Z_STREAM_END) { + break; + } + + if (err != Z_OK) { + if (err == Z_DATA_ERROR) { + gzipStream.next_in = (Bytef *) dummyHead; + gzipStream.avail_in = sizeof(dummyHead); + if ((err = inflate(&gzipStream, Z_NO_FLUSH)) != Z_OK) { + return -2; + } + } else return -3; + } + } + + if (inflateEnd(&gzipStream) != Z_OK) { + return -4; + } + *nDestData = gzipStream.total_out; + + return 0; +} + +int httpGzipCompressInit(HttpContext *pContext) { + pContext->gzipStream.zalloc = (alloc_func) 0; + pContext->gzipStream.zfree = (free_func) 0; + pContext->gzipStream.opaque = (voidpf) 0; + if (deflateInit2(&pContext->gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { + return -1; + } + + return 0; +} + +int httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData, bool isTheLast) { + int err = 0; + pContext->gzipStream.next_in = (Bytef *) srcData; + pContext->gzipStream.avail_in = (uLong) nSrcData; + pContext->gzipStream.next_out = (Bytef *) destData; + pContext->gzipStream.avail_out = (uLong) (*nDestData); + + while (pContext->gzipStream.avail_in != 0 && pContext->gzipStream.total_out < (uLong) (*nDestData)) { + if (deflate(&pContext->gzipStream, Z_FULL_FLUSH) != Z_OK) { + return -1; + } + } + + if (pContext->gzipStream.avail_in != 0) { + return pContext->gzipStream.avail_in; + } + + if (isTheLast) { + for (;;) { + if ((err = deflate(&pContext->gzipStream, Z_FINISH)) == Z_STREAM_END) { + break; + } + if (err != Z_OK) { + return -2; + } + } + + if (deflateEnd(&pContext->gzipStream) != Z_OK) { + return -3; + } + } + + *nDestData = (int32_t) (pContext->gzipStream.total_out); + return 0; +} diff --git a/src/modules/http/src/restJson.c b/src/modules/http/src/restJson.c index 865e862b0f387b4ab4029cf232e5661ed55abd5b..65d9b3456ec418969aba4dee3ad4ca1b32f0220c 100644 --- a/src/modules/http/src/restJson.c +++ b/src/modules/http/src/restJson.c @@ -105,7 +105,7 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, httpJsonItemToken(jsonBuf); if (row[i] == NULL) { - httpJsonString(jsonBuf, "NULL", 4); + httpJsonOriginString(jsonBuf, "null", 4); continue; } @@ -187,7 +187,7 @@ bool restBuildSqlTimeJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *resu httpJsonItemToken(jsonBuf); if (row[i] == NULL) { - httpJsonString(jsonBuf, "NULL", 4); + httpJsonOriginString(jsonBuf, "null", 4); continue; } diff --git a/src/modules/http/src/tgHandle.c b/src/modules/http/src/tgHandle.c index 3a2832a62523cc8783ab2b9d3dfabf0403eb99fd..250dd27096a5cd7062e1da47b8c989a7fe1c5152 100644 --- a/src/modules/http/src/tgHandle.c +++ b/src/modules/http/src/tgHandle.c @@ -700,8 +700,11 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, // stable name char *stname = name->valuestring; table_cmd->metric = stable_cmd->metric = httpAddToSqlCmdBuffer(pContext, "%s", stname); - table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname); - //httpAddToSqlCmdBuffer(pContext, "%s_%d_%d", stname, fieldsSize, orderTagsLen); + if (tsTelegrafUseFieldNum == 0) { + table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname); + } else { + table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s_%d_%d", stname, fieldsSize, orderTagsLen); + } table_cmd->stable = stable_cmd->stable = httpShrinkTableName(pContext, table_cmd->stable, httpGetCmdsString(pContext, table_cmd->stable)); @@ -723,9 +726,11 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, } // table name - table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%s", stname, host->valuestring); - //httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%d_%d_%s", stname, fieldsSize, orderTagsLen, host->valuestring); - + if (tsTelegrafUseFieldNum == 0) { + table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%s", stname, host->valuestring); + } else { + table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%d_%d_%s", stname, fieldsSize, orderTagsLen, host->valuestring); + } for (int i = 0; i < orderTagsLen; ++i) { cJSON *tag = orderedTags[i]; if (tag == host) continue; diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index e096eb24745752b2f9ed1c959cb0940a0df14ba0..bd293fc125d2cbbd87c9fd94d4a777d985e350ef 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -116,11 +116,11 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0"; short tsHttpPort = 6020; // only tcp, range tcp[6020] // short tsNginxPort = 6060; //only tcp, range tcp[6060] -int tsHttpCacheSessions = 30; +int tsHttpCacheSessions = 100; int tsHttpSessionExpire = 36000; int tsHttpMaxThreads = 2; int tsHttpEnableCompress = 0; -int tsAdminRowLimit = 10240; +int tsTelegrafUseFieldNum = 0; char tsMonitorDbName[] = "log"; int tsMonitorInterval = 30; // seconds @@ -513,8 +513,12 @@ void tsInitGlobalConfig() { // http configs tsInitConfigOption(cfg++, "httpCacheSessions", &tsHttpCacheSessions, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 1, 100000, 0, TSDB_CFG_UTYPE_NONE); + tsInitConfigOption(cfg++, "telegrafUseFieldNum", &tsTelegrafUseFieldNum, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0, + 1, 1, TSDB_CFG_UTYPE_NONE); tsInitConfigOption(cfg++, "httpMaxThreads", &tsHttpMaxThreads, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 1, 1000000, 0, TSDB_CFG_UTYPE_NONE); + tsInitConfigOption(cfg++, "httpEnableCompress", &tsHttpEnableCompress, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 0, + 1, 1, TSDB_CFG_UTYPE_NONE); // debug flag tsInitConfigOption(cfg++, "numOfLogLines", &tsNumOfLogLines, TSDB_CFG_VTYPE_INT,