提交 8133a76a 编写于 作者: S slguan

fix issue #282

上级 1b00a4a8
...@@ -129,6 +129,7 @@ extern int tsHttpCacheSessions; ...@@ -129,6 +129,7 @@ extern int tsHttpCacheSessions;
extern int tsHttpSessionExpire; extern int tsHttpSessionExpire;
extern int tsHttpMaxThreads; extern int tsHttpMaxThreads;
extern int tsHttpEnableCompress; extern int tsHttpEnableCompress;
extern int tsTelegrafUseFieldNum;
extern int tsAdminRowLimit; extern int tsAdminRowLimit;
extern char tsMonitorDbName[]; extern char tsMonitorDbName[];
......
...@@ -22,16 +22,18 @@ ...@@ -22,16 +22,18 @@
#include "tmempool.h" #include "tmempool.h"
#include "tsdb.h" #include "tsdb.h"
#include "tutil.h" #include "tutil.h"
#include "zlib.h"
#include "http.h" #include "http.h"
#include "httpJson.h" #include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024*20 #define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024*10 #define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8 #define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10 #define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*70 //70k #define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step #define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size #define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size #define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
...@@ -64,12 +66,15 @@ ...@@ -64,12 +66,15 @@
#define HTTP_CHECK_BODY_CONTINUE 0 #define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1 #define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_READ_RETRY_TIMES 3 #define HTTP_READ_RETRY_TIMES 5
#define HTTP_READ_WAIT_TIME_MS 3 #define HTTP_READ_WAIT_TIME_MS 5
#define HTTP_WRITE_RETRY_TIMES 400 #define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000 #define HTTP_EXPIRED_TIME 60000
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
struct HttpContext; struct HttpContext;
struct HttpThread; struct HttpThread;
...@@ -147,7 +152,7 @@ typedef struct { ...@@ -147,7 +152,7 @@ typedef struct {
} HttpBuf; } HttpBuf;
typedef struct { typedef struct {
char buffer[HTTP_MAX_BUFFER_SIZE]; char buffer[HTTP_BUFFER_SIZE];
int bufsize; int bufsize;
char *pLast; char *pLast;
char *pCur; char *pCur;
...@@ -167,7 +172,8 @@ typedef struct HttpContext { ...@@ -167,7 +172,8 @@ typedef struct HttpContext {
uint8_t httpChunked; uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t fromMemPool; uint8_t fromMemPool;
uint8_t compress; uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t usedByEpoll; uint8_t usedByEpoll;
uint8_t usedByApp; uint8_t usedByApp;
uint8_t reqType; uint8_t reqType;
...@@ -177,6 +183,7 @@ typedef struct HttpContext { ...@@ -177,6 +183,7 @@ typedef struct HttpContext {
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
void *taos; void *taos;
HttpSession *session; HttpSession *session;
z_stream gzipStream;
HttpEncodeMethod *encodeMethod; HttpEncodeMethod *encodeMethod;
HttpSqlCmd singleCmd; HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds; HttpSqlCmds *multiCmds;
...@@ -293,6 +300,11 @@ void httpTrimTableName(char *name); ...@@ -293,6 +300,11 @@ void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name); int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos); char *httpGetCmdsString(HttpContext *pContext, int pos);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
extern const char *httpKeepAliveStr[]; extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[]; extern const char *httpVersionStr[];
......
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
#define TDENGINE_HTTP_JSON_H #define TDENGINE_HTTP_JSON_H
#include <stdint.h> #include <stdint.h>
#include <stdbool.h>
#define JSON_BUFFER_SIZE 4096 #define JSON_BUFFER_SIZE 10240
struct HttpContext; struct HttpContext;
enum { JsonNumber, JsonString, JsonBoolean, JsonArray, JsonObject, JsonNull }; enum { JsonNumber, JsonString, JsonBoolean, JsonArray, JsonObject, JsonNull };
typedef enum { JsonCompress, JsonUnCompress } JsonCompressFlag;
extern char JsonItmTkn; extern char JsonItmTkn;
extern char JsonObjStt; extern char JsonObjStt;
extern char JsonObjEnd; extern char JsonObjEnd;
...@@ -47,6 +46,7 @@ typedef struct { ...@@ -47,6 +46,7 @@ typedef struct {
// http response // http response
int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz); int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz);
int httpWriteBufNoTrace(struct HttpContext* pContext, const char* buf, int sz);
int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz); int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz);
// builder callback // builder callback
...@@ -55,11 +55,12 @@ typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle); ...@@ -55,11 +55,12 @@ typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle);
// buffer // buffer
void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext); void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext);
void httpWriteJsonBufHead(JsonBuf* buf); void httpWriteJsonBufHead(JsonBuf* buf);
int httpWriteJsonBufBody(JsonBuf* buf); int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast);
void httpWriteJsonBufEnd(JsonBuf* buf); void httpWriteJsonBufEnd(JsonBuf* buf);
// value // value
void httpJsonString(JsonBuf* buf, char* sVal, int len); void httpJsonString(JsonBuf* buf, char* sVal, int len);
void httpJsonOriginString(JsonBuf* buf, char* sVal, int len);
void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int maxLen); void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int maxLen);
void httpJsonInt64(JsonBuf* buf, int64_t num); void httpJsonInt64(JsonBuf* buf, int64_t num);
void httpJsonTimestamp(JsonBuf* buf, int64_t t); void httpJsonTimestamp(JsonBuf* buf, int64_t t);
......
...@@ -175,7 +175,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -175,7 +175,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
for (int i = dataFields; i >= 0; i--) { for (int i = dataFields; i >= 0; i--) {
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
if (row[i] == NULL) { if (row[i] == NULL) {
httpJsonString(jsonBuf, "NULL", 4); httpJsonOriginString(jsonBuf, "null", 4);
continue; continue;
} }
......
...@@ -42,7 +42,7 @@ char* httpMsg[] = { ...@@ -42,7 +42,7 @@ char* httpMsg[] = {
"database name too long", "database name too long",
"invalid telegraf json fromat", "invalid telegraf json fromat",
"metrics size is 0", "metrics size is 0",
"metrics size can not more than 20K", // 26 "metrics size can not more than 1K", // 26
"metric name not find", "metric name not find",
"metric name type should be string", "metric name type should be string",
"metric name length is 0", "metric name length is 0",
......
...@@ -186,12 +186,22 @@ bool httpParseHead(HttpContext* pContext) { ...@@ -186,12 +186,22 @@ bool httpParseHead(HttpContext* pContext) {
pParser->data.len = (int32_t)atoi(pParser->pLast + 16); pParser->data.len = (int32_t)atoi(pParser->pLast + 16);
httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr,
pParser->data.len); pParser->data.len);
} else if (tsHttpEnableCompress && strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) { } else if (strncasecmp(pParser->pLast, "Accept-Encoding: ", 17) == 0) {
if (strstr(pParser->pLast + 17, "deflate") != NULL) { if (tsHttpEnableCompress && strstr(pParser->pLast + 17, "gzip") != NULL) {
pContext->compress = JsonCompress; pContext->acceptEncoding = HTTP_COMPRESS_GZIP;
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:gzip", pContext, pContext->fd, pContext->ipstr);
} else {
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:identity", pContext, pContext->fd, pContext->ipstr);
}
} else if (strncasecmp(pParser->pLast, "Content-Encoding: ", 18) == 0) {
if (strstr(pParser->pLast + 18, "gzip") != NULL) {
pContext->contentEncoding = HTTP_COMPRESS_GZIP;
httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:gzip", pContext, pContext->fd, pContext->ipstr);
} else {
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
httpTrace("context:%p, fd:%d, ip:%s, Content-Encoding:identity", pContext, pContext->fd, pContext->ipstr);
} }
httpTrace("context:%p, fd:%d, ip:%s, Accept-Encoding:%s", pContext, pContext->fd, pContext->ipstr,
pContext->compress == JsonCompress ? "deflate" : "identity");
} else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) { } else if (strncasecmp(pParser->pLast, "Connection: ", 12) == 0) {
if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) { if (strncasecmp(pParser->pLast + 12, "Keep-Alive", 10) == 0) {
pContext->httpKeepAlive = HTTP_KEEPALIVE_ENABLE; pContext->httpKeepAlive = HTTP_KEEPALIVE_ENABLE;
...@@ -312,7 +322,7 @@ bool httpParseRequest(HttpContext* pContext) { ...@@ -312,7 +322,7 @@ bool httpParseRequest(HttpContext* pContext) {
return true; return true;
} }
httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", httpTrace("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds,
pContext->parser.bufsize, pContext->parser.buffer); pContext->parser.bufsize, pContext->parser.buffer);
......
...@@ -72,71 +72,86 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) { ...@@ -72,71 +72,86 @@ int httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int sz) {
return writeLen; return writeLen;
} }
int httpWriteBuf(struct HttpContext* pContext, const char* buf, int sz) { int httpWriteBuf(struct HttpContext *pContext, const char *buf, int sz) {
int writeSz = httpWriteBufByFd(pContext, buf, sz); int writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) { if (writeSz != sz) {
httpError("context:%p, fd:%d, ip:%s, size:%d, write size:%d, failed to send response:\n%s", httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response:\n%s",
pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf);
} else { } else {
httpTrace("context:%p, fd:%d, ip:%s, size:%d, write size:%d, response:\n%s", httpTrace("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, response:\n%s",
pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf); pContext, pContext->fd, pContext->ipstr, sz, writeSz, buf);
} }
return writeSz; return writeSz;
} }
int httpWriteJsonBufBody(JsonBuf* buf) { int httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int sz) {
int writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) {
httpError("context:%p, fd:%d, ip:%s, dataSize:%d, writeSize:%d, failed to send response",
pContext, pContext->fd, pContext->ipstr, sz, writeSz);
}
return writeSz;
}
int httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
int remain = 0; int remain = 0;
char sLen[24];
uint64_t srcLen = (uint64_t) (buf->lst - buf->buf);
if (buf->pContext->fd <= 0) { if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, write json body error", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
buf->pContext->ipstr);
buf->pContext->fd = -1; buf->pContext->fd = -1;
} }
if (buf->lst == buf->buf) { /*
httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr); * HTTP servers often use compression to optimize transmission, for example
return 0; // there is no data to dump. * with Content-Encoding: gzip or Content-Encoding: deflate.
} * If both compression and chunked encoding are enabled, then the content stream is first compressed, then chunked;
* so the chunk encoding itself is not compressed, and the data in each chunk is not compressed individually.
char sLen[24]; * The remote endpoint then decodes the stream by concatenating the chunks and uncompressing the result.
uint64_t srcLen = (uint64_t)(buf->lst - buf->buf); */
/*HTTP servers often use compression to optimize transmission, for example if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) {
* with Content-Encoding: gzip or Content-Encoding: deflate. If both if (buf->lst == buf->buf) {
* compression and chunked encoding are enabled, then the content stream is httpTrace("context:%p, fd:%d, ip:%s, no data need dump", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
* first compressed, then chunked; so the chunk encoding itself is not return 0; // there is no data to dump.
* compressed, and the data in each chunk is not compressed individually. The } else {
* remote endpoint then decodes the stream by concatenating the chunks and int len = sprintf(sLen, "%lx\r\n", srcLen);
* uncompressing the result.*/ httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%ld, response:\n%s",
if (buf->pContext->compress == JsonUnCompress) { buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, buf->buf);
int len = sprintf(sLen, "%lx\r\n", srcLen); httpWriteBufNoTrace(buf->pContext, sLen, len);
httpTrace("context:%p, fd:%d, ip:%s, write json body, chunk size:%lld", buf->pContext, buf->pContext->fd, remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int) srcLen);
buf->pContext->ipstr, srcLen); }
httpWriteBuf(buf->pContext, sLen, len); // dump chunk size
remain = httpWriteBuf(buf->pContext, buf->buf, (int)srcLen);
} else if (buf->pContext->compress == JsonCompress) {
// unsigned char compressBuf[JSON_BUFFER_SIZE] = { 0 };
// uint64_t compressBufLen = sizeof(compressBuf);
// compress(compressBuf, &compressBufLen, (const unsigned char*)buf->buf,
// srcLen);
// int len = sprintf(sLen, "%lx\r\n", compressBufLen);
//
// httpTrace("context:%p, fd:%d, ip:%s, write json body, chunk size:%lld,
// compress:%ld", buf->pContext, buf->pContext->fd, buf->pContext->ipstr,
// srcLen, compressBufLen);
// httpWriteBuf(buf->pContext, sLen, len);//dump chunk size
// remain = httpWriteBuf(buf->pContext, (const char*)compressBuf,
// (int)compressBufLen);
} else { } else {
char compressBuf[JSON_BUFFER_SIZE] = {0};
int32_t compressBufLen = JSON_BUFFER_SIZE;
int ret = httpGzipCompress(buf->pContext, buf->buf, srcLen, compressBuf, &compressBufLen, isTheLast);
if (ret == 0) {
if (compressBufLen > 0) {
int len = sprintf(sLen, "%x\r\n", compressBufLen);
httpTrace("context:%p, fd:%d, ip:%s, write body, chunkSize:%ld, compressSize:%d, last:%d, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, compressBufLen, isTheLast, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char *) compressBuf, (int) compressBufLen);
} else {
httpTrace("context:%p, fd:%d, ip:%s, last:%d, compress already dumped, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, isTheLast, buf->buf);
return 0; // there is no data to dump.
}
} else {
httpError("context:%p, fd:%d, ip:%s, failed to compress data, chunkSize:%d, last:%d, error:%d, response:\n%s",
buf->pContext, buf->pContext->fd, buf->pContext->ipstr, srcLen, isTheLast, ret, buf->buf);
return 0;
}
} }
httpWriteBuf(buf->pContext, "\r\n", 2); httpWriteBufNoTrace(buf->pContext, "\r\n", 2);
buf->total += (int)(buf->lst - buf->buf); buf->total += (int) (buf->lst - buf->buf);
buf->lst = buf->buf; buf->lst = buf->buf;
memset(buf->buf, 0, (size_t)buf->size); memset(buf->buf, 0, (size_t) buf->size);
return remain;
return remain; // remain>0 is system error
} }
void httpWriteJsonBufHead(JsonBuf* buf) { void httpWriteJsonBufHead(JsonBuf* buf) {
...@@ -147,7 +162,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) { ...@@ -147,7 +162,7 @@ void httpWriteJsonBufHead(JsonBuf* buf) {
char msg[1024] = {0}; char msg[1024] = {0};
int len = -1; int len = -1;
if (buf->pContext->compress == JsonUnCompress) { if (buf->pContext->acceptEncoding == HTTP_COMPRESS_IDENTITY) {
len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_UN_COMPRESS], httpVersionStr[buf->pContext->httpVersion], len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_UN_COMPRESS], httpVersionStr[buf->pContext->httpVersion],
httpKeepAliveStr[buf->pContext->httpKeepAlive]); httpKeepAliveStr[buf->pContext->httpKeepAlive]);
} else { } else {
...@@ -164,8 +179,8 @@ void httpWriteJsonBufEnd(JsonBuf* buf) { ...@@ -164,8 +179,8 @@ void httpWriteJsonBufEnd(JsonBuf* buf) {
buf->pContext->fd = -1; buf->pContext->fd = -1;
} }
httpWriteJsonBufBody(buf); httpWriteJsonBufBody(buf, true);
httpWriteBuf(buf->pContext, "0\r\n\r\n", 5); // end of chunked resp httpWriteBufNoTrace(buf->pContext, "0\r\n\r\n", 5); // end of chunked resp
} }
void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) { void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) {
...@@ -175,8 +190,11 @@ void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) { ...@@ -175,8 +190,11 @@ void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) {
buf->pContext = pContext; buf->pContext = pContext;
memset(buf->lst, 0, JSON_BUFFER_SIZE); memset(buf->lst, 0, JSON_BUFFER_SIZE);
httpTrace("context:%p, fd:%d, ip:%s, json buffer initialized", buf->pContext, buf->pContext->fd, if (pContext->acceptEncoding == HTTP_COMPRESS_GZIP) {
buf->pContext->ipstr); httpGzipCompressInit(buf->pContext);
}
httpTrace("context:%p, fd:%d, ip:%s, json buffer initialized", buf->pContext, buf->pContext->fd, buf->pContext->ipstr);
} }
void httpJsonItemToken(JsonBuf* buf) { void httpJsonItemToken(JsonBuf* buf) {
...@@ -363,7 +381,7 @@ void httpJsonArray(JsonBuf* buf, httpJsonBuilder fnBuilder, void* jsonHandle) { ...@@ -363,7 +381,7 @@ void httpJsonArray(JsonBuf* buf, httpJsonBuilder fnBuilder, void* jsonHandle) {
void httpJsonTestBuf(JsonBuf* buf, int safety) { void httpJsonTestBuf(JsonBuf* buf, int safety) {
if ((buf->lst - buf->buf + safety) < buf->size) return; if ((buf->lst - buf->buf + safety) < buf->size) return;
// buf->slot = *buf->lst; // buf->slot = *buf->lst;
httpWriteJsonBufBody(buf); httpWriteJsonBufBody(buf, false);
} }
void httpJsonToken(JsonBuf* buf, char c) { void httpJsonToken(JsonBuf* buf, char c) {
...@@ -377,7 +395,7 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len) { ...@@ -377,7 +395,7 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len) {
} }
if (len > buf->size) { if (len > buf->size) {
httpWriteJsonBufBody(buf); httpWriteJsonBufBody(buf, false);
httpJsonPrint(buf, json, len); httpJsonPrint(buf, json, len);
// buf->slot = json[len - 1]; // buf->slot = json[len - 1];
return; return;
......
...@@ -36,7 +36,7 @@ const char *httpRespTemplate[] = { ...@@ -36,7 +36,7 @@ const char *httpRespTemplate[] = {
"%s %d %s\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\n\r\n", "%s %d %s\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\n\r\n",
// HTTP_RESPONSE_CHUNKED_UN_COMPRESS, HTTP_RESPONSE_CHUNKED_COMPRESS // HTTP_RESPONSE_CHUNKED_UN_COMPRESS, HTTP_RESPONSE_CHUNKED_COMPRESS
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nTransfer-Encoding: chunked\r\n\r\n", "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nTransfer-Encoding: chunked\r\n\r\n",
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Encoding: deflate\r\nTransfer-Encoding: chunked\r\n\r\n", "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Encoding: gzip\r\nTransfer-Encoding: chunked\r\n\r\n",
// HTTP_RESPONSE_OPTIONS // HTTP_RESPONSE_OPTIONS
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\nAccess-Control-Allow-Methods: *\r\nAccess-Control-Max-Age: 3600\r\nAccess-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept, authorization\r\n\r\n", "%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\nAccess-Control-Allow-Methods: *\r\nAccess-Control-Max-Age: 3600\r\nAccess-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept, authorization\r\n\r\n",
// HTTP_RESPONSE_GRAFANA // HTTP_RESPONSE_GRAFANA
......
...@@ -158,7 +158,8 @@ bool httpInitContext(HttpContext *pContext) { ...@@ -158,7 +158,8 @@ bool httpInitContext(HttpContext *pContext) {
pContext->httpVersion = HTTP_VERSION_10; pContext->httpVersion = HTTP_VERSION_10;
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT; pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
pContext->httpChunked = HTTP_UNCUNKED; pContext->httpChunked = HTTP_UNCUNKED;
pContext->compress = JsonUnCompress; pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
pContext->usedByEpoll = 1; pContext->usedByEpoll = 1;
pContext->usedByApp = 0; pContext->usedByApp = 0;
pContext->reqType = HTTP_REQTYPE_OTHERS; pContext->reqType = HTTP_REQTYPE_OTHERS;
...@@ -326,6 +327,30 @@ bool httpReadDataImp(HttpContext *pContext) { ...@@ -326,6 +327,30 @@ bool httpReadDataImp(HttpContext *pContext) {
return true; return true;
} }
bool httpUnCompressData(HttpContext *pContext) {
if (pContext->contentEncoding == HTTP_COMPRESS_GZIP) {
char *decompressBuf = calloc(HTTP_DECOMPRESS_BUF_SIZE, 1);
int32_t decompressBufLen = pContext->parser.bufsize;
int ret = httpGzipDeCompress(pContext->parser.data.pos, pContext->parser.data.len, decompressBuf, &decompressBufLen);
if (ret == 0) {
memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen);
pContext->parser.data.pos[decompressBufLen] = 0;
httpDump("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s",
pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf);
} else {
httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d",
pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, ret);
return false;
}
} else {
httpDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
}
return true;
}
bool httpReadData(HttpThread *pThread, HttpContext *pContext) { bool httpReadData(HttpThread *pThread, HttpContext *pContext) {
if (!pContext->parsed) { if (!pContext->parsed) {
httpInitContext(pContext); httpInitContext(pContext);
...@@ -350,9 +375,12 @@ bool httpReadData(HttpThread *pThread, HttpContext *pContext) { ...@@ -350,9 +375,12 @@ bool httpReadData(HttpThread *pThread, HttpContext *pContext) {
return false; return false;
} else if (ret == HTTP_CHECK_BODY_SUCCESS){ } else if (ret == HTTP_CHECK_BODY_SUCCESS){
httpCleanUpContextTimer(pContext); httpCleanUpContextTimer(pContext);
httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, body:\n%s", if (httpUnCompressData(pContext)) {
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.data.pos); return true;
return true; } else {
httpCloseContextByServer(pThread, pContext);
return false;
}
} else { } else {
httpCleanUpContextTimer(pContext); httpCleanUpContextTimer(pContext);
httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr);
...@@ -568,7 +596,7 @@ bool httpInitConnect(HttpServer *pServer) { ...@@ -568,7 +596,7 @@ bool httpInitConnect(HttpServer *pServer) {
} }
if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) { if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) {
httpError("http thread:%s, init HTTP condition variable failed, reason:%s\n", pThread->label, strerror(errno)); httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno));
return false; return false;
} }
......
...@@ -289,7 +289,7 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { ...@@ -289,7 +289,7 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
return; return;
} }
httpDump("context:%p, fd:%d, ip:%s, user:%s, sql:%s, start query", pContext, pContext->fd, pContext->ipstr, httpDump("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr,
pContext->user, sql); pContext->user, sql);
taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext);
} }
......
...@@ -356,4 +356,95 @@ char *httpGetCmdsString(HttpContext *pContext, int pos) { ...@@ -356,4 +356,95 @@ char *httpGetCmdsString(HttpContext *pContext, int pos) {
} }
return multiCmds->buffer + pos; return multiCmds->buffer + pos;
} }
\ No newline at end of file
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData) {
int err = 0;
z_stream gzipStream = {0};
static char dummyHead[2] = {
0x8 + 0x7 * 0x10,
(((0x8 + 0x7 * 0x10) * 0x100 + 30) / 31 * 31) & 0xFF,
};
gzipStream.zalloc = (alloc_func) 0;
gzipStream.zfree = (free_func) 0;
gzipStream.opaque = (voidpf) 0;
gzipStream.next_in = (Bytef *) srcData;
gzipStream.avail_in = 0;
gzipStream.next_out = (Bytef *) destData;
if (inflateInit2(&gzipStream, 47) != Z_OK) {
return -1;
}
while (gzipStream.total_out < *nDestData && gzipStream.total_in < nSrcData) {
gzipStream.avail_in = gzipStream.avail_out = nSrcData; //1
if ((err = inflate(&gzipStream, Z_NO_FLUSH)) == Z_STREAM_END) {
break;
}
if (err != Z_OK) {
if (err == Z_DATA_ERROR) {
gzipStream.next_in = (Bytef *) dummyHead;
gzipStream.avail_in = sizeof(dummyHead);
if ((err = inflate(&gzipStream, Z_NO_FLUSH)) != Z_OK) {
return -2;
}
} else return -3;
}
}
if (inflateEnd(&gzipStream) != Z_OK) {
return -4;
}
*nDestData = gzipStream.total_out;
return 0;
}
int httpGzipCompressInit(HttpContext *pContext) {
pContext->gzipStream.zalloc = (alloc_func) 0;
pContext->gzipStream.zfree = (free_func) 0;
pContext->gzipStream.opaque = (voidpf) 0;
if (deflateInit2(&pContext->gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
return -1;
}
return 0;
}
int httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData, bool isTheLast) {
int err = 0;
pContext->gzipStream.next_in = (Bytef *) srcData;
pContext->gzipStream.avail_in = (uLong) nSrcData;
pContext->gzipStream.next_out = (Bytef *) destData;
pContext->gzipStream.avail_out = (uLong) (*nDestData);
while (pContext->gzipStream.avail_in != 0 && pContext->gzipStream.total_out < (uLong) (*nDestData)) {
if (deflate(&pContext->gzipStream, Z_FULL_FLUSH) != Z_OK) {
return -1;
}
}
if (pContext->gzipStream.avail_in != 0) {
return pContext->gzipStream.avail_in;
}
if (isTheLast) {
for (;;) {
if ((err = deflate(&pContext->gzipStream, Z_FINISH)) == Z_STREAM_END) {
break;
}
if (err != Z_OK) {
return -2;
}
}
if (deflateEnd(&pContext->gzipStream) != Z_OK) {
return -3;
}
}
*nDestData = (int32_t) (pContext->gzipStream.total_out);
return 0;
}
...@@ -105,7 +105,7 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -105,7 +105,7 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
if (row[i] == NULL) { if (row[i] == NULL) {
httpJsonString(jsonBuf, "NULL", 4); httpJsonOriginString(jsonBuf, "null", 4);
continue; continue;
} }
...@@ -187,7 +187,7 @@ bool restBuildSqlTimeJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *resu ...@@ -187,7 +187,7 @@ bool restBuildSqlTimeJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *resu
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
if (row[i] == NULL) { if (row[i] == NULL) {
httpJsonString(jsonBuf, "NULL", 4); httpJsonOriginString(jsonBuf, "null", 4);
continue; continue;
} }
......
...@@ -700,8 +700,11 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, ...@@ -700,8 +700,11 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric,
// stable name // stable name
char *stname = name->valuestring; char *stname = name->valuestring;
table_cmd->metric = stable_cmd->metric = httpAddToSqlCmdBuffer(pContext, "%s", stname); table_cmd->metric = stable_cmd->metric = httpAddToSqlCmdBuffer(pContext, "%s", stname);
table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname); if (tsTelegrafUseFieldNum == 0) {
//httpAddToSqlCmdBuffer(pContext, "%s_%d_%d", stname, fieldsSize, orderTagsLen); table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s", stname);
} else {
table_cmd->stable = stable_cmd->stable = httpAddToSqlCmdBuffer(pContext, "%s_%d_%d", stname, fieldsSize, orderTagsLen);
}
table_cmd->stable = stable_cmd->stable = table_cmd->stable = stable_cmd->stable =
httpShrinkTableName(pContext, table_cmd->stable, httpGetCmdsString(pContext, table_cmd->stable)); httpShrinkTableName(pContext, table_cmd->stable, httpGetCmdsString(pContext, table_cmd->stable));
...@@ -723,9 +726,11 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric, ...@@ -723,9 +726,11 @@ bool tgProcessSingleMetricUseDefaultSchema(HttpContext *pContext, cJSON *metric,
} }
// table name // table name
table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%s", stname, host->valuestring); if (tsTelegrafUseFieldNum == 0) {
//httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%d_%d_%s", stname, fieldsSize, orderTagsLen, host->valuestring); table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%s", stname, host->valuestring);
} else {
table_cmd->table = stable_cmd->table = httpAddToSqlCmdBufferNoTerminal(pContext, "%s_%d_%d_%s", stname, fieldsSize, orderTagsLen, host->valuestring);
}
for (int i = 0; i < orderTagsLen; ++i) { for (int i = 0; i < orderTagsLen; ++i) {
cJSON *tag = orderedTags[i]; cJSON *tag = orderedTags[i];
if (tag == host) continue; if (tag == host) continue;
......
...@@ -116,11 +116,11 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance ...@@ -116,11 +116,11 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0"; char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0";
short tsHttpPort = 6020; // only tcp, range tcp[6020] short tsHttpPort = 6020; // only tcp, range tcp[6020]
// short tsNginxPort = 6060; //only tcp, range tcp[6060] // short tsNginxPort = 6060; //only tcp, range tcp[6060]
int tsHttpCacheSessions = 30; int tsHttpCacheSessions = 100;
int tsHttpSessionExpire = 36000; int tsHttpSessionExpire = 36000;
int tsHttpMaxThreads = 2; int tsHttpMaxThreads = 2;
int tsHttpEnableCompress = 0; int tsHttpEnableCompress = 0;
int tsAdminRowLimit = 10240; int tsTelegrafUseFieldNum = 0;
char tsMonitorDbName[] = "log"; char tsMonitorDbName[] = "log";
int tsMonitorInterval = 30; // seconds int tsMonitorInterval = 30; // seconds
...@@ -513,8 +513,12 @@ void tsInitGlobalConfig() { ...@@ -513,8 +513,12 @@ void tsInitGlobalConfig() {
// http configs // http configs
tsInitConfigOption(cfg++, "httpCacheSessions", &tsHttpCacheSessions, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 1, tsInitConfigOption(cfg++, "httpCacheSessions", &tsHttpCacheSessions, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 1,
100000, 0, TSDB_CFG_UTYPE_NONE); 100000, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "telegrafUseFieldNum", &tsTelegrafUseFieldNum, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, 0,
1, 1, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "httpMaxThreads", &tsHttpMaxThreads, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 1, tsInitConfigOption(cfg++, "httpMaxThreads", &tsHttpMaxThreads, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 1,
1000000, 0, TSDB_CFG_UTYPE_NONE); 1000000, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "httpEnableCompress", &tsHttpEnableCompress, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG, 0,
1, 1, TSDB_CFG_UTYPE_NONE);
// debug flag // debug flag
tsInitConfigOption(cfg++, "numOfLogLines", &tsNumOfLogLines, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "numOfLogLines", &tsNumOfLogLines, TSDB_CFG_VTYPE_INT,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册