diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 4720fb0ddca703e936cacc4bbfab647ed812e31c..7552ea5c25c136106ff998201500d299949d5932 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { } void mnodeReleaseConn(SConnObj *pConn) { - if(pConn == NULL) return; - taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false); + if (pConn == NULL) return; + taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false); } SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) { @@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p static void mnodeFreeConn(void *data) { SConnObj *pConn = data; tfree(pConn->pQueries); - tfree(pConn->pQueries); + tfree(pConn->pStreams); mTrace("connId:%d, is destroyed", pConn->connId); } diff --git a/src/plugins/http/inc/gcHandle.h b/src/plugins/http/inc/gcHandle.h index ed1e9302eb078226c494eeb32bffa5fe90c4e888..a3688c6c386e5d73c5084a54ad887ba305e425ea 100644 --- a/src/plugins/http/inc/gcHandle.h +++ b/src/plugins/http/inc/gcHandle.h @@ -16,15 +16,11 @@ #ifndef TDENGINE_GC_HANDLE_H #define TDENGINE_GC_HANDLE_H -#include -#include -#include -#include - #include "http.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpUtil.h" #include "httpResp.h" +#include "httpSql.h" #define GC_ROOT_URL_POS 0 #define GC_ACTION_URL_POS 1 diff --git a/src/plugins/http/inc/httpAuth.h b/src/plugins/http/inc/httpAuth.h new file mode 100644 index 0000000000000000000000000000000000000000..b8fabbe1ec28698cd35bf343af092eeb2d840716 --- /dev/null +++ b/src/plugins/http/inc/httpAuth.h @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_TOKEN_H +#define TDENGINE_HTTP_TOKEN_H + +bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len); +bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len); +bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen); + +#endif \ No newline at end of file diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h new file mode 100644 index 0000000000000000000000000000000000000000..a2d50d6b7fdfba3bc80c918abd7999247636b6ee --- /dev/null +++ b/src/plugins/http/inc/httpContext.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_CONTEXT_H +#define TDENGINE_HTTP_CONTEXT_H + +#include "httpInt.h" + +bool httpInitContexts(); +void httpCleanupContexts(); +const char *httpContextStateStr(HttpContextState state); + +HttpContext *httpCreateContext(int32_t fd); +bool httpInitContext(HttpContext *pContext); +HttpContext *httpGetContext(void * pContext); +void httpReleaseContext(HttpContext *pContext); +void httpCloseContextByServer(HttpContext *pContext); +void httpCloseContextByApp(HttpContext *pContext); +void httpNotifyContextClose(HttpContext *pContext); +bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); + +#endif diff --git a/src/plugins/http/inc/httpHandle.h b/src/plugins/http/inc/httpHandle.h index b8885431370999a6b11de3a2b5d5c5a6757ceba3..3e6356d8053e5b953235a8134ee607c34789edbb 100644 --- a/src/plugins/http/inc/httpHandle.h +++ b/src/plugins/http/inc/httpHandle.h @@ -13,304 +13,11 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_HTTP_SERVER_H -#define TDENGINE_HTTP_SERVER_H - -#include -#include "pthread.h" -#include "semaphore.h" -#include "tmempool.h" -#include "taosdef.h" -#include "tutil.h" -#include "zlib.h" -#include "http.h" -#include "httpJson.h" - -#define HTTP_MAX_CMD_SIZE 1024 -#define HTTP_MAX_BUFFER_SIZE 1024*1024 - -#define HTTP_LABEL_SIZE 8 -#define HTTP_MAX_EVENTS 10 -#define HTTP_BUFFER_SIZE 1024*65 //65k -#define HTTP_DECOMPRESS_BUF_SIZE 1024*64 -#define HTTP_STEP_SIZE 1024 //http message get process step by step -#define HTTP_MAX_URL 5 //http url stack size -#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size -#define HTTP_GC_TARGET_SIZE 512 - -#define HTTP_VERSION_10 0 -#define HTTP_VERSION_11 1 -//#define HTTP_VERSION_12 2 - -#define HTTP_UNCUNKED 0 -#define HTTP_CHUNKED 1 - -#define HTTP_KEEPALIVE_NO_INPUT 0 -#define HTTP_KEEPALIVE_ENABLE 1 -#define HTTP_KEEPALIVE_DISABLE 2 - -#define HTTP_REQTYPE_OTHERS 0 -#define HTTP_REQTYPE_LOGIN 1 -#define HTTP_REQTYPE_HEARTBEAT 2 -#define HTTP_REQTYPE_SINGLE_SQL 3 -#define HTTP_REQTYPE_MULTI_SQL 4 - -#define HTTP_CHECK_BODY_ERROR -1 -#define HTTP_CHECK_BODY_CONTINUE 0 -#define HTTP_CHECK_BODY_SUCCESS 1 - -#define HTTP_WRITE_RETRY_TIMES 500 -#define HTTP_WRITE_WAIT_TIME_MS 5 -#define HTTP_EXPIRED_TIME 60000 -#define HTTP_DELAY_CLOSE_TIME_MS 500 - -#define HTTP_COMPRESS_IDENTITY 0 -#define HTTP_COMPRESS_GZIP 2 - -#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN) - -typedef enum { - HTTP_CONTEXT_STATE_READY, - HTTP_CONTEXT_STATE_HANDLING, - HTTP_CONTEXT_STATE_DROPPING, - HTTP_CONTEXT_STATE_CLOSED -} HttpContextState; - -struct HttpContext; -struct HttpThread; - -typedef struct { - void *signature; - int expire; - int access; - void *taos; - char id[HTTP_SESSION_ID_LEN]; -} HttpSession; - -typedef enum { - HTTP_CMD_TYPE_UN_SPECIFIED, - HTTP_CMD_TYPE_CREATE_DB, - HTTP_CMD_TYPE_CREATE_STBALE, - HTTP_CMD_TYPE_INSERT -} HttpSqlCmdType; - -typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState; - -typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType; - -typedef struct { - // used by single cmd - char *nativSql; - int32_t numOfRows; - int32_t code; - - // these are the locations in the buffer - int32_t tagNames[TSDB_MAX_TAGS]; - int32_t tagValues[TSDB_MAX_TAGS]; - int32_t timestamp; - int32_t metric; - int32_t stable; - int32_t table; - int32_t values; - int32_t sql; - - // used by multi-cmd - int8_t cmdType; - int8_t cmdReturnType; - int8_t cmdState; - int8_t tagNum; -} HttpSqlCmd; - -typedef struct { - HttpSqlCmd *cmds; - int16_t pos; - int16_t size; - int16_t maxSize; - int32_t bufferPos; - int32_t bufferSize; - char * buffer; -} HttpSqlCmds; - -typedef struct { - char *module; - bool (*decodeFp)(struct HttpContext *pContext); -} HttpDecodeMethod; - -typedef struct { - void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result); - void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd); - bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows); - void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows); - void (*initJsonFp)(struct HttpContext *pContext); - void (*cleanJsonFp)(struct HttpContext *pContext); - bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code); - void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code); -} HttpEncodeMethod; - -typedef struct { - char *pos; - int32_t len; -} HttpBuf; - -typedef struct { - char buffer[HTTP_BUFFER_SIZE]; - int bufsize; - char *pLast; - char *pCur; - HttpBuf method; - HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query - HttpBuf data; // body content - HttpBuf token; // auth token - HttpDecodeMethod *pMethod; -} HttpParser; - -typedef struct HttpContext { - void * signature; - int fd; - uint32_t accessTimes; - uint32_t lastAccessTime; - uint8_t httpVersion; - uint8_t httpChunked; - uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately - uint8_t fromMemPool; - uint8_t acceptEncoding; - uint8_t contentEncoding; - uint8_t reqType; - uint8_t parsed; - int32_t state; - char ipstr[22]; - char user[TSDB_USER_LEN]; // parsed from auth token or login message - char pass[TSDB_PASSWORD_LEN]; - void *taos; - HttpSession *session; - z_stream gzipStream; - HttpEncodeMethod *encodeMethod; - HttpSqlCmd singleCmd; - HttpSqlCmds *multiCmds; - JsonBuf *jsonBuf; - HttpParser parser; - void *timer; - struct HttpThread *pThread; - struct HttpContext *prev; - struct HttpContext *next; -} HttpContext; - -typedef struct HttpThread { - pthread_t thread; - HttpContext * pHead; - pthread_mutex_t threadMutex; - bool stop; - int pollFd; - int numOfFds; - int threadId; - char label[HTTP_LABEL_SIZE]; - bool (*processData)(HttpContext *pContext); - struct HttpServer *pServer; // handle passed by upper layer during pServer initialization -} HttpThread; - -typedef struct HttpServer { - char label[HTTP_LABEL_SIZE]; - uint32_t serverIp; - uint16_t serverPort; - bool online; - int fd; - int cacheContext; - int sessionExpire; - int numOfThreads; - HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; - int methodScannerLen; - pthread_mutex_t serverMutex; - void *pSessionHash; - void *pContextPool; - void *expireTimer; - HttpThread *pThreads; - pthread_t thread; - bool (*processData)(HttpContext *pContext); - int requestNum; - void *timerHandle; -} HttpServer; - -// http util method -bool httpCheckUsedbSql(char *sql); -void httpTimeToString(time_t t, char *buf, int buflen); - -// http init method -void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); -void httpCleanUpServer(HttpServer *pServer); - -// http server connection -void httpCleanUpConnect(HttpServer *pServer); -bool httpInitConnect(HttpServer *pServer); - -// http context for each client connection -HttpContext *httpCreateContext(HttpServer *pServer); -bool httpInitContext(HttpContext *pContext); -void httpCloseContextByApp(HttpContext *pContext); -void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext); - -// http session method -void httpCreateSession(HttpContext *pContext, void *taos); -void httpAccessSession(HttpContext *pContext); -void httpFetchSession(HttpContext *pContext); -void httpRestoreSession(HttpContext *pContext); -void httpRemoveExpireSessions(HttpServer *pServer); -bool httpInitAllSessions(HttpServer *pServer); -void httpRemoveAllSessions(HttpServer *pServer); -void httpProcessSessionExpire(void *handle, void *tmrId); - -// http request parser -void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod); - -// http token method -bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len); -bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len); -bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen); - -// util -bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp); -bool httpProcessData(HttpContext *pContext); -bool httpReadDataImp(HttpContext *pContext); -bool httpParseRequest(HttpContext* pContext); -int httpCheckReadCompleted(HttpContext* pContext); -void httpReadDirtyData(HttpContext *pContext); +#ifndef TDENGINE_HTTP_HANDLE_H +#define TDENGINE_HTTP_HANDLE_H // http request handler void httpProcessRequest(HttpContext *pContext); - -// http json printer -JsonBuf *httpMallocJsonBuf(HttpContext *pContext); -void httpFreeJsonBuf(HttpContext *pContext); - -// http multicmds util - -int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...); -int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...); -int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize); -int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext); - -bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize); -bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize); -bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize); -void httpFreeMultiCmds(HttpContext *pContext); - -HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext); -HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext); -int httpCurSqlCmdPos(HttpContext *pContext); - -void httpTrimTableName(char *name); -int httpShrinkTableName(HttpContext *pContext, int pos, char *name); -char *httpGetCmdsString(HttpContext *pContext, int pos); - -int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData); -int httpGzipCompressInit(HttpContext *pContext); -int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen, - char *outDestData, int32_t *outDestDataLen, bool isTheLast); - -extern const char *httpKeepAliveStr[]; -extern const char *httpVersionStr[]; -const char* httpContextStateStr(HttpContextState state); - -bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState); -void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext); +bool httpProcessData(HttpContext *pContext); #endif diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h new file mode 100644 index 0000000000000000000000000000000000000000..5d94e8456ee047546d27f56ff09c121d6cd30087 --- /dev/null +++ b/src/plugins/http/inc/httpInt.h @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_INT_H +#define TDENGINE_HTTP_INT_H + +#include +#include "pthread.h" +#include "semaphore.h" +#include "tmempool.h" +#include "taosdef.h" +#include "tutil.h" +#include "zlib.h" +#include "http.h" +#include "httpCode.h" +#include "httpLog.h" +#include "httpJson.h" + +#define HTTP_MAX_CMD_SIZE 1024 +#define HTTP_MAX_BUFFER_SIZE 1024*1024 + +#define HTTP_LABEL_SIZE 8 +#define HTTP_MAX_EVENTS 10 +#define HTTP_BUFFER_SIZE 1024*65 //65k +#define HTTP_DECOMPRESS_BUF_SIZE 1024*64 +#define HTTP_STEP_SIZE 1024 //http message get process step by step +#define HTTP_MAX_URL 5 //http url stack size +#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size +#define HTTP_GC_TARGET_SIZE 512 + +#define HTTP_VERSION_10 0 +#define HTTP_VERSION_11 1 +//#define HTTP_VERSION_12 2 + +#define HTTP_UNCUNKED 0 +#define HTTP_CHUNKED 1 + +#define HTTP_KEEPALIVE_NO_INPUT 0 +#define HTTP_KEEPALIVE_ENABLE 1 +#define HTTP_KEEPALIVE_DISABLE 2 + +#define HTTP_REQTYPE_OTHERS 0 +#define HTTP_REQTYPE_LOGIN 1 +#define HTTP_REQTYPE_HEARTBEAT 2 +#define HTTP_REQTYPE_SINGLE_SQL 3 +#define HTTP_REQTYPE_MULTI_SQL 4 + +#define HTTP_CHECK_BODY_ERROR -1 +#define HTTP_CHECK_BODY_CONTINUE 0 +#define HTTP_CHECK_BODY_SUCCESS 1 + +#define HTTP_WRITE_RETRY_TIMES 500 +#define HTTP_WRITE_WAIT_TIME_MS 5 +#define HTTP_EXPIRED_TIME 60000 +#define HTTP_DELAY_CLOSE_TIME_MS 500 + +#define HTTP_COMPRESS_IDENTITY 0 +#define HTTP_COMPRESS_GZIP 2 + +#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN) + +typedef enum { + HTTP_SERVER_INIT, + HTTP_SERVER_RUNNING, + HTTP_SERVER_CLOSING, + HTTP_SERVER_CLOSED +} HttpServerStatus; + +typedef enum { + HTTP_CONTEXT_STATE_READY, + HTTP_CONTEXT_STATE_HANDLING, + HTTP_CONTEXT_STATE_DROPPING, + HTTP_CONTEXT_STATE_CLOSED +} HttpContextState; + +struct HttpContext; +struct HttpThread; + +typedef struct { + char id[HTTP_SESSION_ID_LEN]; + int refCount; + void *taos; +} HttpSession; + +typedef enum { + HTTP_CMD_TYPE_UN_SPECIFIED, + HTTP_CMD_TYPE_CREATE_DB, + HTTP_CMD_TYPE_CREATE_STBALE, + HTTP_CMD_TYPE_INSERT +} HttpSqlCmdType; + +typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState; + +typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType; + +typedef struct { + // used by single cmd + char *nativSql; + int32_t numOfRows; + int32_t code; + + // these are the locations in the buffer + int32_t tagNames[TSDB_MAX_TAGS]; + int32_t tagValues[TSDB_MAX_TAGS]; + int32_t timestamp; + int32_t metric; + int32_t stable; + int32_t table; + int32_t values; + int32_t sql; + + // used by multi-cmd + int8_t cmdType; + int8_t cmdReturnType; + int8_t cmdState; + int8_t tagNum; +} HttpSqlCmd; + +typedef struct { + HttpSqlCmd *cmds; + int16_t pos; + int16_t size; + int16_t maxSize; + int32_t bufferPos; + int32_t bufferSize; + char * buffer; +} HttpSqlCmds; + +typedef struct { + char *module; + bool (*decodeFp)(struct HttpContext *pContext); +} HttpDecodeMethod; + +typedef struct { + void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result); + void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd); + bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows); + void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows); + void (*initJsonFp)(struct HttpContext *pContext); + void (*cleanJsonFp)(struct HttpContext *pContext); + bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code); + void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code); +} HttpEncodeMethod; + +typedef struct { + char *pos; + int32_t len; +} HttpBuf; + +typedef struct { + char buffer[HTTP_BUFFER_SIZE]; + int bufsize; + char *pLast; + char *pCur; + HttpBuf method; + HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query + HttpBuf data; // body content + HttpBuf token; // auth token + HttpDecodeMethod *pMethod; +} HttpParser; + +typedef struct HttpContext { + int32_t refCount; + int fd; + uint32_t accessTimes; + uint32_t lastAccessTime; + int32_t state; + uint8_t httpVersion; + uint8_t httpChunked; + uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately + uint8_t acceptEncoding; + uint8_t contentEncoding; + uint8_t reqType; + uint8_t parsed; + char ipstr[22]; + char user[TSDB_USER_LEN]; // parsed from auth token or login message + char pass[TSDB_PASSWORD_LEN]; + void * taos; + void * ppContext; + HttpSession *session; + z_stream gzipStream; + HttpParser parser; + HttpSqlCmd singleCmd; + HttpSqlCmds *multiCmds; + JsonBuf * jsonBuf; + void * timer; + HttpEncodeMethod * encodeMethod; + struct HttpThread *pThread; +} HttpContext; + +typedef struct HttpThread { + pthread_t thread; + HttpContext * pHead; + pthread_mutex_t threadMutex; + bool stop; + int pollFd; + int numOfFds; + int threadId; + char label[HTTP_LABEL_SIZE]; + bool (*processData)(HttpContext *pContext); +} HttpThread; + +typedef struct HttpServer { + char label[HTTP_LABEL_SIZE]; + uint32_t serverIp; + uint16_t serverPort; + int fd; + int numOfThreads; + int methodScannerLen; + int32_t requestNum; + int32_t status; + pthread_t thread; + HttpThread * pThreads; + void * contextCache; + void * sessionCache; + pthread_mutex_t serverMutex; + HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE]; + bool (*processData)(HttpContext *pContext); +} HttpServer; + +extern const char *httpKeepAliveStr[]; +extern const char *httpVersionStr[]; +extern HttpServer tsHttpServer; + +#endif diff --git a/src/plugins/http/inc/httpJson.h b/src/plugins/http/inc/httpJson.h index 403c20b083934755e8f32d83a394e79eecd80725..905460c67b14b4d3305f4071ae5370edb0336ff0 100644 --- a/src/plugins/http/inc/httpJson.h +++ b/src/plugins/http/inc/httpJson.h @@ -97,4 +97,8 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len); // quick void httpJsonPairStatus(JsonBuf* buf, int code); +// http json printer +JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext); +void httpFreeJsonBuf(struct HttpContext* pContext); + #endif diff --git a/src/plugins/http/inc/httpResp.h b/src/plugins/http/inc/httpResp.h index 4868d4c68894b92ea6f4ac3345efde2faeb5556a..5eaaa2a0378552d466a7d201981387f018c0614c 100644 --- a/src/plugins/http/inc/httpResp.h +++ b/src/plugins/http/inc/httpResp.h @@ -16,7 +16,7 @@ #ifndef TDENGINE_HTTP_RESP_H #define TDENGINE_HTTP_RESP_H -#include "httpHandle.h" +#include "httpInt.h" enum _httpRespTempl { HTTP_RESPONSE_JSON_OK, diff --git a/src/plugins/http/inc/httpServer.h b/src/plugins/http/inc/httpServer.h new file mode 100644 index 0000000000000000000000000000000000000000..04dadfe04c4d61d8d8b98403f5d8993958b0b87f --- /dev/null +++ b/src/plugins/http/inc/httpServer.h @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_SERVER_H +#define TDENGINE_HTTP_SERVER_H + +#include "httpInt.h" + +bool httpInitConnect(); +void httpCleanUpConnect(); + +void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); +void httpCleanUpServer(HttpServer *pServer); +bool httpReadDataImp(HttpContext *pContext); + +#endif diff --git a/src/plugins/http/inc/httpSession.h b/src/plugins/http/inc/httpSession.h new file mode 100644 index 0000000000000000000000000000000000000000..393e720f69c899cd9b78babdd0c550f14132a014 --- /dev/null +++ b/src/plugins/http/inc/httpSession.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_SESSION_H +#define TDENGINE_HTTP_SESSION_H + +bool httpInitSessions(); +void httpCleanUpSessions(); + +// http session method +void httpCreateSession(HttpContext *pContext, void *taos); +void httpGetSession(HttpContext *pContext); +void httpReleaseSession(HttpContext *pContext); + +#endif diff --git a/src/plugins/http/inc/httpSql.h b/src/plugins/http/inc/httpSql.h new file mode 100644 index 0000000000000000000000000000000000000000..09f5b142fb63081cc6d4432e1089a446432d056c --- /dev/null +++ b/src/plugins/http/inc/httpSql.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_SQL_H +#define TDENGINE_HTTP_SQL_H + + +int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...); +int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...); +int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize); +int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext); + +bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize); +bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize); +bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize); +void httpFreeMultiCmds(HttpContext *pContext); + +HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext); +HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext); +int httpCurSqlCmdPos(HttpContext *pContext); + +void httpTrimTableName(char *name); +int httpShrinkTableName(HttpContext *pContext, int pos, char *name); +char *httpGetCmdsString(HttpContext *pContext, int pos); + +#endif diff --git a/src/plugins/http/inc/httpUtil.h b/src/plugins/http/inc/httpUtil.h new file mode 100644 index 0000000000000000000000000000000000000000..c82f702ebc081da3e1c7821151c38923c605095c --- /dev/null +++ b/src/plugins/http/inc/httpUtil.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_HTTP_UTIL_H +#define TDENGINE_HTTP_UTIL_H + +bool httpCheckUsedbSql(char *sql); +void httpTimeToString(time_t t, char *buf, int buflen); + +bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp); +bool httpParseRequest(HttpContext *pContext); +int httpCheckReadCompleted(HttpContext *pContext); +void httpReadDirtyData(HttpContext *pContext); + +int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData); +int httpGzipCompressInit(HttpContext *pContext); +int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen, + char *outDestData, int32_t *outDestDataLen, bool isTheLast); + +// http request parser +void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod); + + + +#endif diff --git a/src/plugins/http/inc/restHandle.h b/src/plugins/http/inc/restHandle.h index 48ad040c538318d41061a97ab4caa79c8da67fa4..632a1dc64739e39d1e9671fd41f9c224597eff07 100644 --- a/src/plugins/http/inc/restHandle.h +++ b/src/plugins/http/inc/restHandle.h @@ -16,15 +16,11 @@ #ifndef TDENGINE_REST_HANDLE_H #define TDENGINE_REST_HANDLE_H -#include -#include -#include -#include - #include "http.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpUtil.h" #include "httpResp.h" +#include "httpSql.h" #define REST_ROOT_URL_POS 0 #define REST_ACTION_URL_POS 1 diff --git a/src/plugins/http/inc/tgHandle.h b/src/plugins/http/inc/tgHandle.h index 5622694374e806ca2c87344450c24013c3c1abc7..6a3a7bfa4a450da1dab035a094093a143d8736c8 100644 --- a/src/plugins/http/inc/tgHandle.h +++ b/src/plugins/http/inc/tgHandle.h @@ -16,16 +16,11 @@ #ifndef TDENGINE_TG_HANDLE_H #define TDENGINE_TG_HANDLE_H -#include -#include -#include -#include - -#include "cJSON.h" #include "http.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpUtil.h" #include "httpResp.h" +#include "httpSql.h" #define TG_ROOT_URL_POS 0 #define TG_DB_URL_POS 1 diff --git a/src/plugins/http/src/httpAuth.c b/src/plugins/http/src/httpAuth.c index cf2ce5ddd9274e8a7d1908f6c410c7ef629055b8..6350d80299c8702de76fa60e6dd40291d84f613a 100644 --- a/src/plugins/http/src/httpAuth.c +++ b/src/plugins/http/src/httpAuth.c @@ -18,8 +18,8 @@ #include "tkey.h" #include "tutil.h" #include "http.h" -#include "httpLog.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpAuth.h" #define KEY_DES_4 4971256377704625728L diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c new file mode 100644 index 0000000000000000000000000000000000000000..183a332fb609cc896368ab61bcbce1109cbc3685 --- /dev/null +++ b/src/plugins/http/src/httpContext.c @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "tsocket.h" +#include "tutil.h" +#include "ttime.h" +#include "ttimer.h" +#include "tglobal.h" +#include "tcache.h" +#include "hash.h" +#include "httpInt.h" +#include "httpResp.h" +#include "httpSql.h" +#include "httpSession.h" + +static void httpRemoveContextFromEpoll(HttpContext *pContext) { + HttpThread *pThread = pContext->pThread; + if (pContext->fd >= 0) { + epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); + taosCloseSocket(pContext->fd); + pContext->fd = -1; + } +} + +static void httpDestroyContext(void *data) { + HttpContext *pContext = *(HttpContext **)data; + if (pContext->fd > 0) tclose(pContext->fd); + + HttpThread *pThread = pContext->pThread; + httpRemoveContextFromEpoll(pContext); + httpReleaseSession(pContext); + atomic_sub_fetch_32(&pThread->numOfFds, 1); + + pContext->pThread = 0; + pContext->state = HTTP_CONTEXT_STATE_CLOSED; + + // avoid double free + httpFreeJsonBuf(pContext); + httpFreeMultiCmds(pContext); + + httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount); + tfree(pContext); +} + +bool httpInitContexts() { + tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext); + if (tsHttpServer.contextCache == NULL) { + httpError("failed to init context cache"); + return false; + } + + return true; +} + +void httpCleanupContexts() { + // TODO: wait until all context is closed + if (tsHttpServer.contextCache != NULL) { + SCacheObj *cache = tsHttpServer.contextCache; + httpPrint("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable)); + taosCacheCleanup(tsHttpServer.contextCache); + tsHttpServer.contextCache = NULL; + } +} + +const char *httpContextStateStr(HttpContextState state) { + switch (state) { + case HTTP_CONTEXT_STATE_READY: + return "ready"; + case HTTP_CONTEXT_STATE_HANDLING: + return "handling"; + case HTTP_CONTEXT_STATE_DROPPING: + return "dropping"; + case HTTP_CONTEXT_STATE_CLOSED: + return "closed"; + default: + return "unknown"; + } +} + +void httpNotifyContextClose(HttpContext *pContext) { + shutdown(pContext->fd, SHUT_WR); +} + +bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) { + return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState); +} + +HttpContext *httpCreateContext(int32_t fd) { + HttpContext *pContext = calloc(1, sizeof(HttpContext)); + if (pContext == NULL) return NULL; + + char contextStr[16] = {0}; + snprintf(contextStr, sizeof(contextStr), "%p", pContext); + + pContext->fd = fd; + pContext->httpVersion = HTTP_VERSION_10; + pContext->lastAccessTime = taosGetTimestampSec(); + pContext->state = HTTP_CONTEXT_STATE_READY; + + HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3); + pContext->ppContext = ppContext; + httpTrace("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext); + + // set the ref to 0 + taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false); + + return pContext; +} + +HttpContext *httpGetContext(void *ptr) { + char contextStr[16] = {0}; + snprintf(contextStr, sizeof(contextStr), "%p", ptr); + + HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr); + + if (ppContext) { + HttpContext *pContext = *ppContext; + if (pContext) { + int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1); + httpTrace("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount); + return pContext; + } + } + return NULL; +} + +void httpReleaseContext(HttpContext *pContext) { + int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1); + assert(refCount >= 0); + httpTrace("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount); + + HttpContext **ppContext = pContext->ppContext; + taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); +} + +bool httpInitContext(HttpContext *pContext) { + pContext->accessTimes++; + pContext->lastAccessTime = taosGetTimestampSec(); + pContext->httpVersion = HTTP_VERSION_10; + pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT; + pContext->httpChunked = HTTP_UNCUNKED; + pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; + pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; + pContext->reqType = HTTP_REQTYPE_OTHERS; + pContext->encodeMethod = NULL; + pContext->timer = NULL; + memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd)); + + HttpParser *pParser = &pContext->parser; + memset(pParser, 0, sizeof(HttpParser)); + pParser->pCur = pParser->pLast = pParser->buffer; + + httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d", + pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed); + return true; +} + +void httpCloseContextByApp(HttpContext *pContext) { + pContext->parsed = false; + + bool keepAlive = true; + if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) { + keepAlive = false; + } else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) { + keepAlive = false; + } else {} + + if (keepAlive) { + if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { + httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect", + pContext, pContext->fd, pContext->ipstr); + } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) { + httpRemoveContextFromEpoll(pContext); + httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect", + pContext, pContext->fd, pContext->ipstr); + } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { + httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect", + pContext, pContext->fd, pContext->ipstr); + } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { + httpRemoveContextFromEpoll(pContext); + httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect", + pContext, pContext->fd, pContext->ipstr); + } else { + httpRemoveContextFromEpoll(pContext); + httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect", + pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); + } + } else { + httpRemoveContextFromEpoll(pContext); + httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect", + pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); + } + + httpReleaseContext(pContext); +} + +void httpCloseContextByServer(HttpContext *pContext) { + if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { + httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); + } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { + httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr); + } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) { + httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr); + } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { + httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr); + } else { + httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state); + } + + pContext->parsed = false; + httpRemoveContextFromEpoll(pContext); + httpReleaseContext(pContext); +} diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index 8323ef7f4ed2892d5cc5cd896ee93012e11b79d6..5f89940a66300eebe75e0f3d3e48435703f5487f 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -19,11 +19,12 @@ #include "tglobal.h" #include "tsocket.h" #include "ttimer.h" -#include "http.h" -#include "httpLog.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" #include "httpResp.h" +#include "httpAuth.h" +#include "httpServer.h" +#include "httpContext.h" +#include "httpHandle.h" void httpToLowerUrl(char* url) { /*ignore case */ @@ -159,7 +160,7 @@ bool httpGetHttpMethod(HttpContext* pContext) { bool httpGetDecodeMethod(HttpContext* pContext) { HttpParser* pParser = &pContext->parser; - HttpServer* pServer = pContext->pThread->pServer; + HttpServer* pServer = &tsHttpServer; int methodLen = pServer->methodScannerLen; for (int i = 0; i < methodLen; i++) { HttpDecodeMethod* method = pServer->methodScanner[i]; diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index e5e69ae02a048e3bb7110e11d09891dcca922e8c..76cc90c48f2629b8a2d7709c8a22201279d20d17 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -22,6 +22,7 @@ #include "httpCode.h" #include "httpJson.h" #include "httpResp.h" +#include "httpUtil.h" #define MAX_NUM_STR_SZ 25 diff --git a/src/plugins/http/src/httpResp.c b/src/plugins/http/src/httpResp.c index d9507072de6af046e55d12efd9d8bdaeb9e2267e..de52e10f9a7d71590ad481770a382d88264c221e 100644 --- a/src/plugins/http/src/httpResp.c +++ b/src/plugins/http/src/httpResp.c @@ -21,6 +21,7 @@ #include "httpResp.h" #include "httpCode.h" #include "httpJson.h" +#include "httpContext.h" const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"}; diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 9a9c659b038a0061c4af3acc5581298800db378d..bea2bb083ace72bbaffc191d7d498c09ba967ffd 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -21,244 +21,15 @@ #include "ttime.h" #include "ttimer.h" #include "tglobal.h" -#include "http.h" -#include "httpLog.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpContext.h" #include "httpResp.h" +#include "httpUtil.h" #ifndef EPOLLWAKEUP #define EPOLLWAKEUP (1u << 29) #endif -const char* httpContextStateStr(HttpContextState state) { - switch (state) { - case HTTP_CONTEXT_STATE_READY: - return "ready"; - case HTTP_CONTEXT_STATE_HANDLING: - return "handling"; - case HTTP_CONTEXT_STATE_DROPPING: - return "dropping"; - case HTTP_CONTEXT_STATE_CLOSED: - return "closed"; - default: - return "unknown"; - } -} - -void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) { - if (pContext->fd >= 0) { - epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); - taosCloseSocket(pContext->fd); - pContext->fd = -1; - } -} - -bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) { - return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState); -} - -void httpFreeContext(HttpServer *pServer, HttpContext *pContext); - -/** - * context will be reused while connection exist - * multiCmds and jsonBuf will be malloc after taos_query_a called - * and won't be freed until connection closed - */ -HttpContext *httpCreateContext(HttpServer *pServer) { - HttpContext *pContext = (HttpContext *)taosMemPoolMalloc(pServer->pContextPool); - if (pContext != NULL) { - pContext->fromMemPool = 1; - httpTrace("context:%p, is malloced from mempool", pContext); - } else { - pContext = (HttpContext *)malloc(sizeof(HttpContext)); - if (pContext == NULL) { - return NULL; - } else { - memset(pContext, 0, sizeof(HttpContext)); - } - httpTrace("context:%p, is malloced from raw memory", pContext); - } - - pContext->signature = pContext; - pContext->httpVersion = HTTP_VERSION_10; - pContext->lastAccessTime = taosGetTimestampSec(); - pContext->state = HTTP_CONTEXT_STATE_READY; - return pContext; -} - -void httpFreeContext(HttpServer *pServer, HttpContext *pContext) { - if (pContext->fromMemPool) { - httpTrace("context:%p, is freed from mempool", pContext); - taosMemPoolFree(pServer->pContextPool, (char *)pContext); - } else { - httpTrace("context:%p, is freed from raw memory", pContext); - tfree(pContext); - } -} - -void httpCleanUpContextTimer(HttpContext *pContext) { - if (pContext->timer != NULL) { - taosTmrStopA(&pContext->timer); - //httpTrace("context:%p, ip:%s, close timer:%p", pContext, pContext->ipstr, pContext->timer); - pContext->timer = NULL; - } -} - -void httpCleanUpContext(HttpContext *pContext, void *unused) { - httpTrace("context:%p, start the clean up operation, sig:%p", pContext, pContext->signature); - void *sig = atomic_val_compare_exchange_ptr(&pContext->signature, pContext, 0); - if (sig == NULL) { - httpTrace("context:%p is freed by another thread.", pContext); - return; - } - - HttpThread *pThread = pContext->pThread; - - httpCleanUpContextTimer(pContext); - - httpRemoveContextFromEpoll(pThread, pContext); - - httpRestoreSession(pContext); - - pthread_mutex_lock(&pThread->threadMutex); - - pThread->numOfFds--; - if (pThread->numOfFds < 0) { - httpError("context:%p, ip:%s, thread:%s, number of FDs:%d shall never be negative", - pContext, pContext->ipstr, pThread->label, pThread->numOfFds); - pThread->numOfFds = 0; - } - - // remove from the link list - if (pContext->prev) { - (pContext->prev)->next = pContext->next; - } else { - pThread->pHead = pContext->next; - } - - if (pContext->next) { - (pContext->next)->prev = pContext->prev; - } - - pthread_mutex_unlock(&pThread->threadMutex); - - httpTrace("context:%p, ip:%s, thread:%s, numOfFds:%d, context is cleaned up", pContext, pContext->ipstr, - pThread->label, pThread->numOfFds); - - pContext->signature = 0; - pContext->fd = -1; - pContext->pThread = 0; - pContext->prev = 0; - pContext->next = 0; - pContext->state = HTTP_CONTEXT_STATE_READY; - - // avoid double free - httpFreeJsonBuf(pContext); - httpFreeMultiCmds(pContext); - httpFreeContext(pThread->pServer, pContext); -} - -bool httpInitContext(HttpContext *pContext) { - pContext->accessTimes++; - pContext->lastAccessTime = taosGetTimestampSec(); - pContext->httpVersion = HTTP_VERSION_10; - pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT; - pContext->httpChunked = HTTP_UNCUNKED; - pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY; - pContext->contentEncoding = HTTP_COMPRESS_IDENTITY; - pContext->reqType = HTTP_REQTYPE_OTHERS; - pContext->encodeMethod = NULL; - pContext->timer = NULL; - memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd)); - - HttpParser *pParser = &pContext->parser; - memset(pParser, 0, sizeof(HttpParser)); - pParser->pCur = pParser->pLast = pParser->buffer; - - httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d", - pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed); - return true; -} - - -void httpCloseContext(HttpThread *pThread, HttpContext *pContext) { - taosTmrReset((TAOS_TMR_CALLBACK)httpCleanUpContext, HTTP_DELAY_CLOSE_TIME_MS, pContext, pThread->pServer->timerHandle, &pContext->timer); - httpTrace("context:%p, fd:%d, ip:%s, state:%s will be closed after:%d ms, timer:%p", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), HTTP_DELAY_CLOSE_TIME_MS, pContext->timer); -} - -void httpCloseContextByApp(HttpContext *pContext) { - HttpThread *pThread = pContext->pThread; - pContext->parsed = false; - - bool keepAlive = true; - if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) { - keepAlive = false; - } else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) { - keepAlive = false; - } else {} - - if (keepAlive) { - if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) { - httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect", - pContext, pContext->fd, pContext->ipstr); - } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) { - httpRemoveContextFromEpoll(pThread, pContext); - httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect", - pContext, pContext->fd, pContext->ipstr); - httpCloseContext(pThread, pContext); - } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { - httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect", - pContext, pContext->fd, pContext->ipstr); - } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { - httpRemoveContextFromEpoll(pThread, pContext); - httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect", - pContext, pContext->fd, pContext->ipstr); - httpCloseContext(pThread, pContext); - } else { - httpRemoveContextFromEpoll(pThread, pContext); - httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); - httpCloseContext(pThread, pContext); - } - } else { - httpRemoveContextFromEpoll(pThread, pContext); - httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state); - httpCloseContext(pThread, pContext); - } -} - -void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) { - httpRemoveContextFromEpoll(pThread, pContext); - pContext->parsed = false; - - if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) { - httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr); - } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) { - httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr); - } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) { - httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr); - httpCloseContext(pThread, pContext); - } else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) { - httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr); - httpCloseContext(pThread, pContext); - } else { - httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state); - httpCloseContext(pThread, pContext); - } -} - -void httpCloseContextByServerForExpired(void *param, void *tmrId) { - HttpContext *pContext = (HttpContext *)param; - httpRemoveContextFromEpoll(pContext->pThread, pContext); - httpError("context:%p, fd:%d, ip:%s, read http body error, time expired, timer:%p", pContext, pContext->fd, pContext->ipstr, tmrId); - httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); - httpCloseContextByServer(pContext->pThread, pContext); -} - - static void httpStopThread(HttpThread* pThread) { pThread->stop = true; @@ -281,17 +52,10 @@ static void httpStopThread(HttpThread* pThread) { close(pThread->pollFd); pthread_mutex_destroy(&(pThread->threadMutex)); - - //while (pThread->pHead) { - // httpCleanUpContext(pThread->pHead, 0); - //} } - -void httpCleanUpConnect(HttpServer *pServer) { - if (pServer == NULL) return; - - shutdown(pServer->fd, SHUT_RD); +void httpCleanUpConnect() { + HttpServer *pServer = &tsHttpServer; pthread_join(pServer->thread, NULL); for (int i = 0; i < pServer->numOfThreads; ++i) { @@ -302,19 +66,10 @@ void httpCleanUpConnect(HttpServer *pServer) { } tfree(pServer->pThreads); + pServer->pThreads = NULL; httpTrace("http server:%s is cleaned up", pServer->label); } -// read all the data, then just discard it -void httpReadDirtyData(HttpContext *pContext) { - int fd = pContext->fd; - char data[1024] = {0}; - int len = (int)taosReadSocket(fd, data, 1024); - while (len >= sizeof(data)) { - len = (int)taosReadSocket(fd, data, 1024); - } -} - bool httpReadDataImp(HttpContext *pContext) { HttpParser *pParser = &pContext->parser; @@ -338,11 +93,10 @@ bool httpReadDataImp(HttpContext *pContext) { } if (pParser->bufsize >= (HTTP_BUFFER_SIZE - HTTP_STEP_SIZE)) { - httpReadDirtyData(pContext); httpError("context:%p, fd:%d, ip:%s, thread:%s, request big than:%d", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, HTTP_BUFFER_SIZE); - httpRemoveContextFromEpoll(pContext->pThread, pContext); httpSendErrorResp(pContext, HTTP_REQUSET_TOO_BIG); + httpNotifyContextClose(pContext); return false; } } @@ -352,7 +106,7 @@ bool httpReadDataImp(HttpContext *pContext) { return true; } -bool httpDecompressData(HttpContext *pContext) { +static bool httpDecompressData(HttpContext *pContext) { if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) { httpDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); return true; @@ -382,45 +136,43 @@ bool httpDecompressData(HttpContext *pContext) { return ret == 0; } -bool httpReadData(HttpThread *pThread, HttpContext *pContext) { +static bool httpReadData(HttpContext *pContext) { if (!pContext->parsed) { httpInitContext(pContext); } if (!httpReadDataImp(pContext)) { - httpCloseContextByServer(pThread, pContext); + httpNotifyContextClose(pContext); return false; } if (!httpParseRequest(pContext)) { - httpCloseContextByServer(pThread, pContext); + httpNotifyContextClose(pContext); return false; } int ret = httpCheckReadCompleted(pContext); if (ret == HTTP_CHECK_BODY_CONTINUE) { - taosTmrReset(httpCloseContextByServerForExpired, HTTP_EXPIRED_TIME, pContext, pThread->pServer->timerHandle, &pContext->timer); - //httpTrace("context:%p, fd:%d, ip:%s, not finished yet, try another times, timer:%p", pContext, pContext->fd, pContext->ipstr, pContext->timer); + //httpTrace("context:%p, fd:%d, ip:%s, not finished yet, wait another event", pContext, pContext->fd, pContext->ipstr); return false; } else if (ret == HTTP_CHECK_BODY_SUCCESS){ - httpCleanUpContextTimer(pContext); httpTrace("context:%p, fd:%d, ip:%s, thread:%s, read size:%d, dataLen:%d", pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->parser.bufsize, pContext->parser.data.len); if (httpDecompressData(pContext)) { return true; } else { - httpCloseContextByServer(pThread, pContext); + httpNotifyContextClose(pContext); return false; } } else { - httpCleanUpContextTimer(pContext); httpError("context:%p, fd:%d, ip:%s, failed to read http body, close connect", pContext, pContext->fd, pContext->ipstr); - httpCloseContextByServer(pThread, pContext); + httpNotifyContextClose(pContext); return false; } } -void httpProcessHttpData(void *param) { +static void httpProcessHttpData(void *param) { + HttpServer *pServer = &tsHttpServer; HttpThread *pThread = (HttpThread *)param; HttpContext *pContext; int fdNum; @@ -441,77 +193,72 @@ void httpProcessHttpData(void *param) { if (fdNum <= 0) continue; for (int i = 0; i < fdNum; ++i) { - pContext = events[i].data.ptr; - if (pContext->signature != pContext || pContext->pThread != pThread || pContext->fd <= 0) { + pContext = httpGetContext(events[i].data.ptr); + if (pContext == NULL) { + httpError("fd:%d, is already released, close connect", events[i].data.fd); + epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); + tclose(events[i].data.fd); continue; } if (events[i].events & EPOLLPRI) { httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLPRI events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); - httpRemoveContextFromEpoll(pThread, pContext); - httpCloseContextByServer(pThread, pContext); + httpCloseContextByServer(pContext); continue; } if (events[i].events & EPOLLRDHUP) { httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLRDHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); - httpRemoveContextFromEpoll(pThread, pContext); - httpCloseContextByServer(pThread, pContext); + httpCloseContextByServer(pContext); continue; } if (events[i].events & EPOLLERR) { httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLERR events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); - httpRemoveContextFromEpoll(pThread, pContext); - httpCloseContextByServer(pThread, pContext); + httpCloseContextByServer(pContext); continue; } if (events[i].events & EPOLLHUP) { httpTrace("context:%p, fd:%d, ip:%s, state:%s, EPOLLHUP events occured, accessed:%d, close connect", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); - httpRemoveContextFromEpoll(pThread, pContext); - httpCloseContextByServer(pThread, pContext); + httpCloseContextByServer(pContext); continue; } if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) { httpTrace("context:%p, fd:%d, ip:%s, state:%s, not in ready state, ignore read events", pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state)); + httpReleaseContext(pContext); continue; } - if (!pContext->pThread->pServer->online) { - httpTrace("context:%p, fd:%d, ip:%s, state:%s, server is not online, accessed:%d, close connect", - pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); - httpRemoveContextFromEpoll(pThread, pContext); - httpReadDirtyData(pContext); + if (pServer->status != HTTP_SERVER_RUNNING) { + httpTrace("context:%p, fd:%d, ip:%s, state:%s, server is not running, accessed:%d, close connect", pContext, + pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->accessTimes); httpSendErrorResp(pContext, HTTP_SERVER_OFFLINE); - httpCloseContextByServer(pThread, pContext); - continue; + httpNotifyContextClose(pContext); } else { - if (httpReadData(pThread, pContext)) { + if (httpReadData(pContext)) { (*(pThread->processData))(pContext); - atomic_fetch_add_32(&pThread->pServer->requestNum, 1); + atomic_fetch_add_32(&pServer->requestNum, 1); } } } } } -void* httpAcceptHttpConnection(void *arg) { +static void *httpAcceptHttpConnection(void *arg) { int connFd = -1; struct sockaddr_in clientAddr; int threadId = 0; - HttpThread * pThread; - HttpServer * pServer; - HttpContext * pContext; - int totalFds; - - pServer = (HttpServer *)arg; + HttpServer * pServer = &tsHttpServer; + HttpThread * pThread = NULL; + HttpContext * pContext = NULL; + int totalFds = 0; sigset_t set; sigemptyset(&set); @@ -521,12 +268,12 @@ void* httpAcceptHttpConnection(void *arg) { pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); if (pServer->fd < 0) { - httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp), - pServer->serverPort, strerror(errno)); + httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, + taosIpStr(pServer->serverIp), pServer->serverPort, strerror(errno)); return NULL; } else { - httpPrint("http service init success at %u", pServer->serverPort); - pServer->online = true; + httpPrint("http server init success at %u", pServer->serverPort); + pServer->status = HTTP_SERVER_RUNNING; } while (1) { @@ -534,10 +281,10 @@ void* httpAcceptHttpConnection(void *arg) { connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); if (connFd == -1) { if (errno == EINVAL) { - httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label); + httpTrace("http server:%s socket was shutdown, exiting...", pServer->label); break; } - httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno)); + httpError("http server:%s, accept connect failure, errno:%d reason:%s", pServer->label, errno, strerror(errno)); continue; } @@ -547,8 +294,8 @@ void* httpAcceptHttpConnection(void *arg) { } if (totalFds > tsHttpCacheSessions * 100) { - httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection", - connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions); + httpError("fd:%d, ip:%s:%u, totalFds:%d larger than httpCacheSessions:%d*100, refuse connection", connFd, + inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), totalFds, tsHttpCacheSessions); taosCloseSocket(connFd); continue; } @@ -559,7 +306,7 @@ void* httpAcceptHttpConnection(void *arg) { // pick up the thread to handle this connection pThread = pServer->pThreads + threadId; - pContext = httpCreateContext(pServer); + pContext = httpCreateContext(connFd); if (pContext == NULL) { httpError("fd:%d, ip:%s:%u, no enough resource to allocate http context", connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); @@ -567,39 +314,24 @@ void* httpAcceptHttpConnection(void *arg) { continue; } - httpTrace("context:%p, fd:%d, ip:%s:%u, thread:%s, numOfFds:%d, totalFds:%d, accept a new connection", - pContext, connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), pThread->label, - pThread->numOfFds, totalFds); - - pContext->fd = connFd; - sprintf(pContext->ipstr, "%s:%d", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); pContext->pThread = pThread; - + sprintf(pContext->ipstr, "%s:%u", inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); + struct epoll_event event; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; - event.data.ptr = pContext; if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { - httpError("context:%p, fd:%d, ip:%s:%u, thread:%s, failed to add http fd for epoll, error:%s", - pContext, connFd, inet_ntoa(clientAddr.sin_addr), htons(clientAddr.sin_port), pThread->label, - strerror(errno)); - httpFreeContext(pThread->pServer, pContext); - tclose(connFd); + httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, + pContext->ipstr, pThread->label, strerror(errno)); + tclose(pContext->fd); + httpReleaseContext(pContext); continue; } // notify the data process, add into the FdObj list - pthread_mutex_lock(&(pThread->threadMutex)); - - pContext->next = pThread->pHead; - - if (pThread->pHead) (pThread->pHead)->prev = pContext; - - pThread->pHead = pContext; - - pThread->numOfFds++; - - pthread_mutex_unlock(&(pThread->threadMutex)); + atomic_add_fetch_32(&pThread->numOfFds, 1); + httpTrace("context:%p, fd:%d, ip:%s, thread:%s numOfFds:%d totalFds:%d, accept a new connection", pContext, connFd, + pContext->ipstr, pThread->label, pThread->numOfFds, totalFds); // pick up next thread for next connection threadId++; @@ -610,21 +342,17 @@ void* httpAcceptHttpConnection(void *arg) { return NULL; } -bool httpInitConnect(HttpServer *pServer) { - int i; - HttpThread * pThread; - - pServer->pThreads = (HttpThread *)malloc(sizeof(HttpThread) * (size_t)pServer->numOfThreads); +bool httpInitConnect() { + HttpServer *pServer = &tsHttpServer; + pServer->pThreads = calloc(pServer->numOfThreads, sizeof(HttpThread)); if (pServer->pThreads == NULL) { httpError("init error no enough memory"); return false; } - memset(pServer->pThreads, 0, sizeof(HttpThread) * (size_t)pServer->numOfThreads); - pThread = pServer->pThreads; - for (i = 0; i < pServer->numOfThreads; ++i) { + HttpThread *pThread = pServer->pThreads; + for (int i = 0; i < pServer->numOfThreads; ++i) { sprintf(pThread->label, "%s%d", pServer->label, i); - pThread->pServer = pServer; pThread->processData = pServer->processData; pThread->threadId = i; @@ -643,8 +371,8 @@ bool httpInitConnect(HttpServer *pServer) { pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pThread->thread), &thattr, (void *)httpProcessHttpData, (void *)(pThread)) != 0) { - httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", - pThread->label, strerror(errno)); + httpError("http thread:%s, failed to create HTTP process data thread, reason:%s", pThread->label, + strerror(errno)); return false; } pthread_attr_destroy(&thattr); diff --git a/src/plugins/http/src/httpSession.c b/src/plugins/http/src/httpSession.c index e80d6f26b72844246f4cd828650d63c70ccf9df3..2b0735bfaf58c834d912942847e429bd828a4a33 100644 --- a/src/plugins/http/src/httpSession.c +++ b/src/plugins/http/src/httpSession.c @@ -15,44 +15,28 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "hash.h" #include "taos.h" #include "ttime.h" -#include "ttimer.h" -#include "http.h" -#include "httpLog.h" -#include "httpCode.h" -#include "httpHandle.h" -#include "httpResp.h" - -void httpAccessSession(HttpContext *pContext) { - HttpServer *server = pContext->pThread->pServer; - pthread_mutex_lock(&server->serverMutex); - if (pContext->session == pContext->session->signature) { - pContext->session->expire = (int) taosGetTimestampSec() + pContext->pThread->pServer->sessionExpire; - } - pthread_mutex_unlock(&server->serverMutex); -} +#include "tglobal.h" +#include "tcache.h" +#include "httpInt.h" +#include "httpContext.h" +#include "httpSession.h" void httpCreateSession(HttpContext *pContext, void *taos) { - HttpServer *server = pContext->pThread->pServer; - pthread_mutex_lock(&server->serverMutex); + HttpServer *server = &tsHttpServer; + httpReleaseSession(pContext); - if (pContext->session != NULL && pContext->session == pContext->session->signature) { - httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd, - pContext->ipstr, pContext->user, pContext->session, pContext->session->taos); - pContext->session->expire = 0; - pContext->session->access--; - } + pthread_mutex_lock(&server->serverMutex); - HttpSession session; + HttpSession session = {0}; session.taos = taos; - session.expire = (int)taosGetTimestampSec() + server->sessionExpire; - session.access = 1; - int sessionIdLen = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + session.refCount = 1; + snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - taosHashPut(server->pSessionHash, session.id, sessionIdLen, (char *)(&session), sizeof(HttpSession)); - pContext->session = taosHashGet(server->pSessionHash, session.id, sessionIdLen); + pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire); + // void *temp = pContext->session; + // taosCacheRelease(server->sessionCache, (void **)&temp, false); if (pContext->session == NULL) { httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, @@ -62,26 +46,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) { return; } - pContext->session->signature = pContext->session; - httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr, - pContext->user, pContext->session, pContext->session->taos); + httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd, + pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); pthread_mutex_unlock(&server->serverMutex); } -void httpFetchSessionImp(HttpContext *pContext) { - HttpServer *server = pContext->pThread->pServer; +static void httpFetchSessionImp(HttpContext *pContext) { + HttpServer *server = &tsHttpServer; pthread_mutex_lock(&server->serverMutex); char sessionId[HTTP_SESSION_ID_LEN]; - int sessonIdLen = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - - pContext->session = taosHashGet(server->pSessionHash, sessionId, sessonIdLen); - if (pContext->session != NULL && pContext->session == pContext->session->signature) { - pContext->session->access++; - httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d", - pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, - pContext->session->taos, pContext->session->access, pContext->session->expire); - pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire; + snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); + + pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId); + if (pContext->session != NULL) { + atomic_add_fetch_32(&pContext->session->refCount, 1); + httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd, + pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount); } else { httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr, pContext->user); @@ -90,113 +71,54 @@ void httpFetchSessionImp(HttpContext *pContext) { pthread_mutex_unlock(&server->serverMutex); } -void httpFetchSession(HttpContext *pContext) { +void httpGetSession(HttpContext *pContext) { if (pContext->session == NULL) { httpFetchSessionImp(pContext); } else { char sessionId[HTTP_SESSION_ID_LEN]; snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass); - if (strcmp(pContext->session->id, sessionId) != 0) { - httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user); - httpRestoreSession(pContext); - httpFetchSessionImp(pContext); - } + httpReleaseSession(pContext); + httpFetchSessionImp(pContext); } } -void httpRestoreSession(HttpContext *pContext) { - HttpServer * server = pContext->pThread->pServer; +void httpReleaseSession(HttpContext *pContext) { + if (pContext == NULL || pContext->session == NULL) return; - // all access to the session is via serverMutex - pthread_mutex_lock(&server->serverMutex); - HttpSession *session = pContext->session; - if (session == NULL || session != session->signature) { - pthread_mutex_unlock(&server->serverMutex); - return; - } - session->access--; - httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d", - pContext, pContext->ipstr, pContext->user, session, session->taos, - session->access, pContext->session->expire); - pContext->session = NULL; - pthread_mutex_unlock(&server->serverMutex); -} + int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1); + assert(refCount >= 0); + httpTrace("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos, + pContext->session->refCount); -void httpResetSession(HttpSession *pSession) { - httpTrace("close session:%p:%p", pSession, pSession->taos); - if (pSession->taos != NULL) { - taos_close(pSession->taos); - pSession->taos = NULL; - } - pSession->signature = NULL; + taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false); + pContext->session = NULL; } -void httpRemoveAllSessions(HttpServer *pServer) { - SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash); +static void httpDestroySession(void *data) { + HttpSession *session = data; + httpTrace("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount); - while (taosHashIterNext(pIter)) { - HttpSession *pSession = taosHashIterGet(pIter); - if (pSession == NULL) continue; - httpResetSession(pSession); + if (session->taos != NULL) { + taos_close(session->taos); + session->taos = NULL; } - - taosHashDestroyIter(pIter); } -bool httpInitAllSessions(HttpServer *pServer) { - if (pServer->pSessionHash == NULL) { - pServer->pSessionHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); - } - if (pServer->pSessionHash == NULL) { - httpError("http init session pool failed"); - return false; - } - if (pServer->expireTimer == NULL) { - taosTmrReset(httpProcessSessionExpire, 50000, pServer, pServer->timerHandle, &pServer->expireTimer); +void httpCleanUpSessions() { + if (tsHttpServer.sessionCache != NULL) { + SCacheObj *cache = tsHttpServer.sessionCache; + httpPrint("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable)); + taosCacheCleanup(tsHttpServer.sessionCache); + tsHttpServer.sessionCache = NULL; } - - return true; } -bool httpSessionExpired(HttpSession *pSession) { - time_t cur = taosGetTimestampSec(); - - if (pSession->taos != NULL) { - if (pSession->expire > cur) { - return false; // un-expired, so return false - } - if (pSession->access > 0) { - httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos, - pSession->access); - return false; // still used, so return false - } - httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d", - pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire); +bool httpInitSessions() { + tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession); + if (tsHttpServer.sessionCache == NULL) { + httpError("failed to init session cache"); + return false; } return true; } - -void httpRemoveExpireSessions(HttpServer *pServer) { - SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash); - - while (taosHashIterNext(pIter)) { - HttpSession *pSession = taosHashIterGet(pIter); - if (pSession == NULL) continue; - - pthread_mutex_lock(&pServer->serverMutex); - if (httpSessionExpired(pSession)) { - httpResetSession(pSession); - taosHashRemove(pServer->pSessionHash, pSession->id, strlen(pSession->id)); - } - pthread_mutex_unlock(&pServer->serverMutex); - } - - taosHashDestroyIter(pIter); -} - -void httpProcessSessionExpire(void *handle, void *tmrId) { - HttpServer *pServer = (HttpServer *)handle; - httpRemoveExpireSessions(pServer); - taosTmrReset(httpProcessSessionExpire, 60000, pServer, pServer->timerHandle, &pServer->expireTimer); -} \ No newline at end of file diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index efd1aac76797a21ddaa632f2a6344678b0f76c7f..ce2f7a83bdd4a5a132a31b3a5de1f2ba372b6329 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -18,11 +18,12 @@ #include "tnote.h" #include "taos.h" #include "tsclient.h" -#include "http.h" -#include "httpLog.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpContext.h" +#include "httpSql.h" #include "httpResp.h" +#include "httpAuth.h" +#include "httpSession.h" void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos); @@ -30,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext); void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { HttpContext *pContext = (HttpContext *)param; - if (pContext == NULL || pContext->signature != pContext) return; + if (pContext == NULL) return; HttpSqlCmds * multiCmds = pContext->multiCmds; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -72,7 +73,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { HttpContext *pContext = (HttpContext *)param; - if (pContext == NULL || pContext->signature != pContext) return; + if (pContext == NULL) return; HttpSqlCmds * multiCmds = pContext->multiCmds; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -172,7 +173,7 @@ void httpProcessMultiSql(HttpContext *pContext) { } void httpProcessMultiSqlCmd(HttpContext *pContext) { - if (pContext == NULL || pContext->signature != pContext) return; + if (pContext == NULL) return; HttpSqlCmds *multiCmds = pContext->multiCmds; if (multiCmds == NULL || multiCmds->size <= 0 || multiCmds->pos >= multiCmds->size || multiCmds->pos < 0) { @@ -192,7 +193,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) { void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) { HttpContext *pContext = (HttpContext *)param; - if (pContext == NULL || pContext->signature != pContext) return; + if (pContext == NULL) return; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -230,7 +231,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) { HttpContext *pContext = (HttpContext *)param; - if (pContext == NULL || pContext->signature != pContext) return; + if (pContext == NULL) return; HttpEncodeMethod *encode = pContext->encodeMethod; @@ -354,7 +355,7 @@ void httpExecCmd(HttpContext *pContext) { void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { HttpContext *pContext = param; - if (pContext == NULL || pContext->signature != pContext) return; + if (pContext == NULL) return; if (code < 0) { httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr, @@ -383,16 +384,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { } void httpProcessRequest(HttpContext *pContext) { - httpFetchSession(pContext); + httpGetSession(pContext); - if (pContext->session == NULL || pContext->session != pContext->session->signature || - pContext->reqType == HTTP_REQTYPE_LOGIN) { + if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) { taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext, &(pContext->taos)); httpTrace("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->taos); } else { - httpAccessSession(pContext); httpExecCmd(pContext); } } diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index e7a5344be5ad73b18deac0b4769f9bd5b3245072..015b0783b739a36eff88a398ce449eaff5771940 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -20,84 +20,64 @@ #include "tsocket.h" #include "ttimer.h" #include "tadmin.h" -#include "http.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" +#include "httpContext.h" +#include "httpSession.h" +#include "httpServer.h" #include "httpResp.h" -#include "httpLog.h" -#include "gcHandle.h" #include "httpHandle.h" +#include "gcHandle.h" #include "restHandle.h" #include "tgHandle.h" #ifndef _ADMIN - void adminInitHandle(HttpServer* pServer) {} void opInitHandle(HttpServer* pServer) {} - #endif -static HttpServer *httpServer = NULL; +HttpServer tsHttpServer; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); int httpInitSystem() { - // taos_init(); - - httpServer = (HttpServer *)malloc(sizeof(HttpServer)); - memset(httpServer, 0, sizeof(HttpServer)); - - strcpy(httpServer->label, "rest"); - httpServer->serverIp = 0; - httpServer->serverPort = tsHttpPort; - httpServer->cacheContext = tsHttpCacheSessions; - httpServer->sessionExpire = tsHttpSessionExpire; - httpServer->numOfThreads = tsHttpMaxThreads; - httpServer->processData = httpProcessData; + strcpy(tsHttpServer.label, "rest"); + tsHttpServer.serverIp = 0; + tsHttpServer.serverPort = tsHttpPort; + tsHttpServer.numOfThreads = tsHttpMaxThreads; + tsHttpServer.processData = httpProcessData; - pthread_mutex_init(&httpServer->serverMutex, NULL); + pthread_mutex_init(&tsHttpServer.serverMutex, NULL); if (tsHttpEnableRecordSql != 0) { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note"); } - restInitHandle(httpServer); - adminInitHandle(httpServer); - gcInitHandle(httpServer); - tgInitHandle(httpServer); - opInitHandle(httpServer); + restInitHandle(&tsHttpServer); + adminInitHandle(&tsHttpServer); + gcInitHandle(&tsHttpServer); + tgInitHandle(&tsHttpServer); + opInitHandle(&tsHttpServer); return 0; } int httpStartSystem() { - httpPrint("starting to initialize http service ..."); + httpPrint("start http server ..."); - if (httpServer == NULL) { - httpError("http server is null"); - httpInitSystem(); - } - - if (httpServer->pContextPool == NULL) { - httpServer->pContextPool = taosMemPoolInit(httpServer->cacheContext, sizeof(HttpContext)); - } - if (httpServer->pContextPool == NULL) { - httpError("http init context pool failed"); + if (tsHttpServer.status != HTTP_SERVER_INIT) { + httpError("http server is already started"); return -1; } - if (httpServer->timerHandle == NULL) { - httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 100 + 100, 200, 60000, "http"); - } - if (httpServer->timerHandle == NULL) { - httpError("http init timer failed"); + if (!httpInitContexts()) { + httpError("http init contexts failed"); return -1; } - if (!httpInitAllSessions(httpServer)) { + if (!httpInitSessions()) { httpError("http init session failed"); return -1; } - if (!httpInitConnect(httpServer)) { + if (!httpInitConnect()) { httpError("http init server failed"); return -1; } @@ -106,53 +86,23 @@ int httpStartSystem() { } void httpStopSystem() { - if (httpServer != NULL) { - httpServer->online = false; - } + tsHttpServer.status = HTTP_SERVER_CLOSING; + shutdown(tsHttpServer.fd, SHUT_RD); tgCleanupHandle(); } void httpCleanUpSystem() { - httpPrint("http service cleanup"); + httpPrint("http server cleanup"); httpStopSystem(); -//#if 0 - if (httpServer == NULL) { - return; - } - - if (httpServer->expireTimer != NULL) { - taosTmrStopA(&(httpServer->expireTimer)); - } - - if (httpServer->timerHandle != NULL) { - taosTmrCleanUp(httpServer->timerHandle); - httpServer->timerHandle = NULL; - } - - if (httpServer->pThreads != NULL) { - httpCleanUpConnect(httpServer); - httpServer->pThreads = NULL; - } - - -#if 0 - httpRemoveAllSessions(httpServer); + httpCleanupContexts(); + httpCleanUpSessions(); + httpCleanUpConnect(); + pthread_mutex_destroy(&tsHttpServer.serverMutex); - if (httpServer->pContextPool != NULL) { - taosMemPoolCleanUp(httpServer->pContextPool); - httpServer->pContextPool = NULL; - } - - pthread_mutex_destroy(&httpServer->serverMutex); - - tfree(httpServer); -#endif + tsHttpServer.status = HTTP_SERVER_CLOSED; } int32_t httpGetReqCount() { - if (httpServer != NULL) { - return atomic_exchange_32(&httpServer->requestNum, 0); - } - return 0; + return atomic_exchange_32(&tsHttpServer.requestNum, 0); } diff --git a/src/plugins/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c index 1fb63ea2fc4db58e25a78eee51ffd15f6a71951a..694cdec0a0c84f52d61aff03311f7b355edd664a 100644 --- a/src/plugins/http/src/httpUtil.c +++ b/src/plugins/http/src/httpUtil.c @@ -17,11 +17,10 @@ #include "os.h" #include "tmd5.h" #include "taos.h" -#include "http.h" -#include "httpLog.h" -#include "httpCode.h" -#include "httpHandle.h" +#include "httpInt.h" #include "httpResp.h" +#include "httpSql.h" +#include "httpUtil.h" bool httpCheckUsedbSql(char *sql) { if (strstr(sql, "use ") != NULL) { diff --git a/src/plugins/http/src/tgHandle.c b/src/plugins/http/src/tgHandle.c index b85f27d175ca541153b50e5224eb766cd6bbe9c4..fae11127e1634038a4c5d18a80cf04d07bb82f49 100644 --- a/src/plugins/http/src/tgHandle.c +++ b/src/plugins/http/src/tgHandle.c @@ -18,9 +18,10 @@ #include "tglobal.h" #include "taosdef.h" #include "taosmsg.h" +#include "httpInt.h" #include "tgHandle.h" #include "tgJson.h" -#include "httpLog.h" +#include "cJSON.h" /* * taos.telegraf.cfg formats like diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ac06cf4f3f8b4c7355904ec9da1495bb0d9beeca..2b6083a91ccef8ef2a08902117e8f67340ba6a37 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { #endif } +#if 0 static FORCE_INLINE void taosFreeNode(void *data) { SCacheDataNode *pNode = *(SCacheDataNode **)data; free(pNode); } +#endif /** * @param key key of object for hash, usually a null-terminated string @@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data)) } // set free cache node callback function for hash table - taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); + // taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode); pCacheObj->freeFp = freeCb; pCacheObj->refreshTime = refreshTime * 1000; @@ -565,7 +567,17 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { void doCleanupDataCache(SCacheObj *pCacheObj) { __cache_wr_lock(pCacheObj); - taosHashCleanup(pCacheObj->pHashTable); + + SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + while (taosHashIterNext(pIter)) { + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + // if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); + //} + } + taosHashDestroyIter(pIter); + + taosHashCleanup(pCacheObj->pHashTable); __cache_unlock(pCacheObj); taosTrashCanEmpty(pCacheObj, true);