提交 ef1fe62a 编写于 作者: A Alex Duan

fix(http): migrate http module from 2.6 to 2.0

上级 84565e38
...@@ -41,6 +41,11 @@ extern "C" { ...@@ -41,6 +41,11 @@ extern "C" {
struct SSqlInfo; struct SSqlInfo;
struct SLocalMerger; struct SLocalMerger;
typedef enum {
TAOS_REQ_FROM_SHELL,
TAOS_REQ_FROM_HTTP
} SReqOrigin;
// data source from sql string or from file // data source from sql string or from file
enum { enum {
DATA_FROM_SQL_STRING = 1, DATA_FROM_SQL_STRING = 1,
...@@ -361,6 +366,7 @@ typedef struct STscObj { ...@@ -361,6 +366,7 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet; SRpcCorEpSet *tscCorMgmtEpSet;
pthread_mutex_t mutex; pthread_mutex_t mutex;
int32_t numOfObj; // number of sqlObj from this tscObj int32_t numOfObj; // number of sqlObj from this tscObj
SReqOrigin from;
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
......
...@@ -41,6 +41,7 @@ extern char tsArbitrator[]; ...@@ -41,6 +41,7 @@ extern char tsArbitrator[];
extern int8_t tsArbOnline; extern int8_t tsArbOnline;
extern int64_t tsArbOnlineTimestamp; extern int64_t tsArbOnlineTimestamp;
extern int32_t tsDnodeId; extern int32_t tsDnodeId;
extern int64_t tsDnodeStartTime;
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;
...@@ -123,6 +124,8 @@ extern int32_t tsHttpMaxThreads; ...@@ -123,6 +124,8 @@ extern int32_t tsHttpMaxThreads;
extern int8_t tsHttpEnableCompress; extern int8_t tsHttpEnableCompress;
extern int8_t tsHttpEnableRecordSql; extern int8_t tsHttpEnableRecordSql;
extern int8_t tsTelegrafUseFieldNum; extern int8_t tsTelegrafUseFieldNum;
extern int8_t tsHttpDbNameMandatory;
extern int32_t tsHttpKeepAlive;
// mqtt // mqtt
extern int8_t tsEnableMqttModule; extern int8_t tsEnableMqttModule;
......
...@@ -48,6 +48,7 @@ int8_t tsArbOnline = 0; ...@@ -48,6 +48,7 @@ int8_t tsArbOnline = 0;
int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME; int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME;
char tsEmail[TSDB_FQDN_LEN] = {0}; char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0; int32_t tsDnodeId = 0;
int64_t tsDnodeStartTime = 0;
// common // common
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 300;
...@@ -160,6 +161,9 @@ int32_t tsHttpMaxThreads = 2; ...@@ -160,6 +161,9 @@ int32_t tsHttpMaxThreads = 2;
int8_t tsHttpEnableCompress = 1; int8_t tsHttpEnableCompress = 1;
int8_t tsHttpEnableRecordSql = 0; int8_t tsHttpEnableRecordSql = 0;
int8_t tsTelegrafUseFieldNum = 0; int8_t tsTelegrafUseFieldNum = 0;
int8_t tsHttpDbNameMandatory = 0;
int32_t tsHttpKeepAlive = 30000;
// mqtt // mqtt
int8_t tsEnableMqttModule = 0; // not finished yet, not started it by default int8_t tsEnableMqttModule = 0; // not finished yet, not started it by default
...@@ -1215,6 +1219,27 @@ static void doInitGlobalConfig(void) { ...@@ -1215,6 +1219,27 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "httpDbNameMandatory";
cfg.ptr = &tsHttpDbNameMandatory;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// pContext in cache
cfg.option = "httpKeepAlive";
cfg.ptr = &tsHttpKeepAlive;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 3000;
cfg.maxValue = 3600000;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// debug flag // debug flag
cfg.option = "numOfLogLines"; cfg.option = "numOfLogLines";
cfg.ptr = &tsNumOfLogLines; cfg.ptr = &tsNumOfLogLines;
......
...@@ -159,6 +159,7 @@ int32_t dnodeInitSystem() { ...@@ -159,6 +159,7 @@ int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING); dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING);
moduleStart(); moduleStart();
tsDnodeStartTime = taosGetTimestampMs();
dnodeReportStep("TDengine", "initialized successfully", 1); dnodeReportStep("TDengine", "initialized successfully", 1);
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
......
...@@ -22,7 +22,9 @@ extern "C" { ...@@ -22,7 +22,9 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
int32_t httpGetReqCount(); int64_t httpGetReqCount();
int32_t httpGetStatusCodeCount(int index);
int32_t httpClearStatusCodeCount(int index);
int32_t httpInitSystem(); int32_t httpInitSystem();
int32_t httpStartSystem(); int32_t httpStartSystem();
void httpStopSystem(); void httpStopSystem();
......
...@@ -22,6 +22,12 @@ extern "C" { ...@@ -22,6 +22,12 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
typedef struct {
const char * name;
int32_t code;
int32_t index;
} SMonHttpStatus;
typedef struct { typedef struct {
char * acctId; char * acctId;
int64_t currentPointsPerSecond; int64_t currentPointsPerSecond;
...@@ -54,6 +60,7 @@ void monCleanupSystem(); ...@@ -54,6 +60,7 @@ void monCleanupSystem();
void monSaveAcctLog(SAcctMonitorObj *pMonObj); void monSaveAcctLog(SAcctMonitorObj *pMonObj);
void monSaveLog(int32_t level, const char *const format, ...); void monSaveLog(int32_t level, const char *const format, ...);
void monExecuteSQL(char *sql); void monExecuteSQL(char *sql);
SMonHttpStatus *monGetHttpStatusHashTableEntry(int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -75,6 +75,7 @@ int32_t* taosGetErrno(); ...@@ -75,6 +75,7 @@ int32_t* taosGetErrno();
//client //client
#define TSDB_CODE_TSC_INVALID_SQL TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid SQL statement") #define TSDB_CODE_TSC_INVALID_SQL TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid SQL statement")
#define TSDB_CODE_TSC_INVALID_OPERATION TSDB_CODE_TSC_INVALID_SQL
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle") #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle")
#define TSDB_CODE_TSC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0202) //"Invalid combination of client/service time") #define TSDB_CODE_TSC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0202) //"Invalid combination of client/service time")
#define TSDB_CODE_TSC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x0203) //"Invalid value in client") #define TSDB_CODE_TSC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x0203) //"Invalid value in client")
...@@ -394,6 +395,8 @@ int32_t* taosGetErrno(); ...@@ -394,6 +395,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find") #define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find")
#define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string") #define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string")
#define TSDB_CODE_HTTP_REQUEST_JSON_ERROR TAOS_DEF_ERROR_CODE(0, 0x1F00) //"http request json error"
// odbc // odbc
#define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory") #define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory")
#define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input") #define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input")
......
...@@ -57,9 +57,11 @@ ...@@ -57,9 +57,11 @@
#define NANO_SECOND_ENABLED 0 #define NANO_SECOND_ENABLED 0
#define SET_THREADNAME_ENABLED 0 #define SET_THREADNAME_ENABLED 0
/*dkj
#if SET_THREADNAME_ENABLED == 0 #if SET_THREADNAME_ENABLED == 0
#define setThreadName(name) #define setThreadName(name)
#endif #endif
*/
#define REQ_EXTRA_BUF_LEN 1024 #define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096 #define RESP_BUF_LEN 4096
......
...@@ -112,6 +112,26 @@ extern "C" { ...@@ -112,6 +112,26 @@ extern "C" {
#define threadlocal __declspec( thread ) #define threadlocal __declspec( thread )
#endif #endif
// setThreadName
#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64)
// MacOS
#if !defined(_GNU_SOURCE)
#define setThreadName(name) do { pthread_setname_np((name)); } while (0)
#else
// pthread_setname_np not defined
#define setThreadName(name)
#endif
#else
// Linux, length of name must <= 16 (the last '\0' included)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#endif
#else
// Windows
#define setThreadName(name)
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -78,6 +78,7 @@ extern "C" { ...@@ -78,6 +78,7 @@ extern "C" {
#include <error.h> #include <error.h>
#include <math.h> #include <math.h>
#include <poll.h> #include <poll.h>
#include <sys/prctl.h>
#define TAOS_OS_FUNC_LZ4 #define TAOS_OS_FUNC_LZ4
#define BUILDIN_CLZL(val) __builtin_clzll(val) #define BUILDIN_CLZL(val) __builtin_clzll(val)
......
...@@ -81,6 +81,7 @@ extern "C" { ...@@ -81,6 +81,7 @@ extern "C" {
#endif #endif
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <math.h> #include <math.h>
#include <sys/prctl.h>
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -86,6 +86,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati ...@@ -86,6 +86,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce(); void deltaToUtcInitOnce();
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -293,6 +293,63 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { ...@@ -293,6 +293,63 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return 0; return 0;
} }
int64_t convertTimePrecision(int64_t timeStamp, int32_t fromPrecision, int32_t toPrecision) {
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI ||
fromPrecision == TSDB_TIME_PRECISION_MICRO ||
fromPrecision == TSDB_TIME_PRECISION_NANO);
assert(toPrecision == TSDB_TIME_PRECISION_MILLI ||
toPrecision == TSDB_TIME_PRECISION_MICRO ||
toPrecision == TSDB_TIME_PRECISION_NANO);
double tempResult = (double)timeStamp;
switch(fromPrecision) {
case TSDB_TIME_PRECISION_MILLI: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return timeStamp;
case TSDB_TIME_PRECISION_MICRO:
tempResult *= 1000;
timeStamp *= 1000;
goto end_;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000000;
timeStamp *= 1000000;
goto end_;
}
} // end from milli
case TSDB_TIME_PRECISION_MICRO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return timeStamp / 1000;
case TSDB_TIME_PRECISION_MICRO:
return timeStamp;
case TSDB_TIME_PRECISION_NANO:
tempResult *= 1000;
timeStamp *= 1000;
goto end_;
}
} //end from micro
case TSDB_TIME_PRECISION_NANO: {
switch (toPrecision) {
case TSDB_TIME_PRECISION_MILLI:
return timeStamp / 1000000;
case TSDB_TIME_PRECISION_MICRO:
return timeStamp / 1000;
case TSDB_TIME_PRECISION_NANO:
return timeStamp;
}
} //end from nano
default: {
assert(0);
return timeStamp; // only to pass windows compilation
}
} //end switch fromPrecision
end_:
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
if (tempResult <= (double)INT64_MIN) return INT64_MIN + 1; // INT64_MIN means NULL
return timeStamp;
}
int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) { int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
*time = 0; *time = 0;
struct tm tm = {0}; struct tm tm = {0};
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
...@@ -6,6 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) ...@@ -6,6 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
......
...@@ -37,11 +37,12 @@ ...@@ -37,11 +37,12 @@
#define HTTP_BUFFER_SIZE 8388608 #define HTTP_BUFFER_SIZE 8388608
#define HTTP_STEP_SIZE 4096 //http message get process step by step #define HTTP_STEP_SIZE 4096 //http message get process step by step
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size #define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512 #define HTTP_GC_TARGET_SIZE 16384
#define HTTP_WRITE_RETRY_TIMES 500 #define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_PASSWORD_LEN TSDB_UNI_LEN #define HTTP_PASSWORD_LEN TSDB_UNI_LEN
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + HTTP_PASSWORD_LEN) #define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + HTTP_PASSWORD_LEN)
#define HTTP_STATUS_CODE_NUM 63
typedef enum HttpReqType { typedef enum HttpReqType {
HTTP_REQTYPE_OTHERS = 0, HTTP_REQTYPE_OTHERS = 0,
...@@ -140,26 +141,29 @@ typedef enum { ...@@ -140,26 +141,29 @@ typedef enum {
} EHTTP_CONTEXT_FAILED_CAUSE; } EHTTP_CONTEXT_FAILED_CAUSE;
typedef struct HttpContext { typedef struct HttpContext {
int32_t refCount; int32_t refCount;
SOCKET fd; SOCKET fd;
uint32_t accessTimes; uint32_t accessTimes;
uint32_t lastAccessTime; uint32_t lastAccessTime;
int32_t state; int32_t state;
uint8_t reqType; uint8_t reqType;
uint8_t parsed; uint8_t parsed;
char ipstr[22]; bool error;
char user[TSDB_USER_LEN]; // parsed from auth token or login message char ipstr[22];
char pass[HTTP_PASSWORD_LEN]; char user[TSDB_USER_LEN]; // parsed from auth token or login message
TAOS * taos; char pass[HTTP_PASSWORD_LEN];
void * ppContext; char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN];
HttpSession *session; TAOS * taos;
z_stream gzipStream; void * ppContext;
HttpParser *parser; pthread_mutex_t ctxMutex;
HttpSqlCmd singleCmd; HttpSession *session;
HttpSqlCmds *multiCmds; z_stream gzipStream;
JsonBuf * jsonBuf; HttpParser *parser;
HttpEncodeMethod *encodeMethod; HttpSqlCmd singleCmd;
HttpDecodeMethod *decodeMethod; HttpSqlCmds *multiCmds;
JsonBuf *jsonBuf;
HttpEncodeMethod *encodeMethod;
HttpDecodeMethod *decodeMethod;
struct HttpThread *pThread; struct HttpThread *pThread;
} HttpContext; } HttpContext;
...@@ -184,8 +188,9 @@ typedef struct HttpServer { ...@@ -184,8 +188,9 @@ typedef struct HttpServer {
SOCKET fd; SOCKET fd;
int32_t numOfThreads; int32_t numOfThreads;
int32_t methodScannerLen; int32_t methodScannerLen;
int32_t requestNum; int64_t requestNum;
int32_t status; int32_t status;
int32_t statusCodeErrs[HTTP_STATUS_CODE_NUM];
pthread_t thread; pthread_t thread;
HttpThread * pThreads; HttpThread * pThreads;
void * contextCache; void * contextCache;
......
...@@ -64,8 +64,8 @@ void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len); ...@@ -64,8 +64,8 @@ void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen); void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen);
void httpJsonInt64(JsonBuf* buf, int64_t num); void httpJsonInt64(JsonBuf* buf, int64_t num);
void httpJsonUInt64(JsonBuf* buf, uint64_t num); void httpJsonUInt64(JsonBuf* buf, uint64_t num);
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us); void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision);
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us); void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision);
void httpJsonInt(JsonBuf* buf, int32_t num); void httpJsonInt(JsonBuf* buf, int32_t num);
void httpJsonUInt(JsonBuf* buf, uint32_t num); void httpJsonUInt(JsonBuf* buf, uint32_t num);
void httpJsonFloat(JsonBuf* buf, float num); void httpJsonFloat(JsonBuf* buf, float num);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTPMETRICSHANDLE_H
#define TDENGINE_HTTPMETRICSHANDLE_H
#include "http.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
void metricsInitHandle(HttpServer* httpServer);
bool metricsProcessRequest(struct HttpContext* httpContext);
#endif // TDENGINE_HTTPMETRICHANDLE_H
...@@ -22,12 +22,12 @@ ...@@ -22,12 +22,12 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h" #include "httpSql.h"
#define REST_ROOT_URL_POS 0 #define REST_ROOT_URL_POS 0
#define REST_ACTION_URL_POS 1 #define REST_ACTION_URL_POS 1
#define REST_USER_URL_POS 2 #define REST_USER_USEDB_URL_POS 2
#define REST_PASS_URL_POS 3 #define REST_PASS_URL_POS 3
void restInitHandle(HttpServer* pServer); void restInitHandle(HttpServer* pServer);
bool restProcessRequest(struct HttpContext* pContext); bool restProcessRequest(struct HttpContext* pContext);
#endif #endif
\ No newline at end of file
...@@ -35,4 +35,7 @@ void httpTrimTableName(char *name); ...@@ -35,4 +35,7 @@ void httpTrimTableName(char *name);
int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name); int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name);
char * httpGetCmdsString(HttpContext *pContext, int32_t pos); char * httpGetCmdsString(HttpContext *pContext, int32_t pos);
int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql);
void httpCheckFreeEscapedSql(char *oldSql, char *newSql);
#endif #endif
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define TDENGINE_HTTP_UTIL_H #define TDENGINE_HTTP_UTIL_H
bool httpCheckUsedbSql(char *sql); bool httpCheckUsedbSql(char *sql);
bool httpCheckAlterSql(char *sql);
void httpTimeToString(int32_t t, char *buf, int32_t buflen); void httpTimeToString(int32_t t, char *buf, int32_t buflen);
bool httpUrlMatch(HttpContext *pContext, int32_t pos, char *cmp); bool httpUrlMatch(HttpContext *pContext, int32_t pos, char *cmp);
......
...@@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) { ...@@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) {
pContext->parser = NULL; pContext->parser = NULL;
} }
pthread_mutex_destroy(&pContext->ctxMutex);
tfree(pContext); tfree(pContext);
} }
...@@ -118,16 +120,19 @@ HttpContext *httpCreateContext(SOCKET fd) { ...@@ -118,16 +120,19 @@ HttpContext *httpCreateContext(SOCKET fd) {
pContext->lastAccessTime = taosGetTimestampSec(); pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY; pContext->state = HTTP_CONTEXT_STATE_READY;
pContext->parser = httpCreateParser(pContext); pContext->parser = httpCreateParser(pContext);
pContext->error = false;
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pContext; TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pContext;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pContext, HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pContext,
sizeof(TSDB_CACHE_PTR_TYPE), 3000); sizeof(TSDB_CACHE_PTR_TYPE), tsHttpKeepAlive);
pContext->ppContext = ppContext; pContext->ppContext = ppContext;
httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext);
// set the ref to 0 // set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false); taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false);
pthread_mutex_init(&pContext->ctxMutex, NULL);
return pContext; return pContext;
} }
...@@ -188,11 +193,12 @@ void httpCloseContextByApp(HttpContext *pContext) { ...@@ -188,11 +193,12 @@ void httpCloseContextByApp(HttpContext *pContext) {
pContext->parsed = false; pContext->parsed = false;
bool keepAlive = true; bool keepAlive = true;
if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) { if (pContext->error == true) {
keepAlive = false;
} else if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false; keepAlive = false;
} else if (parser && parser->httpVersion != HTTP_VERSION_10 && parser->keepAlive == HTTP_KEEPALIVE_DISABLE) { } else if (parser && parser->httpVersion != HTTP_VERSION_10 && parser->keepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false; keepAlive = false;
} else {
} }
if (keepAlive) { if (keepAlive) {
......
...@@ -176,6 +176,16 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -176,6 +176,16 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
return false; return false;
} }
#define ESCAPE_ERROR_PROC(code, context, root) \
do { \
if (code != TSDB_CODE_SUCCESS) { \
httpSendErrorResp(context, code); \
\
cJSON_Delete(root); \
return false; \
} \
} while (0)
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
cJSON* query = cJSON_GetArrayItem(root, i); cJSON* query = cJSON_GetArrayItem(root, i);
if (query == NULL) continue; if (query == NULL) continue;
...@@ -186,7 +196,14 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -186,7 +196,14 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
continue; continue;
} }
int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, refId->valuestring); char *newStr = NULL;
int32_t retCode = 0;
retCode = httpCheckAllocEscapeSql(refId->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(refId->valuestring, newStr);
if (refIdBuffer == -1) { if (refIdBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user); httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user);
break; break;
...@@ -195,7 +212,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -195,7 +212,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
cJSON* alias = cJSON_GetObjectItem(query, "alias"); cJSON* alias = cJSON_GetObjectItem(query, "alias");
int32_t aliasBuffer = -1; int32_t aliasBuffer = -1;
if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) { if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) {
aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring); retCode = httpCheckAllocEscapeSql(alias->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
aliasBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(alias->valuestring, newStr);
if (aliasBuffer == -1) { if (aliasBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user); httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user);
break; break;
...@@ -211,7 +232,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -211,7 +232,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
continue; continue;
} }
int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, sql->valuestring); retCode = httpCheckAllocEscapeSql(sql->valuestring, &newStr);
ESCAPE_ERROR_PROC(retCode, pContext, root);
int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, newStr);
httpCheckFreeEscapedSql(sql->valuestring, newStr);
if (sqlBuffer == -1) { if (sqlBuffer == -1) {
httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user); httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user);
break; break;
...@@ -237,6 +262,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) { ...@@ -237,6 +262,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) {
} }
} }
#undef ESCAPE_ERROR_PROC
pContext->reqType = HTTP_REQTYPE_MULTI_SQL; pContext->reqType = HTTP_REQTYPE_MULTI_SQL;
pContext->encodeMethod = &gcQueryMethod; pContext->encodeMethod = &gcQueryMethod;
pContext->multiCmds->pos = 0; pContext->multiCmds->pos = 0;
......
...@@ -130,14 +130,34 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -130,14 +130,34 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
// for group by // for group by
if (groupFields != -1) { if (groupFields != -1) {
char target[HTTP_GC_TARGET_SIZE] = {0}; char target[HTTP_GC_TARGET_SIZE] = {0};
int32_t len; int32_t len = 0, cur = 0;
len = snprintf(target, HTTP_GC_TARGET_SIZE, "%s{", aliasBuffer); cur = snprintf(target, HTTP_GC_TARGET_SIZE, "%s{", aliasBuffer);
if (cur < 0 || cur >= HTTP_GC_TARGET_SIZE) {
httpError("context:%p, fd:%d, too long alias: %s", pContext, pContext->fd, aliasBuffer);
return false;
}
len += cur;
for (int32_t i = dataFields + 1; i < num_fields; i++) { for (int32_t i = dataFields + 1; i < num_fields; i++) {
// -2 means the last '}' and '\0'
#define HTTP_GC_CHECK_SIZE(name) if (cur < 0 || cur >= HTTP_GC_TARGET_SIZE - len - 2) { \
if (cur < 0) { \
httpError("context:%p, fd:%d, failed to snprintf for: %s", pContext, pContext->fd, name); \
} else { \
httpError("context:%p, fd:%d, snprintf overflow for: %s", pContext, pContext->fd, name); \
target[len] = '\0'; \
} \
break; \
} else { \
len += cur; \
}
if (row[i] == NULL) { if (row[i] == NULL) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:nil", fields[i].name); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:nil", fields[i].name);
HTTP_GC_CHECK_SIZE(fields[i].name)
if (i < num_fields - 1) { if (i < num_fields - 1) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", "); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, ", ");
HTTP_GC_CHECK_SIZE(fields[i].name)
} }
continue; continue;
...@@ -146,40 +166,49 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -146,40 +166,49 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
switch (fields[i].type) { switch (fields[i].type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_TINYINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int8_t *)row[i])); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%d", fields[i].name, *((int8_t *)row[i]));
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int16_t *)row[i])); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%d", fields[i].name, *((int16_t *)row[i]));
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d,", fields[i].name, *((int32_t *)row[i])); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%d,", fields[i].name, *((int32_t *)row[i]));
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i])); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i]));
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i])); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i]));
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i])); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i]));
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
if (row[i] != NULL) { if (row[i] != NULL) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:", fields[i].name);
memcpy(target + len, (char *)row[i], length[i]); HTTP_GC_CHECK_SIZE(fields[i].name)
memcpy(target + len, (char *)row[i], MIN(length[i], HTTP_GC_TARGET_SIZE - len - 3));
len = (int32_t)strlen(target); len = (int32_t)strlen(target);
} }
break; break;
default: default:
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-"); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%s", fields[i].name, "-");
HTTP_GC_CHECK_SIZE(fields[i].name)
break; break;
} }
if (i < num_fields - 1) { if (i < num_fields - 1) {
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", "); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, ", ");
HTTP_GC_CHECK_SIZE(fields[i].name)
} }
} }
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "}"); cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 1, "}");
if (strcmp(target, targetBuffer) != 0) { if (strcmp(target, targetBuffer) != 0) {
// first target not write this section // first target not write this section
...@@ -199,7 +228,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -199,7 +228,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
for (int32_t i = dataFields; i >= 0; i--) { for (int32_t i = dataFields; i >= 0; i--) {
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
if (row[i] == NULL) { if (row == NULL || i >= num_fields || row[i] == NULL) {
httpJsonOriginString(jsonBuf, "null", 4); httpJsonOriginString(jsonBuf, "null", 4);
continue; continue;
} }
...@@ -228,13 +257,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -228,13 +257,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes); httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP: {
if (precision == TSDB_TIME_PRECISION_MILLI) { // ms int64_t ts = convertTimePrecision(*((int64_t *)row[i]), precision, TSDB_TIME_PRECISION_MILLI);
httpJsonInt64(jsonBuf, *((int64_t *)row[i])); httpJsonInt64(jsonBuf, ts);
} else {
httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
}
break; break;
}
default: default:
httpJsonString(jsonBuf, "-", 1); httpJsonString(jsonBuf, "-", 1);
break; break;
......
...@@ -132,10 +132,10 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) { ...@@ -132,10 +132,10 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) {
if (ret != Z_STREAM_END) continue; if (ret != Z_STREAM_END) continue;
} }
int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk); int32_t _len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk);
gzip->gzip->next_out[0] = '\0'; gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, _len);
gzip->gzip->next_out = (z_const Bytef *)gzip->chunk; gzip->gzip->next_out = (z_const Bytef *)gzip->chunk;
gzip->gzip->avail_out = gzip->conf.chunk_size; gzip->gzip->avail_out = gzip->conf.chunk_size;
} }
......
...@@ -35,6 +35,7 @@ bool httpProcessData(HttpContext* pContext) { ...@@ -35,6 +35,7 @@ bool httpProcessData(HttpContext* pContext) {
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) { if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) {
httpTrace("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd, httpTrace("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd,
httpContextStateStr(pContext->state)); httpContextStateStr(pContext->state));
pContext->error = true;
httpCloseContextByApp(pContext); httpCloseContextByApp(pContext);
return false; return false;
} }
...@@ -44,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) { ...@@ -44,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) {
httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd); httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd);
httpSendOptionResp(pContext, "process options request success"); httpSendOptionResp(pContext, "process options request success");
} else { } else {
if (!httpDecodeRequest(pContext)) { pthread_mutex_lock(&pContext->ctxMutex);
/*
* httpCloseContextByApp has been called when parsing the error if (httpDecodeRequest(pContext)) {
*/
// httpCloseContextByApp(pContext);
} else {
httpClearParser(pContext->parser); httpClearParser(pContext->parser);
httpProcessRequest(pContext); httpProcessRequest(pContext);
} }
pthread_mutex_unlock(&pContext->ctxMutex);
} }
return true; return true;
......
...@@ -262,42 +262,112 @@ void httpJsonUInt64(JsonBuf* buf, uint64_t num) { ...@@ -262,42 +262,112 @@ void httpJsonUInt64(JsonBuf* buf, uint64_t num) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num); buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num);
} }
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) { void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
char ts[35] = {0}; char ts[35] = {0};
struct tm* ptm;
int32_t precision = 1000; int32_t fractionLen;
if (us) { char* format = NULL;
precision = 1000000; time_t quot = 0;
} int64_t mod = 0;
switch (timePrecision) {
case TSDB_TIME_PRECISION_MILLI: {
mod = ((t) % 1000 + 1000) % 1000;
if (t < 0 && mod != 0) {
t -= 1000;
}
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
break;
}
time_t tt = t / precision; case TSDB_TIME_PRECISION_MICRO: {
ptm = localtime(&tt); mod = ((t) % 1000000 + 1000000) % 1000000;
int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", ptm); if (t < 0 && mod != 0) {
if (us) { t -= 1000000;
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision); }
} else { quot = t / 1000000;
length += snprintf(ts + length, 5, ".%03" PRId64, t % precision); fractionLen = 8;
format = ".%06" PRId64;
break;
}
case TSDB_TIME_PRECISION_NANO: {
mod = ((t) % 1000000000 + 1000000000) % 1000000000;
if (t < 0 && mod != 0) {
t -= 1000000000;
}
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
break;
}
default:
fractionLen = 0;
assert(false);
} }
struct tm ptm = {0};
localtime_r(&quot, &ptm);
int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", &ptm);
length += snprintf(ts + length, fractionLen, format, mod);
httpJsonString(buf, ts, length); httpJsonString(buf, ts, length);
} }
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us) { void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
char ts[40] = {0}; char ts[40] = {0};
struct tm* ptm; struct tm* ptm;
int32_t precision = 1000;
if (us) { int32_t fractionLen;
precision = 1000000; char* format = NULL;
time_t quot = 0;
long mod = 0;
switch (timePrecision) {
case TSDB_TIME_PRECISION_MILLI: {
mod = ((t) % 1000 + 1000) % 1000;
if (t < 0 && mod != 0) {
t -= 1000;
}
quot = t / 1000;
fractionLen = 5;
format = ".%03" PRId64;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
mod = ((t) % 1000000 + 1000000) % 1000000;
if (t < 0 && mod != 0) {
t -= 1000000;
}
quot = t / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
break;
}
case TSDB_TIME_PRECISION_NANO: {
mod = ((t) % 1000000000 + 1000000000) % 1000000000;
if (t < 0 && mod != 0) {
t -= 1000000000;
}
quot = t / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
break;
}
default:
fractionLen = 0;
assert(false);
} }
time_t tt = t / precision; ptm = localtime(&quot);
ptm = localtime(&tt);
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
if (us) { length += snprintf(ts + length, fractionLen, format, mod);
length += snprintf(ts + length, 8, ".%06" PRId64, t % precision);
} else {
length += snprintf(ts + length, 5, ".%03" PRId64, t % precision);
}
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
httpJsonString(buf, ts, length); httpJsonString(buf, ts, length);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tfs.h"
#include "httpMetricsHandle.h"
#include "dnode.h"
#include "httpLog.h"
static HttpDecodeMethod metricsDecodeMethod = {"metrics", metricsProcessRequest};
void metricsInitHandle(HttpServer* pServer) {
httpAddMethod(pServer, &metricsDecodeMethod);
}
bool metricsProcessRequest(HttpContext* pContext) {
httpDebug("context:%p, fd:%d, user:%s, process admin grant msg", pContext, pContext->fd, pContext->user);
JsonBuf* jsonBuf = httpMallocJsonBuf(pContext);
if (jsonBuf == NULL) {
httpError("failed to allocate memory for metrics");
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY);
return false;
}
httpInitJsonBuf(jsonBuf, pContext);
httpWriteJsonBufHead(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
{
char* keyDisks = "tags";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
{
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyTagName = "name";
char* keyTagValue = "value";
httpJsonPairOriginString(jsonBuf, keyTagName, (int32_t)strlen(keyTagName), "\"dnode_id\"",
(int32_t)strlen("\"dnode_id\""));
int32_t dnodeId = dnodeGetDnodeId();
httpJsonPairIntVal(jsonBuf, keyTagValue, (int32_t)strlen(keyTagValue), dnodeId);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
{
if (tsDnodeStartTime != 0) {
int64_t now = taosGetTimestampMs();
int64_t upTime = now-tsDnodeStartTime;
char* keyUpTime = "up_time";
httpJsonPairInt64Val(jsonBuf, keyUpTime, (int32_t)strlen(keyUpTime), upTime);
}
}
/*
{
int32_t cpuCores = taosGetCpuCores();
char* keyCpuCores = "cpu_cores";
httpJsonPairIntVal(jsonBuf, keyCpuCores, (int32_t)strlen(keyCpuCores), cpuCores);
float sysCpuUsage = 0;
float procCpuUsage = 0;
bool succeeded = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
if (!succeeded) {
httpError("failed to get cpu usage");
} else {
if (sysCpuUsage <= procCpuUsage) {
sysCpuUsage = procCpuUsage + 0.1f;
}
char* keyCpuSystem = "cpu_system";
char* keyCpuEngine = "cpu_engine";
httpJsonPairFloatVal(jsonBuf, keyCpuSystem, (int32_t)strlen(keyCpuSystem), sysCpuUsage);
httpJsonPairFloatVal(jsonBuf, keyCpuEngine, (int32_t)strlen(keyCpuEngine), procCpuUsage);
}
}
{
float sysMemoryUsedMB = 0;
bool succeeded = taosGetSysMemory(&sysMemoryUsedMB);
if (!succeeded) {
httpError("failed to get sys memory info");
} else {
char* keyMemSystem = "mem_system";
httpJsonPairFloatVal(jsonBuf, keyMemSystem, (int32_t)strlen(keyMemSystem), sysMemoryUsedMB);
}
float procMemoryUsedMB = 0;
succeeded = taosGetProcMemory(&procMemoryUsedMB);
if (!succeeded) {
httpError("failed to get proc memory info");
} else {
char* keyMemEngine = "mem_engine";
httpJsonPairFloatVal(jsonBuf, keyMemEngine, (int32_t)strlen(keyMemEngine), procMemoryUsedMB);
}
}
{
int64_t bytes = 0, rbytes = 0, tbytes = 0;
bool succeeded = taosGetCardInfo(&bytes, &rbytes, &tbytes);
if (!succeeded) {
httpError("failed to get network info");
} else {
char* keyNetIn = "net_in";
char* keyNetOut = "net_out";
httpJsonPairInt64Val(jsonBuf, keyNetIn, (int32_t)strlen(keyNetIn), rbytes);
httpJsonPairInt64Val(jsonBuf, keyNetOut, (int32_t)strlen(keyNetOut), tbytes);
}
}
{
int64_t rchars = 0, rbytes = 0;
int64_t wchars = 0, wbytes = 0;
bool succeeded = taosReadProcIO(&rchars, &wchars, &rbytes, &wbytes);
if (!succeeded) {
httpError("failed to get io info");
} else {
char* keyIORead = "io_read";
char* keyIOWrite = "io_write";
httpJsonPairInt64Val(jsonBuf, keyIORead, (int32_t)strlen(keyIORead), rchars);
httpJsonPairInt64Val(jsonBuf, keyIOWrite, (int32_t)strlen(keyIOWrite), wchars);
}
}
{
const int8_t numTiers = 3;
SFSMeta fsMeta;
STierMeta* tierMetas = calloc(numTiers, sizeof(STierMeta));
tfsUpdateInfo(&fsMeta, tierMetas, numTiers);
{
char* keyDiskUsed = "disk_used";
char* keyDiskTotal = "disk_total";
httpJsonPairInt64Val(jsonBuf, keyDiskTotal, (int32_t)strlen(keyDiskTotal), fsMeta.tsize);
httpJsonPairInt64Val(jsonBuf, keyDiskUsed, (int32_t)strlen(keyDiskUsed), fsMeta.used);
char* keyDisks = "disks";
httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks));
httpJsonToken(jsonBuf, JsonArrStt);
for (int i = 0; i < numTiers; ++i) {
httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonObjStt);
char* keyDataDirLevelUsed = "datadir_used";
char* keyDataDirLevelTotal = "datadir_total";
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelUsed, (int32_t)strlen(keyDataDirLevelUsed), tierMetas[i].used);
httpJsonPairInt64Val(jsonBuf, keyDataDirLevelTotal, (int32_t)strlen(keyDataDirLevelTotal), tierMetas[i].size);
httpJsonToken(jsonBuf, JsonObjEnd);
}
httpJsonToken(jsonBuf, JsonArrEnd);
}
free(tierMetas);
}
{
SDnodeStatisInfo info = dnodeGetStatisInfo();
{
char* keyReqHttp = "req_http";
char* keyReqSelect = "req_select";
char* keyReqInsert = "req_insert";
httpJsonPairInt64Val(jsonBuf, keyReqHttp, (int32_t)strlen(keyReqHttp), info.httpReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqSelect, (int32_t)strlen(keyReqSelect), info.queryReqNum);
httpJsonPairInt64Val(jsonBuf, keyReqInsert, (int32_t)strlen(keyReqInsert), info.submitReqNum);
}
}
*/
httpJsonToken(jsonBuf, JsonObjEnd);
httpWriteJsonBufEnd(jsonBuf);
pContext->reqType = HTTP_REQTYPE_OTHERS;
httpFreeJsonBuf(pContext);
return false;
}
...@@ -186,7 +186,7 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target ...@@ -186,7 +186,7 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target
if (pContext->decodeMethod != NULL) { if (pContext->decodeMethod != NULL) {
httpTrace("context:%p, fd:%d, decode method is %s", pContext, pContext->fd, pContext->decodeMethod->module); httpTrace("context:%p, fd:%d, decode method is %s", pContext, pContext->fd, pContext->decodeMethod->module);
} else { } else {
httpError("context:%p, fd:%d, the url is not support, target:%s", pContext, pContext->fd, target); httpError("context:%p, fd:%d, the url is not supported, target:%s", pContext, pContext->fd, target);
httpOnError(pParser, 0, TSDB_CODE_HTTP_UNSUPPORT_URL); httpOnError(pParser, 0, TSDB_CODE_HTTP_UNSUPPORT_URL);
return -1; return -1;
} }
...@@ -663,7 +663,7 @@ static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, c ...@@ -663,7 +663,7 @@ static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, c
HttpContext *pContext = parser->pContext; HttpContext *pContext = parser->pContext;
int32_t ok = 0; int32_t ok = 0;
do { do {
if (!isspace(c) && c != '\r' && c != '\n') { if (!isspace(c)) {
if (httpAppendString(&parser->str, &c, 1)) { if (httpAppendString(&parser->str, &c, 1)) {
httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c);
ok = -1; ok = -1;
...@@ -763,9 +763,9 @@ static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const ...@@ -763,9 +763,9 @@ static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const
httpPopStack(parser); httpPopStack(parser);
break; break;
} }
httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x", pContext, pContext->fd, state, c, c);
ok = -1; ok = -1;
httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_PARSE_SP_FAILED); httpOnError(parser, HTTP_CODE_BAD_REQUEST, TSDB_CODE_HTTP_PARSE_SP_FAILED);
} while (0); } while (0);
return ok; return ok;
} }
...@@ -837,7 +837,7 @@ static int32_t httpParserPostProcess(HttpParser *parser) { ...@@ -837,7 +837,7 @@ static int32_t httpParserPostProcess(HttpParser *parser) {
if (parser->gzip) { if (parser->gzip) {
if (ehttp_gzip_finish(parser->gzip)) { if (ehttp_gzip_finish(parser->gzip)) {
httpError("context:%p, fd:%d, gzip failed", pContext, pContext->fd); httpError("context:%p, fd:%d, gzip failed", pContext, pContext->fd);
httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_FINISH_GZIP_FAILED); httpOnError(parser, HTTP_CODE_INTERNAL_SERVER_ERROR, TSDB_CODE_HTTP_FINISH_GZIP_FAILED);
return -1; return -1;
} }
} }
...@@ -1040,7 +1040,7 @@ static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, co ...@@ -1040,7 +1040,7 @@ static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, co
if (ehttp_gzip_write(parser->gzip, parser->str.str, parser->str.pos)) { if (ehttp_gzip_write(parser->gzip, parser->str.str, parser->str.pos)) {
httpError("context:%p, fd:%d, gzip failed", pContext, pContext->fd); httpError("context:%p, fd:%d, gzip failed", pContext, pContext->fd);
ok = -1; ok = -1;
httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_PARSE_CHUNK_FAILED); httpOnError(parser, HTTP_CODE_INTERNAL_SERVER_ERROR, TSDB_CODE_HTTP_PARSE_CHUNK_FAILED);
break; break;
} }
} else { } else {
...@@ -1062,7 +1062,7 @@ static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, cons ...@@ -1062,7 +1062,7 @@ static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, cons
do { do {
ok = -1; ok = -1;
httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c);
httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_PARSE_END_FAILED); httpOnError(parser, HTTP_CODE_BAD_REQUEST, TSDB_CODE_HTTP_PARSE_END_FAILED);
} while (0); } while (0);
return ok; return ok;
} }
......
...@@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) { ...@@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) {
int32_t type; int32_t type;
void * unUsed; void * unUsed;
setThreadName("httpResultQ");
while (1) { while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset); httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
......
...@@ -21,10 +21,11 @@ ...@@ -21,10 +21,11 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpJson.h" #include "httpJson.h"
#include "httpContext.h" #include "httpContext.h"
#include "monitor.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"}; const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
const char *httpVersionStr[] = {"HTTP/1.0", "HTTP/1.1", "HTTP/1.2"}; const char *httpVersionStr[] = {"HTTP/1.0", "HTTP/1.1", "HTTP/2.0"}; /* There is no version 1.2 */
const char *httpRespTemplate[] = { const char *httpRespTemplate[] = {
// HTTP_RESPONSE_JSON_OK // HTTP_RESPONSE_JSON_OK
...@@ -52,8 +53,14 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char * ...@@ -52,8 +53,14 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char *
int8_t httpVersion = 0; int8_t httpVersion = 0;
int8_t keepAlive = 0; int8_t keepAlive = 0;
if (pContext->parser != NULL) { if (pContext->parser != NULL) {
httpVersion = pContext->parser->httpVersion; httpVersion = pContext->parser->httpVersion;
}
if (pContext->error == true) {
keepAlive = HTTP_KEEPALIVE_DISABLE;
} else if (pContext->parser != NULL) {
keepAlive = pContext->parser->keepAlive; keepAlive = pContext->parser->keepAlive;
} }
...@@ -67,92 +74,101 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char * ...@@ -67,92 +74,101 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char *
} }
void httpSendErrorResp(HttpContext *pContext, int32_t errNo) { void httpSendErrorResp(HttpContext *pContext, int32_t errNo) {
int32_t httpCode = 500; int32_t httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
if (errNo == TSDB_CODE_SUCCESS) if (errNo == TSDB_CODE_SUCCESS)
httpCode = 200; httpCode = HTTP_CODE_OK;
else if (errNo == TSDB_CODE_HTTP_SERVER_OFFLINE) else if (errNo == TSDB_CODE_HTTP_SERVER_OFFLINE)
httpCode = 404; httpCode = HTTP_CODE_NOT_FOUND;
else if (errNo == TSDB_CODE_HTTP_UNSUPPORT_URL) else if (errNo == TSDB_CODE_HTTP_UNSUPPORT_URL)
httpCode = 404; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_INVALID_URL) else if (errNo == TSDB_CODE_HTTP_INVALID_URL)
httpCode = 404; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_NO_ENOUGH_MEMORY) else if (errNo == TSDB_CODE_HTTP_NO_ENOUGH_MEMORY)
httpCode = 507; httpCode = HTTP_CODE_INSUFFICIENT_STORAGE;
else if (errNo == TSDB_CODE_HTTP_REQUSET_TOO_BIG) else if (errNo == TSDB_CODE_HTTP_REQUSET_TOO_BIG)
httpCode = 413; httpCode = HTTP_CODE_PAYLOAD_TOO_LARGE;
else if (errNo == TSDB_CODE_HTTP_NO_AUTH_INFO) else if (errNo == TSDB_CODE_HTTP_NO_AUTH_INFO)
httpCode = 401; httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_NO_MSG_INPUT) else if (errNo == TSDB_CODE_HTTP_NO_MSG_INPUT)
httpCode = 400; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_NO_SQL_INPUT) else if (errNo == TSDB_CODE_HTTP_NO_SQL_INPUT)
httpCode = 400; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_NO_EXEC_USEDB) else if (errNo == TSDB_CODE_HTTP_NO_EXEC_USEDB)
httpCode = 400; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_SESSION_FULL) else if (errNo == TSDB_CODE_HTTP_SESSION_FULL)
httpCode = 421; httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_GEN_TAOSD_TOKEN_ERR) else if (errNo == TSDB_CODE_HTTP_GEN_TAOSD_TOKEN_ERR)
httpCode = 507; httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_INVALID_MULTI_REQUEST) else if (errNo == TSDB_CODE_HTTP_INVALID_MULTI_REQUEST)
httpCode = 400; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_CREATE_GZIP_FAILED) else if (errNo == TSDB_CODE_HTTP_CREATE_GZIP_FAILED)
httpCode = 507; httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_FINISH_GZIP_FAILED) else if (errNo == TSDB_CODE_HTTP_FINISH_GZIP_FAILED)
httpCode = 507; httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR;
else if (errNo == TSDB_CODE_HTTP_INVALID_VERSION) else if (errNo == TSDB_CODE_HTTP_INVALID_VERSION)
httpCode = 406; httpCode = HTTP_CODE_HTTP_VER_NOT_SUPPORTED;
else if (errNo == TSDB_CODE_HTTP_INVALID_CONTENT_LENGTH) else if (errNo == TSDB_CODE_HTTP_INVALID_CONTENT_LENGTH)
httpCode = 406; httpCode = HTTP_CODE_LENGTH_REQUIRED;
else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_TYPE) else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_TYPE)
httpCode = 406; httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_FORMAT) else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_FORMAT)
httpCode = 406; httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_INVALID_BASIC_AUTH) else if (errNo == TSDB_CODE_HTTP_INVALID_BASIC_AUTH)
httpCode = 406; httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_INVALID_TAOSD_AUTH) else if (errNo == TSDB_CODE_HTTP_INVALID_TAOSD_AUTH)
httpCode = 406; httpCode = HTTP_CODE_UNAUTHORIZED;
else if (errNo == TSDB_CODE_HTTP_PARSE_METHOD_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_METHOD_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_TARGET_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_TARGET_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_VERSION_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_VERSION_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_SP_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_SP_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_STATUS_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_STATUS_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_PHRASE_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_PHRASE_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_CRLF_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_CRLF_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_KEY_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_KEY_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_VAL_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_VAL_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_SIZE_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_SIZE_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_END_FAILED) else if (errNo == TSDB_CODE_HTTP_PARSE_END_FAILED)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_INVALID_STATE) else if (errNo == TSDB_CODE_HTTP_PARSE_INVALID_STATE)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else if (errNo == TSDB_CODE_HTTP_PARSE_ERROR_STATE) else if (errNo == TSDB_CODE_HTTP_PARSE_ERROR_STATE)
httpCode = 406; httpCode = HTTP_CODE_BAD_REQUEST;
else else
httpCode = 400; httpCode = HTTP_CODE_BAD_REQUEST;
if (pContext->parser && pContext->parser->httpCode != 0) { if (pContext->parser && pContext->parser->httpCode != 0) {
httpCode = pContext->parser->httpCode; httpCode = pContext->parser->httpCode;
} }
HttpServer *pServer = &tsHttpServer;
SMonHttpStatus *httpStatus = monGetHttpStatusHashTableEntry(httpCode);
// FIXME(@huolinhe): I don't known why the errors index is overflowed, but fix it by index check
if (httpStatus->index < HTTP_STATUS_CODE_NUM) {
pServer->statusCodeErrs[httpStatus->index] += 1;
}
pContext->error = true;
char *httpCodeStr = httpGetStatusDesc(httpCode); char *httpCodeStr = httpGetStatusDesc(httpCode);
httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo & 0XFFFF, tstrerror(errNo)); httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo & 0XFFFF, tstrerror(errNo));
} }
void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) { void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) {
int32_t httpCode = 400; int32_t httpCode = HTTP_CODE_BAD_REQUEST;
char temp[512] = {0}; char temp[512] = {0};
int32_t len = sprintf(temp, "invalid SQL: %s", errMsg); int32_t len = sprintf(temp, "invalid SQL: %s", errMsg);
...@@ -165,7 +181,7 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) { ...@@ -165,7 +181,7 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) {
} }
} }
httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_SQL & 0XFFFF, temp); httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_OPERATION & 0XFFFF, temp);
} }
void httpSendSuccResp(HttpContext *pContext, char *desc) { void httpSendSuccResp(HttpContext *pContext, char *desc) {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "httpLog.h" #include "httpLog.h"
#include "httpRestHandle.h" #include "httpRestHandle.h"
#include "httpRestJson.h" #include "httpRestJson.h"
#include "tglobal.h"
static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest}; static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest};
static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest}; static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest};
...@@ -62,11 +63,11 @@ void restInitHandle(HttpServer* pServer) { ...@@ -62,11 +63,11 @@ void restInitHandle(HttpServer* pServer) {
bool restGetUserFromUrl(HttpContext* pContext) { bool restGetUserFromUrl(HttpContext* pContext) {
HttpParser* pParser = pContext->parser; HttpParser* pParser = pContext->parser;
if (pParser->path[REST_USER_URL_POS].pos >= TSDB_USER_LEN || pParser->path[REST_USER_URL_POS].pos <= 0) { if (pParser->path[REST_USER_USEDB_URL_POS].pos >= TSDB_USER_LEN || pParser->path[REST_USER_USEDB_URL_POS].pos <= 0) {
return false; return false;
} }
tstrncpy(pContext->user, pParser->path[REST_USER_URL_POS].str, TSDB_USER_LEN); tstrncpy(pContext->user, pParser->path[REST_USER_USEDB_URL_POS].str, TSDB_USER_LEN);
return true; return true;
} }
...@@ -107,6 +108,24 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) { ...@@ -107,6 +108,24 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) {
HttpSqlCmd* cmd = &(pContext->singleCmd); HttpSqlCmd* cmd = &(pContext->singleCmd);
cmd->nativSql = sql; cmd->nativSql = sql;
/* find if there is db_name in url */
pContext->db[0] = '\0';
HttpString *path = &pContext->parser->path[REST_USER_USEDB_URL_POS];
if (tsHttpDbNameMandatory) {
if (path->pos == 0) {
httpError("context:%p, fd:%d, user:%s, database name is mandatory", pContext, pContext->fd, pContext->user);
httpSendErrorResp(pContext, TSDB_CODE_HTTP_INVALID_URL);
return false;
}
}
if (path->pos > 0 && !(strlen(sql) > 4 && (sql[0] == 'u' || sql[0] == 'U') &&
(sql[1] == 's' || sql[1] == 'S') && (sql[2] == 'e' || sql[2] == 'E') && sql[3] == ' '))
{
snprintf(pContext->db, /*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN, "%s", path->str);
}
pContext->reqType = HTTP_REQTYPE_SINGLE_SQL; pContext->reqType = HTTP_REQTYPE_SINGLE_SQL;
if (timestampFmt == REST_TIMESTAMP_FMT_LOCAL_STRING) { if (timestampFmt == REST_TIMESTAMP_FMT_LOCAL_STRING) {
pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod; pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod;
...@@ -119,6 +138,90 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) { ...@@ -119,6 +138,90 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) {
return true; return true;
} }
#define REST_FUNC_URL_POS 2
#define REST_OUTP_URL_POS 3
#define REST_AGGR_URL_POS 4
#define REST_BUFF_URL_POS 5
#define HTTP_FUNC_LEN 32
#define HTTP_OUTP_LEN 16
#define HTTP_AGGR_LEN 2
#define HTTP_BUFF_LEN 32
static int udfSaveFile(const char *fname, const char *content, long len) {
int fd = open(fname, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755);
if (fd < 0)
return -1;
if (taosWrite(fd, (void *)content, len) < 0)
return -1;
return 0;
}
static bool restProcessUdfRequest(HttpContext* pContext) {
HttpParser* pParser = pContext->parser;
if (pParser->path[REST_FUNC_URL_POS].pos >= HTTP_FUNC_LEN || pParser->path[REST_FUNC_URL_POS].pos <= 0) {
return false;
}
if (pParser->path[REST_OUTP_URL_POS].pos >= HTTP_OUTP_LEN || pParser->path[REST_OUTP_URL_POS].pos <= 0) {
return false;
}
if (pParser->path[REST_AGGR_URL_POS].pos >= HTTP_AGGR_LEN || pParser->path[REST_AGGR_URL_POS].pos <= 0) {
return false;
}
if (pParser->path[REST_BUFF_URL_POS].pos >= HTTP_BUFF_LEN || pParser->path[REST_BUFF_URL_POS].pos <= 0) {
return false;
}
char* sql = pContext->parser->body.str;
int len = pContext->parser->body.pos;
if (sql == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_SQL_INPUT);
return false;
}
char udfDir[256] = {0};
char buf[64] = "udf-";
char funcName[64] = {0};
int aggr = 0;
char outputType[16] = {0};
char buffSize[32] = {0};
tstrncpy(funcName, pParser->path[REST_FUNC_URL_POS].str, HTTP_FUNC_LEN);
tstrncpy(buf + 4, funcName, HTTP_FUNC_LEN);
if (pParser->path[REST_AGGR_URL_POS].str[0] != '0') {
aggr = 1;
}
tstrncpy(outputType, pParser->path[REST_OUTP_URL_POS].str, HTTP_OUTP_LEN);
tstrncpy(buffSize, pParser->path[REST_BUFF_URL_POS].str, HTTP_BUFF_LEN);
taosGetTmpfilePath(funcName, udfDir);
udfSaveFile(udfDir, sql, len);
tfree(sql);
pContext->parser->body.str = malloc(1024);
sql = pContext->parser->body.str;
int sqlLen = sprintf(sql, "create %s function %s as \"%s\" outputtype %s bufsize %s",
aggr == 1 ? "aggregate" : " ", funcName, udfDir, outputType, buffSize);
pContext->parser->body.pos = sqlLen;
pContext->parser->body.size = sqlLen + 1;
HttpSqlCmd* cmd = &(pContext->singleCmd);
cmd->nativSql = sql;
pContext->reqType = HTTP_REQTYPE_SINGLE_SQL;
pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod;
return true;
}
bool restProcessRequest(struct HttpContext* pContext) { bool restProcessRequest(struct HttpContext* pContext) {
if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) { if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) {
restGetUserFromUrl(pContext); restGetUserFromUrl(pContext);
...@@ -138,6 +241,8 @@ bool restProcessRequest(struct HttpContext* pContext) { ...@@ -138,6 +241,8 @@ bool restProcessRequest(struct HttpContext* pContext) {
return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_UTC_STRING); return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_UTC_STRING);
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) { } else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) {
return restProcessLoginRequest(pContext); return restProcessLoginRequest(pContext);
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "udf")) {
return restProcessUdfRequest(pContext);
} else { } else {
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
#include "tsclient.h"
#include "httpLog.h" #include "httpLog.h"
#include "httpJson.h" #include "httpJson.h"
#include "httpRestHandle.h" #include "httpRestHandle.h"
...@@ -62,13 +63,21 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result) ...@@ -62,13 +63,21 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result)
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonArrStt); httpJsonToken(jsonBuf, JsonArrStt);
SSqlObj *pObj = (SSqlObj *) result;
bool isAlterSql = (pObj->sqlstr == NULL) ? false : httpCheckAlterSql(pObj->sqlstr);
if (num_fields == 0) { if (num_fields == 0) {
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN); httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN);
} else { } else {
for (int32_t i = 0; i < num_fields; ++i) { if (isAlterSql == true) {
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name)); httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN);
} else {
for (int32_t i = 0; i < num_fields; ++i) {
httpJsonItemToken(jsonBuf);
httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name));
}
} }
} }
...@@ -99,8 +108,14 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result) ...@@ -99,8 +108,14 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result)
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
httpJsonToken(jsonBuf, JsonArrStt); httpJsonToken(jsonBuf, JsonArrStt);
httpJsonItemToken(jsonBuf); if (isAlterSql == true) {
httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name)); httpJsonItemToken(jsonBuf);
httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN);
} else {
httpJsonItemToken(jsonBuf);
httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name));
}
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
httpJsonInt(jsonBuf, fields[i].type); httpJsonInt(jsonBuf, fields[i].type);
httpJsonItemToken(jsonBuf); httpJsonItemToken(jsonBuf);
...@@ -186,13 +201,11 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -186,13 +201,11 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) { if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) {
httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]), httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result));
taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO);
} else if (timestampFormat == REST_TIMESTAMP_FMT_TIMESTAMP) { } else if (timestampFormat == REST_TIMESTAMP_FMT_TIMESTAMP) {
httpJsonInt64(jsonBuf, *((int64_t *)row[i])); httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else { } else {
httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]), httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result));
taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO);
} }
break; break;
default: default:
......
...@@ -53,7 +53,7 @@ static void httpStopThread(HttpThread *pThread) { ...@@ -53,7 +53,7 @@ static void httpStopThread(HttpThread *pThread) {
break; break;
} }
} while (0); } while (0);
if (r) { if (r && taosCheckPthreadValid(pThread->thread)) {
pthread_cancel(pThread->thread); pthread_cancel(pThread->thread);
} }
#else #else
...@@ -63,15 +63,21 @@ static void httpStopThread(HttpThread *pThread) { ...@@ -63,15 +63,21 @@ static void httpStopThread(HttpThread *pThread) {
httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s",
pThread->label, strerror(errno)); pThread->label, strerror(errno));
pThread->stop = true; pThread->stop = true;
pthread_cancel(pThread->thread); if (taosCheckPthreadValid(pThread->thread)) {
pthread_cancel(pThread->thread);
}
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s",
pThread->label, strerror(errno)); pThread->label, strerror(errno));
pthread_cancel(pThread->thread); if (taosCheckPthreadValid(pThread->thread)) {
pthread_cancel(pThread->thread);
}
} }
#endif // __APPLE__ #endif // __APPLE__
pthread_join(pThread->thread, NULL); if (taosCheckPthreadValid(pThread->thread)) {
pthread_join(pThread->thread, NULL);
}
#ifdef __APPLE__ #ifdef __APPLE__
if (sv[0] != -1) { if (sv[0] != -1) {
...@@ -117,6 +123,8 @@ static void httpProcessHttpData(void *param) { ...@@ -117,6 +123,8 @@ static void httpProcessHttpData(void *param) {
int32_t fdNum; int32_t fdNum;
taosSetMaskSIGPIPE(); taosSetMaskSIGPIPE();
//dkj
//setThreadName("httpData");
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
...@@ -189,9 +197,7 @@ static void httpProcessHttpData(void *param) { ...@@ -189,9 +197,7 @@ static void httpProcessHttpData(void *param) {
} else { } else {
if (httpReadData(pContext)) { if (httpReadData(pContext)) {
(*(pThread->processData))(pContext); (*(pThread->processData))(pContext);
atomic_fetch_add_32(&pServer->requestNum, 1); atomic_fetch_add_64(&pServer->requestNum, 1);
} else {
httpReleaseContext(pContext/*, false*/);
} }
} }
} }
...@@ -208,6 +214,7 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -208,6 +214,7 @@ static void *httpAcceptHttpConnection(void *arg) {
int32_t totalFds = 0; int32_t totalFds = 0;
taosSetMaskSIGPIPE(); taosSetMaskSIGPIPE();
setThreadName("httpAcceptConn");
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
...@@ -269,7 +276,11 @@ static void *httpAcceptHttpConnection(void *arg) { ...@@ -269,7 +276,11 @@ static void *httpAcceptHttpConnection(void *arg) {
sprintf(pContext->ipstr, "%s:%u", taosInetNtoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); sprintf(pContext->ipstr, "%s:%u", taosInetNtoa(clientAddr.sin_addr), htons(clientAddr.sin_port));
struct epoll_event event; struct epoll_event event;
#ifndef _TD_NINGSI_60
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
#else
event.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
#endif
event.data.ptr = pContext; event.data.ptr = pContext;
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd,
...@@ -394,15 +405,23 @@ static bool httpReadData(HttpContext *pContext) { ...@@ -394,15 +405,23 @@ static bool httpReadData(HttpContext *pContext) {
return true; return true;
} }
} else if (nread < 0) { } else if (nread < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EINTR) {
httpDebug("context:%p, fd:%d, read from socket error:%d, continue", pContext, pContext->fd, errno);
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
httpDebug("context:%p, fd:%d, read from socket error:%d, wait another event", pContext, pContext->fd, errno); httpDebug("context:%p, fd:%d, read from socket error:%d, wait another event", pContext, pContext->fd, errno);
return false; // later again httpReleaseContext(pContext/*, false */);
return false;
} else { } else {
httpError("context:%p, fd:%d, read from socket error:%d, close connect", pContext, pContext->fd, errno); httpError("context:%p, fd:%d, read from socket error:%d, close connect", pContext, pContext->fd, errno);
taosCloseSocket(pContext->fd);
httpReleaseContext(pContext/*, false */);
return false; return false;
} }
} else { } else {
httpError("context:%p, fd:%d, nread:%d, wait another event", pContext, pContext->fd, nread); httpError("context:%p, fd:%d, nread:%d, wait another event", pContext, pContext->fd, nread);
taosCloseSocket(pContext->fd);
httpReleaseContext(pContext/*, false */);
return false; return false;
} }
} }
......
...@@ -263,7 +263,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code ...@@ -263,7 +263,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SSqlObj *pObj = (SSqlObj *)result; SSqlObj *pObj = (SSqlObj *)result;
if (code == TSDB_CODE_TSC_INVALID_SQL) { if (code == TSDB_CODE_TSC_INVALID_OPERATION) {
terrno = code; terrno = code;
httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, pContext->fd, httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, pContext->fd,
pContext->user, tstrerror(code), pObj, taos_errstr(pObj)); pContext->user, tstrerror(code), pObj, taos_errstr(pObj));
...@@ -405,9 +405,15 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { ...@@ -405,9 +405,15 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) {
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL); httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL);
httpCloseContextByApp(pContext);
} else { } else {
// httpProcessRequestCb called by another thread and a subsequent thread calls this
// function again, if this function called by httpProcessRequestCb executes memset
// just before the subsequent thread executes *Cmd function, nativSql will be NULL
pthread_mutex_lock(&pContext->ctxMutex);
httpExecCmd(pContext); httpExecCmd(pContext);
pthread_mutex_unlock(&pContext->ctxMutex);
} }
} }
...@@ -419,7 +425,74 @@ void httpProcessRequest(HttpContext *pContext) { ...@@ -419,7 +425,74 @@ void httpProcessRequest(HttpContext *pContext) {
&(pContext->taos)); &(pContext->taos));
httpDebug("context:%p, fd:%d, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, pContext->user, httpDebug("context:%p, fd:%d, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, pContext->user,
pContext->taos); pContext->taos);
if (pContext->taos != NULL) {
STscObj *pObj = pContext->taos;
pObj->from = TAOS_REQ_FROM_HTTP;
}
} else { } else {
httpExecCmd(pContext); httpExecCmd(pContext);
} }
} }
int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql)
{
char *pos;
if (oldSql == NULL || newSql == NULL) {
return TSDB_CODE_SUCCESS;
}
/* bad sql clause */
pos = strstr(oldSql, "%%");
if (pos) {
httpError("bad sql:%s", oldSql);
return TSDB_CODE_HTTP_REQUEST_JSON_ERROR;
}
pos = strchr(oldSql, '%');
if (pos == NULL) {
httpDebug("sql:%s", oldSql);
*newSql = oldSql;
return TSDB_CODE_SUCCESS;
}
*newSql = (char *) calloc(1, (strlen(oldSql) << 1) + 1);
if (newSql == NULL) {
httpError("failed to allocate for new sql, old sql:%s", oldSql);
return TSDB_CODE_HTTP_NO_ENOUGH_MEMORY;
}
char *src = oldSql;
char *dst = *newSql;
size_t sqlLen = strlen(src);
while (1) {
memcpy(dst, src, pos - src + 1);
dst += pos - src + 1;
*dst++ = '%';
if (pos + 1 >= oldSql + sqlLen) {
break;
}
src = ++pos;
pos = strchr(pos, '%');
if (pos == NULL) {
memcpy(dst, src, strlen(src));
break;
}
}
return TSDB_CODE_SUCCESS;
}
void httpCheckFreeEscapedSql(char *oldSql, char *newSql)
{
if (oldSql && newSql) {
if (oldSql != newSql) {
free(newSql);
}
}
}
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "httpGcHandle.h" #include "httpGcHandle.h"
#include "httpRestHandle.h" #include "httpRestHandle.h"
#include "httpTgHandle.h" #include "httpTgHandle.h"
#include "httpMetricsHandle.h"
#ifndef _ADMIN #ifndef _ADMIN
void adminInitHandle(HttpServer* pServer) {} void adminInitHandle(HttpServer* pServer) {}
...@@ -52,7 +53,7 @@ int32_t httpInitSystem() { ...@@ -52,7 +53,7 @@ int32_t httpInitSystem() {
gcInitHandle(&tsHttpServer); gcInitHandle(&tsHttpServer);
tgInitHandle(&tsHttpServer); tgInitHandle(&tsHttpServer);
opInitHandle(&tsHttpServer); opInitHandle(&tsHttpServer);
metricsInitHandle(&tsHttpServer);
return 0; return 0;
} }
...@@ -119,4 +120,10 @@ void httpCleanUpSystem() { ...@@ -119,4 +120,10 @@ void httpCleanUpSystem() {
tsHttpServer.status = HTTP_SERVER_CLOSED; tsHttpServer.status = HTTP_SERVER_CLOSED;
} }
int32_t httpGetReqCount() { return atomic_exchange_32(&tsHttpServer.requestNum, 0); } int64_t httpGetReqCount() { return atomic_exchange_64(&tsHttpServer.requestNum, 0); }
int32_t httpGetStatusCodeCount(int index) {
return atomic_load_32(&tsHttpServer.statusCodeErrs[index]);
}
int32_t httpClearStatusCodeCount(int index) {
return atomic_exchange_32(&tsHttpServer.statusCodeErrs[index], 0);
}
...@@ -209,7 +209,7 @@ void tgParseSchemaMetric(cJSON *metric) { ...@@ -209,7 +209,7 @@ void tgParseSchemaMetric(cJSON *metric) {
parsedOk = false; parsedOk = false;
goto ParseEnd; goto ParseEnd;
} }
int32_t nameLen = (int32_t)strlen(field->valuestring); nameLen = (int32_t)strlen(field->valuestring);
if (nameLen == 0 || nameLen >= TSDB_TABLE_NAME_LEN) { if (nameLen == 0 || nameLen >= TSDB_TABLE_NAME_LEN) {
parsedOk = false; parsedOk = false;
goto ParseEnd; goto ParseEnd;
...@@ -610,7 +610,18 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { ...@@ -610,7 +610,18 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
// stable tag for detail // stable tag for detail
for (int32_t i = 0; i < orderTagsLen; ++i) { for (int32_t i = 0; i < orderTagsLen; ++i) {
cJSON *tag = orderedTags[i]; cJSON *tag = orderedTags[i];
stable_cmd->tagNames[i] = table_cmd->tagNames[i] = httpAddToSqlCmdBuffer(pContext, tag->string);
char *tagStr = NULL;
int32_t retCode = httpCheckAllocEscapeSql(tag->string, &tagStr);
if (retCode != TSDB_CODE_SUCCESS) {
httpSendErrorResp(pContext, retCode);
return false;
}
stable_cmd->tagNames[i] = table_cmd->tagNames[i] = httpAddToSqlCmdBuffer(pContext, tagStr);
httpCheckFreeEscapedSql(tag->string, tagStr);
if (tag->type == cJSON_String) if (tag->type == cJSON_String)
stable_cmd->tagValues[i] = table_cmd->tagValues[i] = httpAddToSqlCmdBuffer(pContext, "'%s'", tag->valuestring); stable_cmd->tagValues[i] = table_cmd->tagValues[i] = httpAddToSqlCmdBuffer(pContext, "'%s'", tag->valuestring);
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpSql.h" #include "httpSql.h"
#include "httpUtil.h" #include "httpUtil.h"
#include "ttoken.h"
bool httpCheckUsedbSql(char *sql) { bool httpCheckUsedbSql(char *sql) {
if (strstr(sql, "use ") != NULL) { if (strstr(sql, "use ") != NULL) {
...@@ -29,6 +30,17 @@ bool httpCheckUsedbSql(char *sql) { ...@@ -29,6 +30,17 @@ bool httpCheckUsedbSql(char *sql) {
return false; return false;
} }
bool httpCheckAlterSql(char *sql) {
int32_t index = 0;
do {
SStrToken t0 = tStrGetToken(sql, &index, false);
if (t0.type != TK_LP) {
return t0.type == TK_ALTER;
}
} while (1);
}
void httpTimeToString(int32_t t, char *buf, int32_t buflen) { void httpTimeToString(int32_t t, char *buf, int32_t buflen) {
memset(buf, 0, (size_t)buflen); memset(buf, 0, (size_t)buflen);
char ts[32] = {0}; char ts[32] = {0};
...@@ -338,10 +350,10 @@ int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name) { ...@@ -338,10 +350,10 @@ int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name) {
return pos; return pos;
} }
MD5_CTX context; T_MD5_CTX context;
MD5Init(&context); tMD5Init(&context);
MD5Update(&context, (uint8_t *)name, (uint32_t)len); tMD5Update(&context, (uint8_t *)name, (uint32_t)len);
MD5Final(&context); tMD5Final(&context);
int32_t table_name = httpAddToSqlCmdBuffer( int32_t table_name = httpAddToSqlCmdBuffer(
pContext, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], pContext, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
......
...@@ -72,6 +72,75 @@ static void monBuildMonitorSql(char *sql, int32_t cmd); ...@@ -72,6 +72,75 @@ static void monBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monStartSystemFp)(); extern int32_t (*monStartSystemFp)();
extern void (*monStopSystemFp)(); extern void (*monStopSystemFp)();
extern void (*monExecuteSQLFp)(char *sql); extern void (*monExecuteSQLFp)(char *sql);
static void monInitHttpStatusHashTable();
static void monCleanupHttpStatusHashTable();
static void *monHttpStatusHashTable;
static SMonHttpStatus monHttpStatusTable[] = {
{"HTTP_CODE_CONTINUE", 100},
{"HTTP_CODE_SWITCHING_PROTOCOL", 101},
{"HTTP_CODE_PROCESSING", 102},
{"HTTP_CODE_EARLY_HINTS", 103},
{"HTTP_CODE_OK", 200},
{"HTTP_CODE_CREATED", 201},
{"HTTP_CODE_ACCEPTED", 202},
{"HTTP_CODE_NON_AUTHORITATIVE_INFO", 203},
{"HTTP_CODE_NO_CONTENT", 204},
{"HTTP_CODE_RESET_CONTENT", 205},
{"HTTP_CODE_PARTIAL_CONTENT", 206},
{"HTTP_CODE_MULTI_STATUS", 207},
{"HTTP_CODE_ALREADY_REPORTED", 208},
{"HTTP_CODE_IM_USED", 226},
{"HTTP_CODE_MULTIPLE_CHOICE", 300},
{"HTTP_CODE_MOVED_PERMANENTLY", 301},
{"HTTP_CODE_FOUND", 302},
{"HTTP_CODE_SEE_OTHER", 303},
{"HTTP_CODE_NOT_MODIFIED", 304},
{"HTTP_CODE_USE_PROXY", 305},
{"HTTP_CODE_UNUSED", 306},
{"HTTP_CODE_TEMPORARY_REDIRECT", 307},
{"HTTP_CODE_PERMANENT_REDIRECT", 308},
{"HTTP_CODE_BAD_REQUEST", 400},
{"HTTP_CODE_UNAUTHORIZED", 401},
{"HTTP_CODE_PAYMENT_REQUIRED", 402},
{"HTTP_CODE_FORBIDDEN", 403},
{"HTTP_CODE_NOT_FOUND", 404},
{"HTTP_CODE_METHOD_NOT_ALLOWED", 405},
{"HTTP_CODE_NOT_ACCEPTABLE", 406},
{"HTTP_CODE_PROXY_AUTH_REQUIRED", 407},
{"HTTP_CODE_REQUEST_TIMEOUT", 408},
{"HTTP_CODE_CONFLICT", 409},
{"HTTP_CODE_GONE", 410},
{"HTTP_CODE_LENGTH_REQUIRED", 411},
{"HTTP_CODE_PRECONDITION_FAILED", 412},
{"HTTP_CODE_PAYLOAD_TOO_LARGE", 413},
{"HTTP_CODE_URI_TOO_LARGE", 414},
{"HTTP_CODE_UNSUPPORTED_MEDIA_TYPE", 415},
{"HTTP_CODE_RANGE_NOT_SATISFIABLE", 416},
{"HTTP_CODE_EXPECTATION_FAILED", 417},
{"HTTP_CODE_IM_A_TEAPOT", 418},
{"HTTP_CODE_MISDIRECTED_REQUEST", 421},
{"HTTP_CODE_UNPROCESSABLE_ENTITY", 422},
{"HTTP_CODE_LOCKED", 423},
{"HTTP_CODE_FAILED_DEPENDENCY", 424},
{"HTTP_CODE_TOO_EARLY", 425},
{"HTTP_CODE_UPGRADE_REQUIRED", 426},
{"HTTP_CODE_PRECONDITION_REQUIRED", 428},
{"HTTP_CODE_TOO_MANY_REQUESTS", 429},
{"HTTP_CODE_REQ_HDR_FIELDS_TOO_LARGE",431},
{"HTTP_CODE_UNAVAIL_4_LEGAL_REASONS", 451},
{"HTTP_CODE_INTERNAL_SERVER_ERROR", 500},
{"HTTP_CODE_NOT_IMPLEMENTED", 501},
{"HTTP_CODE_BAD_GATEWAY", 502},
{"HTTP_CODE_SERVICE_UNAVAILABLE", 503},
{"HTTP_CODE_GATEWAY_TIMEOUT", 504},
{"HTTP_CODE_HTTP_VER_NOT_SUPPORTED", 505},
{"HTTP_CODE_VARIANT_ALSO_NEGOTIATES", 506},
{"HTTP_CODE_INSUFFICIENT_STORAGE", 507},
{"HTTP_CODE_LOOP_DETECTED", 508},
{"HTTP_CODE_NOT_EXTENDED", 510},
{"HTTP_CODE_NETWORK_AUTH_REQUIRED", 511}
};
int32_t monInitSystem() { int32_t monInitSystem() {
if (tsMonitor.ep[0] == 0) { if (tsMonitor.ep[0] == 0) {
...@@ -85,6 +154,8 @@ int32_t monInitSystem() { ...@@ -85,6 +154,8 @@ int32_t monInitSystem() {
} }
} }
monInitHttpStatusHashTable();
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
...@@ -255,6 +326,7 @@ void monCleanupSystem() { ...@@ -255,6 +326,7 @@ void monCleanupSystem() {
taos_close(tsMonitor.conn); taos_close(tsMonitor.conn);
tsMonitor.conn = NULL; tsMonitor.conn = NULL;
} }
monCleanupHttpStatusHashTable();
monInfo("monitor module is cleaned up"); monInfo("monitor module is cleaned up");
} }
...@@ -417,3 +489,29 @@ void monExecuteSQL(char *sql) { ...@@ -417,3 +489,29 @@ void monExecuteSQL(char *sql) {
monDebug("execute sql:%s", sql); monDebug("execute sql:%s", sql);
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql"); taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql");
} }
static void monInitHttpStatusHashTable() {
int32_t numOfEntries = tListLen(monHttpStatusTable);
monHttpStatusHashTable = taosHashInit(numOfEntries, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
for (int32_t i = 0; i < numOfEntries; ++i) {
monHttpStatusTable[i].index = i;
SMonHttpStatus* pEntry = &monHttpStatusTable[i];
taosHashPut(monHttpStatusHashTable, &monHttpStatusTable[i].code, sizeof(int32_t),
&pEntry, POINTER_BYTES);
}
}
static void monCleanupHttpStatusHashTable() {
void* m = monHttpStatusHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr(&monHttpStatusHashTable, m, 0) == m) {
taosHashCleanup(m);
}
}
SMonHttpStatus *monGetHttpStatusHashTableEntry(int32_t code) {
if (monHttpStatusHashTable == NULL) {
return NULL;
}
return (SMonHttpStatus*)taosHashGet(monHttpStatusHashTable, &code, sizeof(int32_t));
}
...@@ -38,4 +38,9 @@ void MD5Init(MD5_CTX *mdContext); ...@@ -38,4 +38,9 @@ void MD5Init(MD5_CTX *mdContext);
void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen); void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen);
void MD5Final(MD5_CTX *mdContext); void MD5Final(MD5_CTX *mdContext);
#define T_MD5_CTX MD5_CTX
#define tMD5Init MD5Init
#define tMD5Update MD5Update
#define tMD5Final MD5Final
#endif #endif
...@@ -406,6 +406,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, "tag value can not mor ...@@ -406,6 +406,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, "tag value can not mor
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, "value not find") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, "value not find")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, "value type should be boolean, number or string") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, "value type should be boolean, number or string")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_REQUEST_JSON_ERROR, "http request json error")
// odbc // odbc
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, "out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, "convertion not a valid literal input") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, "convertion not a valid literal input")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册