提交 cfae3b94 编写于 作者: S Shengliang Guan

remove files

上级 99c71628
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
ADD_SUBDIRECTORY(monitor)
ADD_SUBDIRECTORY(http)
IF (TD_LINUX AND TD_MQTT)
ADD_SUBDIRECTORY(mqtt)
ENDIF ()
\ No newline at end of file
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(http ${SRC})
TARGET_LINK_LIBRARIES(http z)
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(http taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(http taos)
ENDIF ()
IF (TD_ADMIN)
TARGET_LINK_LIBRARIES(http admin)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_TOKEN_H
#define TDENGINE_HTTP_TOKEN_H
int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len);
int32_t httpParseTaosdAuthToken(HttpContext *pContext, char *token, int32_t len);
int32_t httpGenTaosdAuthToken(HttpContext *pContext, char *token, int32_t maxLen);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_CONTEXT_H
#define TDENGINE_HTTP_CONTEXT_H
#include "httpInt.h"
bool httpInitContexts();
void httpCleanupContexts();
const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(SOCKET fd);
bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(void * pContext);
void httpReleaseContext(HttpContext *pContext/*, bool clearRes*/);
void httpCloseContextByServer(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_GC_HANDLE_H
#define TDENGINE_GC_HANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define GC_ROOT_URL_POS 0
#define GC_ACTION_URL_POS 1
#define GC_USER_URL_POS 2
#define GC_PASS_URL_POS 3
void gcInitHandle(HttpServer* pServer);
bool gcProcessRequest(struct HttpContext* pContext);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_GC_JSON_H
#define TDENGINE_GC_JSON_H
#include "../../../../include/client/taos.h"
#include "httpHandle.h"
#include "httpJson.h"
void gcInitQueryJson(HttpContext *pContext);
void gcCleanQueryJson(HttpContext *pContext);
void gcStartQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result);
void gcStopQueryJson(HttpContext *pContext, HttpSqlCmd *cmd);
bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows);
void gcSendHeartBeatResp(HttpContext *pContext, HttpSqlCmd *cmd);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef HTTP_GZIP_H
#define HTTP_GZIP_H
#define EHTTP_GZIP_CHUNK_SIZE_DEFAULT (1024*16)
typedef struct ehttp_gzip_s ehttp_gzip_t;
typedef struct ehttp_gzip_callbacks_s ehttp_gzip_callbacks_t;
typedef struct ehttp_gzip_conf_s ehttp_gzip_conf_t;
struct ehttp_gzip_callbacks_s {
void (*on_data)(ehttp_gzip_t *gzip, void *arg, const char *buf, int32_t len);
};
struct ehttp_gzip_conf_s {
int32_t get_header:2; // 0: not fetching header info
int32_t chunk_size; // 0: fallback to default: EHTTP_GZIP_CHUNK_SIZE_DEFAULT
};
ehttp_gzip_t* ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
ehttp_gzip_t* ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
void ehttp_gzip_destroy(ehttp_gzip_t *gzip);
int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len);
int32_t ehttp_gzip_finish(ehttp_gzip_t *gzip);
#endif // _ehttp_gzip_h_9196791b_ac2a_4d73_9979_f4b41abbc4c0_
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_HANDLE_H
#define TDENGINE_HTTP_HANDLE_H
// http request handler
void httpProcessRequest(HttpContext *pContext);
bool httpProcessData(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_INT_H
#define TDENGINE_HTTP_INT_H
#include "os.h"
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpLog.h"
#include "httpJson.h"
#include "httpParser.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024*8
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_INIT 4096
#define HTTP_BUFFER_SIZE 8388608
#define HTTP_STEP_SIZE 4096 //http message get process step by step
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_PASSWORD_LEN TSDB_UNI_LEN
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + HTTP_PASSWORD_LEN)
typedef enum HttpReqType {
HTTP_REQTYPE_OTHERS = 0,
HTTP_REQTYPE_LOGIN = 1,
HTTP_REQTYPE_HEARTBEAT = 2,
HTTP_REQTYPE_SINGLE_SQL = 3,
HTTP_REQTYPE_MULTI_SQL = 4
} HttpReqType;
typedef enum {
HTTP_SERVER_INIT,
HTTP_SERVER_RUNNING,
HTTP_SERVER_CLOSING,
HTTP_SERVER_CLOSED
} HttpServerStatus;
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
struct HttpContext;
struct HttpThread;
typedef struct {
char id[HTTP_SESSION_ID_LEN];
int32_t refCount;
void * taos;
} HttpSession;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*fpDecode)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef enum {
EHTTP_CONTEXT_PROCESS_FAILED = 0x01,
EHTTP_CONTEXT_PARSER_FAILED = 0x02
} EHTTP_CONTEXT_FAILED_CAUSE;
typedef struct HttpContext {
int32_t refCount;
SOCKET fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
int32_t state;
uint8_t reqType;
uint8_t parsed;
uint8_t error;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[HTTP_PASSWORD_LEN];
char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN];
TAOS * taos;
void * ppContext;
HttpSession *session;
z_stream gzipStream;
HttpParser *parser;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf * jsonBuf;
HttpEncodeMethod *encodeMethod;
HttpDecodeMethod *decodeMethod;
struct HttpThread *pThread;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
EpollFd pollFd;
int32_t numOfContexts;
int32_t threadId;
char label[HTTP_LABEL_SIZE << 1];
bool (*processData)(HttpContext *pContext);
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
int8_t stop;
int8_t reserve;
SOCKET fd;
int32_t numOfThreads;
int32_t methodScannerLen;
int32_t requestNum;
int32_t status;
pthread_t thread;
HttpThread * pThreads;
void * contextCache;
void * sessionCache;
pthread_mutex_t serverMutex;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpServer;
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
extern HttpServer tsHttpServer;
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_JSON_H
#define TDENGINE_HTTP_JSON_H
#include <stdint.h>
#include <stdbool.h>
#define JSON_BUFFER_SIZE 16384
struct HttpContext;
enum { JsonNumber, JsonString, JsonBoolean, JsonArray, JsonObject, JsonNull };
extern char JsonItmTkn;
extern char JsonObjStt;
extern char JsonObjEnd;
extern char JsonArrStt;
extern char JsonArrEnd;
extern char JsonStrStt;
extern char JsonStrEnd;
extern char JsonPairTkn;
extern char JsonNulTkn[];
extern char JsonTrueTkn[];
extern char JsonFalseTkn[];
typedef struct {
int32_t size;
int32_t total;
char* lst;
char buf[JSON_BUFFER_SIZE];
struct HttpContext* pContext;
} JsonBuf;
// http response
int32_t httpWriteBuf(struct HttpContext* pContext, const char* buf, int32_t sz);
int32_t httpWriteBufNoTrace(struct HttpContext* pContext, const char* buf, int32_t sz);
int32_t httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int32_t sz);
// builder callback
typedef void (*httpJsonBuilder)(JsonBuf* buf, void* jsnHandle);
// buffer
void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext);
void httpWriteJsonBufHead(JsonBuf* buf);
int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast);
void httpWriteJsonBufEnd(JsonBuf* buf);
// value
void httpJsonString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen);
void httpJsonInt64(JsonBuf* buf, int64_t num);
void httpJsonUInt64(JsonBuf* buf, uint64_t num);
void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision);
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision);
void httpJsonInt(JsonBuf* buf, int32_t num);
void httpJsonUInt(JsonBuf* buf, uint32_t num);
void httpJsonFloat(JsonBuf* buf, float num);
void httpJsonDouble(JsonBuf* buf, double num);
void httpJsonNull(JsonBuf* buf);
void httpJsonBool(JsonBuf* buf, int32_t val);
// pair
void httpJsonPair(JsonBuf* buf, char* name, int32_t nameLen, char* sVal, int32_t valLen);
void httpJsonPairOriginString(JsonBuf* buf, char* name, int32_t nameLen, char* sVal, int32_t valLen);
void httpJsonPairHead(JsonBuf* buf, char* name, int32_t len);
void httpJsonPairIntVal(JsonBuf* buf, char* name, int32_t nNameLen, int32_t num);
void httpJsonPairInt64Val(JsonBuf* buf, char* name, int32_t nNameLen, int64_t num);
void httpJsonPairBoolVal(JsonBuf* buf, char* name, int32_t nNameLen, int32_t num);
void httpJsonPairFloatVal(JsonBuf* buf, char* name, int32_t nNameLen, float num);
void httpJsonPairDoubleVal(JsonBuf* buf, char* name, int32_t nNameLen, double num);
void httpJsonPairNullVal(JsonBuf* buf, char* name, int32_t nNameLen);
// object
void httpJsonPairArray(JsonBuf* buf, char* name, int32_t nLen, httpJsonBuilder builder, void* dsHandle);
void httpJsonPairObject(JsonBuf* buf, char* name, int32_t nLen, httpJsonBuilder builder, void* dsHandle);
void httpJsonObject(JsonBuf* buf, httpJsonBuilder fnBuilder, void* dsHandle);
void httpJsonArray(JsonBuf* buf, httpJsonBuilder fnBuidler, void* jsonHandle);
// print
void httpJsonTestBuf(JsonBuf* buf, int32_t safety);
void httpJsonToken(JsonBuf* buf, char c);
void httpJsonItemToken(JsonBuf* buf);
void httpJsonPrint(JsonBuf* buf, const char* json, int32_t len);
// quick
void httpJsonPairStatus(JsonBuf* buf, int32_t code);
// http json printer
JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext);
void httpFreeJsonBuf(struct HttpContext* pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_LOG_H
#define TDENGINE_HTTP_LOG_H
#include "tlog.h"
extern int32_t httpDebugFlag;
#define httpFatal(...) { if (httpDebugFlag & DEBUG_FATAL) { taosPrintLog("HTP FATAL ", 255, __VA_ARGS__); }}
#define httpError(...) { if (httpDebugFlag & DEBUG_ERROR) { taosPrintLog("HTP ERROR ", 255, __VA_ARGS__); }}
#define httpWarn(...) { if (httpDebugFlag & DEBUG_WARN) { taosPrintLog("HTP WARN ", 255, __VA_ARGS__); }}
#define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP ", 255, __VA_ARGS__); }}
#define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP ", httpDebugFlag, __VA_ARGS__); }}
#define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP ", httpDebugFlag, __VA_ARGS__); }}
#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP ", httpDebugFlag, __VA_ARGS__); }}
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTPMETRICSHANDLE_H
#define TDENGINE_HTTPMETRICSHANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
void metricsInitHandle(HttpServer* httpServer);
bool metricsProcessRequest(struct HttpContext* httpContext);
#endif // TDENGINE_HTTPMETRICHANDLE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef HTTP_PARSER_H
#define HTTP_PARSER_H
#include "httpGzip.h"
#define HTTP_MAX_URL 6 // http url stack size
#define HTTP_CODE_CONTINUE 100
#define HTTP_CODE_SWITCHING_PROTOCOL 101
#define HTTP_CODE_PROCESSING 102
#define HTTP_CODE_EARLY_HINTS 103
#define HTTP_CODE_OK 200
#define HTTP_CODE_CREATED 201
#define HTTP_CODE_ACCEPTED 202
#define HTTP_CODE_NON_AUTHORITATIVE_INFO 203
#define HTTP_CODE_NO_CONTENT 204
#define HTTP_CODE_RESET_CONTENT 205
#define HTTP_CODE_PARTIAL_CONTENT 206
#define HTTP_CODE_MULTI_STATUS 207
#define HTTP_CODE_ALREADY_REPORTED 208
#define HTTP_CODE_IM_USED 226
#define HTTP_CODE_MULTIPLE_CHOICE 300
#define HTTP_CODE_MOVED_PERMANENTLY 301
#define HTTP_CODE_FOUND 302
#define HTTP_CODE_SEE_OTHER 303
#define HTTP_CODE_NOT_MODIFIED 304
#define HTTP_CODE_USE_PROXY 305
#define HTTP_CODE_UNUSED 306
#define HTTP_CODE_TEMPORARY_REDIRECT 307
#define HTTP_CODE_PERMANENT_REDIRECT 308
#define HTTP_CODE_BAD_REQUEST 400
#define HTTP_CODE_UNAUTHORIZED 401
#define HTTP_CODE_PAYMENT_REQUIRED 402
#define HTTP_CODE_FORBIDDEN 403
#define HTTP_CODE_NOT_FOUND 404
#define HTTP_CODE_METHOD_NOT_ALLOWED 405
#define HTTP_CODE_NOT_ACCEPTABLE 406
#define HTTP_CODE_PROXY_AUTH_REQUIRED 407
#define HTTP_CODE_REQUEST_TIMEOUT 408
#define HTTP_CODE_CONFLICT 409
#define HTTP_CODE_GONE 410
#define HTTP_CODE_LENGTH_REQUIRED 411
#define HTTP_CODE_PRECONDITION_FAILED 412
#define HTTP_CODE_PAYLOAD_TOO_LARGE 413
#define HTTP_CODE_URI_TOO_LARGE 414
#define HTTP_CODE_UNSUPPORTED_MEDIA_TYPE 415
#define HTTP_CODE_RANGE_NOT_SATISFIABLE 416
#define HTTP_CODE_EXPECTATION_FAILED 417
#define HTTP_CODE_IM_A_TEAPOT 418
#define HTTP_CODE_MISDIRECTED_REQUEST 421
#define HTTP_CODE_UNPROCESSABLE_ENTITY 422
#define HTTP_CODE_LOCKED 423
#define HTTP_CODE_FAILED_DEPENDENCY 424
#define HTTP_CODE_TOO_EARLY 425
#define HTTP_CODE_UPGRADE_REQUIRED 426
#define HTTP_CODE_PRECONDITION_REQUIRED 428
#define HTTP_CODE_TOO_MANY_REQUESTS 429
#define HTTP_CODE_REQ_HDR_FIELDS_TOO_LARGE 431
#define HTTP_CODE_UNAVAIL_4_LEGAL_REASONS 451
#define HTTP_CODE_INTERNAL_SERVER_ERROR 500
#define HTTP_CODE_NOT_IMPLEMENTED 501
#define HTTP_CODE_BAD_GATEWAY 502
#define HTTP_CODE_SERVICE_UNAVAILABLE 503
#define HTTP_CODE_GATEWAY_TIMEOUT 504
#define HTTP_CODE_HTTP_VER_NOT_SUPPORTED 505
#define HTTP_CODE_VARIANT_ALSO_NEGOTIATES 506
#define HTTP_CODE_INSUFFICIENT_STORAGE 507
#define HTTP_CODE_LOOP_DETECTED 508
#define HTTP_CODE_NOT_EXTENDED 510
#define HTTP_CODE_NETWORK_AUTH_REQUIRED 511
typedef enum HTTP_PARSER_STATE {
HTTP_PARSER_BEGIN,
HTTP_PARSER_REQUEST_OR_RESPONSE,
HTTP_PARSER_METHOD,
HTTP_PARSER_TARGET,
HTTP_PARSER_HTTP_VERSION,
HTTP_PARSER_SP,
HTTP_PARSER_STATUS_CODE,
HTTP_PARSER_REASON_PHRASE,
HTTP_PARSER_CRLF,
HTTP_PARSER_HEADER,
HTTP_PARSER_HEADER_KEY,
HTTP_PARSER_HEADER_VAL,
HTTP_PARSER_CHUNK_SIZE,
HTTP_PARSER_CHUNK,
HTTP_PARSER_END,
HTTP_PARSER_ERROR,
HTTP_PARSER_OPTIONAL_SP
} HTTP_PARSER_STATE;
typedef enum HTTP_AUTH_TYPE {
HTTP_INVALID_AUTH,
HTTP_BASIC_AUTH,
HTTP_TAOSD_AUTH
} HTTP_AUTH_TYPE;
typedef enum HTTP_VERSION {
HTTP_VERSION_10 = 0,
HTTP_VERSION_11 = 1,
HTTP_VERSION_12 = 2,
HTTP_INVALID_VERSION
} HTTP_VERSION;
typedef enum HTTP_KEEPALIVE {
HTTP_KEEPALIVE_NO_INPUT = 0,
HTTP_KEEPALIVE_ENABLE = 1,
HTTP_KEEPALIVE_DISABLE = 2
} HTTP_KEEPALIVE;
typedef struct HttpString {
char * str;
int32_t pos;
int32_t size;
} HttpString;
typedef struct HttpStatus {
int32_t code;
char * desc;
} HttpStatus;
typedef struct HttpStack{
int8_t *stacks;
int32_t pos;
int32_t size;
} HttpStack;
struct HttpContext;
typedef struct HttpParser {
struct HttpContext *pContext;
ehttp_gzip_t *gzip;
HttpStack stacks;
HttpString str;
HttpString body;
HttpString path[HTTP_MAX_URL];
char * method;
char * target;
char * version;
char * reasonPhrase;
char * key;
char * val;
char * authContent;
int8_t httpVersion;
int8_t acceptEncodingGzip;
int8_t acceptEncodingChunked;
int8_t contentLengthSpecified;
int8_t contentChunked;
int8_t transferGzip;
int8_t transferChunked;
int8_t keepAlive;
int8_t authType;
int32_t contentLength;
int32_t chunkSize;
int32_t receivedChunkSize;
int32_t receivedSize;
int32_t statusCode;
int8_t inited;
int8_t parsed;
int16_t httpCode;
int32_t parseCode;
} HttpParser;
void httpInitParser(HttpParser *parser);
HttpParser *httpCreateParser(struct HttpContext *pContext);
void httpClearParser(HttpParser *parser);
void httpDestroyParser(HttpParser *parser);
int32_t httpParseBuf(HttpParser *parser, const char *buf, int32_t len);
char * httpGetStatusDesc(int32_t statusCode);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_QUEUE_H
#define TDENGINE_HTTP_QUEUE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
typedef void (*FHttpResultFp)(void *param, void *result, int32_t code, int32_t rows);
bool httpInitResultQueue();
void httpCleanupResultQueue();
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_RESP_H
#define TDENGINE_HTTP_RESP_H
#include "httpInt.h"
enum _httpRespTempl {
HTTP_RESPONSE_JSON_OK,
HTTP_RESPONSE_JSON_ERROR,
HTTP_RESPONSE_OK,
HTTP_RESPONSE_ERROR,
HTTP_RESPONSE_CHUNKED_UN_COMPRESS,
HTTP_RESPONSE_CHUNKED_COMPRESS,
HTTP_RESPONSE_OPTIONS,
HTTP_RESPONSE_GRAFANA,
HTTP_RESP_END
};
extern const char *httpRespTemplate[];
void httpSendErrorResp(HttpContext *pContext, int32_t errNo);
void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char* errMsg);
void httpSendSuccResp(HttpContext *pContext, char *desc);
void httpSendOptionResp(HttpContext *pContext, char *desc);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_REST_HANDLE_H
#define TDENGINE_REST_HANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define REST_ROOT_URL_POS 0
#define REST_ACTION_URL_POS 1
#define REST_USER_USEDB_URL_POS 2
#define REST_PASS_URL_POS 3
void restInitHandle(HttpServer* pServer);
bool restProcessRequest(struct HttpContext* pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_REST_JSON_H
#define TDENGINE_REST_JSON_H
#include <stdbool.h>
#include "../../../../include/client/taos.h"
#include "httpHandle.h"
#include "httpJson.h"
#define REST_JSON_SUCCESS "succ"
#define REST_JSON_SUCCESS_LEN 4
#define REST_JSON_FAILURE "error"
#define REST_JSON_FAILURE_LEN 5
#define REST_JSON_STATUS "status"
#define REST_JSON_STATUS_LEN 6
#define REST_JSON_CODE "code"
#define REST_JSON_CODE_LEN 4
#define REST_JSON_DESC "desc"
#define REST_JSON_DESC_LEN 4
#define REST_JSON_DATA "data"
#define REST_JSON_DATA_LEN 4
#define REST_JSON_HEAD "head"
#define REST_JSON_HEAD_LEN 4
#define REST_JSON_HEAD_INFO "column_meta"
#define REST_JSON_HEAD_INFO_LEN 11
#define REST_JSON_ROWS "rows"
#define REST_JSON_ROWS_LEN 4
#define REST_JSON_AFFECT_ROWS "affected_rows"
#define REST_JSON_AFFECT_ROWS_LEN 13
#define REST_TIMESTAMP_FMT_LOCAL_STRING 0
#define REST_TIMESTAMP_FMT_TIMESTAMP 1
#define REST_TIMESTAMP_FMT_UTC_STRING 2
void restBuildSqlAffectRowsJson(HttpContext *pContext, HttpSqlCmd *cmd, int32_t affect_rows);
void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result);
bool restBuildSqlTimestampJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows);
bool restBuildSqlLocalTimeStringJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows);
bool restBuildSqlUtcTimeStringJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows);
void restStopSqlJson(HttpContext *pContext, HttpSqlCmd *cmd);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SERVER_H
#define TDENGINE_HTTP_SERVER_H
#include "httpInt.h"
bool httpInitConnect();
void httpCleanUpConnect();
void *httpInitServer(char *ip, uint16_t port, char *label, int32_t numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SESSION_H
#define TDENGINE_HTTP_SESSION_H
bool httpInitSessions();
void httpCleanUpSessions();
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpGetSession(HttpContext *pContext);
void httpReleaseSession(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SQL_H
#define TDENGINE_HTTP_SQL_H
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int32_t mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int32_t cmdSize, int32_t bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int32_t cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int32_t bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int32_t httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name);
char * httpGetCmdsString(HttpContext *pContext, int32_t pos);
int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql);
void httpCheckFreeEscapedSql(char *oldSql, char *newSql);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SYSTEM_H
#define TDENGINE_HTTP_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
int32_t httpInitSystem();
int32_t httpStartSystem();
void httpStopSystem();
void httpCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TG_HANDLE_H
#define TDENGINE_TG_HANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define TG_ROOT_URL_POS 0
#define TG_DB_URL_POS 1
#define TG_USER_URL_POS 2
#define TG_PASS_URL_POS 3
void tgInitHandle(HttpServer *pServer);
void tgCleanupHandle();
bool tgProcessRquest(struct HttpContext *pContext);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TG_JSON_H
#define TDENGINE_TG_JSON_H
#include "../../../../include/client/taos.h"
#include "httpHandle.h"
#include "httpJson.h"
void tgInitQueryJson(HttpContext *pContext);
void tgCleanQueryJson(HttpContext *pContext);
void tgStartQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result);
void tgStopQueryJson(HttpContext *pContext, HttpSqlCmd *cmd);
void tgBuildSqlAffectRowsJson(HttpContext *pContext, HttpSqlCmd *cmd, int32_t affect_rows);
bool tgCheckFinished(struct HttpContext *pContext, HttpSqlCmd *cmd, int32_t code);
void tgSetNextCmd(struct HttpContext *pContext, HttpSqlCmd *cmd, int32_t code);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_UTIL_H
#define TDENGINE_HTTP_UTIL_H
bool httpCheckUsedbSql(char *sql);
bool httpCheckAlterSql(char *sql);
void httpTimeToString(int32_t t, char *buf, int32_t buflen);
bool httpUrlMatch(HttpContext *pContext, int32_t pos, char *cmp);
bool httpParseRequest(HttpContext *pContext);
int32_t httpCheckReadCompleted(HttpContext *pContext);
void httpReadDirtyData(HttpContext *pContext);
int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int32_t httpGzipCompressInit(HttpContext *pContext);
int32_t httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tkey.h"
#include "tutil.h"
#include "http.h"
#include "httpInt.h"
#include "httpAuth.h"
#define KEY_DES_4 4971256377704625728L
int32_t httpParseBasicAuthToken(HttpContext *pContext, char *token, int32_t len) {
token[len] = '\0';
int32_t outlen = 0;
char * base64 = (char *)base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, basic token:%s parsed error", pContext, pContext->fd, token);
free(base64);
return -1;
}
char *user = strstr(base64, ":");
if (user == NULL) {
httpError("context:%p, fd:%d, basic token:%s invalid format", pContext, pContext->fd, token);
free(base64);
return -1;
}
int32_t user_len = (int32_t)(user - base64);
if (user_len < 1 || user_len >= TSDB_USER_LEN) {
httpError("context:%p, fd:%d, basic token:%s parse user error", pContext, pContext->fd, token);
free(base64);
return -1;
}
strncpy(pContext->user, base64, (size_t)user_len);
pContext->user[user_len] = 0;
char * password = user + 1;
int32_t pass_len = (int32_t)((base64 + outlen) - password);
if (pass_len < 1 || pass_len >= HTTP_PASSWORD_LEN) {
httpError("context:%p, fd:%d, basic token:%s parse password error", pContext, pContext->fd, token);
free(base64);
return -1;
}
strncpy(pContext->pass, password, (size_t)pass_len);
pContext->pass[pass_len] = 0;
free(base64);
httpDebug("context:%p, fd:%d, basic token parsed success, user:%s", pContext, pContext->fd, pContext->user);
return 0;
}
int32_t httpParseTaosdAuthToken(HttpContext *pContext, char *token, int32_t len) {
token[len] = '\0';
int32_t outlen = 0;
unsigned char *base64 = base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, taosd token:%s parsed error", pContext, pContext->fd, token);
if (base64) free(base64);
return 01;
}
if (outlen != (TSDB_USER_LEN + HTTP_PASSWORD_LEN)) {
httpError("context:%p, fd:%d, taosd token:%s length error", pContext, pContext->fd, token);
free(base64);
return -1;
}
char *descrypt = taosDesDecode(KEY_DES_4, (char *)base64, outlen);
if (descrypt == NULL) {
httpError("context:%p, fd:%d, taosd token:%s descrypt error", pContext, pContext->fd, token);
free(base64);
return -1;
} else {
tstrncpy(pContext->user, descrypt, sizeof(pContext->user));
tstrncpy(pContext->pass, descrypt + TSDB_USER_LEN, sizeof(pContext->pass));
httpDebug("context:%p, fd:%d, taosd token:%s parsed success, user:%s", pContext, pContext->fd, token,
pContext->user);
free(base64);
free(descrypt);
return 0;
}
}
int32_t httpGenTaosdAuthToken(HttpContext *pContext, char *token, int32_t maxLen) {
char buffer[sizeof(pContext->user) + sizeof(pContext->pass)] = {0};
size_t size = sizeof(pContext->user);
tstrncpy(buffer, pContext->user, size);
size = sizeof(pContext->pass);
tstrncpy(buffer + sizeof(pContext->user), pContext->pass, size);
char *encrypt = taosDesEncode(KEY_DES_4, buffer, TSDB_USER_LEN + HTTP_PASSWORD_LEN);
char *base64 = base64_encode((const unsigned char *)encrypt, TSDB_USER_LEN + HTTP_PASSWORD_LEN);
size_t len = strlen(base64);
tstrncpy(token, base64, len + 1);
free(encrypt);
free(base64);
httpDebug("context:%p, fd:%d, generate taosd token:%s", pContext, pContext->fd, token);
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tsocket.h"
#include "tutil.h"
#include "ttimer.h"
#include "tglobal.h"
#include "tcache.h"
#include "hash.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSql.h"
#include "httpSession.h"
#include "httpContext.h"
#include "httpParser.h"
static void httpDestroyContext(void *data);
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
#ifdef WINDOWS
SOCKET fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1);
#else
SOCKET fd = atomic_val_compare_exchange_64(&pContext->fd, pContext->fd, -1);
#endif
taosCloseSocket(fd);
}
}
static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
if (pContext->fd > 0) taosCloseSocket(pContext->fd);
HttpThread *pThread = pContext->pThread;
httpRemoveContextFromEpoll(pContext);
httpReleaseSession(pContext);
atomic_sub_fetch_32(&pThread->numOfContexts, 1);
httpDebug("context:%p, is destroyed, refCount:%d data:%p thread:%s numOfContexts:%d", pContext, pContext->refCount,
data, pContext->pThread->label, pContext->pThread->numOfContexts);
pContext->pThread = 0;
pContext->state = HTTP_CONTEXT_STATE_CLOSED;
// avoid double free
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
if (pContext->parser) {
httpDestroyParser(pContext->parser);
pContext->parser = NULL;
}
tfree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInit(TSDB_CACHE_PTR_KEY, 2, true, httpDestroyContext, "restc");
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
}
return true;
}
void httpCleanupContexts() {
if (tsHttpServer.contextCache != NULL) {
SCacheObj *cache = tsHttpServer.contextCache;
httpInfo("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL;
}
}
const char *httpContextStateStr(HttpContextState state) {
switch (state) {
case HTTP_CONTEXT_STATE_READY:
return "ready";
case HTTP_CONTEXT_STATE_HANDLING:
return "handling";
case HTTP_CONTEXT_STATE_DROPPING:
return "dropping";
case HTTP_CONTEXT_STATE_CLOSED:
return "closed";
default:
return "unknown";
}
}
void httpNotifyContextClose(HttpContext *pContext) { shutdown(pContext->fd, SHUT_WR); }
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
}
HttpContext *httpCreateContext(SOCKET fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
pContext->fd = fd;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
pContext->parser = httpCreateParser(pContext);
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pContext;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pContext,
sizeof(TSDB_CACHE_PTR_TYPE), 3000);
pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false);
return pContext;
}
HttpContext *httpGetContext(void *ptr) {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)ptr;
HttpContext **ppContext = taosCacheAcquireByKey(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpTrace("context:%p, fd:%d, is accquired, data:%p refCount:%d", pContext, pContext->fd, ppContext, refCount);
return pContext;
}
}
return NULL;
}
void httpReleaseContext(HttpContext *pContext/*, bool clearRes*/) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
if (refCount < 0) {
httpError("context:%p, is already released, refCount:%d", pContext, refCount);
return;
}
/*
if (clearRes) {
if (pContext->parser) {
httpClearParser(pContext->parser);
}
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
}
*/
HttpContext **ppContext = pContext->ppContext;
httpTrace("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
} else {
httpDebug("context:%p, won't be destroyed for cache is already released", pContext);
// httpDestroyContext((void **)(&ppContext));
}
}
bool httpInitContext(HttpContext *pContext) {
pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->reqType = HTTP_REQTYPE_OTHERS;
pContext->encodeMethod = NULL;
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
httpTrace("context:%p, fd:%d, parsed:%d", pContext, pContext->fd, pContext->parsed);
return true;
}
void httpCloseContextByApp(HttpContext *pContext) {
HttpParser *parser = pContext->parser;
pContext->parsed = false;
bool keepAlive = true;
if (pContext->error == true) {
keepAlive = false;
} else if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false;
} else if (parser && parser->httpVersion != HTTP_VERSION_10 && parser->keepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false;
}
if (keepAlive) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, last state:handling, keepAlive:true, reuse context", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ast state:dropping, keepAlive:true, close connect", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, last state:ready, keepAlive:true, reuse context", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, last state:ready, keepAlive:true, close connect", pContext, pContext->fd);
} else {
httpRemoveContextFromEpoll(pContext);
httpError("context:%p, fd:%d, last state:%s:%d, keepAlive:true, close connect", pContext, pContext->fd,
httpContextStateStr(pContext->state), pContext->state);
}
} else {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ilast state:%s:%d, keepAlive:false, close context", pContext, pContext->fd,
httpContextStateStr(pContext->state), pContext->state);
}
httpReleaseContext(pContext/*, true*/);
}
void httpCloseContextByServer(HttpContext *pContext) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, epoll finished, still used by app", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, epoll already finished, wait app finished", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, epoll finished, close connect", pContext, pContext->fd);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, epoll finished, will be closed soon", pContext, pContext->fd);
} else {
httpError("context:%p, fd:%d, unknown state:%d", pContext, pContext->fd, pContext->state);
}
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext/*, true*/);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taoserror.h"
#include "cJSON.h"
#include "httpLog.h"
#include "httpGcHandle.h"
#include "httpGcJson.h"
static HttpDecodeMethod gcDecodeMethod = {"grafana", gcProcessRequest};
static HttpEncodeMethod gcHeartBeatMethod = {
.startJsonFp = NULL,
.stopJsonFp = gcSendHeartBeatResp,
.buildQueryJsonFp = NULL,
.buildAffectRowJsonFp = NULL,
.initJsonFp = NULL,
.cleanJsonFp = NULL,
.checkFinishedFp = NULL,
.setNextCmdFp = NULL
};
static HttpEncodeMethod gcQueryMethod = {
.startJsonFp = NULL,
.stopJsonFp = gcStopQueryJson,
.buildQueryJsonFp = gcBuildQueryJson,
.buildAffectRowJsonFp = NULL,
.initJsonFp = gcInitQueryJson,
.cleanJsonFp = gcCleanQueryJson,
.checkFinishedFp = NULL,
.setNextCmdFp = NULL
};
void gcInitHandle(HttpServer* pServer) { httpAddMethod(pServer, &gcDecodeMethod); }
bool gcGetUserFromUrl(HttpContext* pContext) {
HttpParser* pParser = pContext->parser;
if (pParser->path[GC_USER_URL_POS].pos >= TSDB_USER_LEN || pParser->path[GC_USER_URL_POS].pos <= 0) {
return false;
}
tstrncpy(pContext->user, pParser->path[GC_USER_URL_POS].str, TSDB_USER_LEN);
return true;
}
bool gcGetPassFromUrl(HttpContext* pContext) {
HttpParser* pParser = pContext->parser;
if (pParser->path[GC_PASS_URL_POS].pos >= HTTP_PASSWORD_LEN || pParser->path[GC_PASS_URL_POS].pos <= 0) {
return false;
}
tstrncpy(pContext->pass, pParser->path[GC_PASS_URL_POS].str, HTTP_PASSWORD_LEN);
return true;
}
bool gcProcessLoginRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, user:%s, process grafana login msg", pContext, pContext->fd, pContext->user);
pContext->reqType = HTTP_REQTYPE_LOGIN;
return true;
}
/**
* Process the query request
* @param fd for http send back
* @param context is taos conn
* @param filter, the request format is json, such as
*/
// https://github.com/grafana/grafana/blob/master/docs/sources/plugins/developing/datasources.md
// input
//[{
// "refId": "A",
// "alias" : "taosd",
// "sql" : "select first(taosd) from sys.mem where ts > now-6h and ts < now interval(20000a)"
//},
//{
// "refId": "B",
// "alias" : "system",
// "sql" : "select first(taosd) from sys.mem where ts > now-6h and ts < now interval(20000a)"
//}]
// output
//[{
// "datapoints": [[339.386719,
// 1537873132000],
// [339.656250,
// 1537873162400],
// [339.656250,
// 1537873192600],
// [339.656250,
// 1537873222800],
// [339.589844,
// 1537873253200],
// [339.964844,
// 1537873283400],
// [340.093750,
// 1537873313800],
// [340.093750,
// 1537873344000],
// [340.093750,
// 1537873374200],
// [340.093750,
// 1537873404600]],
// "refId": "A",
// "target" : "taosd"
//},
//{
// "datapoints": [[339.386719,
// 1537873132000],
// [339.656250,
// 1537873162400],
// [339.656250,
// 1537873192600],
// [339.656250,
// 1537873222800],
// [339.589844,
// 1537873253200],
// [339.964844,
// 1537873283400],
// [340.093750,
// 1537873313800],
// [340.093750,
// 1537873344000],
// [340.093750,
// 1537873374200],
// [340.093750,
// 1537873404600]],
// "refId": "B",
// "target" : "system"
//}]
bool gcProcessQueryRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, process grafana query msg", pContext, pContext->fd);
char* filter = pContext->parser->body.str;
if (filter == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_MSG_INPUT);
return false;
}
cJSON* root = cJSON_Parse(filter);
if (root == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_GC_REQ_PARSE_ERROR);
return false;
}
int32_t size = cJSON_GetArraySize(root);
if (size <= 0) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_GC_QUERY_NULL);
cJSON_Delete(root);
return false;
}
if (size > 100) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_GC_QUERY_SIZE);
cJSON_Delete(root);
return false;
}
if (!httpMallocMultiCmds(pContext, size, HTTP_BUFFER_SIZE)) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
cJSON_Delete(root);
return false;
}
#define ESCAPE_ERROR_PROC(code, context, root) \
do { \
if (code != TSDB_CODE_SUCCESS) { \
httpSendErrorResp(context, code); \
\
cJSON_Delete(root); \
return false; \
} \
} while (0)
for (int32_t i = 0; i < size; ++i) {
cJSON* query = cJSON_GetArrayItem(root, i);
if (query == NULL) continue;
cJSON* refId = cJSON_GetObjectItem(query, "refId");
if (refId == NULL || refId->valuestring == NULL || strlen(refId->valuestring) == 0) {
httpDebug("context:%p, fd:%d, user:%s, refId is null", pContext, pContext->fd, pContext->user);
continue;
}
char *newStr = NULL;
int32_t retCode = 0;
retCode = httpCheckAllocEscapeSql(refId->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(refId->valuestring, newStr);
if (refIdBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user);
break;
}
cJSON* alias = cJSON_GetObjectItem(query, "alias");
int32_t aliasBuffer = -1;
if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) {
retCode = httpCheckAllocEscapeSql(alias->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
aliasBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(alias->valuestring, newStr);
if (aliasBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user);
break;
}
}
if (aliasBuffer == -1) {
aliasBuffer = httpAddToSqlCmdBuffer(pContext, "");
}
cJSON* sql = cJSON_GetObjectItem(query, "sql");
if (sql == NULL || sql->valuestring == NULL || strlen(sql->valuestring) == 0) {
httpDebug("context:%p, fd:%d, user:%s, sql is null", pContext, pContext->fd, pContext->user);
continue;
}
retCode = httpCheckAllocEscapeSql(sql->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(sql->valuestring, newStr);
if (sqlBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user);
break;
}
HttpSqlCmd* cmd = httpNewSqlCmd(pContext);
if (cmd == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
cJSON_Delete(root);
return false;
}
cmd->sql = sqlBuffer;
cmd->values = refIdBuffer;
cmd->table = aliasBuffer;
cmd->numOfRows = 0; // hack way as target flags
cmd->timestamp = httpAddToSqlCmdBufferWithSize(pContext, HTTP_GC_TARGET_SIZE + 1); // hack way
if (cmd->timestamp == -1) {
httpWarn("context:%p, fd:%d, user:%s, cant't malloc target size, sql buffer is full", pContext, pContext->fd,
pContext->user);
break;
}
}
#undef ESCAPE_ERROR_PROC
pContext->reqType = HTTP_REQTYPE_MULTI_SQL;
pContext->encodeMethod = &gcQueryMethod;
pContext->multiCmds->pos = 0;
return true;
}
bool gcProcessHeartbeatRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, process grafana heartbeat msg", pContext, pContext->fd);
pContext->reqType = HTTP_REQTYPE_HEARTBEAT;
pContext->encodeMethod = &gcHeartBeatMethod;
return true;
}
/**
* Process get/post/options msg, such as login and logout
*/
bool gcProcessRequest(struct HttpContext* pContext) {
if (httpUrlMatch(pContext, GC_ACTION_URL_POS, "login")) {
gcGetUserFromUrl(pContext);
gcGetPassFromUrl(pContext);
}
if (strlen(pContext->user) == 0 || strlen(pContext->pass) == 0) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_AUTH_INFO);
return false;
}
if (httpUrlMatch(pContext, GC_ACTION_URL_POS, "query")) {
return gcProcessQueryRequest(pContext);
} else if (httpUrlMatch(pContext, GC_ACTION_URL_POS, "heartbeat")) {
return gcProcessHeartbeatRequest(pContext);
} else if (httpUrlMatch(pContext, GC_ACTION_URL_POS, "login")) {
return gcProcessLoginRequest(pContext);
} else {
return gcProcessHeartbeatRequest(pContext);
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "httpGcHandle.h"
#include "httpGcJson.h"
#include "httpJson.h"
#include "httpResp.h"
unsigned char *base64_decode(const char *value, int inlen, int *outlen);
void gcInitQueryJson(HttpContext *pContext) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return;
httpInitJsonBuf(jsonBuf, pContext);
httpWriteJsonBufHead(jsonBuf);
// data array begin
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonArrStt);
}
void gcCleanQueryJson(HttpContext *pContext) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return;
// array end
httpJsonToken(jsonBuf, JsonArrEnd);
httpWriteJsonBufEnd(jsonBuf);
}
void gcWriteTargetStartJson(JsonBuf *jsonBuf, char *refId, char *target) {
if (strlen(target) == 0) {
target = refId;
}
// object begin
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
// target section
httpJsonPair(jsonBuf, "refId", 5, refId, (int32_t)strlen(refId));
httpJsonPair(jsonBuf, "target", 6, target, (int32_t)strlen(target));
// data begin
httpJsonPairHead(jsonBuf, "datapoints", 10);
// data array begin
httpJsonToken(jsonBuf, JsonArrStt);
}
void gcWriteTargetEndJson(JsonBuf *jsonBuf) {
// data array end
httpJsonToken(jsonBuf, JsonArrEnd);
// object end
httpJsonToken(jsonBuf, JsonObjEnd);
}
void gcStopQueryJson(HttpContext *pContext, HttpSqlCmd *cmd) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return;
// write end of target
if (cmd->numOfRows != 0) {
gcWriteTargetEndJson(jsonBuf);
}
}
bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, int32_t numOfRows) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return false;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
if (num_fields == 0) {
return false;
}
int32_t precision = taos_result_precision(result);
// such as select count(*) from sys.cpu
// such as select count(*) from sys.cpu group by ipaddr
// such as select count(*) from sys.cpu interval(1d)
// such as select count(*) from sys.cpu interval(1d) group by ipaddr
// such as select count(*) count(*) from sys.cpu group by ipaddr interval(1d)
int32_t dataFields = -1;
int32_t groupFields = -1;
bool hasTimestamp = fields[0].type == TSDB_DATA_TYPE_TIMESTAMP;
if (hasTimestamp) {
dataFields = 1;
if (num_fields > 2) groupFields = num_fields - 1;
} else {
dataFields = 0;
if (num_fields > 1) groupFields = num_fields - 1;
}
char *refIdBuffer = httpGetCmdsString(pContext, cmd->values);
char *aliasBuffer = httpGetCmdsString(pContext, cmd->table);
char *targetBuffer = httpGetCmdsString(pContext, cmd->timestamp);
if (groupFields == -1 && cmd->numOfRows == 0) {
gcWriteTargetStartJson(jsonBuf, refIdBuffer, aliasBuffer);
}
cmd->numOfRows += numOfRows;
for (int32_t k = 0; k < numOfRows; ++k) {
TAOS_ROW row = taos_fetch_row(result);
if (row == NULL) {
cmd->numOfRows--;
continue;
}
int32_t *length = taos_fetch_lengths(result);
// for group by
if (groupFields != -1) {
char target[HTTP_GC_TARGET_SIZE] = {0};
int32_t len;
len = snprintf(target, HTTP_GC_TARGET_SIZE, "%s{", aliasBuffer);
for (int32_t i = dataFields + 1; i < num_fields; i++) {
if (row[i] == NULL) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:nil", fields[i].name);
if (i < num_fields - 1) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", ");
}
continue;
}
switch (fields[i].type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d,", fields[i].name, *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (row[i] != NULL) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name);
memcpy(target + len, (char *)row[i], length[i]);
len = (int32_t)strlen(target);
}
break;
default:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-");
break;
}
if (i < num_fields - 1) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", ");
}
}
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "}");
if (strcmp(target, targetBuffer) != 0) {
// first target not write this section
if (strlen(targetBuffer) != 0) {
gcWriteTargetEndJson(jsonBuf);
}
// start new target
gcWriteTargetStartJson(jsonBuf, refIdBuffer, target);
strncpy(targetBuffer, target, HTTP_GC_TARGET_SIZE);
}
} // end of group by
// data row array begin
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonArrStt);
for (int32_t i = dataFields; i >= 0; i--) {
httpJsonItemToken(jsonBuf);
if (row == NULL || i >= num_fields || row[i] == NULL) {
httpJsonOriginString(jsonBuf, "null", 4);
continue;
}
switch (fields[i].type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
httpJsonInt(jsonBuf, *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
httpJsonInt(jsonBuf, *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
httpJsonInt(jsonBuf, *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break;
case TSDB_DATA_TYPE_DOUBLE:
httpJsonDouble(jsonBuf, GET_DOUBLE_VAL(row[i]));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes);
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t ts = convertTimePrecision(*((int64_t *)row[i]), precision, TSDB_TIME_PRECISION_MILLI);
httpJsonInt64(jsonBuf, ts);
break;
}
default:
httpJsonString(jsonBuf, "-", 1);
break;
}
}
if (dataFields == 0) {
httpJsonItemToken(jsonBuf);
httpJsonString(jsonBuf, "-", 1);
}
// data row array end
httpJsonToken(jsonBuf, JsonArrEnd);
}
return true;
}
void gcSendHeartBeatResp(HttpContext *pContext, HttpSqlCmd *cmd) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) return;
char *desc = "Grafana server receive a quest from you!";
httpInitJsonBuf(jsonBuf, pContext);
httpJsonToken(jsonBuf, JsonObjStt);
httpJsonPair(jsonBuf, "message", (int32_t)strlen("message"), desc, (int32_t)strlen(desc));
httpJsonToken(jsonBuf, JsonObjEnd);
char head[1024];
int32_t hLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_GRAFANA], httpVersionStr[pContext->parser->httpVersion],
httpKeepAliveStr[pContext->parser->keepAlive], (jsonBuf->lst - jsonBuf->buf));
httpWriteBuf(pContext, head, hLen);
httpWriteBuf(pContext, jsonBuf->buf, (int32_t)(jsonBuf->lst - jsonBuf->buf));
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "zlib.h"
#include "httpGzip.h"
typedef enum {
EHTTP_GZIP_INITING,
EHTTP_GZIP_READY,
EHTTP_GZIP_CLOSED,
} EHTTP_GZIP_STATE;
struct ehttp_gzip_s {
ehttp_gzip_conf_t conf;
ehttp_gzip_callbacks_t callbacks;
void * arg;
z_stream * gzip;
gz_header * header;
char * chunk;
int32_t state;
};
static void dummy_on_data(ehttp_gzip_t *gzip, void *arg, const char *buf, int32_t len) {}
static void ehttp_gzip_cleanup(ehttp_gzip_t *gzip) {
switch (gzip->state) {
case EHTTP_GZIP_READY: {
inflateEnd(gzip->gzip);
} break;
default:
break;
}
if (gzip->gzip) {
free(gzip->gzip);
gzip->gzip = NULL;
}
if (gzip->header) {
free(gzip->header);
gzip->header = NULL;
}
if (gzip->chunk) {
free(gzip->chunk);
gzip->chunk = NULL;
}
gzip->state = EHTTP_GZIP_CLOSED;
}
ehttp_gzip_t *ehttp_gzip_create_decompressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg) {
ehttp_gzip_t *gzip = (ehttp_gzip_t *)calloc(1, sizeof(*gzip));
if (!gzip) return NULL;
do {
gzip->conf = conf;
gzip->callbacks = callbacks;
gzip->arg = arg;
if (gzip->callbacks.on_data == NULL) gzip->callbacks.on_data = dummy_on_data;
gzip->gzip = (z_stream *)calloc(1, sizeof(*gzip->gzip));
if (gzip->conf.get_header) {
gzip->header = (gz_header *)calloc(1, sizeof(*gzip->header));
}
if (gzip->conf.chunk_size <= 0) gzip->conf.chunk_size = EHTTP_GZIP_CHUNK_SIZE_DEFAULT;
gzip->chunk = (char *)malloc(gzip->conf.chunk_size);
if (!gzip->gzip || (gzip->conf.get_header && !gzip->header) || !gzip->chunk) break;
gzip->gzip->zalloc = Z_NULL;
gzip->gzip->zfree = Z_NULL;
gzip->gzip->opaque = Z_NULL;
// 863 windowBits can also be greater than 15 for optional gzip decoding. Add
// 864 32 to windowBits to enable zlib and gzip decoding with automatic header
// 865 detection, or add 16 to decode only the gzip format (the zlib format will
// 866 return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is a
// 867 CRC-32 instead of an Adler-32. Unlike the gunzip utility and gzread() (see
// 868 below), inflate() will not automatically decode concatenated gzip streams.
// 869 inflate() will return Z_STREAM_END at the end of the gzip stream. The state
// 870 would need to be reset to continue decoding a subsequent gzip stream.
int32_t ret = inflateInit2(gzip->gzip, 32); // 32/16? 32/16 + MAX_WBITS
if (ret != Z_OK) break;
if (gzip->header) {
ret = inflateGetHeader(gzip->gzip, gzip->header);
}
if (ret != Z_OK) break;
gzip->gzip->next_out = (z_const Bytef *)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size;
gzip->state = EHTTP_GZIP_READY;
return gzip;
} while (0);
ehttp_gzip_destroy(gzip);
return NULL;
}
ehttp_gzip_t *ehttp_gzip_create_compressor(ehttp_gzip_conf_t conf, ehttp_gzip_callbacks_t callbacks, void *arg);
void ehttp_gzip_destroy(ehttp_gzip_t *gzip) {
ehttp_gzip_cleanup(gzip);
free(gzip);
}
int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) {
if (gzip->state != EHTTP_GZIP_READY) return -1;
if (len <= 0) return 0;
gzip->gzip->next_in = (z_const Bytef*)buf;
gzip->gzip->avail_in = len;
while (gzip->gzip->avail_in) {
int32_t ret;
if (gzip->header) {
ret = inflate(gzip->gzip, Z_BLOCK);
} else {
ret = inflate(gzip->gzip, Z_SYNC_FLUSH);
}
if (ret != Z_OK && ret != Z_STREAM_END) return -1;
if (gzip->gzip->avail_out > 0) {
if (ret != Z_STREAM_END) continue;
}
int32_t _len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk);
gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, _len);
gzip->gzip->next_out = (z_const Bytef *)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size;
}
return 0;
}
int32_t ehttp_gzip_finish(ehttp_gzip_t *gzip) {
if (gzip->state != EHTTP_GZIP_READY) return -1;
gzip->gzip->next_in = NULL;
gzip->gzip->avail_in = 0;
int32_t ret;
ret = inflate(gzip->gzip, Z_FINISH);
if (ret != Z_STREAM_END) return -1;
int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk);
gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);
gzip->gzip->next_out = NULL;
gzip->gzip->avail_out = 0;
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpContext.h"
#include "httpHandle.h"
bool httpDecodeRequest(HttpContext* pContext) {
if (pContext->decodeMethod->fpDecode == NULL) {
return false;
}
return (*pContext->decodeMethod->fpDecode)(pContext);
}
/**
* Process the request from http pServer
*/
bool httpProcessData(HttpContext* pContext) {
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) {
httpTrace("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd,
httpContextStateStr(pContext->state));
pContext->error = true;
httpCloseContextByApp(pContext);
return false;
}
// handle Cross-domain request
if (strcmp(pContext->parser->method, "OPTIONS") == 0) {
httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd);
httpSendOptionResp(pContext, "process options request success");
} else {
if (!httpDecodeRequest(pContext)) {
/*
* httpCloseContextByApp has been called when parsing the error
*/
// httpCloseContextByApp(pContext);
} else {
httpClearParser(pContext->parser);
httpProcessRequest(pContext);
}
}
return true;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tglobal.h"
#include "http.h"
#include "httpLog.h"
#include "httpJson.h"
#include "httpResp.h"
#include "httpUtil.h"
#define MAX_NUM_STR_SZ 25
char JsonItmTkn = ',';
char JsonObjStt = '{';
char JsonObjEnd = '}';
char JsonArrStt = '[';
char JsonArrEnd = ']';
char JsonStrStt = '\"';
char JsonStrEnd = '\"';
char JsonPairTkn = ':';
char JsonNulTkn[] = "null";
char JsonTrueTkn[] = "true";
char JsonFalseTkn[] = "false";
int32_t httpWriteBufByFd(struct HttpContext* pContext, const char* buf, int32_t sz) {
int32_t len;
int32_t countWait = 0;
int32_t writeLen = 0;
do {
if (pContext->fd > 2) {
len = (int32_t)taosSend(pContext->fd, buf + writeLen, (size_t)(sz - writeLen), MSG_NOSIGNAL);
} else {
return sz;
}
if (len < 0) {
httpDebug("context:%p, fd:%d, socket write errno:%d:%s, times:%d", pContext, pContext->fd, errno, strerror(errno),
countWait);
if (++countWait > HTTP_WRITE_RETRY_TIMES) break;
taosMsleep(HTTP_WRITE_WAIT_TIME_MS);
continue;
} else if (len == 0) {
httpDebug("context:%p, fd:%d, socket write errno:%d:%s, connect already closed", pContext, pContext->fd, errno,
strerror(errno));
break;
} else {
countWait = 0;
writeLen += len;
}
} while (writeLen < sz);
return writeLen;
}
int32_t httpWriteBuf(struct HttpContext* pContext, const char* buf, int32_t sz) {
int32_t writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) {
httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response:\n%s", pContext, pContext->fd, sz,
writeSz, buf);
} else {
httpTrace("context:%p, fd:%d, dataSize:%d, writeSize:%d, response:\n%s", pContext, pContext->fd, sz, writeSz, buf);
}
return writeSz;
}
int32_t httpWriteBufNoTrace(struct HttpContext* pContext, const char* buf, int32_t sz) {
int32_t writeSz = httpWriteBufByFd(pContext, buf, sz);
if (writeSz != sz) {
httpError("context:%p, fd:%d, dataSize:%d, writeSize:%d, failed to send response", pContext, pContext->fd, sz,
writeSz);
}
return writeSz;
}
int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
int32_t remain = 0;
char sLen[24];
int32_t srcLen = (int32_t)(buf->lst - buf->buf);
if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd);
buf->pContext->fd = -1;
}
/*
* HTTP servers often use compression to optimize transmission, for example
* with Content-Encoding: gzip or Content-Encoding: deflate.
* If both compression and chunked encoding are enabled, then the content stream is first compressed, then chunked;
* so the chunk encoding itself is not compressed, and the data in each chunk is not compressed individually.
* The remote endpoint then decodes the stream by concatenating the chunks and uncompressing the result.
*/
if (buf->pContext->parser->acceptEncodingGzip == 0 || !tsHttpEnableCompress) {
if (buf->lst == buf->buf) {
httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd);
return 0; // there is no data to dump.
} else {
int32_t len = sprintf(sLen, "%x\r\n", srcLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%d, response:\n%s", buf->pContext, buf->pContext->fd, srcLen,
buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, buf->buf, srcLen);
}
} else {
char compressBuf[JSON_BUFFER_SIZE] = {0};
int32_t compressBufLen = JSON_BUFFER_SIZE;
int32_t ret = httpGzipCompress(buf->pContext, buf->buf, srcLen, compressBuf, &compressBufLen, isTheLast);
if (ret == 0) {
if (compressBufLen > 0) {
int32_t len = sprintf(sLen, "%x\r\n", compressBufLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%d, compressSize:%d, last:%d, response:\n%s", buf->pContext,
buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen);
} else {
httpDebug("context:%p, fd:%d, last:%d, compress already dumped, response:\n%s", buf->pContext,
buf->pContext->fd, isTheLast, buf->buf);
remain = 0; // there is no data to dump.
}
} else {
httpError("context:%p, fd:%d, failed to compress data, chunkSize:%d, last:%d, error:%d, response:\n%s",
buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf);
remain = 0;
}
}
httpWriteBufNoTrace(buf->pContext, "\r\n", 2);
buf->total += (int32_t)(buf->lst - buf->buf);
buf->lst = buf->buf;
memset(buf->buf, 0, (size_t)buf->size);
return remain;
}
void httpWriteJsonBufHead(JsonBuf* buf) {
if (buf->pContext->fd <= 0) {
buf->pContext->fd = -1;
}
char msg[1024] = {0};
int32_t len = -1;
if (buf->pContext->parser->acceptEncodingGzip == 0 || !tsHttpEnableCompress) {
len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_UN_COMPRESS], httpVersionStr[buf->pContext->parser->httpVersion],
httpKeepAliveStr[buf->pContext->parser->keepAlive]);
} else {
len = sprintf(msg, httpRespTemplate[HTTP_RESPONSE_CHUNKED_COMPRESS], httpVersionStr[buf->pContext->parser->httpVersion],
httpKeepAliveStr[buf->pContext->parser->keepAlive]);
}
httpWriteBuf(buf->pContext, (const char*)msg, len);
}
void httpWriteJsonBufEnd(JsonBuf* buf) {
if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, json buf fd is 0", buf->pContext, buf->pContext->fd);
buf->pContext->fd = -1;
}
httpWriteJsonBufBody(buf, true);
httpWriteBufNoTrace(buf->pContext, "0\r\n\r\n", 5); // end of chunked resp
}
void httpInitJsonBuf(JsonBuf* buf, struct HttpContext* pContext) {
buf->lst = buf->buf;
buf->total = 0;
buf->size = JSON_BUFFER_SIZE; // option setting
buf->pContext = pContext;
memset(buf->lst, 0, JSON_BUFFER_SIZE);
if (pContext->parser->acceptEncodingGzip == 1 && tsHttpEnableCompress) {
httpGzipCompressInit(buf->pContext);
}
httpTrace("context:%p, fd:%d, json buffer initialized", buf->pContext, buf->pContext->fd);
}
void httpJsonItemToken(JsonBuf* buf) {
char c = *(buf->lst - 1);
if (c == JsonArrStt || c == JsonObjStt || c == JsonPairTkn || c == JsonItmTkn) {
return;
}
if (buf->lst > buf->buf) httpJsonToken(buf, JsonItmTkn);
}
void httpJsonString(JsonBuf* buf, char* sVal, int32_t len) {
httpJsonItemToken(buf);
httpJsonToken(buf, JsonStrStt);
httpJsonPrint(buf, sVal, len);
httpJsonToken(buf, JsonStrEnd);
}
void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len) {
httpJsonItemToken(buf);
httpJsonPrint(buf, sVal, len);
}
void httpJsonStringForTransMean(JsonBuf* buf, char* sVal, int32_t maxLen) {
httpJsonItemToken(buf);
httpJsonToken(buf, JsonStrStt);
if (sVal != NULL) {
// dispose transferred meaning byte
char* lastPos = sVal;
char* curPos = sVal;
for (int32_t i = 0; i < maxLen; ++i) {
if (*curPos == 0) {
break;
}
if (*curPos == '\"') {
httpJsonPrint(buf, lastPos, (int32_t)(curPos - lastPos));
curPos++;
lastPos = curPos;
httpJsonPrint(buf, "\\\"", 2);
} else if (*curPos == '\\') {
httpJsonPrint(buf, lastPos, (int32_t)(curPos - lastPos));
curPos++;
lastPos = curPos;
httpJsonPrint(buf, "\\\\", 2);
} else {
curPos++;
}
}
if (*lastPos) {
httpJsonPrint(buf, lastPos, (int32_t)(curPos - lastPos));
}
}
httpJsonToken(buf, JsonStrEnd);
}
void httpJsonInt64(JsonBuf* buf, int64_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRId64, num);
}
void httpJsonUInt64(JsonBuf* buf, uint64_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num);
}
void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
char ts[35] = {0};
int32_t fractionLen;
char* format = NULL;
time_t quot = 0;
int64_t mod = 0;
switch (timePrecision) {
case TSDB_TIME_PRECISION_MILLI: {
mod = ((t) % 1000 + 1000) % 1000;
if (t < 0 && mod != 0) {
t -= 1000;
}
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
mod = ((t) % 1000000 + 1000000) % 1000000;
if (t < 0 && mod != 0) {
t -= 1000000;
}
quot = t / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
break;
}
case TSDB_TIME_PRECISION_NANO: {
mod = ((t) % 1000000000 + 1000000000) % 1000000000;
if (t < 0 && mod != 0) {
t -= 1000000000;
}
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
break;
}
default:
fractionLen = 0;
assert(false);
}
struct tm ptm = {0};
localtime_r(&quot, &ptm);
int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", &ptm);
length += snprintf(ts + length, fractionLen, format, mod);
httpJsonString(buf, ts, length);
}
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
char ts[40] = {0};
struct tm* ptm;
int32_t fractionLen;
char* format = NULL;
time_t quot = 0;
long mod = 0;
switch (timePrecision) {
case TSDB_TIME_PRECISION_MILLI: {
mod = ((t) % 1000 + 1000) % 1000;
if (t < 0 && mod != 0) {
t -= 1000;
}
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
mod = ((t) % 1000000 + 1000000) % 1000000;
if (t < 0 && mod != 0) {
t -= 1000000;
}
quot = t / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
break;
}
case TSDB_TIME_PRECISION_NANO: {
mod = ((t) % 1000000000 + 1000000000) % 1000000000;
if (t < 0 && mod != 0) {
t -= 1000000000;
}
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
break;
}
default:
fractionLen = 0;
assert(false);
}
ptm = localtime(&quot);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
length += snprintf(ts + length, fractionLen, format, mod);
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
httpJsonString(buf, ts, length);
}
void httpJsonInt(JsonBuf* buf, int32_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%d", num);
}
void httpJsonUInt(JsonBuf* buf, uint32_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%u", num);
}
void httpJsonFloat(JsonBuf* buf, float num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
if (isinf(num) || isnan(num)) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "null");
} else if (num > 1E10 || num < -1E10) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%.5e", num);
} else {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%.5f", num);
}
}
void httpJsonDouble(JsonBuf* buf, double num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
if (isinf(num) || isnan(num)) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "null");
} else if (num > 1E10 || num < -1E10) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%.9e", num);
} else {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%.9f", num);
}
}
void httpJsonNull(JsonBuf* buf) { httpJsonString(buf, "null", 4); }
void httpJsonBool(JsonBuf* buf, int32_t val) {
if (val == 0)
httpJsonPrint(buf, JsonFalseTkn, sizeof(JsonFalseTkn));
else
httpJsonPrint(buf, JsonTrueTkn, sizeof(JsonTrueTkn));
}
void httpJsonPairHead(JsonBuf* buf, char* name, int32_t len) {
httpJsonItemToken(buf);
httpJsonString(buf, name, len);
httpJsonToken(buf, JsonPairTkn);
}
void httpJsonPair(JsonBuf* buf, char* name, int32_t nameLen, char* sVal, int32_t valLen) {
httpJsonPairHead(buf, name, nameLen);
httpJsonString(buf, sVal, valLen);
}
void httpJsonPairOriginString(JsonBuf* buf, char* name, int32_t nameLen, char* sVal, int32_t valLen) {
httpJsonPairHead(buf, name, nameLen);
httpJsonOriginString(buf, sVal, valLen);
}
void httpJsonPairIntVal(JsonBuf* buf, char* name, int32_t nNameLen, int32_t num) {
httpJsonPairHead(buf, name, nNameLen);
httpJsonInt(buf, num);
}
void httpJsonPairInt64Val(JsonBuf* buf, char* name, int32_t nNameLen, int64_t num) {
httpJsonPairHead(buf, name, nNameLen);
httpJsonInt64(buf, num);
}
void httpJsonPairBoolVal(JsonBuf* buf, char* name, int32_t nNameLen, int32_t num) {
httpJsonPairHead(buf, name, nNameLen);
httpJsonBool(buf, num);
}
void httpJsonPairFloatVal(JsonBuf* buf, char* name, int32_t nNameLen, float num) {
httpJsonPairHead(buf, name, nNameLen);
httpJsonFloat(buf, num);
}
void httpJsonPairDoubleVal(JsonBuf* buf, char* name, int32_t nNameLen, double num) {
httpJsonPairHead(buf, name, nNameLen);
httpJsonDouble(buf, num);
}
void httpJsonPairNullVal(JsonBuf* buf, char* name, int32_t nNameLen) {
httpJsonPairHead(buf, name, nNameLen);
httpJsonNull(buf);
}
void httpJsonPairArray(JsonBuf* buf, char* name, int32_t len, httpJsonBuilder fnBuilder, void* dsHandle) {
httpJsonPairHead(buf, name, len);
httpJsonArray(buf, fnBuilder, dsHandle);
}
void httpJsonPairObject(JsonBuf* buf, char* name, int32_t len, httpJsonBuilder fnBuilder, void* dsHandle) {
httpJsonPairHead(buf, name, len);
httpJsonObject(buf, fnBuilder, dsHandle);
}
void httpJsonObject(JsonBuf* buf, httpJsonBuilder fnBuilder, void* dsHandle) {
httpJsonItemToken(buf);
httpJsonToken(buf, JsonObjStt);
(*fnBuilder)(buf, dsHandle);
httpJsonToken(buf, JsonObjEnd);
}
void httpJsonArray(JsonBuf* buf, httpJsonBuilder fnBuilder, void* jsonHandle) {
httpJsonItemToken(buf);
httpJsonToken(buf, JsonArrStt);
(*fnBuilder)(buf, jsonHandle);
httpJsonToken(buf, JsonArrEnd);
}
void httpJsonTestBuf(JsonBuf* buf, int32_t safety) {
if ((buf->lst - buf->buf + safety) < buf->size) return;
// buf->slot = *buf->lst;
httpWriteJsonBufBody(buf, false);
}
void httpJsonToken(JsonBuf* buf, char c) {
httpJsonTestBuf(buf, MAX_NUM_STR_SZ); // maybe object stack
*buf->lst++ = c;
}
void httpJsonPrint(JsonBuf* buf, const char* json, int32_t len) {
if (len == 0 || len >= JSON_BUFFER_SIZE) {
return;
}
if (len > buf->size) {
httpWriteJsonBufBody(buf, false);
httpJsonPrint(buf, json, len);
// buf->slot = json[len - 1];
return;
}
httpJsonTestBuf(buf, len + 2);
memcpy(buf->lst, json, (size_t)len);
buf->lst += len;
}
void httpJsonPairStatus(JsonBuf* buf, int32_t code) {
if (code == 0) {
httpJsonPair(buf, "status", 6, "succ", 4);
} else {
httpJsonPair(buf, "status", 6, "error", 5);
httpJsonItemToken(buf);
httpJsonPairIntVal(buf, "code", 4, code & 0XFFFF);
httpJsonItemToken(buf);
if (code == TSDB_CODE_MND_DB_NOT_SELECTED) {
httpJsonPair(buf, "desc", 4, "failed to create database", 23);
} else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
httpJsonPair(buf, "desc", 4, "failed to create table", 22);
} else {
httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int32_t)strlen(tstrerror(code)));
}
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tfs.h"
#include "httpMetricsHandle.h"
#include "dnode.h"
#include "httpLog.h"
static HttpDecodeMethod metricsDecodeMethod = {"metrics", metricsProcessRequest};
void metricsInitHandle(HttpServer* pServer) {
httpAddMethod(pServer, &metricsDecodeMethod);
}
bool metricsProcessRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, user:%s, process admin grant msg", pContext, pContext->fd, pContext->user);
JsonBuf* jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) {
httpError("failed to allocate memory for metrics");
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
return false;
}
httpInitJsonBuf(jsonBuf, pContext);
httpWriteJsonBufHead(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
{
char* keyDisks = "tags";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
{
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyTagName = "name";
char* keyTagValue = "value";
httpJsonPairOriginString(jsonBuf, keyTagName, (int32_t)strlen(keyTagName), "\"dnode_id\"",
(int32_t)strlen("\"dnode_id\""));
int32_t dnodeId = dnodeGetDnodeId();
httpJsonPairIntVal(jsonBuf, keyTagValue, (int32_t)strlen(keyTagValue), dnodeId);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
{
if (tsDnodeStartTime != 0) {
int64_t now = taosGetTimestampMs();
int64_t upTime = now-tsDnodeStartTime;
char* keyUpTime = "up_time";
httpJsonPairInt64Val(jsonBuf, keyUpTime, (int32_t)strlen(keyUpTime), upTime);
}
}
{
int32_t cpuCores = taosGetCpuCores();
char* keyCpuCores = "cpu_cores";
httpJsonPairIntVal(jsonBuf, keyCpuCores, (int32_t)strlen(keyCpuCores), cpuCores);
float sysCpuUsage = 0;
float procCpuUsage = 0;
bool succeeded = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!succeeded) {
httpError("failed to get cpu usage");
} else {
if (sysCpuUsage <= procCpuUsage) {
sysCpuUsage = procCpuUsage + 0.1f;
}
char* keyCpuSystem = "cpu_system";
char* keyCpuEngine = "cpu_engine";
httpJsonPairFloatVal(jsonBuf, keyCpuSystem, (int32_t)strlen(keyCpuSystem), sysCpuUsage);
httpJsonPairFloatVal(jsonBuf, keyCpuEngine, (int32_t)strlen(keyCpuEngine), procCpuUsage);
}
}
{
float sysMemoryUsedMB = 0;
bool succeeded = taosGetSysMemory(&sysMemoryUsedMB);
if (!succeeded) {
httpError("failed to get sys memory info");
} else {
char* keyMemSystem = "mem_system";
httpJsonPairFloatVal(jsonBuf, keyMemSystem, (int32_t)strlen(keyMemSystem), sysMemoryUsedMB);
}
float procMemoryUsedMB = 0;
succeeded = taosGetProcMemory(&procMemoryUsedMB);
if (!succeeded) {
httpError("failed to get proc memory info");
} else {
char* keyMemEngine = "mem_engine";
httpJsonPairFloatVal(jsonBuf, keyMemEngine, (int32_t)strlen(keyMemEngine), procMemoryUsedMB);
}
}
{
int64_t bytes = 0, rbytes = 0, tbytes = 0;
bool succeeded = taosGetCardInfo(&bytes, &rbytes, &tbytes);
if (!succeeded) {
httpError("failed to get network info");
} else {
char* keyNetIn = "net_in";
char* keyNetOut = "net_out";
httpJsonPairInt64Val(jsonBuf, keyNetIn, (int32_t)strlen(keyNetIn), rbytes);
httpJsonPairInt64Val(jsonBuf, keyNetOut, (int32_t)strlen(keyNetOut), tbytes);
}
}
{
int64_t rchars = 0;
int64_t wchars = 0;
bool succeeded = taosReadProcIO(&rchars, &wchars);
if (!succeeded) {
httpError("failed to get io info");
} else {
char* keyIORead = "io_read";
char* keyIOWrite = "io_write";
httpJsonPairInt64Val(jsonBuf, keyIORead, (int32_t)strlen(keyIORead), rchars);
httpJsonPairInt64Val(jsonBuf, keyIOWrite, (int32_t)strlen(keyIOWrite), wchars);
}
}
{
const int8_t numTiers = 3;
SFSMeta fsMeta;
STierMeta* tierMetas = calloc(numTiers, sizeof(STierMeta));
tfsUpdateInfo(&fsMeta, tierMetas, numTiers);
{
char* keyDiskUsed = "disk_used";
char* keyDiskTotal = "disk_total";
httpJsonPairInt64Val(jsonBuf, keyDiskTotal, (int32_t)strlen(keyDiskTotal), fsMeta.tsize);
httpJsonPairInt64Val(jsonBuf, keyDiskUsed, (int32_t)strlen(keyDiskUsed), fsMeta.used);
char* keyDisks = "disks";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
for (int i = 0; i < numTiers; ++i) {
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyDataDirLevelUsed = "datadir_used";
char* keyDataDirLevelTotal = "datadir_total";
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelUsed, (int32_t)strlen(keyDataDirLevelUsed), tierMetas[i].used);
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelTotal, (int32_t)strlen(keyDataDirLevelTotal), tierMetas[i].size);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
free(tierMetas);
}
{
SStatisInfo info = dnodeGetStatisInfo();
{
char* keyReqHttp = "req_http";
char* keyReqSelect = "req_select";
char* keyReqInsert = "req_insert";
httpJsonPairInt64Val(jsonBuf, keyReqHttp, (int32_t)strlen(keyReqHttp), info.httpReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqSelect, (int32_t)strlen(keyReqSelect), info.queryReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqInsert, (int32_t)strlen(keyReqInsert), info.submitReqNum);
}
}
httpJsonToken(jsonBuf, JsonObjEnd);
httpWriteJsonBufEnd(jsonBuf);
pContext->reqType = HTTP_REQTYPE_OTHERS;
httpFreeJsonBuf(pContext);
return false;
}
\ No newline at end of file
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "httpQueue.h"
#include "../../../../include/client/taos.h"
#include "httpAuth.h"
#include "httpContext.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSession.h"
#include "httpSql.h"
#include "os.h"
#include "tnote.h"
#include "tqueue.h"
#include "tsclient.h"
typedef struct {
pthread_t thread;
int32_t workerId;
} SHttpWorker;
typedef struct {
int32_t num;
SHttpWorker *httpWorker;
} SHttpWorkerPool;
typedef struct {
void * param;
void * result;
int32_t code;
int32_t rows;
FHttpResultFp fp;
} SHttpResult;
static SHttpWorkerPool tsHttpPool;
static taos_qset tsHttpQset;
static taos_queue tsHttpQueue;
void httpDispatchToResultQueue(void *param, TAOS_RES *result, int32_t code, int32_t rows, FHttpResultFp fp) {
if (tsHttpQueue != NULL) {
SHttpResult *pMsg = taosAllocateQitem(sizeof(SHttpResult));
pMsg->param = param;
pMsg->result = result;
pMsg->code = code;
pMsg->rows = rows;
pMsg->fp = fp;
taosWriteQitem(tsHttpQueue, TAOS_QTYPE_RPC, pMsg);
} else {
taos_stop_query(result);
taos_free_result(result);
//(*fp)(param, result, code, rows);
}
}
static void *httpProcessResultQueue(void *param) {
SHttpResult *pMsg;
int32_t type;
void * unUsed;
setThreadName("httpResultQ");
while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
break;
}
httpTrace("context:%p, res:%p will be processed in result queue, code:%d rows:%d", pMsg->param, pMsg->result,
pMsg->code, pMsg->rows);
(*pMsg->fp)(pMsg->param, pMsg->result, pMsg->code, pMsg->rows);
taosFreeQitem(pMsg);
}
return NULL;
}
static bool httpAllocateResultQueue() {
tsHttpQueue = taosOpenQueue();
if (tsHttpQueue == NULL) return false;
taosAddIntoQset(tsHttpQset, tsHttpQueue, NULL);
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
pWorker->workerId = i;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, httpProcessResultQueue, pWorker) != 0) {
httpError("failed to create thread to process http result queue, reason:%s", strerror(errno));
}
pthread_attr_destroy(&thAttr);
httpDebug("http result worker:%d is launched, total:%d", pWorker->workerId, tsHttpPool.num);
}
httpInfo("http result queue is opened");
return true;
}
static void httpFreeResultQueue() {
taosCloseQueue(tsHttpQueue);
tsHttpQueue = NULL;
}
bool httpInitResultQueue() {
tsHttpQset = taosOpenQset();
tsHttpPool.num = tsHttpMaxThreads;
tsHttpPool.httpWorker = (SHttpWorker *)calloc(sizeof(SHttpWorker), tsHttpPool.num);
if (tsHttpPool.httpWorker == NULL) return -1;
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
pWorker->workerId = i;
}
return httpAllocateResultQueue();
}
void httpCleanupResultQueue() {
httpFreeResultQueue();
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsHttpQset);
}
}
for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL);
}
}
taosCloseQset(tsHttpQset);
free(tsHttpPool.httpWorker);
httpInfo("http result queue is closed");
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "httpLog.h"
#include "httpResp.h"
#include "httpJson.h"
#include "httpContext.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
const char *httpVersionStr[] = {"HTTP/1.0", "HTTP/1.1", "HTTP/1.2"};
const char *httpRespTemplate[] = {
// HTTP_RESPONSE_JSON_OK
// HTTP_RESPONSE_JSON_ERROR
"{\"status\":\"succ\",\"code\":%d,\"desc\":\"%s\"}",
"{\"status\":\"error\",\"code\":%d,\"desc\":\"%s\"}",
// HTTP_RESPONSE_OK
// HTTP_RESPONSE_ERROR
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\n\r\n",
"%s %d %s\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\n\r\n",
// HTTP_RESPONSE_CHUNKED_UN_COMPRESS, HTTP_RESPONSE_CHUNKED_COMPRESS
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nTransfer-Encoding: chunked\r\n\r\n",
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Encoding: gzip\r\nTransfer-Encoding: chunked\r\n\r\n",
// HTTP_RESPONSE_OPTIONS
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\nAccess-Control-Allow-Methods: *\r\nAccess-Control-Max-Age: 3600\r\nAccess-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept, authorization\r\n\r\n",
// HTTP_RESPONSE_GRAFANA
"%s 200 OK\r\nAccess-Control-Allow-Origin:*\r\n%sAccess-Control-Allow-Methods:POST, GET, OPTIONS, DELETE, PUT\r\nAccess-Control-Allow-Headers:Accept, Content-Type\r\nContent-Type: application/json;charset=utf-8\r\nContent-Length: %d\r\n\r\n"
};
static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char *httpCodeStr, int32_t errNo, const char *desc) {
httpError("context:%p, fd:%d, code:%d, error:%s", pContext, pContext->fd, httpCode, desc);
char head[512] = {0};
char body[512] = {0};
int8_t httpVersion = 0;
int8_t keepAlive = 0;
if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion;
keepAlive = pContext->parser->keepAlive;
}
int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_ERROR], errNo, desc);
int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_ERROR], httpVersionStr[httpVersion], httpCode,
httpCodeStr, httpKeepAliveStr[keepAlive], bodyLen);
httpWriteBuf(pContext, head, headLen);
httpWriteBuf(pContext, body, bodyLen);
httpCloseContextByApp(pContext);
}
void httpSendErrorResp(HttpContext *pContext, int32_t errNo) {
int32_t httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
if (errNo == TSDB_CODE_SUCCESS)
httpCode = HTTP_CODE_OK;
else if (errNo == TSDB_CODE_HTTP_SERVER_OFFLINE)
httpCode = HTTP_CODE_NOT_FOUND;
else if (errNo == TSDB_CODE_HTTP_UNSUPPORT_URL)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_INVALID_URL)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_NO_ENOUGH_MEMORY)
httpCode = HTTP_CODE_INSUFFICIENT_STORAGE;
else if (errNo == TSDB_CODE_HTTP_REQUSET_TOO_BIG)
httpCode = HTTP_CODE_PAYLOAD_TOO_LARGE;
else if (errNo == TSDB_CODE_HTTP_NO_AUTH_INFO)
httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_NO_MSG_INPUT)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_NO_SQL_INPUT)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_NO_EXEC_USEDB)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_SESSION_FULL)
httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_GEN_TAOSD_TOKEN_ERR)
httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_INVALID_MULTI_REQUEST)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_CREATE_GZIP_FAILED)
httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_FINISH_GZIP_FAILED)
httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_INVALID_VERSION)
httpCode = HTTP_CODE_HTTP_VER_NOT_SUPPORTED;
else if (errNo == TSDB_CODE_HTTP_INVALID_CONTENT_LENGTH)
httpCode = HTTP_CODE_LENGTH_REQUIRED;
else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_TYPE)
httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_FORMAT)
httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_INVALID_BASIC_AUTH)
httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_INVALID_TAOSD_AUTH)
httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_PARSE_METHOD_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_TARGET_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_VERSION_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_SP_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_STATUS_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_PHRASE_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_CRLF_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_KEY_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_VAL_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_SIZE_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_END_FAILED)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_INVALID_STATE)
httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_ERROR_STATE)
httpCode = HTTP_CODE_BAD_REQUEST;
else
httpCode = HTTP_CODE_BAD_REQUEST;
if (pContext->parser && pContext->parser->httpCode != 0) {
httpCode = pContext->parser->httpCode;
}
pContext->error = true;
char *httpCodeStr = httpGetStatusDesc(httpCode);
httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo & 0XFFFF, tstrerror(errNo));
}
void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) {
int32_t httpCode = HTTP_CODE_BAD_REQUEST;
char temp[512] = {0};
int32_t len = sprintf(temp, "invalid SQL: %s", errMsg);
for (int32_t i = 0; i < len; ++i) {
if (temp[i] == '\"') {
temp[i] = '\'';
} else if (temp[i] == '\n') {
temp[i] = ' ';
} else {
}
}
httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_OPERATION & 0XFFFF, temp);
}
void httpSendSuccResp(HttpContext *pContext, char *desc) {
char head[1024] = {0};
char body[1024] = {0};
int8_t httpVersion = 0;
int8_t keepAlive = 0;
if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion;
keepAlive = pContext->parser->keepAlive;
}
int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_OK], TSDB_CODE_SUCCESS, desc);
int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_OK], httpVersionStr[httpVersion],
httpKeepAliveStr[keepAlive], bodyLen);
httpWriteBuf(pContext, head, headLen);
httpWriteBuf(pContext, body, bodyLen);
httpCloseContextByApp(pContext);
}
void httpSendOptionResp(HttpContext *pContext, char *desc) {
char head[1024] = {0};
char body[1024] = {0};
int8_t httpVersion = 0;
int8_t keepAlive = 0;
if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion;
keepAlive = pContext->parser->keepAlive;
}
int32_t bodyLen = sprintf(body, httpRespTemplate[HTTP_RESPONSE_JSON_OK], TSDB_CODE_SUCCESS, desc);
int32_t headLen = sprintf(head, httpRespTemplate[HTTP_RESPONSE_OPTIONS], httpVersionStr[httpVersion],
httpKeepAliveStr[keepAlive], bodyLen);
httpWriteBuf(pContext, head, headLen);
httpWriteBuf(pContext, body, bodyLen);
httpCloseContextByApp(pContext);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "httpLog.h"
#include "httpRestHandle.h"
#include "httpRestJson.h"
#include "tglobal.h"
static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest};
static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest};
static HttpEncodeMethod restEncodeSqlTimestampMethod = {
.startJsonFp = restStartSqlJson,
.stopJsonFp = restStopSqlJson,
.buildQueryJsonFp = restBuildSqlTimestampJson,
.buildAffectRowJsonFp = restBuildSqlAffectRowsJson,
.initJsonFp = NULL,
.cleanJsonFp = NULL,
.checkFinishedFp = NULL,
.setNextCmdFp = NULL
};
static HttpEncodeMethod restEncodeSqlLocalTimeStringMethod = {
.startJsonFp = restStartSqlJson,
.stopJsonFp = restStopSqlJson,
.buildQueryJsonFp = restBuildSqlLocalTimeStringJson,
.buildAffectRowJsonFp = restBuildSqlAffectRowsJson,
.initJsonFp = NULL,
.cleanJsonFp = NULL,
.checkFinishedFp = NULL,
.setNextCmdFp = NULL
};
static HttpEncodeMethod restEncodeSqlUtcTimeStringMethod = {
.startJsonFp = restStartSqlJson,
.stopJsonFp = restStopSqlJson,
.buildQueryJsonFp = restBuildSqlUtcTimeStringJson,
.buildAffectRowJsonFp = restBuildSqlAffectRowsJson,
.initJsonFp = NULL,
.cleanJsonFp = NULL,
.checkFinishedFp = NULL,
.setNextCmdFp = NULL
};
void restInitHandle(HttpServer* pServer) {
httpAddMethod(pServer, &restDecodeMethod);
httpAddMethod(pServer, &restDecodeMethod2);
}
bool restGetUserFromUrl(HttpContext* pContext) {
HttpParser* pParser = pContext->parser;
if (pParser->path[REST_USER_USEDB_URL_POS].pos >= TSDB_USER_LEN || pParser->path[REST_USER_USEDB_URL_POS].pos <= 0) {
return false;
}
tstrncpy(pContext->user, pParser->path[REST_USER_USEDB_URL_POS].str, TSDB_USER_LEN);
return true;
}
bool restGetPassFromUrl(HttpContext* pContext) {
HttpParser* pParser = pContext->parser;
if (pParser->path[REST_PASS_URL_POS].pos >= HTTP_PASSWORD_LEN || pParser->path[REST_PASS_URL_POS].pos <= 0) {
return false;
}
tstrncpy(pContext->pass, pParser->path[REST_PASS_URL_POS].str, HTTP_PASSWORD_LEN);
return true;
}
bool restProcessLoginRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, user:%s, process restful login msg", pContext, pContext->fd, pContext->user);
pContext->reqType = HTTP_REQTYPE_LOGIN;
return true;
}
bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) {
httpDebug("context:%p, fd:%d, user:%s, process restful sql msg", pContext, pContext->fd, pContext->user);
char* sql = pContext->parser->body.str;
if (sql == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_SQL_INPUT);
return false;
}
/*
* for async test
*
if (httpCheckUsedbSql(sql)) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_EXEC_USEDB);
return false;
}
*/
HttpSqlCmd* cmd = &(pContext->singleCmd);
cmd->nativSql = sql;
/* find if there is db_name in url */
pContext->db[0] = '\0';
HttpString *path = &pContext->parser->path[REST_USER_USEDB_URL_POS];
if (tsHttpDbNameMandatory) {
if (path->pos == 0) {
httpError("context:%p, fd:%d, user:%s, database name is mandatory", pContext, pContext->fd, pContext->user);
httpSendErrorResp(pContext, TSDB_CODE_HTTP_INVALID_URL);
return false;
}
}
if (path->pos > 0 && !(strlen(sql) > 4 && (sql[0] == 'u' || sql[0] == 'U') &&
(sql[1] == 's' || sql[1] == 'S') && (sql[2] == 'e' || sql[2] == 'E') && sql[3] == ' '))
{
snprintf(pContext->db, /*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN, "%s", path->str);
}
pContext->reqType = HTTP_REQTYPE_SINGLE_SQL;
if (timestampFmt == REST_TIMESTAMP_FMT_LOCAL_STRING) {
pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod;
} else if (timestampFmt == REST_TIMESTAMP_FMT_TIMESTAMP) {
pContext->encodeMethod = &restEncodeSqlTimestampMethod;
} else if (timestampFmt == REST_TIMESTAMP_FMT_UTC_STRING) {
pContext->encodeMethod = &restEncodeSqlUtcTimeStringMethod;
}
return true;
}
#define REST_FUNC_URL_POS 2
#define REST_OUTP_URL_POS 3
#define REST_AGGR_URL_POS 4
#define REST_BUFF_URL_POS 5
#define HTTP_FUNC_LEN 32
#define HTTP_OUTP_LEN 16
#define HTTP_AGGR_LEN 2
#define HTTP_BUFF_LEN 32
static int udfSaveFile(const char *fname, const char *content, long len) {
int fd = open(fname, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755);
if (fd < 0)
return -1;
if (taosWrite(fd, (void *)content, len) < 0)
return -1;
return 0;
}
static bool restProcessUdfRequest(HttpContext* pContext) {
HttpParser* pParser = pContext->parser;
if (pParser->path[REST_FUNC_URL_POS].pos >= HTTP_FUNC_LEN || pParser->path[REST_FUNC_URL_POS].pos <= 0) {
return false;
}
if (pParser->path[REST_OUTP_URL_POS].pos >= HTTP_OUTP_LEN || pParser->path[REST_OUTP_URL_POS].pos <= 0) {
return false;
}
if (pParser->path[REST_AGGR_URL_POS].pos >= HTTP_AGGR_LEN || pParser->path[REST_AGGR_URL_POS].pos <= 0) {
return false;
}
if (pParser->path[REST_BUFF_URL_POS].pos >= HTTP_BUFF_LEN || pParser->path[REST_BUFF_URL_POS].pos <= 0) {
return false;
}
char* sql = pContext->parser->body.str;
int len = pContext->parser->body.pos;
if (sql == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_SQL_INPUT);
return false;
}
char udfDir[256] = {0};
char buf[64] = "udf-";
char funcName[64] = {0};
int aggr = 0;
char outputType[16] = {0};
char buffSize[32] = {0};
tstrncpy(funcName, pParser->path[REST_FUNC_URL_POS].str, HTTP_FUNC_LEN);
tstrncpy(buf + 4, funcName, HTTP_FUNC_LEN);
if (pParser->path[REST_AGGR_URL_POS].str[0] != '0') {
aggr = 1;
}
tstrncpy(outputType, pParser->path[REST_OUTP_URL_POS].str, HTTP_OUTP_LEN);
tstrncpy(buffSize, pParser->path[REST_BUFF_URL_POS].str, HTTP_BUFF_LEN);
taosGetTmpfilePath(funcName, udfDir);
udfSaveFile(udfDir, sql, len);
tfree(sql);
pContext->parser->body.str = malloc(1024);
sql = pContext->parser->body.str;
int sqlLen = sprintf(sql, "create %s function %s as \"%s\" outputtype %s bufsize %s",
aggr == 1 ? "aggregate" : " ", funcName, udfDir, outputType, buffSize);
pContext->parser->body.pos = sqlLen;
pContext->parser->body.size = sqlLen + 1;
HttpSqlCmd* cmd = &(pContext->singleCmd);
cmd->nativSql = sql;
pContext->reqType = HTTP_REQTYPE_SINGLE_SQL;
pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod;
return true;
}
bool restProcessRequest(struct HttpContext* pContext) {
if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) {
restGetUserFromUrl(pContext);
restGetPassFromUrl(pContext);
}
if (strlen(pContext->user) == 0 || strlen(pContext->pass) == 0) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_AUTH_INFO);
return false;
}
if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "sql")) {
return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_LOCAL_STRING);
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "sqlt")) {
return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_TIMESTAMP);
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "sqlutc")) {
return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_UTC_STRING);
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) {
return restProcessLoginRequest(pContext);
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "udf")) {
return restProcessUdfRequest(pContext);
} else {
}
httpSendErrorResp(pContext, TSDB_CODE_HTTP_INVALID_URL);
return false;
}
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "httpSession.h"
#include "../../../../include/client/taos.h"
#include "httpContext.h"
#include "httpInt.h"
#include "os.h"
#include "taoserror.h"
#include "tcache.h"
#include "tglobal.h"
void httpCreateSession(HttpContext *pContext, void *taos) {
HttpServer *server = &tsHttpServer;
httpReleaseSession(pContext);
pthread_mutex_lock(&server->serverMutex);
HttpSession session;
memset(&session, 0, sizeof(HttpSession));
session.taos = taos;
session.refCount = 1;
int32_t len = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session =
taosCachePut(server->sessionCache, session.id, len, &session, sizeof(HttpSession), tsHttpSessionExpire * 1000);
// void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
if (pContext->session == NULL) {
httpError("context:%p, fd:%d, user:%s, error:%s", pContext, pContext->fd, pContext->user,
tstrerror(TSDB_CODE_HTTP_SESSION_FULL));
taos_close(taos);
pthread_mutex_unlock(&server->serverMutex);
return;
}
httpDebug("context:%p, fd:%d, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd,
pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pthread_mutex_unlock(&server->serverMutex);
}
static void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = &tsHttpServer;
pthread_mutex_lock(&server->serverMutex);
char sessionId[HTTP_SESSION_ID_LEN];
int32_t len = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCacheAcquireByKey(server->sessionCache, sessionId, len);
if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->session->refCount, 1);
httpDebug("context:%p, fd:%d, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
} else {
httpDebug("context:%p, fd:%d, user:%s, session not found", pContext, pContext->fd, pContext->user);
}
pthread_mutex_unlock(&server->serverMutex);
}
void httpGetSession(HttpContext *pContext) {
if (pContext->session == NULL) {
httpFetchSessionImp(pContext);
} else {
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
httpReleaseSession(pContext);
httpFetchSessionImp(pContext);
}
}
void httpReleaseSession(HttpContext *pContext) {
if (pContext == NULL || pContext->session == NULL) return;
int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1);
assert(refCount >= 0);
httpDebug("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos,
pContext->session->refCount);
taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false);
pContext->session = NULL;
}
static void httpDestroySession(void *data) {
HttpSession *session = data;
httpDebug("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
if (session->taos != NULL) {
taos_close(session->taos);
session->taos = NULL;
}
}
void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) {
SCacheObj *cache = tsHttpServer.sessionCache;
httpInfo("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL;
}
}
bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, 5, false, httpDestroySession, "rests");
if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache");
return false;
}
return true;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
LIST(REMOVE_ITEM SRC src/syncArbitrator.c)
ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common)
LIST(APPEND BIN_SRC src/syncArbitrator.c)
LIST(APPEND BIN_SRC src/syncTcp.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator sync common os tutil)
#ADD_SUBDIRECTORY(test)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册