diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 4d5552194c08defc85f0a4287e49ac661930d694..1842a42f6c5e973325754e351c379d628e24e894 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -487,7 +487,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no) { case TSDB_DATA_TYPE_NCHAR: memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW); memcpy(t_str, row[i], fields[i].bytes); - fprintf(fp, "%s", t_str); + fprintf(fp, "\'%s\'", t_str); break; case TSDB_DATA_TYPE_TIMESTAMP: #ifdef WINDOWS diff --git a/src/modules/http/inc/httpHandle.h b/src/modules/http/inc/httpHandle.h index a2f0f02d25160ded9d74630b5c1d76d728e66b20..2d8c5fbf9b1718a66992d2a2c8ea147a2cfb8be1 100644 --- a/src/modules/http/inc/httpHandle.h +++ b/src/modules/http/inc/httpHandle.h @@ -31,7 +31,7 @@ #define HTTP_LABEL_SIZE 8 #define HTTP_MAX_EVENTS 10 -#define HTTP_BUFFER_SIZE 1024*100//100k +#define HTTP_BUFFER_SIZE 1024*70 //70k #define HTTP_STEP_SIZE 1024 //http message get process step by step #define HTTP_MAX_URL 5 //http url stack size #define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size @@ -60,6 +60,13 @@ #define HTTP_PROCESS_ERROR 0 #define HTTP_PROCESS_SUCCESS 1 +#define HTTP_PARSE_BODY_ERROR -1 +#define HTTP_PARSE_BODY_CONTINUE 0 +#define HTTP_PARSE_BODY_SUCCESS 1 + +#define HTTP_RETRY_TIMES 10 +#define HTTP_EXPIRED_TIME 60000 + struct HttpContext; struct HttpThread; @@ -84,7 +91,7 @@ typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN typedef struct { // used by single cmd - char * nativSql; + char *nativSql; int32_t numOfRows; int32_t code; @@ -132,51 +139,53 @@ typedef struct { } HttpEncodeMethod; typedef struct { - char * pos; + char *pos; int32_t len; } HttpBuf; +typedef struct { + char buffer[HTTP_MAX_BUFFER_SIZE]; + int bufsize; + char *pLast; + char *pCur; + HttpBuf method; + HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query + HttpBuf data; // body content + HttpBuf token; // auth token + HttpDecodeMethod *pMethod; +} HttpParser; + typedef struct HttpContext { void * signature; int fd; uint32_t accessTimes; uint32_t lastAccessTime; - uint8_t httpVersion : 1; - uint8_t httpChunked : 1; - uint8_t httpKeepAlive : 2; // http1.0 and not keep-alive, close connection immediately - uint8_t fromMemPool : 1; - uint8_t compress : 1; - uint8_t usedByEpoll : 1; - uint8_t usedByApp : 1; + uint8_t httpVersion; + uint8_t httpChunked; + uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately + uint8_t fromMemPool; + uint8_t compress; + uint8_t usedByEpoll; + uint8_t usedByApp; uint8_t reqType; + uint8_t parsed; char ipstr[22]; char user[TSDB_USER_LEN]; // parsed from auth token or login message char pass[TSDB_PASSWORD_LEN]; - void * taos; + void *taos; HttpSession *session; - HttpEncodeMethod * encodeMethod; + HttpEncodeMethod *encodeMethod; HttpSqlCmd singleCmd; - HttpSqlCmds * multiCmds; - JsonBuf * jsonBuf; + HttpSqlCmds *multiCmds; + JsonBuf *jsonBuf; pthread_mutex_t mutex; - struct HttpThread * pThread; - struct HttpContext *prev, *next; + HttpParser parser; + void *readTimer; + struct HttpThread *pThread; + struct HttpContext *prev; + struct HttpContext *next; } HttpContext; -typedef struct { - char * buffer; - int bufsize; - char * pLast; - char * pCur; - HttpBuf method; - HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query - HttpBuf data; // body content - HttpBuf token; // auth token - HttpDecodeMethod *pMethod; -} HttpParser; - -#define HTTP_MAX_FDS_LEN 65536 - typedef struct HttpThread { pthread_t thread; HttpContext * pHead; @@ -186,8 +195,6 @@ typedef struct HttpThread { int numOfFds; int threadId; char label[HTTP_LABEL_SIZE]; - char buffer[HTTP_BUFFER_SIZE]; // buffer to receive data - HttpParser parser; // parse from buffer bool (*processData)(HttpContext *pContext); struct _http_server_obj_ *pServer; // handle passed by upper layer during pServer initialization } HttpThread; @@ -202,15 +209,15 @@ typedef struct _http_server_obj_ { HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; int methodScannerLen; pthread_mutex_t serverMutex; - void * pSessionHash; - void * pContextPool; - void * expireTimer; - HttpThread * pThreads; + void *pSessionHash; + void *pContextPool; + void *expireTimer; + HttpThread *pThreads; pthread_t thread; - bool (*processData)(HttpContext *pContext); - int requestNum; - void *timerHandle; - bool online; + bool (*processData)(HttpContext *pContext); + int requestNum; + void *timerHandle; + bool online; } HttpServer; // http util method @@ -253,6 +260,8 @@ bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen); bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp); bool httpProcessData(HttpContext *pContext); bool httpReadDataImp(HttpContext *pContext); +bool httpParseRequest(HttpContext* pContext); +int httpCheckReadCompleted(HttpContext* pContext); // http request handler void httpProcessRequest(HttpContext *pContext); diff --git a/src/modules/http/src/gcHandle.c b/src/modules/http/src/gcHandle.c index fedce0b1075e30aafe6dd4f29d2427c577fa81d0..dfea677ffc3e582726bf8ccfbb44eecaf747da3a 100644 --- a/src/modules/http/src/gcHandle.c +++ b/src/modules/http/src/gcHandle.c @@ -26,7 +26,7 @@ static HttpEncodeMethod gcQueryMethod = { void gcInitHandle(HttpServer* pServer) { httpAddMethod(pServer, &gcDecodeMethod); } bool gcGetUserFromUrl(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (pParser->path[GC_USER_URL_POS].len > TSDB_USER_LEN - 1 || pParser->path[GC_USER_URL_POS].len <= 0) { return false; } @@ -36,7 +36,7 @@ bool gcGetUserFromUrl(HttpContext* pContext) { } bool gcGetPassFromUrl(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (pParser->path[GC_PASS_URL_POS].len > TSDB_PASSWORD_LEN - 1 || pParser->path[GC_PASS_URL_POS].len <= 0) { return false; } @@ -126,7 +126,7 @@ bool gcProcessLoginRequest(HttpContext* pContext) { bool gcProcessQueryRequest(HttpContext* pContext) { httpTrace("context:%p, fd:%d, ip:%s, process grafana query msg", pContext, pContext->fd, pContext->ipstr); - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; char* filter = pParser->data.pos; if (filter == NULL) { httpSendErrorResp(pContext, HTTP_NO_MSG_INPUT); diff --git a/src/modules/http/src/httpHandle.c b/src/modules/http/src/httpHandle.c index c73aff8bb2621081dcc99a12b432f1f6dd4ca797..7a244348633d0d5400b9653044452b7f951430fb 100644 --- a/src/modules/http/src/httpHandle.c +++ b/src/modules/http/src/httpHandle.c @@ -39,7 +39,7 @@ void httpToLowerUrl(char* url) { } bool httpUrlMatch(HttpContext* pContext, int pos, char* cmp) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (pos < 0 || pos >= HTTP_MAX_URL) { return false; @@ -58,11 +58,11 @@ bool httpUrlMatch(HttpContext* pContext, int pos, char* cmp) { // /account/db/meter HTTP/1.1\r\nHost bool httpParseURL(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; - + HttpParser* pParser = &pContext->parser; char* pSeek; char* pEnd = strchr(pParser->pLast, ' '); if (*pParser->pLast != '/') { + httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL); return false; } pParser->pLast++; @@ -88,12 +88,6 @@ bool httpParseURL(HttpContext* pContext) { } pParser->pLast = pEnd + 1; - // for (int i = 0; i < HTTP_MAX_URL; i++) { - // if (pParser->path[i].len > 0) { - // httpTrace("url_pos: %d, path: [%s]", i, pParser->path[i].pos); - // } - //} - if (pParser->path[0].len == 0) { httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL); return false; @@ -103,9 +97,14 @@ bool httpParseURL(HttpContext* pContext) { } bool httpParseHttpVersion(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; - + HttpParser* pParser = &pContext->parser; char* pEnd = strchr(pParser->pLast, '1'); + if (pEnd == NULL) { + httpError("context:%p, fd:%d, ip:%s, can't find http version at position:%s", pContext, pContext->fd, + pContext->ipstr, pParser->pLast); + httpSendErrorResp(pContext, HTTP_PARSE_HTTP_VERSION_ERROR); + return false; + } if (*(pEnd + 1) != '.') { httpError("context:%p, fd:%d, ip:%s, can't find http version at position:%s", pContext, pContext->fd, @@ -129,7 +128,7 @@ bool httpParseHttpVersion(HttpContext* pContext) { } bool httpGetNextLine(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; while (pParser->buffer + pParser->bufsize - pParser->pCur++ > 0) { if (*(pParser->pCur) == '\n' && *(pParser->pCur - 1) == '\r') { // cut the string @@ -144,7 +143,8 @@ bool httpGetNextLine(HttpContext* pContext) { } bool httpGetHttpMethod(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; + char* pSeek = strchr(pParser->pLast, ' '); if (pSeek == NULL) { httpSendErrorResp(pContext, HTTP_PARSE_HTTP_METHOD_ERROR); @@ -160,7 +160,8 @@ bool httpGetHttpMethod(HttpContext* pContext) { } bool httpGetDecodeMethod(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; + HttpServer* pServer = pContext->pThread->pServer; int methodLen = pServer->methodScannerLen; for (int i = 0; i < methodLen; i++) { @@ -180,7 +181,7 @@ bool httpGetDecodeMethod(HttpContext* pContext) { } bool httpParseHead(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (strncasecmp(pParser->pLast, "Content-Length: ", 16) == 0) { pParser->data.len = (int32_t)atoi(pParser->pLast + 16); httpTrace("context:%p, fd:%d, ip:%s, Content-Length:%d", pContext, pContext->fd, pContext->ipstr, @@ -258,68 +259,62 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test) } bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { - for (int tryTimes = 0; tryTimes < 100; ++tryTimes) { + for (int tryTimes = 0; tryTimes < HTTP_RETRY_TIMES; ++tryTimes) { bool parsedOk = httpParseChunkedBody(pContext, pParser, true); if (parsedOk) { - // httpTrace("context:%p, fd:%d, ip:%s, chunked body read finished", - // pContext, pContext->fd, pContext->ipstr); httpParseChunkedBody(pContext, pParser, false); - return true; + return HTTP_PARSE_BODY_SUCCESS; } else { httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, continue read", pContext, pContext->fd, pContext->ipstr); if (!httpReadDataImp(pContext)) { httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return false; + return HTTP_PARSE_BODY_ERROR; } else { taosMsleep(1); } } } - httpError("context:%p, fd:%d, ip:%s, chunked body parsed error", pContext, pContext->fd, pContext->ipstr); - httpSendErrorResp(pContext, HTTP_PARSE_CHUNKED_BODY_ERROR); - - return false; + httpTrace("context:%p, fd:%d, ip:%s, chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr); + return HTTP_PARSE_BODY_CONTINUE; } -bool httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { - for (int tryTimes = 0; tryTimes < 100; ++tryTimes) { +int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { + for (int tryTimes = 0; tryTimes < HTTP_RETRY_TIMES; ++tryTimes) { int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); if (dataReadLen > pParser->data.len) { httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d", pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); - return false; + return HTTP_PARSE_BODY_ERROR; } else if (dataReadLen < pParser->data.len) { httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read", pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); if (!httpReadDataImp(pContext)) { httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return false; + return HTTP_PARSE_BODY_ERROR; } else { taosMsleep(1); } } else { - return true; + return HTTP_PARSE_BODY_SUCCESS; } } - int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); - if (dataReadLen != pParser->data.len) { - httpError("context:%p, fd:%d, ip:%s, un-chunked body length error, dataReadLen:%d != pContext->data.len:%d", - pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); - httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); - return false; - } - - httpTrace("context:%p, fd:%d, ip:%s, un-chunked body read over, dataReadLen:%d == pContext->data.len:%d", - pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); - return true; + httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, wait epoll", pContext, pContext->fd, pContext->ipstr); + return HTTP_PARSE_BODY_CONTINUE; } bool httpParseRequest(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser *pParser = &pContext->parser; + if (pContext->parsed) { + return true; + } + + httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, + pContext->parser.bufsize, pContext->parser.buffer); if (!httpGetHttpMethod(pContext)) { return false; @@ -355,22 +350,31 @@ bool httpParseRequest(HttpContext* pContext) { pParser->pLast = ++pParser->pCur; } while (1); + httpTrace("context:%p, fd:%d, ip:%s, parse http head ok", pContext, pContext->fd, pContext->ipstr); + + pContext->parsed = true; + return true; +} + +int httpCheckReadCompleted(HttpContext* pContext) { + HttpParser *pParser = &pContext->parser; if (pContext->httpChunked == HTTP_UNCUNKED) { - if (!httpReadUnChunkedBody(pContext, pParser)) { - return false; + int ret = httpReadUnChunkedBody(pContext, pParser); + if (ret != HTTP_PARSE_BODY_SUCCESS) { + return ret; } } else { - if (!httpReadChunkedBody(pContext, pParser)) { - return false; + int ret = httpReadChunkedBody(pContext, pParser); + if (ret != HTTP_PARSE_BODY_SUCCESS) { + return ret; } } - httpTrace("context:%p, fd:%d, ip:%s, parse http request ok", pContext, pContext->fd, pContext->ipstr); - return true; + return HTTP_PARSE_BODY_SUCCESS; } bool httpDecodeRequest(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (pParser->pMethod->decodeFp == NULL) { return false; } @@ -382,15 +386,10 @@ bool httpDecodeRequest(HttpContext* pContext) { * Process the request from http pServer */ bool httpProcessData(HttpContext* pContext) { - httpInitContext(pContext); - - if (!httpParseRequest(pContext)) { - httpCloseContextByApp(pContext); - return HTTP_PROCESS_ERROR; - } + pContext->usedByApp = 1; // handle Cross-domain request - if (strcmp(pContext->pThread->parser.method.pos, "OPTIONS") == 0) { + if (strcmp(pContext->parser.method.pos, "OPTIONS") == 0) { httpTrace("context:%p, fd:%d, ip:%s, process options request", pContext, pContext->fd, pContext->ipstr); httpSendOptionResp(pContext, "process options request success"); return HTTP_PROCESS_SUCCESS; diff --git a/src/modules/http/src/httpServer.c b/src/modules/http/src/httpServer.c index 79c6a385a8c995c250c7aa92c2184564928f96ed..e253c899a0824bad262591b46d2371d93ee904cf 100644 --- a/src/modules/http/src/httpServer.c +++ b/src/modules/http/src/httpServer.c @@ -37,6 +37,7 @@ #include "tsocket.h" #include "tutil.h" #include "ttime.h" +#include "ttimer.h" #include "http.h" #include "httpCode.h" @@ -95,6 +96,8 @@ void httpFreeContext(HttpServer *pServer, HttpContext *pContext) { void httpCleanUpContext(HttpThread *pThread, HttpContext *pContext) { // for not keep-alive + taosTmrStopA(pContext->readTimer); + if (pContext->fd >= 0) { epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); taosCloseSocket(pContext->fd); @@ -148,11 +151,15 @@ bool httpInitContext(HttpContext *pContext) { pContext->httpChunked = HTTP_UNCUNKED; pContext->compress = JsonUnCompress; pContext->usedByEpoll = 1; - pContext->usedByApp = 1; + pContext->usedByApp = 0; pContext->reqType = HTTP_REQTYPE_OTHERS; pContext->encodeMethod = NULL; memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd)); + HttpParser *pParser = &pContext->parser; + memset(pParser, 0, sizeof(HttpParser)); + pParser->pCur = pParser->pLast = pParser->buffer; + httpTrace("context:%p, fd:%d, ip:%s, accessTimes:%d", pContext, pContext->fd, pContext->ipstr, pContext->accessTimes); return true; } @@ -164,6 +171,7 @@ void httpCloseContextByApp(HttpContext *pContext) { } pthread_mutex_lock(&pContext->mutex); + pContext->parsed = false; httpTrace("context:%p, fd:%d, ip:%s, app use finished, usedByEpoll:%d, usedByApp:%d, httpVersion:1.%d, keepAlive:%d", pContext, pContext->fd, pContext->ipstr, pContext->usedByEpoll, pContext->usedByApp, pContext->httpVersion, @@ -189,6 +197,7 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) { } pthread_mutex_lock(&pContext->mutex); pContext->usedByEpoll = 0; + pContext->parsed = false; httpTrace("context:%p, fd:%d, ip:%s, epoll use finished, usedByEpoll:%d, usedByApp:%d", pContext, pContext->fd, pContext->ipstr, pContext->usedByEpoll, pContext->usedByApp); @@ -206,6 +215,12 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) { } } +void httpCloseContextByServerFromTimer(HttpContext *pContext) { + httpError("context:%p, fd:%d, ip:%s, read http body error, time expired", pContext, pContext->fd, pContext->ipstr); + httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); + httpCloseContextByServer(pContext->pThread, pContext); +} + void httpCleanUpConnect(HttpServer *pServer) { int i; HttpThread *pThread; @@ -257,7 +272,7 @@ void httpReadDirtyData(int fd) { } bool httpReadDataImp(HttpContext *pContext) { - HttpParser *pParser = &pContext->pThread->parser; + HttpParser *pParser = &pContext->parser; int blocktimes = 0; while (pParser->bufsize <= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { @@ -267,17 +282,18 @@ bool httpReadDataImp(HttpContext *pContext) { break; } else if (nread < 0) { if (errno == EINTR) { - if (blocktimes++ > 1000) { - httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, EINTER too many times", - pContext, pContext->fd, pContext->ipstr, errno); + if (blocktimes++ > HTTP_RETRY_TIMES) { + taosMsleep(1); + httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, EINTER times:%d", + pContext, pContext->fd, pContext->ipstr, errno, blocktimes); break; } continue; } else if (errno == EAGAIN || errno == EWOULDBLOCK) { - taosMsleep(1); - if (blocktimes++ > 1000) { - httpError("context:%p, fd:%d, ip:%s, read from socket error:%d, EAGAIN too many times", - pContext, pContext->fd, pContext->ipstr, errno); + if (blocktimes++ > HTTP_RETRY_TIMES) { + taosMsleep(1); + httpTrace("context:%p, fd:%d, ip:%s, read from socket error:%d, EAGAIN times:%d", + pContext, pContext->fd, pContext->ipstr, errno, blocktimes); break; } continue; @@ -292,27 +308,51 @@ bool httpReadDataImp(HttpContext *pContext) { if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { httpReadDirtyData(pContext->fd); - httpError("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, request big than:%d", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, - HTTP_BUFFER_SIZE); + httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE); httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG); return false; } } pParser->buffer[pParser->bufsize] = 0; - httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, content:\n%s", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, - pParser->bufsize, pParser->buffer); + httpTrace("context:%p, fd:%d, ip:%s, thread:%s, read size:%d", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pParser->bufsize); return true; } -bool httpReadData(HttpContext *pContext) { - HttpParser *pParser = &pContext->pThread->parser; - memset(pParser, 0, sizeof(HttpParser)); - pParser->pCur = pParser->pLast = pParser->buffer = pContext->pThread->buffer; - return httpReadDataImp(pContext); +bool httpReadData(HttpThread *pThread, HttpContext *pContext) { + if (!pContext->parsed) { + httpInitContext(pContext); + } + + if (!httpReadDataImp(pContext)) { + httpTrace("context:%p, fd:%d, ip:%s, read data error, close connect", pContext, pContext->fd, pContext->ipstr); + httpCloseContextByServer(pThread, pContext); + return false; + } + + if (!httpParseRequest(pContext)) { + httpTrace("context:%p, fd:%d, ip:%s, failed to parse http head, close connect", pContext, pContext->fd, pContext->ipstr); + httpCloseContextByServer(pThread, pContext); + return false; + } + + int ret = httpCheckReadCompleted(pContext); + if (ret == HTTP_PARSE_BODY_CONTINUE) { + httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times", pContext, pContext->fd, pContext->ipstr); + taosTmrReset(httpCloseContextByServerFromTimer, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->readTimer); + return false; + } else if (ret == HTTP_PARSE_BODY_SUCCESS){ + httpDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, content:\n%s", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser->data.pos); + return true; + } else { + httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); + httpCloseContextByServer(pThread, pContext); + return false; + } } void httpProcessHttpData(void *param) { @@ -377,9 +417,7 @@ void httpProcessHttpData(void *param) { continue; } - if (!httpReadData(pContext)) { - httpTrace("context:%p, fd:%d, ip:%s, read data error", pContext, pContext->fd, pContext->ipstr); - httpCloseContextByServer(pThread, pContext); + if (!httpReadData(pThread, pContext)) { continue; } @@ -406,7 +444,7 @@ void httpAcceptHttpConnection(void *arg) { struct sockaddr_in clientAddr; int sockFd; int threadId = 0; - int connThreshold = 2 * tsHttpCacheSessions / tsHttpMaxThreads; + const int connThreshold = 2 * tsHttpCacheSessions / tsHttpMaxThreads; HttpThread * pThread; HttpServer * pServer; HttpContext * pContext; diff --git a/src/modules/http/src/httpSession.c b/src/modules/http/src/httpSession.c index c13cc4c83b9f71addb9f3bd44298bc53489ddbe0..00d263d792d106ecd738dad39816a72238957380 100644 --- a/src/modules/http/src/httpSession.c +++ b/src/modules/http/src/httpSession.c @@ -87,8 +87,8 @@ void httpRestoreSession(HttpContext *pContext) { pthread_mutex_lock(&server->serverMutex); session->access--; - httpTrace("context:%p, fd:%d, ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d", - pContext, pContext->fd, pContext->ipstr, pContext->user, session, session->id, session->taos, + httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d", + pContext, pContext->ipstr, pContext->user, session, session->id, session->taos, session->access, pContext->session->expire); pthread_mutex_unlock(&server->serverMutex); } diff --git a/src/modules/http/src/httpSql.c b/src/modules/http/src/httpSql.c index bfd6189539da24503d9ec0d8bad9bcfdf1c08902..f37dc77543533405b1326f5241db6f81fa765d6d 100644 --- a/src/modules/http/src/httpSql.c +++ b/src/modules/http/src/httpSql.c @@ -296,9 +296,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { void httpProcessLoginCmd(HttpContext *pContext) { char token[128] = "current version only supports basic authorization, no token returned"; - httpTrace("context:%p, fd:%d, ip:%s, user:%s, return token:%s", pContext, pContext->fd, pContext->ipstr, - pContext->user, token); - httpTrace("user:%s login from %s via http", pContext->user, pContext->ipstr); + httpTrace("context:%p, fd:%d, ip:%s, user:%s, login via http, return token:%s", + pContext, pContext->fd, pContext->ipstr, pContext->user, token); httpSendSuccResp(pContext, token); } diff --git a/src/modules/http/src/restHandle.c b/src/modules/http/src/restHandle.c index 742b64eee2ff5ebe864bdc8228c4d47b171fd476..fd4318983774c4fa9e2ed8dc01d92bc9e71346ac 100644 --- a/src/modules/http/src/restHandle.c +++ b/src/modules/http/src/restHandle.c @@ -29,7 +29,7 @@ void restInitHandle(HttpServer* pServer) { } bool restGetUserFromUrl(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (pParser->path[REST_USER_URL_POS].len > TSDB_USER_LEN - 1 || pParser->path[REST_USER_URL_POS].len <= 0) { return false; } @@ -39,7 +39,7 @@ bool restGetUserFromUrl(HttpContext* pContext) { } bool restGetPassFromUrl(HttpContext* pContext) { - HttpParser* pParser = &pContext->pThread->parser; + HttpParser* pParser = &pContext->parser; if (pParser->path[REST_PASS_URL_POS].len > TSDB_PASSWORD_LEN - 1 || pParser->path[REST_PASS_URL_POS].len <= 0) { return false; } @@ -59,7 +59,7 @@ bool restProcessSqlRequest(HttpContext* pContext, int isSqlT) { httpTrace("context:%p, fd:%d, ip:%s, user:%s, process restful sql msg", pContext, pContext->fd, pContext->ipstr, pContext->user); - char* sql = pContext->pThread->parser.data.pos; + char* sql = pContext->parser.data.pos; if (sql == NULL) { httpSendErrorResp(pContext, HTTP_NO_SQL_INPUT); return false; diff --git a/src/modules/http/src/tgHandle.c b/src/modules/http/src/tgHandle.c index 4fe36dedfd955e324e2d340f935533ad20b79279..3a2832a62523cc8783ab2b9d3dfabf0403eb99fd 100644 --- a/src/modules/http/src/tgHandle.c +++ b/src/modules/http/src/tgHandle.c @@ -447,7 +447,7 @@ void tgInitHandle(HttpServer *pServer) { } bool tgGetUserFromUrl(HttpContext *pContext) { - HttpParser *pParser = &pContext->pThread->parser; + HttpParser *pParser = &pContext->parser; if (pParser->path[TG_USER_URL_POS].len > TSDB_USER_LEN - 1 || pParser->path[TG_USER_URL_POS].len <= 0) { return false; } @@ -457,7 +457,7 @@ bool tgGetUserFromUrl(HttpContext *pContext) { } bool tgGetPassFromUrl(HttpContext *pContext) { - HttpParser *pParser = &pContext->pThread->parser; + HttpParser *pParser = &pContext->parser; if (pParser->path[TG_PASS_URL_POS].len > TSDB_PASSWORD_LEN - 1 || pParser->path[TG_PASS_URL_POS].len <= 0) { return false; } @@ -467,7 +467,7 @@ bool tgGetPassFromUrl(HttpContext *pContext) { } char *tgGetDbFromUrl(HttpContext *pContext) { - HttpParser *pParser = &pContext->pThread->parser; + HttpParser *pParser = &pContext->parser; if (pParser->path[TG_DB_URL_POS].len <= 0) { httpSendErrorResp(pContext, HTTP_TG_DB_NOT_INPUT); return NULL; @@ -1158,7 +1158,7 @@ bool tgProcessSingleMetricUseConfigSchema(HttpContext *pContext, cJSON *metric, bool tgProcessQueryRequest(HttpContext *pContext, char *db) { httpTrace("context:%p, fd:%d, ip:%s, process telegraf query msg", pContext, pContext->fd, pContext->ipstr); - HttpParser *pParser = &pContext->pThread->parser; + HttpParser *pParser = &pContext->parser; char * filter = pParser->data.pos; if (filter == NULL) { httpSendErrorResp(pContext, HTTP_NO_MSG_INPUT); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 2f8d5f62f860043019de5a0d954fa24013bb8417..a839f4a6b8a830956a1f26159ffcb13601c8e1d8 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -773,7 +773,7 @@ int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { pImport->importedRows = pImport->rows; code = vnodeImportToFile(pImport); } else { - dError("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId); + dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId); } return code; @@ -817,7 +817,7 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { } else if (pImport->firstKey < pObj->lastKeyOnFile) { code = vnodeImportStartToFile(pImport, payload, rows); } else { // firstKey == pObj->lastKeyOnFile - dError("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId); + dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId); } } diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index fba609a77078f4ccb902d29366b690501cfe3abb..e096eb24745752b2f9ed1c959cb0940a0df14ba0 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -116,7 +116,7 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance char tsHttpIp[TSDB_IPv4ADDR_LEN] = "0.0.0.0"; short tsHttpPort = 6020; // only tcp, range tcp[6020] // short tsNginxPort = 6060; //only tcp, range tcp[6060] -int tsHttpCacheSessions = 2000; +int tsHttpCacheSessions = 30; int tsHttpSessionExpire = 36000; int tsHttpMaxThreads = 2; int tsHttpEnableCompress = 0; diff --git a/src/util/src/tmempool.c b/src/util/src/tmempool.c index 4c8e00c3b58814b12c56bed54949b57528ff329e..bba6ca9d50a461ff1523a393ac17e7cf6dbed22b 100644 --- a/src/util/src/tmempool.c +++ b/src/util/src/tmempool.c @@ -79,8 +79,7 @@ char *taosMemPoolMalloc(mpool_h handle) { pthread_mutex_lock(&(pool_p->mutex)); if (pool_p->numOfFree <= 0) { - pError("mempool: out of memory"); - + pTrace("mempool: out of memory"); } else { pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]); pool_p->first++;