提交 492f1f38 编写于 作者: S Shengliang Guan

[TD-637] add refcount to restful

上级 2eb8cc45
......@@ -16,15 +16,11 @@
#ifndef TDENGINE_GC_HANDLE_H
#define TDENGINE_GC_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define GC_ROOT_URL_POS 0
#define GC_ACTION_URL_POS 1
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_TOKEN_H
#define TDENGINE_HTTP_TOKEN_H
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_CONTEXT_H
#define TDENGINE_HTTP_CONTEXT_H
#include "httpInt.h"
bool httpInitContexts();
void httpCleanupContexts();
const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(int32_t fd);
bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(int32_t fd);
void httpReleaseContext(HttpContext *pContext);
void httpCloseContextByServer(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
#endif
......@@ -13,304 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SERVER_H
#define TDENGINE_HTTP_SERVER_H
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_VERSION_10 0
#define HTTP_VERSION_11 1
//#define HTTP_VERSION_12 2
#define HTTP_UNCUNKED 0
#define HTTP_CHUNKED 1
#define HTTP_KEEPALIVE_NO_INPUT 0
#define HTTP_KEEPALIVE_ENABLE 1
#define HTTP_KEEPALIVE_DISABLE 2
#define HTTP_REQTYPE_OTHERS 0
#define HTTP_REQTYPE_LOGIN 1
#define HTTP_REQTYPE_HEARTBEAT 2
#define HTTP_REQTYPE_SINGLE_SQL 3
#define HTTP_REQTYPE_MULTI_SQL 4
#define HTTP_CHECK_BODY_ERROR -1
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
struct HttpContext;
struct HttpThread;
typedef struct {
void *signature;
int expire;
int access;
void *taos;
char id[HTTP_SESSION_ID_LEN];
} HttpSession;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef struct {
char *pos;
int32_t len;
} HttpBuf;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
char *pLast;
char *pCur;
HttpBuf method;
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
} HttpParser;
typedef struct HttpContext {
void * signature;
int fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
uint8_t httpVersion;
uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t fromMemPool;
uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t reqType;
uint8_t parsed;
int32_t state;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void *taos;
HttpSession *session;
z_stream gzipStream;
HttpEncodeMethod *encodeMethod;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf *jsonBuf;
HttpParser parser;
void *timer;
struct HttpThread *pThread;
struct HttpContext *prev;
struct HttpContext *next;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
int pollFd;
int numOfFds;
int threadId;
char label[HTTP_LABEL_SIZE];
bool (*processData)(HttpContext *pContext);
struct HttpServer *pServer; // handle passed by upper layer during pServer initialization
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
bool online;
int fd;
int cacheContext;
int sessionExpire;
int numOfThreads;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
int methodScannerLen;
pthread_mutex_t serverMutex;
void *pSessionHash;
void *pContextPool;
void *expireTimer;
HttpThread *pThreads;
pthread_t thread;
bool (*processData)(HttpContext *pContext);
int requestNum;
void *timerHandle;
} HttpServer;
// http util method
bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen);
// http init method
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
// http server connection
void httpCleanUpConnect(HttpServer *pServer);
bool httpInitConnect(HttpServer *pServer);
// http context for each client connection
HttpContext *httpCreateContext(HttpServer *pServer);
bool httpInitContext(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext);
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpAccessSession(HttpContext *pContext);
void httpFetchSession(HttpContext *pContext);
void httpRestoreSession(HttpContext *pContext);
void httpRemoveExpireSessions(HttpServer *pServer);
bool httpInitAllSessions(HttpServer *pServer);
void httpRemoveAllSessions(HttpServer *pServer);
void httpProcessSessionExpire(void *handle, void *tmrId);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
// http token method
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
// util
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
bool httpProcessData(HttpContext *pContext);
bool httpReadDataImp(HttpContext *pContext);
bool httpParseRequest(HttpContext* pContext);
int httpCheckReadCompleted(HttpContext* pContext);
void httpReadDirtyData(HttpContext *pContext);
#ifndef TDENGINE_HTTP_HANDLE_H
#define TDENGINE_HTTP_HANDLE_H
// http request handler
void httpProcessRequest(HttpContext *pContext);
// http json printer
JsonBuf *httpMallocJsonBuf(HttpContext *pContext);
void httpFreeJsonBuf(HttpContext *pContext);
// http multicmds util
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
const char* httpContextStateStr(HttpContextState state);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext);
bool httpProcessData(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_INT_H
#define TDENGINE_HTTP_INT_H
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpCode.h"
#include "httpLog.h"
#include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_VERSION_10 0
#define HTTP_VERSION_11 1
//#define HTTP_VERSION_12 2
#define HTTP_UNCUNKED 0
#define HTTP_CHUNKED 1
#define HTTP_KEEPALIVE_NO_INPUT 0
#define HTTP_KEEPALIVE_ENABLE 1
#define HTTP_KEEPALIVE_DISABLE 2
#define HTTP_REQTYPE_OTHERS 0
#define HTTP_REQTYPE_LOGIN 1
#define HTTP_REQTYPE_HEARTBEAT 2
#define HTTP_REQTYPE_SINGLE_SQL 3
#define HTTP_REQTYPE_MULTI_SQL 4
#define HTTP_CHECK_BODY_ERROR -1
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
typedef enum {
HTTP_SERVER_INIT,
HTTP_SERVER_RUNNING,
HTTP_SERVER_CLOSING,
HTTP_SERVER_CLOSED
} HttpServerStatus;
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
struct HttpContext;
struct HttpThread;
typedef struct {
char id[HTTP_SESSION_ID_LEN];
int refCount;
void *taos;
} HttpSession;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef struct {
char *pos;
int32_t len;
} HttpBuf;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
char *pLast;
char *pCur;
HttpBuf method;
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
} HttpParser;
typedef struct HttpContext {
int32_t refCount;
int fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
int32_t state;
uint8_t httpVersion;
uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t reqType;
uint8_t parsed;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void * taos;
HttpSession *session;
z_stream gzipStream;
HttpParser parser;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf * jsonBuf;
void * timer;
HttpEncodeMethod * encodeMethod;
struct HttpThread *pThread;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
int pollFd;
int numOfFds;
int threadId;
char label[HTTP_LABEL_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
int fd;
int numOfThreads;
int methodScannerLen;
int32_t requestNum;
int32_t status;
pthread_t thread;
HttpThread * pThreads;
void * contextCache;
void * sessionCache;
pthread_mutex_t serverMutex;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpServer;
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
extern HttpServer tsHttpServer;
#endif
......@@ -97,4 +97,8 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len);
// quick
void httpJsonPairStatus(JsonBuf* buf, int code);
// http json printer
JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext);
void httpFreeJsonBuf(struct HttpContext* pContext);
#endif
......@@ -16,7 +16,7 @@
#ifndef TDENGINE_HTTP_RESP_H
#define TDENGINE_HTTP_RESP_H
#include "httpHandle.h"
#include "httpInt.h"
enum _httpRespTempl {
HTTP_RESPONSE_JSON_OK,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SERVER_H
#define TDENGINE_HTTP_SERVER_H
#include "httpInt.h"
bool httpInitConnect();
void httpCleanUpConnect();
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
bool httpReadDataImp(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SESSION_H
#define TDENGINE_HTTP_SESSION_H
bool httpInitSessions();
void httpCleanUpSessions();
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpGetSession(HttpContext *pContext);
void httpReleaseSession(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SQL_H
#define TDENGINE_HTTP_SQL_H
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_UTIL_H
#define TDENGINE_HTTP_UTIL_H
bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen);
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
bool httpParseRequest(HttpContext *pContext);
int httpCheckReadCompleted(HttpContext *pContext);
void httpReadDirtyData(HttpContext *pContext);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
#endif
......@@ -16,15 +16,11 @@
#ifndef TDENGINE_REST_HANDLE_H
#define TDENGINE_REST_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define REST_ROOT_URL_POS 0
#define REST_ACTION_URL_POS 1
......
......@@ -16,16 +16,11 @@
#ifndef TDENGINE_TG_HANDLE_H
#define TDENGINE_TG_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "cJSON.h"
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define TG_ROOT_URL_POS 0
#define TG_DB_URL_POS 1
......
......@@ -18,8 +18,8 @@
#include "tkey.h"
#include "tutil.h"
#include "http.h"
#include "httpLog.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpAuth.h"
#define KEY_DES_4 4971256377704625728L
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tsocket.h"
#include "tutil.h"
#include "ttime.h"
#include "ttimer.h"
#include "tglobal.h"
#include "tcache.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSql.h"
#include "httpSession.h"
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
taosCloseSocket(pContext->fd);
pContext->fd = -1;
}
}
static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
if (pContext->fd > 0) tclose(pContext->fd);
HttpThread *pThread = pContext->pThread;
httpRemoveContextFromEpoll(pContext);
httpReleaseSession(pContext);
atomic_sub_fetch_32(&pThread->numOfFds, 1);
pContext->pThread = 0;
pContext->state = HTTP_CONTEXT_STATE_CLOSED;
// avoid double free
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
tfree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(5, httpDestroyContext);
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
}
return true;
}
void httpCleanupContexts() {
// TODO: wait until all context is closed
if (tsHttpServer.contextCache != NULL) {
httpPrint("context cache is cleanup");
taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL;
}
}
const char *httpContextStateStr(HttpContextState state) {
switch (state) {
case HTTP_CONTEXT_STATE_READY:
return "ready";
case HTTP_CONTEXT_STATE_HANDLING:
return "handling";
case HTTP_CONTEXT_STATE_DROPPING:
return "dropping";
case HTTP_CONTEXT_STATE_CLOSED:
return "closed";
default:
return "unknown";
}
}
void httpNotifyContextClose(HttpContext *pContext) {
shutdown(pContext->fd, SHUT_WR);
}
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
}
HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
char fdStr[12] = {0};
snprintf(fdStr, sizeof(fdStr), "%d", fd);
//atomic_add_fetch_32(&pContext->refCount, 1);
pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
taosCachePut(tsHttpServer.contextCache, fdStr, &pContext, sizeof(HttpContext *), 5);
httpTrace("context:%p, fd:%d is created", pContext, fd);
return pContext;
}
HttpContext *httpGetContext(int32_t fd) {
char fdStr[12] = {0};
snprintf(fdStr, sizeof(fdStr), "%d", fd);
HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, fdStr);
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpTrace("context:%p, fd:%d is accquired, refCount:%d", pContext, pContext->fd, refCount);
return pContext;
}
}
return NULL;
}
void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpTrace("context:%p, fd:%d is releasd, refCount:%d", pContext, pContext->fd, refCount);
taosCacheRelease(tsHttpServer.contextCache, (void **)(&pContext), false);
}
bool httpInitContext(HttpContext *pContext) {
pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->httpVersion = HTTP_VERSION_10;
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
pContext->httpChunked = HTTP_UNCUNKED;
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
pContext->reqType = HTTP_REQTYPE_OTHERS;
pContext->encodeMethod = NULL;
pContext->timer = NULL;
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
HttpParser *pParser = &pContext->parser;
memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer;
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
return true;
}
void httpCloseContextByApp(HttpContext *pContext) {
pContext->parsed = false;
bool keepAlive = true;
if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false;
} else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false;
} else {}
if (keepAlive) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
} else {
httpRemoveContextFromEpoll(pContext);
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
} else {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
httpReleaseContext(pContext);
}
void httpCloseContextByServer(HttpContext *pContext) {
httpRemoveContextFromEpoll(pContext);
pContext->parsed = false;
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr);
} else {
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
}
httpReleaseContext(pContext);
}
......@@ -19,11 +19,12 @@
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpAuth.h"
#include "httpServer.h"
#include "httpContext.h"
#include "httpHandle.h"
void httpToLowerUrl(char* url) {
/*ignore case */
......@@ -159,7 +160,7 @@ bool httpGetHttpMethod(HttpContext* pContext) {
bool httpGetDecodeMethod(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser;
HttpServer* pServer = pContext->pThread->pServer;
HttpServer* pServer = &tsHttpServer;
int methodLen = pServer->methodScannerLen;
for (int i = 0; i < methodLen; i++) {
HttpDecodeMethod* method = pServer->methodScanner[i];
......
......@@ -22,6 +22,7 @@
#include "httpCode.h"
#include "httpJson.h"
#include "httpResp.h"
#include "httpUtil.h"
#define MAX_NUM_STR_SZ 25
......
......@@ -21,6 +21,7 @@
#include "httpResp.h"
#include "httpCode.h"
#include "httpJson.h"
#include "httpContext.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
......
此差异已折叠。
......@@ -15,44 +15,26 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "hash.h"
#include "taos.h"
#include "ttime.h"
#include "ttimer.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpResp.h"
void httpAccessSession(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex);
if (pContext->session == pContext->session->signature) {
pContext->session->expire = (int) taosGetTimestampSec() + pContext->pThread->pServer->sessionExpire;
}
pthread_mutex_unlock(&server->serverMutex);
}
#include "tglobal.h"
#include "tcache.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSession.h"
void httpCreateSession(HttpContext *pContext, void *taos) {
HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex);
HttpServer *server = &tsHttpServer;
httpReleaseSession(pContext);
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos);
pContext->session->expire = 0;
pContext->session->access--;
}
pthread_mutex_lock(&server->serverMutex);
HttpSession session;
HttpSession session = {0};
session.taos = taos;
session.expire = (int)taosGetTimestampSec() + server->sessionExpire;
session.access = 1;
int sessionIdLen = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
session.refCount = 1;
snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
taosHashPut(server->pSessionHash, session.id, sessionIdLen, (char *)(&session), sizeof(HttpSession));
pContext->session = taosHashGet(server->pSessionHash, session.id, sessionIdLen);
pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire);
if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
......@@ -62,26 +44,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
return;
}
pContext->session->signature = pContext->session;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr,
pContext->user, pContext->session, pContext->session->taos);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p refCount:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pthread_mutex_unlock(&server->serverMutex);
}
void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer;
static void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = &tsHttpServer;
pthread_mutex_lock(&server->serverMutex);
char sessionId[HTTP_SESSION_ID_LEN];
int sessonIdLen = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosHashGet(server->pSessionHash, sessionId, sessonIdLen);
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
pContext->session->access++;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d",
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session,
pContext->session->taos, pContext->session->access, pContext->session->expire);
pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire;
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->refCount, 1);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, refCount:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
} else {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr,
pContext->user);
......@@ -90,113 +69,53 @@ void httpFetchSessionImp(HttpContext *pContext) {
pthread_mutex_unlock(&server->serverMutex);
}
void httpFetchSession(HttpContext *pContext) {
void httpGetSession(HttpContext *pContext) {
if (pContext->session == NULL) {
httpFetchSessionImp(pContext);
} else {
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
if (strcmp(pContext->session->id, sessionId) != 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user);
httpRestoreSession(pContext);
httpFetchSessionImp(pContext);
}
httpReleaseSession(pContext);
httpFetchSessionImp(pContext);
}
}
void httpRestoreSession(HttpContext *pContext) {
HttpServer * server = pContext->pThread->pServer;
void httpReleaseSession(HttpContext *pContext) {
if (pContext == NULL || pContext->session == NULL) return;
// all access to the session is via serverMutex
pthread_mutex_lock(&server->serverMutex);
HttpSession *session = pContext->session;
if (session == NULL || session != session->signature) {
pthread_mutex_unlock(&server->serverMutex);
return;
}
session->access--;
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d",
pContext, pContext->ipstr, pContext->user, session, session->taos,
session->access, pContext->session->expire);
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpTrace("context:%p, session:%p is releasd refCount:%d", pContext, pContext->session, pContext->session->refCount);
taosCacheRelease(tsHttpServer.sessionCache, (void**)(&(pContext->session)), false);
pContext->session = NULL;
pthread_mutex_unlock(&server->serverMutex);
}
void httpResetSession(HttpSession *pSession) {
httpTrace("close session:%p:%p", pSession, pSession->taos);
static void httpDestroySession(void *data) {
HttpSession *pSession = data;
httpTrace("session:%p:%p, is destroyed, refCount:%d", pSession, pSession->taos, pSession->refCount);
if (pSession->taos != NULL) {
taos_close(pSession->taos);
pSession->taos = NULL;
}
pSession->signature = NULL;
tfree(pSession);
}
void httpRemoveAllSessions(HttpServer *pServer) {
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash);
while (taosHashIterNext(pIter)) {
HttpSession *pSession = taosHashIterGet(pIter);
if (pSession == NULL) continue;
httpResetSession(pSession);
void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) {
httpPrint("session cache is cleanup");
taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL;
}
taosHashDestroyIter(pIter);
}
bool httpInitAllSessions(HttpServer *pServer) {
if (pServer->pSessionHash == NULL) {
pServer->pSessionHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
}
if (pServer->pSessionHash == NULL) {
httpError("http init session pool failed");
bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession);
if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache");
return false;
}
if (pServer->expireTimer == NULL) {
taosTmrReset(httpProcessSessionExpire, 50000, pServer, pServer->timerHandle, &pServer->expireTimer);
}
return true;
}
bool httpSessionExpired(HttpSession *pSession) {
time_t cur = taosGetTimestampSec();
if (pSession->taos != NULL) {
if (pSession->expire > cur) {
return false; // un-expired, so return false
}
if (pSession->access > 0) {
httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos,
pSession->access);
return false; // still used, so return false
}
httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d",
pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire);
}
return true;
}
void httpRemoveExpireSessions(HttpServer *pServer) {
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash);
while (taosHashIterNext(pIter)) {
HttpSession *pSession = taosHashIterGet(pIter);
if (pSession == NULL) continue;
pthread_mutex_lock(&pServer->serverMutex);
if (httpSessionExpired(pSession)) {
httpResetSession(pSession);
taosHashRemove(pServer->pSessionHash, pSession->id, strlen(pSession->id));
}
pthread_mutex_unlock(&pServer->serverMutex);
}
taosHashDestroyIter(pIter);
}
void httpProcessSessionExpire(void *handle, void *tmrId) {
HttpServer *pServer = (HttpServer *)handle;
httpRemoveExpireSessions(pServer);
taosTmrReset(httpProcessSessionExpire, 60000, pServer, pServer->timerHandle, &pServer->expireTimer);
}
\ No newline at end of file
......@@ -18,11 +18,12 @@
#include "tnote.h"
#include "taos.h"
#include "tsclient.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSql.h"
#include "httpResp.h"
#include "httpAuth.h"
#include "httpSession.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
......@@ -30,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -72,7 +73,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -172,7 +173,7 @@ void httpProcessMultiSql(HttpContext *pContext) {
}
void httpProcessMultiSqlCmd(HttpContext *pContext) {
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpSqlCmds *multiCmds = pContext->multiCmds;
if (multiCmds == NULL || multiCmds->size <= 0 || multiCmds->pos >= multiCmds->size || multiCmds->pos < 0) {
......@@ -192,7 +193,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -230,7 +231,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -354,7 +355,7 @@ void httpExecCmd(HttpContext *pContext) {
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
if (code < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr,
......@@ -383,16 +384,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
}
void httpProcessRequest(HttpContext *pContext) {
httpFetchSession(pContext);
httpGetSession(pContext);
if (pContext->session == NULL || pContext->session != pContext->session->signature ||
pContext->reqType == HTTP_REQTYPE_LOGIN) {
if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) {
taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext,
&(pContext->taos));
httpTrace("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->taos);
} else {
httpAccessSession(pContext);
httpExecCmd(pContext);
}
}
......@@ -20,84 +20,64 @@
#include "tsocket.h"
#include "ttimer.h"
#include "tadmin.h"
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSession.h"
#include "httpServer.h"
#include "httpResp.h"
#include "httpLog.h"
#include "gcHandle.h"
#include "httpHandle.h"
#include "gcHandle.h"
#include "restHandle.h"
#include "tgHandle.h"
#ifndef _ADMIN
void adminInitHandle(HttpServer* pServer) {}
void opInitHandle(HttpServer* pServer) {}
#endif
static HttpServer *httpServer = NULL;
HttpServer tsHttpServer;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
int httpInitSystem() {
// taos_init();
httpServer = (HttpServer *)malloc(sizeof(HttpServer));
memset(httpServer, 0, sizeof(HttpServer));
strcpy(httpServer->label, "rest");
httpServer->serverIp = 0;
httpServer->serverPort = tsHttpPort;
httpServer->cacheContext = tsHttpCacheSessions;
httpServer->sessionExpire = tsHttpSessionExpire;
httpServer->numOfThreads = tsHttpMaxThreads;
httpServer->processData = httpProcessData;
strcpy(tsHttpServer.label, "rest");
tsHttpServer.serverIp = 0;
tsHttpServer.serverPort = tsHttpPort;
tsHttpServer.numOfThreads = tsHttpMaxThreads;
tsHttpServer.processData = httpProcessData;
pthread_mutex_init(&httpServer->serverMutex, NULL);
pthread_mutex_init(&tsHttpServer.serverMutex, NULL);
if (tsHttpEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note");
}
restInitHandle(httpServer);
adminInitHandle(httpServer);
gcInitHandle(httpServer);
tgInitHandle(httpServer);
opInitHandle(httpServer);
restInitHandle(&tsHttpServer);
adminInitHandle(&tsHttpServer);
gcInitHandle(&tsHttpServer);
tgInitHandle(&tsHttpServer);
opInitHandle(&tsHttpServer);
return 0;
}
int httpStartSystem() {
httpPrint("starting to initialize http service ...");
httpPrint("start http server ...");
if (httpServer == NULL) {
httpError("http server is null");
httpInitSystem();
}
if (httpServer->pContextPool == NULL) {
httpServer->pContextPool = taosMemPoolInit(httpServer->cacheContext, sizeof(HttpContext));
}
if (httpServer->pContextPool == NULL) {
httpError("http init context pool failed");
if (tsHttpServer.status != HTTP_SERVER_INIT) {
httpError("http server is already started");
return -1;
}
if (httpServer->timerHandle == NULL) {
httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 100 + 100, 200, 60000, "http");
}
if (httpServer->timerHandle == NULL) {
httpError("http init timer failed");
if (!httpInitContexts()) {
httpError("http init session failed");
return -1;
}
if (!httpInitAllSessions(httpServer)) {
if (!httpInitSessions()) {
httpError("http init session failed");
return -1;
}
if (!httpInitConnect(httpServer)) {
if (!httpInitConnect()) {
httpError("http init server failed");
return -1;
}
......@@ -106,9 +86,8 @@ int httpStartSystem() {
}
void httpStopSystem() {
if (httpServer != NULL) {
httpServer->online = false;
}
tsHttpServer.status = HTTP_SERVER_CLOSING;
shutdown(tsHttpServer.fd, SHUT_RD);
tgCleanupHandle();
}
......@@ -116,43 +95,14 @@ void httpCleanUpSystem() {
httpPrint("http service cleanup");
httpStopSystem();
//#if 0
if (httpServer == NULL) {
return;
}
httpCleanupContexts();
httpCleanUpSessions();
httpCleanUpConnect();
pthread_mutex_destroy(&tsHttpServer.serverMutex);
if (httpServer->expireTimer != NULL) {
taosTmrStopA(&(httpServer->expireTimer));
}
if (httpServer->timerHandle != NULL) {
taosTmrCleanUp(httpServer->timerHandle);
httpServer->timerHandle = NULL;
}
if (httpServer->pThreads != NULL) {
httpCleanUpConnect(httpServer);
httpServer->pThreads = NULL;
}
#if 0
httpRemoveAllSessions(httpServer);
if (httpServer->pContextPool != NULL) {
taosMemPoolCleanUp(httpServer->pContextPool);
httpServer->pContextPool = NULL;
}
pthread_mutex_destroy(&httpServer->serverMutex);
tfree(httpServer);
#endif
tsHttpServer.status = HTTP_SERVER_CLOSED;
}
int32_t httpGetReqCount() {
if (httpServer != NULL) {
return atomic_exchange_32(&httpServer->requestNum, 0);
}
return 0;
return atomic_exchange_32(&tsHttpServer.requestNum, 0);
}
......@@ -17,11 +17,10 @@
#include "os.h"
#include "tmd5.h"
#include "taos.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSql.h"
#include "httpUtil.h"
bool httpCheckUsedbSql(char *sql) {
if (strstr(sql, "use ") != NULL) {
......
......@@ -18,9 +18,10 @@
#include "tglobal.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "httpInt.h"
#include "tgHandle.h"
#include "tgJson.h"
#include "httpLog.h"
#include "cJSON.h"
/*
* taos.telegraf.cfg formats like
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册