From ef1fe62ab3787a1690b3b9b06c09fa6fcd839dae Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 24 Sep 2022 12:19:12 +0800 Subject: [PATCH] fix(http): migrate http module from 2.6 to 2.0 --- src/client/inc/tsclient.h | 6 + src/common/inc/tglobal.h | 3 + src/common/src/tglobal.c | 25 +++ src/dnode/src/dnodeMain.c | 1 + src/inc/http.h | 4 +- src/inc/monitor.h | 7 + src/inc/taoserror.h | 3 + src/kit/taosdemo/taosdemo.c | 2 + src/os/inc/osDef.h | 20 +++ src/os/inc/osLinux32.h | 1 + src/os/inc/osLinux64.h | 1 + src/os/inc/osTime.h | 1 + src/os/src/detail/osTime.c | 57 +++++++ src/plugins/http/CMakeLists.txt | 3 +- src/plugins/http/inc/httpInt.h | 49 +++--- src/plugins/http/inc/httpJson.h | 4 +- src/plugins/http/inc/httpMetricsHandle.h | 27 ++++ src/plugins/http/inc/httpRestHandle.h | 10 +- src/plugins/http/inc/httpSql.h | 3 + src/plugins/http/inc/httpUtil.h | 1 + src/plugins/http/src/httpContext.c | 12 +- src/plugins/http/src/httpGcHandle.c | 33 +++- src/plugins/http/src/httpGcJson.c | 71 ++++++--- src/plugins/http/src/httpGzip.c | 4 +- src/plugins/http/src/httpHandle.c | 12 +- src/plugins/http/src/httpJson.c | 118 ++++++++++++--- src/plugins/http/src/httpMetricsHandle.c | 184 +++++++++++++++++++++++ src/plugins/http/src/httpParser.c | 14 +- src/plugins/http/src/httpQueue.c | 2 + src/plugins/http/src/httpResp.c | 98 +++++++----- src/plugins/http/src/httpRestHandle.c | 109 +++++++++++++- src/plugins/http/src/httpRestJson.c | 29 +++- src/plugins/http/src/httpServer.c | 37 +++-- src/plugins/http/src/httpSql.c | 77 +++++++++- src/plugins/http/src/httpSystem.c | 11 +- src/plugins/http/src/httpTgHandle.c | 15 +- src/plugins/http/src/httpUtil.c | 20 ++- src/plugins/monitor/src/monMain.c | 98 ++++++++++++ src/util/inc/tmd5.h | 5 + src/util/src/terror.c | 2 + 40 files changed, 1011 insertions(+), 168 deletions(-) create mode 100644 src/plugins/http/inc/httpMetricsHandle.h create mode 100644 src/plugins/http/src/httpMetricsHandle.c diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 46493474b4..b8ba1c4138 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -41,6 +41,11 @@ extern "C" { struct SSqlInfo; struct SLocalMerger; +typedef enum { + TAOS_REQ_FROM_SHELL, + TAOS_REQ_FROM_HTTP +} SReqOrigin; + // data source from sql string or from file enum { DATA_FROM_SQL_STRING = 1, @@ -361,6 +366,7 @@ typedef struct STscObj { SRpcCorEpSet *tscCorMgmtEpSet; pthread_mutex_t mutex; int32_t numOfObj; // number of sqlObj from this tscObj + SReqOrigin from; } STscObj; typedef struct SSubqueryState { diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 131c412575..06be1c0421 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -41,6 +41,7 @@ extern char tsArbitrator[]; extern int8_t tsArbOnline; extern int64_t tsArbOnlineTimestamp; extern int32_t tsDnodeId; +extern int64_t tsDnodeStartTime; // common extern int tsRpcTimer; @@ -123,6 +124,8 @@ extern int32_t tsHttpMaxThreads; extern int8_t tsHttpEnableCompress; extern int8_t tsHttpEnableRecordSql; extern int8_t tsTelegrafUseFieldNum; +extern int8_t tsHttpDbNameMandatory; +extern int32_t tsHttpKeepAlive; // mqtt extern int8_t tsEnableMqttModule; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 946e1963fb..4db3ae9c9e 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -48,6 +48,7 @@ int8_t tsArbOnline = 0; int64_t tsArbOnlineTimestamp = TSDB_ARB_DUMMY_TIME; char tsEmail[TSDB_FQDN_LEN] = {0}; int32_t tsDnodeId = 0; +int64_t tsDnodeStartTime = 0; // common int32_t tsRpcTimer = 300; @@ -160,6 +161,9 @@ int32_t tsHttpMaxThreads = 2; int8_t tsHttpEnableCompress = 1; int8_t tsHttpEnableRecordSql = 0; int8_t tsTelegrafUseFieldNum = 0; +int8_t tsHttpDbNameMandatory = 0; +int32_t tsHttpKeepAlive = 30000; + // mqtt int8_t tsEnableMqttModule = 0; // not finished yet, not started it by default @@ -1215,6 +1219,27 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "httpDbNameMandatory"; + cfg.ptr = &tsHttpDbNameMandatory; + cfg.valType = TAOS_CFG_VTYPE_INT8; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // pContext in cache + cfg.option = "httpKeepAlive"; + cfg.ptr = &tsHttpKeepAlive; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 3000; + cfg.maxValue = 3600000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + // debug flag cfg.option = "numOfLogLines"; cfg.ptr = &tsNumOfLogLines; diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index ab2fcbea6a..99b30c9f50 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -159,6 +159,7 @@ int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_RUN_STATUS_RUNING); moduleStart(); + tsDnodeStartTime = taosGetTimestampMs(); dnodeReportStep("TDengine", "initialized successfully", 1); dInfo("TDengine is initialized successfully"); diff --git a/src/inc/http.h b/src/inc/http.h index 0d4c386cbf..7333042641 100644 --- a/src/inc/http.h +++ b/src/inc/http.h @@ -22,7 +22,9 @@ extern "C" { #include -int32_t httpGetReqCount(); +int64_t httpGetReqCount(); +int32_t httpGetStatusCodeCount(int index); +int32_t httpClearStatusCodeCount(int index); int32_t httpInitSystem(); int32_t httpStartSystem(); void httpStopSystem(); diff --git a/src/inc/monitor.h b/src/inc/monitor.h index 1aefb0b848..8c7d9702b8 100644 --- a/src/inc/monitor.h +++ b/src/inc/monitor.h @@ -22,6 +22,12 @@ extern "C" { #include +typedef struct { + const char * name; + int32_t code; + int32_t index; +} SMonHttpStatus; + typedef struct { char * acctId; int64_t currentPointsPerSecond; @@ -54,6 +60,7 @@ void monCleanupSystem(); void monSaveAcctLog(SAcctMonitorObj *pMonObj); void monSaveLog(int32_t level, const char *const format, ...); void monExecuteSQL(char *sql); +SMonHttpStatus *monGetHttpStatusHashTableEntry(int32_t code); #ifdef __cplusplus } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 619869efa5..efed1cb79b 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -75,6 +75,7 @@ int32_t* taosGetErrno(); //client #define TSDB_CODE_TSC_INVALID_SQL TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid SQL statement") +#define TSDB_CODE_TSC_INVALID_OPERATION TSDB_CODE_TSC_INVALID_SQL #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle") #define TSDB_CODE_TSC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0202) //"Invalid combination of client/service time") #define TSDB_CODE_TSC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x0203) //"Invalid value in client") @@ -394,6 +395,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find") #define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string") +#define TSDB_CODE_HTTP_REQUEST_JSON_ERROR TAOS_DEF_ERROR_CODE(0, 0x1F00) //"http request json error" + // odbc #define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory") #define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input") diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 48ceced000..3027c202d8 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -57,9 +57,11 @@ #define NANO_SECOND_ENABLED 0 #define SET_THREADNAME_ENABLED 0 +/*dkj #if SET_THREADNAME_ENABLED == 0 #define setThreadName(name) #endif +*/ #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 diff --git a/src/os/inc/osDef.h b/src/os/inc/osDef.h index 07fb5c283b..0b1a54152f 100644 --- a/src/os/inc/osDef.h +++ b/src/os/inc/osDef.h @@ -112,6 +112,26 @@ extern "C" { #define threadlocal __declspec( thread ) #endif +// setThreadName +#if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64) + #if defined(_TD_DARWIN_64) + // MacOS + #if !defined(_GNU_SOURCE) + #define setThreadName(name) do { pthread_setname_np((name)); } while (0) + #else + // pthread_setname_np not defined + #define setThreadName(name) + #endif + #else + // Linux, length of name must <= 16 (the last '\0' included) + #define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0) + #endif +#else + // Windows + #define setThreadName(name) +#endif + + #ifdef __cplusplus } #endif diff --git a/src/os/inc/osLinux32.h b/src/os/inc/osLinux32.h index eee4999399..722e063b36 100644 --- a/src/os/inc/osLinux32.h +++ b/src/os/inc/osLinux32.h @@ -78,6 +78,7 @@ extern "C" { #include #include #include +#include #define TAOS_OS_FUNC_LZ4 #define BUILDIN_CLZL(val) __builtin_clzll(val) diff --git a/src/os/inc/osLinux64.h b/src/os/inc/osLinux64.h index 0dfc819da3..9a841dd6c0 100644 --- a/src/os/inc/osLinux64.h +++ b/src/os/inc/osLinux64.h @@ -81,6 +81,7 @@ extern "C" { #endif #include #include +#include #ifdef __cplusplus } diff --git a/src/os/inc/osTime.h b/src/os/inc/osTime.h index 2c50e7eeab..d57b718bfe 100644 --- a/src/os/inc/osTime.h +++ b/src/os/inc/osTime.h @@ -86,6 +86,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); +int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); #ifdef __cplusplus } diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index 2956dd29ad..c1bd25c48c 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -293,6 +293,63 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { return 0; } +int64_t convertTimePrecision(int64_t timeStamp, int32_t fromPrecision, int32_t toPrecision) { + assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || + fromPrecision == TSDB_TIME_PRECISION_MICRO || + fromPrecision == TSDB_TIME_PRECISION_NANO); + assert(toPrecision == TSDB_TIME_PRECISION_MILLI || + toPrecision == TSDB_TIME_PRECISION_MICRO || + toPrecision == TSDB_TIME_PRECISION_NANO); + double tempResult = (double)timeStamp; + switch(fromPrecision) { + case TSDB_TIME_PRECISION_MILLI: { + switch (toPrecision) { + case TSDB_TIME_PRECISION_MILLI: + return timeStamp; + case TSDB_TIME_PRECISION_MICRO: + tempResult *= 1000; + timeStamp *= 1000; + goto end_; + case TSDB_TIME_PRECISION_NANO: + tempResult *= 1000000; + timeStamp *= 1000000; + goto end_; + } + } // end from milli + case TSDB_TIME_PRECISION_MICRO: { + switch (toPrecision) { + case TSDB_TIME_PRECISION_MILLI: + return timeStamp / 1000; + case TSDB_TIME_PRECISION_MICRO: + return timeStamp; + case TSDB_TIME_PRECISION_NANO: + tempResult *= 1000; + timeStamp *= 1000; + goto end_; + } + } //end from micro + case TSDB_TIME_PRECISION_NANO: { + switch (toPrecision) { + case TSDB_TIME_PRECISION_MILLI: + return timeStamp / 1000000; + case TSDB_TIME_PRECISION_MICRO: + return timeStamp / 1000; + case TSDB_TIME_PRECISION_NANO: + return timeStamp; + } + } //end from nano + default: { + assert(0); + return timeStamp; // only to pass windows compilation + } + } //end switch fromPrecision +end_: + if (tempResult >= (double)INT64_MAX) return INT64_MAX; + if (tempResult <= (double)INT64_MIN) return INT64_MIN + 1; // INT64_MIN means NULL + return timeStamp; +} + + int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 57fc2ee3a2..f372bc66aa 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -1,4 +1,4 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20) +CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20) PROJECT(TDengine) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc) @@ -6,6 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 0a5822b908..2aa01244c6 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -37,11 +37,12 @@ #define HTTP_BUFFER_SIZE 8388608 #define HTTP_STEP_SIZE 4096 //http message get process step by step #define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size -#define HTTP_GC_TARGET_SIZE 512 +#define HTTP_GC_TARGET_SIZE 16384 #define HTTP_WRITE_RETRY_TIMES 500 #define HTTP_WRITE_WAIT_TIME_MS 5 #define HTTP_PASSWORD_LEN TSDB_UNI_LEN #define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + HTTP_PASSWORD_LEN) +#define HTTP_STATUS_CODE_NUM 63 typedef enum HttpReqType { HTTP_REQTYPE_OTHERS = 0, @@ -140,26 +141,29 @@ typedef enum { } EHTTP_CONTEXT_FAILED_CAUSE; typedef struct HttpContext { - int32_t refCount; - SOCKET fd; - uint32_t accessTimes; - uint32_t lastAccessTime; - int32_t state; - uint8_t reqType; - uint8_t parsed; - char ipstr[22]; - char user[TSDB_USER_LEN]; // parsed from auth token or login message - char pass[HTTP_PASSWORD_LEN]; - TAOS * taos; - void * ppContext; - HttpSession *session; - z_stream gzipStream; - HttpParser *parser; - HttpSqlCmd singleCmd; - HttpSqlCmds *multiCmds; - JsonBuf * jsonBuf; - HttpEncodeMethod *encodeMethod; - HttpDecodeMethod *decodeMethod; + int32_t refCount; + SOCKET fd; + uint32_t accessTimes; + uint32_t lastAccessTime; + int32_t state; + uint8_t reqType; + uint8_t parsed; + bool error; + char ipstr[22]; + char user[TSDB_USER_LEN]; // parsed from auth token or login message + char pass[HTTP_PASSWORD_LEN]; + char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN]; + TAOS * taos; + void * ppContext; + pthread_mutex_t ctxMutex; + HttpSession *session; + z_stream gzipStream; + HttpParser *parser; + HttpSqlCmd singleCmd; + HttpSqlCmds *multiCmds; + JsonBuf *jsonBuf; + HttpEncodeMethod *encodeMethod; + HttpDecodeMethod *decodeMethod; struct HttpThread *pThread; } HttpContext; @@ -184,8 +188,9 @@ typedef struct HttpServer { SOCKET fd; int32_t numOfThreads; int32_t methodScannerLen; - int32_t requestNum; + int64_t requestNum; int32_t status; + int32_t statusCodeErrs[HTTP_STATUS_CODE_NUM]; pthread_t thread; HttpThread * pThreads; void * contextCache; diff --git a/src/plugins/http/inc/httpJson.h b/src/plugins/http/inc/httpJson.h index 4d182d0132..3595ad926f 100644 --- a/src/plugins/http/inc/httpJson.h +++ b/src/plugins/http/inc/httpJson.h @@ -64,8 +64,8 @@ void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len); void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen); void httpJsonInt64(JsonBuf* buf, int64_t num); void httpJsonUInt64(JsonBuf* buf, uint64_t num); -void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us); -void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us); +void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision); +void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision); void httpJsonInt(JsonBuf* buf, int32_t num); void httpJsonUInt(JsonBuf* buf, uint32_t num); void httpJsonFloat(JsonBuf* buf, float num); diff --git a/src/plugins/http/inc/httpMetricsHandle.h b/src/plugins/http/inc/httpMetricsHandle.h new file mode 100644 index 0000000000..e05a8ce687 --- /dev/null +++ b/src/plugins/http/inc/httpMetricsHandle.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_HTTPMETRICSHANDLE_H +#define TDENGINE_HTTPMETRICSHANDLE_H + +#include "http.h" +#include "httpInt.h" +#include "httpUtil.h" +#include "httpResp.h" + +void metricsInitHandle(HttpServer* httpServer); + +bool metricsProcessRequest(struct HttpContext* httpContext); + +#endif // TDENGINE_HTTPMETRICHANDLE_H diff --git a/src/plugins/http/inc/httpRestHandle.h b/src/plugins/http/inc/httpRestHandle.h index 632a1dc647..df405685e9 100644 --- a/src/plugins/http/inc/httpRestHandle.h +++ b/src/plugins/http/inc/httpRestHandle.h @@ -22,12 +22,12 @@ #include "httpResp.h" #include "httpSql.h" -#define REST_ROOT_URL_POS 0 -#define REST_ACTION_URL_POS 1 -#define REST_USER_URL_POS 2 -#define REST_PASS_URL_POS 3 +#define REST_ROOT_URL_POS 0 +#define REST_ACTION_URL_POS 1 +#define REST_USER_USEDB_URL_POS 2 +#define REST_PASS_URL_POS 3 void restInitHandle(HttpServer* pServer); bool restProcessRequest(struct HttpContext* pContext); -#endif \ No newline at end of file +#endif diff --git a/src/plugins/http/inc/httpSql.h b/src/plugins/http/inc/httpSql.h index db3e3a3b16..325545af47 100644 --- a/src/plugins/http/inc/httpSql.h +++ b/src/plugins/http/inc/httpSql.h @@ -35,4 +35,7 @@ void httpTrimTableName(char *name); int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name); char * httpGetCmdsString(HttpContext *pContext, int32_t pos); +int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql); +void httpCheckFreeEscapedSql(char *oldSql, char *newSql); + #endif diff --git a/src/plugins/http/inc/httpUtil.h b/src/plugins/http/inc/httpUtil.h index 54c95b6980..21690ebca9 100644 --- a/src/plugins/http/inc/httpUtil.h +++ b/src/plugins/http/inc/httpUtil.h @@ -17,6 +17,7 @@ #define TDENGINE_HTTP_UTIL_H bool httpCheckUsedbSql(char *sql); +bool httpCheckAlterSql(char *sql); void httpTimeToString(int32_t t, char *buf, int32_t buflen); bool httpUrlMatch(HttpContext *pContext, int32_t pos, char *cmp); diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 51adef11b9..4922d98ee4 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) { pContext->parser = NULL; } + pthread_mutex_destroy(&pContext->ctxMutex); + tfree(pContext); } @@ -118,16 +120,19 @@ HttpContext *httpCreateContext(SOCKET fd) { pContext->lastAccessTime = taosGetTimestampSec(); pContext->state = HTTP_CONTEXT_STATE_READY; pContext->parser = httpCreateParser(pContext); + pContext->error = false; TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pContext; HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &pContext, - sizeof(TSDB_CACHE_PTR_TYPE), 3000); + sizeof(TSDB_CACHE_PTR_TYPE), tsHttpKeepAlive); pContext->ppContext = ppContext; httpDebug("context:%p, fd:%d, is created, data:%p", pContext, fd, ppContext); // set the ref to 0 taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false); + pthread_mutex_init(&pContext->ctxMutex, NULL); + return pContext; } @@ -188,11 +193,12 @@ void httpCloseContextByApp(HttpContext *pContext) { pContext->parsed = false; bool keepAlive = true; - if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) { + if (pContext->error == true) { + keepAlive = false; + } else if (parser && parser->httpVersion == HTTP_VERSION_10 && parser->keepAlive != HTTP_KEEPALIVE_ENABLE) { keepAlive = false; } else if (parser && parser->httpVersion != HTTP_VERSION_10 && parser->keepAlive == HTTP_KEEPALIVE_DISABLE) { keepAlive = false; - } else { } if (keepAlive) { diff --git a/src/plugins/http/src/httpGcHandle.c b/src/plugins/http/src/httpGcHandle.c index 925c74e7cd..883afcc4ec 100644 --- a/src/plugins/http/src/httpGcHandle.c +++ b/src/plugins/http/src/httpGcHandle.c @@ -176,6 +176,16 @@ bool gcProcessQueryRequest(HttpContext* pContext) { return false; } +#define ESCAPE_ERROR_PROC(code, context, root) \ + do { \ + if (code != TSDB_CODE_SUCCESS) { \ + httpSendErrorResp(context, code); \ + \ + cJSON_Delete(root); \ + return false; \ + } \ + } while (0) + for (int32_t i = 0; i < size; ++i) { cJSON* query = cJSON_GetArrayItem(root, i); if (query == NULL) continue; @@ -186,7 +196,14 @@ bool gcProcessQueryRequest(HttpContext* pContext) { continue; } - int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, refId->valuestring); + char *newStr = NULL; + int32_t retCode = 0; + + retCode = httpCheckAllocEscapeSql(refId->valuestring, &newStr); + ESCAPE_ERROR_PROC(retCode, pContext, root); + + int32_t refIdBuffer = httpAddToSqlCmdBuffer(pContext, newStr); + httpCheckFreeEscapedSql(refId->valuestring, newStr); if (refIdBuffer == -1) { httpWarn("context:%p, fd:%d, user:%s, refId buffer is full", pContext, pContext->fd, pContext->user); break; @@ -195,7 +212,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) { cJSON* alias = cJSON_GetObjectItem(query, "alias"); int32_t aliasBuffer = -1; if (!(alias == NULL || alias->valuestring == NULL || strlen(alias->valuestring) == 0)) { - aliasBuffer = httpAddToSqlCmdBuffer(pContext, alias->valuestring); + retCode = httpCheckAllocEscapeSql(alias->valuestring, &newStr); + ESCAPE_ERROR_PROC(retCode, pContext, root); + + aliasBuffer = httpAddToSqlCmdBuffer(pContext, newStr); + httpCheckFreeEscapedSql(alias->valuestring, newStr); if (aliasBuffer == -1) { httpWarn("context:%p, fd:%d, user:%s, alias buffer is full", pContext, pContext->fd, pContext->user); break; @@ -211,7 +232,11 @@ bool gcProcessQueryRequest(HttpContext* pContext) { continue; } - int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, sql->valuestring); + retCode = httpCheckAllocEscapeSql(sql->valuestring, &newStr); + ESCAPE_ERROR_PROC(retCode, pContext, root); + + int32_t sqlBuffer = httpAddToSqlCmdBuffer(pContext, newStr); + httpCheckFreeEscapedSql(sql->valuestring, newStr); if (sqlBuffer == -1) { httpWarn("context:%p, fd:%d, user:%s, sql buffer is full", pContext, pContext->fd, pContext->user); break; @@ -237,6 +262,8 @@ bool gcProcessQueryRequest(HttpContext* pContext) { } } +#undef ESCAPE_ERROR_PROC + pContext->reqType = HTTP_REQTYPE_MULTI_SQL; pContext->encodeMethod = &gcQueryMethod; pContext->multiCmds->pos = 0; diff --git a/src/plugins/http/src/httpGcJson.c b/src/plugins/http/src/httpGcJson.c index 397791706d..7e4658b364 100644 --- a/src/plugins/http/src/httpGcJson.c +++ b/src/plugins/http/src/httpGcJson.c @@ -130,14 +130,34 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, // for group by if (groupFields != -1) { char target[HTTP_GC_TARGET_SIZE] = {0}; - int32_t len; - len = snprintf(target, HTTP_GC_TARGET_SIZE, "%s{", aliasBuffer); + int32_t len = 0, cur = 0; + cur = snprintf(target, HTTP_GC_TARGET_SIZE, "%s{", aliasBuffer); + if (cur < 0 || cur >= HTTP_GC_TARGET_SIZE) { + httpError("context:%p, fd:%d, too long alias: %s", pContext, pContext->fd, aliasBuffer); + return false; + } + + len += cur; for (int32_t i = dataFields + 1; i < num_fields; i++) { + // -2 means the last '}' and '\0' +#define HTTP_GC_CHECK_SIZE(name) if (cur < 0 || cur >= HTTP_GC_TARGET_SIZE - len - 2) { \ + if (cur < 0) { \ + httpError("context:%p, fd:%d, failed to snprintf for: %s", pContext, pContext->fd, name); \ + } else { \ + httpError("context:%p, fd:%d, snprintf overflow for: %s", pContext, pContext->fd, name); \ + target[len] = '\0'; \ + } \ + break; \ + } else { \ + len += cur; \ + } if (row[i] == NULL) { - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:nil", fields[i].name); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:nil", fields[i].name); + HTTP_GC_CHECK_SIZE(fields[i].name) if (i < num_fields - 1) { - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", "); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, ", "); + HTTP_GC_CHECK_SIZE(fields[i].name) } continue; @@ -146,40 +166,49 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, switch (fields[i].type) { case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int8_t *)row[i])); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%d", fields[i].name, *((int8_t *)row[i])); + HTTP_GC_CHECK_SIZE(fields[i].name) break; case TSDB_DATA_TYPE_SMALLINT: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d", fields[i].name, *((int16_t *)row[i])); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%d", fields[i].name, *((int16_t *)row[i])); + HTTP_GC_CHECK_SIZE(fields[i].name) break; case TSDB_DATA_TYPE_INT: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%d,", fields[i].name, *((int32_t *)row[i])); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%d,", fields[i].name, *((int32_t *)row[i])); + HTTP_GC_CHECK_SIZE(fields[i].name) break; case TSDB_DATA_TYPE_BIGINT: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i])); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%" PRId64, fields[i].name, *((int64_t *)row[i])); + HTTP_GC_CHECK_SIZE(fields[i].name) break; case TSDB_DATA_TYPE_FLOAT: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i])); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%.5f", fields[i].name, GET_FLOAT_VAL(row[i])); + HTTP_GC_CHECK_SIZE(fields[i].name) break; case TSDB_DATA_TYPE_DOUBLE: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i])); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%.9f", fields[i].name, GET_DOUBLE_VAL(row[i])); + HTTP_GC_CHECK_SIZE(fields[i].name) break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: if (row[i] != NULL) { - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name); - memcpy(target + len, (char *)row[i], length[i]); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:", fields[i].name); + HTTP_GC_CHECK_SIZE(fields[i].name) + memcpy(target + len, (char *)row[i], MIN(length[i], HTTP_GC_TARGET_SIZE - len - 3)); len = (int32_t)strlen(target); } break; default: - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-"); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, "%s:%s", fields[i].name, "-"); + HTTP_GC_CHECK_SIZE(fields[i].name) break; } if (i < num_fields - 1) { - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, ", "); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 2, ", "); + HTTP_GC_CHECK_SIZE(fields[i].name) } } - len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "}"); + cur = snprintf(target + len, HTTP_GC_TARGET_SIZE - len - 1, "}"); if (strcmp(target, targetBuffer) != 0) { // first target not write this section @@ -199,7 +228,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, for (int32_t i = dataFields; i >= 0; i--) { httpJsonItemToken(jsonBuf); - if (row[i] == NULL) { + if (row == NULL || i >= num_fields || row[i] == NULL) { httpJsonOriginString(jsonBuf, "null", 4); continue; } @@ -228,13 +257,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, case TSDB_DATA_TYPE_NCHAR: httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes); break; - case TSDB_DATA_TYPE_TIMESTAMP: - if (precision == TSDB_TIME_PRECISION_MILLI) { // ms - httpJsonInt64(jsonBuf, *((int64_t *)row[i])); - } else { - httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000); - } + case TSDB_DATA_TYPE_TIMESTAMP: { + int64_t ts = convertTimePrecision(*((int64_t *)row[i]), precision, TSDB_TIME_PRECISION_MILLI); + httpJsonInt64(jsonBuf, ts); break; + } default: httpJsonString(jsonBuf, "-", 1); break; diff --git a/src/plugins/http/src/httpGzip.c b/src/plugins/http/src/httpGzip.c index ecda0e1fe0..6a6e995c18 100644 --- a/src/plugins/http/src/httpGzip.c +++ b/src/plugins/http/src/httpGzip.c @@ -132,10 +132,10 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) { if (ret != Z_STREAM_END) continue; } - int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk); + int32_t _len = (int32_t)(gzip->gzip->next_out - (z_const Bytef *)gzip->chunk); gzip->gzip->next_out[0] = '\0'; - gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); + gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, _len); gzip->gzip->next_out = (z_const Bytef *)gzip->chunk; gzip->gzip->avail_out = gzip->conf.chunk_size; } diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index d51c774ff2..6f77994593 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -35,6 +35,7 @@ bool httpProcessData(HttpContext* pContext) { if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) { httpTrace("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd, httpContextStateStr(pContext->state)); + pContext->error = true; httpCloseContextByApp(pContext); return false; } @@ -44,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) { httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd); httpSendOptionResp(pContext, "process options request success"); } else { - if (!httpDecodeRequest(pContext)) { - /* - * httpCloseContextByApp has been called when parsing the error - */ - // httpCloseContextByApp(pContext); - } else { + pthread_mutex_lock(&pContext->ctxMutex); + + if (httpDecodeRequest(pContext)) { httpClearParser(pContext->parser); httpProcessRequest(pContext); } + + pthread_mutex_unlock(&pContext->ctxMutex); } return true; diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index 19166e720f..86e0f2f40b 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -262,42 +262,112 @@ void httpJsonUInt64(JsonBuf* buf, uint64_t num) { buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num); } -void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) { +void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { char ts[35] = {0}; - struct tm* ptm; - int32_t precision = 1000; - if (us) { - precision = 1000000; - } + + int32_t fractionLen; + char* format = NULL; + time_t quot = 0; + int64_t mod = 0; + + switch (timePrecision) { + case TSDB_TIME_PRECISION_MILLI: { + mod = ((t) % 1000 + 1000) % 1000; + if (t < 0 && mod != 0) { + t -= 1000; + } + quot = t / 1000; + fractionLen = 5; + format = ".%03" PRId64; + break; + } - time_t tt = t / precision; - ptm = localtime(&tt); - int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", ptm); - if (us) { - length += snprintf(ts + length, 8, ".%06" PRId64, t % precision); - } else { - length += snprintf(ts + length, 5, ".%03" PRId64, t % precision); + case TSDB_TIME_PRECISION_MICRO: { + mod = ((t) % 1000000 + 1000000) % 1000000; + if (t < 0 && mod != 0) { + t -= 1000000; + } + quot = t / 1000000; + fractionLen = 8; + format = ".%06" PRId64; + break; + } + + case TSDB_TIME_PRECISION_NANO: { + mod = ((t) % 1000000000 + 1000000000) % 1000000000; + if (t < 0 && mod != 0) { + t -= 1000000000; + } + quot = t / 1000000000; + fractionLen = 11; + format = ".%09" PRId64; + break; + } + + default: + fractionLen = 0; + assert(false); } + struct tm ptm = {0}; + localtime_r(", &ptm); + int32_t length = (int32_t)strftime(ts, 35, "%Y-%m-%d %H:%M:%S", &ptm); + length += snprintf(ts + length, fractionLen, format, mod); + httpJsonString(buf, ts, length); } -void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us) { +void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) { char ts[40] = {0}; struct tm* ptm; - int32_t precision = 1000; - if (us) { - precision = 1000000; + + int32_t fractionLen; + char* format = NULL; + time_t quot = 0; + long mod = 0; + + switch (timePrecision) { + case TSDB_TIME_PRECISION_MILLI: { + mod = ((t) % 1000 + 1000) % 1000; + if (t < 0 && mod != 0) { + t -= 1000; + } + quot = t / 1000; + fractionLen = 5; + format = ".%03" PRId64; + break; + } + + case TSDB_TIME_PRECISION_MICRO: { + mod = ((t) % 1000000 + 1000000) % 1000000; + if (t < 0 && mod != 0) { + t -= 1000000; + } + quot = t / 1000000; + fractionLen = 8; + format = ".%06" PRId64; + break; + } + + case TSDB_TIME_PRECISION_NANO: { + mod = ((t) % 1000000000 + 1000000000) % 1000000000; + if (t < 0 && mod != 0) { + t -= 1000000000; + } + quot = t / 1000000000; + fractionLen = 11; + format = ".%09" PRId64; + break; + } + + default: + fractionLen = 0; + assert(false); } - time_t tt = t / precision; - ptm = localtime(&tt); + ptm = localtime("); int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); - if (us) { - length += snprintf(ts + length, 8, ".%06" PRId64, t % precision); - } else { - length += snprintf(ts + length, 5, ".%03" PRId64, t % precision); - } + length += snprintf(ts + length, fractionLen, format, mod); length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); httpJsonString(buf, ts, length); diff --git a/src/plugins/http/src/httpMetricsHandle.c b/src/plugins/http/src/httpMetricsHandle.c new file mode 100644 index 0000000000..045f660346 --- /dev/null +++ b/src/plugins/http/src/httpMetricsHandle.c @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tfs.h" + +#include "httpMetricsHandle.h" +#include "dnode.h" +#include "httpLog.h" + +static HttpDecodeMethod metricsDecodeMethod = {"metrics", metricsProcessRequest}; + +void metricsInitHandle(HttpServer* pServer) { + httpAddMethod(pServer, &metricsDecodeMethod); +} + +bool metricsProcessRequest(HttpContext* pContext) { + httpDebug("context:%p, fd:%d, user:%s, process admin grant msg", pContext, pContext->fd, pContext->user); + + JsonBuf* jsonBuf = httpMallocJsonBuf(pContext); + if (jsonBuf == NULL) { + httpError("failed to allocate memory for metrics"); + httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_ENOUGH_MEMORY); + return false; + } + + httpInitJsonBuf(jsonBuf, pContext); + httpWriteJsonBufHead(jsonBuf); + + httpJsonToken(jsonBuf, JsonObjStt); + { + char* keyDisks = "tags"; + httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks)); + httpJsonToken(jsonBuf, JsonArrStt); + { + httpJsonItemToken(jsonBuf); + httpJsonToken(jsonBuf, JsonObjStt); + char* keyTagName = "name"; + char* keyTagValue = "value"; + httpJsonPairOriginString(jsonBuf, keyTagName, (int32_t)strlen(keyTagName), "\"dnode_id\"", + (int32_t)strlen("\"dnode_id\"")); + int32_t dnodeId = dnodeGetDnodeId(); + httpJsonPairIntVal(jsonBuf, keyTagValue, (int32_t)strlen(keyTagValue), dnodeId); + httpJsonToken(jsonBuf, JsonObjEnd); + } + httpJsonToken(jsonBuf, JsonArrEnd); + } + + { + if (tsDnodeStartTime != 0) { + int64_t now = taosGetTimestampMs(); + int64_t upTime = now-tsDnodeStartTime; + char* keyUpTime = "up_time"; + httpJsonPairInt64Val(jsonBuf, keyUpTime, (int32_t)strlen(keyUpTime), upTime); + } + } +/* + { + int32_t cpuCores = taosGetCpuCores(); + char* keyCpuCores = "cpu_cores"; + httpJsonPairIntVal(jsonBuf, keyCpuCores, (int32_t)strlen(keyCpuCores), cpuCores); + + float sysCpuUsage = 0; + float procCpuUsage = 0; + bool succeeded = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage); + if (!succeeded) { + httpError("failed to get cpu usage"); + } else { + if (sysCpuUsage <= procCpuUsage) { + sysCpuUsage = procCpuUsage + 0.1f; + } + char* keyCpuSystem = "cpu_system"; + char* keyCpuEngine = "cpu_engine"; + httpJsonPairFloatVal(jsonBuf, keyCpuSystem, (int32_t)strlen(keyCpuSystem), sysCpuUsage); + httpJsonPairFloatVal(jsonBuf, keyCpuEngine, (int32_t)strlen(keyCpuEngine), procCpuUsage); + } + } + + { + float sysMemoryUsedMB = 0; + bool succeeded = taosGetSysMemory(&sysMemoryUsedMB); + if (!succeeded) { + httpError("failed to get sys memory info"); + } else { + char* keyMemSystem = "mem_system"; + httpJsonPairFloatVal(jsonBuf, keyMemSystem, (int32_t)strlen(keyMemSystem), sysMemoryUsedMB); + } + + float procMemoryUsedMB = 0; + succeeded = taosGetProcMemory(&procMemoryUsedMB); + if (!succeeded) { + httpError("failed to get proc memory info"); + } else { + char* keyMemEngine = "mem_engine"; + httpJsonPairFloatVal(jsonBuf, keyMemEngine, (int32_t)strlen(keyMemEngine), procMemoryUsedMB); + } + } + + { + int64_t bytes = 0, rbytes = 0, tbytes = 0; + bool succeeded = taosGetCardInfo(&bytes, &rbytes, &tbytes); + if (!succeeded) { + httpError("failed to get network info"); + } else { + char* keyNetIn = "net_in"; + char* keyNetOut = "net_out"; + httpJsonPairInt64Val(jsonBuf, keyNetIn, (int32_t)strlen(keyNetIn), rbytes); + httpJsonPairInt64Val(jsonBuf, keyNetOut, (int32_t)strlen(keyNetOut), tbytes); + } + } + + { + int64_t rchars = 0, rbytes = 0; + int64_t wchars = 0, wbytes = 0; + bool succeeded = taosReadProcIO(&rchars, &wchars, &rbytes, &wbytes); + if (!succeeded) { + httpError("failed to get io info"); + } else { + char* keyIORead = "io_read"; + char* keyIOWrite = "io_write"; + httpJsonPairInt64Val(jsonBuf, keyIORead, (int32_t)strlen(keyIORead), rchars); + httpJsonPairInt64Val(jsonBuf, keyIOWrite, (int32_t)strlen(keyIOWrite), wchars); + } + } + + { + const int8_t numTiers = 3; + SFSMeta fsMeta; + STierMeta* tierMetas = calloc(numTiers, sizeof(STierMeta)); + tfsUpdateInfo(&fsMeta, tierMetas, numTiers); + { + char* keyDiskUsed = "disk_used"; + char* keyDiskTotal = "disk_total"; + httpJsonPairInt64Val(jsonBuf, keyDiskTotal, (int32_t)strlen(keyDiskTotal), fsMeta.tsize); + httpJsonPairInt64Val(jsonBuf, keyDiskUsed, (int32_t)strlen(keyDiskUsed), fsMeta.used); + char* keyDisks = "disks"; + httpJsonPairHead(jsonBuf, keyDisks, (int32_t)strlen(keyDisks)); + httpJsonToken(jsonBuf, JsonArrStt); + for (int i = 0; i < numTiers; ++i) { + httpJsonItemToken(jsonBuf); + httpJsonToken(jsonBuf, JsonObjStt); + char* keyDataDirLevelUsed = "datadir_used"; + char* keyDataDirLevelTotal = "datadir_total"; + httpJsonPairInt64Val(jsonBuf, keyDataDirLevelUsed, (int32_t)strlen(keyDataDirLevelUsed), tierMetas[i].used); + httpJsonPairInt64Val(jsonBuf, keyDataDirLevelTotal, (int32_t)strlen(keyDataDirLevelTotal), tierMetas[i].size); + httpJsonToken(jsonBuf, JsonObjEnd); + } + httpJsonToken(jsonBuf, JsonArrEnd); + } + free(tierMetas); + } + + { + SDnodeStatisInfo info = dnodeGetStatisInfo(); + { + char* keyReqHttp = "req_http"; + char* keyReqSelect = "req_select"; + char* keyReqInsert = "req_insert"; + httpJsonPairInt64Val(jsonBuf, keyReqHttp, (int32_t)strlen(keyReqHttp), info.httpReqNum); + httpJsonPairInt64Val(jsonBuf, keyReqSelect, (int32_t)strlen(keyReqSelect), info.queryReqNum); + httpJsonPairInt64Val(jsonBuf, keyReqInsert, (int32_t)strlen(keyReqInsert), info.submitReqNum); + } + } +*/ + httpJsonToken(jsonBuf, JsonObjEnd); + + httpWriteJsonBufEnd(jsonBuf); + pContext->reqType = HTTP_REQTYPE_OTHERS; + httpFreeJsonBuf(pContext); + return false; +} diff --git a/src/plugins/http/src/httpParser.c b/src/plugins/http/src/httpParser.c index 02f21037b8..7c68e6c98e 100644 --- a/src/plugins/http/src/httpParser.c +++ b/src/plugins/http/src/httpParser.c @@ -186,7 +186,7 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target if (pContext->decodeMethod != NULL) { httpTrace("context:%p, fd:%d, decode method is %s", pContext, pContext->fd, pContext->decodeMethod->module); } else { - httpError("context:%p, fd:%d, the url is not support, target:%s", pContext, pContext->fd, target); + httpError("context:%p, fd:%d, the url is not supported, target:%s", pContext, pContext->fd, target); httpOnError(pParser, 0, TSDB_CODE_HTTP_UNSUPPORT_URL); return -1; } @@ -663,7 +663,7 @@ static int32_t httpParserOnTarget(HttpParser *parser, HTTP_PARSER_STATE state, c HttpContext *pContext = parser->pContext; int32_t ok = 0; do { - if (!isspace(c) && c != '\r' && c != '\n') { + if (!isspace(c)) { if (httpAppendString(&parser->str, &c, 1)) { httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c); ok = -1; @@ -763,9 +763,9 @@ static int32_t httpParserOnSp(HttpParser *parser, HTTP_PARSER_STATE state, const httpPopStack(parser); break; } - httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x, oom", pContext, pContext->fd, state, c, c); + httpError("context:%p, fd:%d, parser state:%d, char:[%c]%02x", pContext, pContext->fd, state, c, c); ok = -1; - httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_PARSE_SP_FAILED); + httpOnError(parser, HTTP_CODE_BAD_REQUEST, TSDB_CODE_HTTP_PARSE_SP_FAILED); } while (0); return ok; } @@ -837,7 +837,7 @@ static int32_t httpParserPostProcess(HttpParser *parser) { if (parser->gzip) { if (ehttp_gzip_finish(parser->gzip)) { httpError("context:%p, fd:%d, gzip failed", pContext, pContext->fd); - httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_FINISH_GZIP_FAILED); + httpOnError(parser, HTTP_CODE_INTERNAL_SERVER_ERROR, TSDB_CODE_HTTP_FINISH_GZIP_FAILED); return -1; } } @@ -1040,7 +1040,7 @@ static int32_t httpParserOnChunk(HttpParser *parser, HTTP_PARSER_STATE state, co if (ehttp_gzip_write(parser->gzip, parser->str.str, parser->str.pos)) { httpError("context:%p, fd:%d, gzip failed", pContext, pContext->fd); ok = -1; - httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_PARSE_CHUNK_FAILED); + httpOnError(parser, HTTP_CODE_INTERNAL_SERVER_ERROR, TSDB_CODE_HTTP_PARSE_CHUNK_FAILED); break; } } else { @@ -1062,7 +1062,7 @@ static int32_t httpParserOnEnd(HttpParser *parser, HTTP_PARSER_STATE state, cons do { ok = -1; httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); - httpOnError(parser, HTTP_CODE_INSUFFICIENT_STORAGE, TSDB_CODE_HTTP_PARSE_END_FAILED); + httpOnError(parser, HTTP_CODE_BAD_REQUEST, TSDB_CODE_HTTP_PARSE_END_FAILED); } while (0); return ok; } diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c index 7f7ce40460..677ab0c91d 100644 --- a/src/plugins/http/src/httpQueue.c +++ b/src/plugins/http/src/httpQueue.c @@ -70,6 +70,8 @@ static void *httpProcessResultQueue(void *param) { int32_t type; void * unUsed; + setThreadName("httpResultQ"); + while (1) { if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) { httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset); diff --git a/src/plugins/http/src/httpResp.c b/src/plugins/http/src/httpResp.c index 063f2bb04e..2c18904d2a 100644 --- a/src/plugins/http/src/httpResp.c +++ b/src/plugins/http/src/httpResp.c @@ -21,10 +21,11 @@ #include "httpResp.h" #include "httpJson.h" #include "httpContext.h" +#include "monitor.h" const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"}; -const char *httpVersionStr[] = {"HTTP/1.0", "HTTP/1.1", "HTTP/1.2"}; +const char *httpVersionStr[] = {"HTTP/1.0", "HTTP/1.1", "HTTP/2.0"}; /* There is no version 1.2 */ const char *httpRespTemplate[] = { // HTTP_RESPONSE_JSON_OK @@ -52,8 +53,14 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char * int8_t httpVersion = 0; int8_t keepAlive = 0; + if (pContext->parser != NULL) { httpVersion = pContext->parser->httpVersion; + } + + if (pContext->error == true) { + keepAlive = HTTP_KEEPALIVE_DISABLE; + } else if (pContext->parser != NULL) { keepAlive = pContext->parser->keepAlive; } @@ -67,92 +74,101 @@ static void httpSendErrorRespImp(HttpContext *pContext, int32_t httpCode, char * } void httpSendErrorResp(HttpContext *pContext, int32_t errNo) { - int32_t httpCode = 500; + int32_t httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR; if (errNo == TSDB_CODE_SUCCESS) - httpCode = 200; + httpCode = HTTP_CODE_OK; else if (errNo == TSDB_CODE_HTTP_SERVER_OFFLINE) - httpCode = 404; + httpCode = HTTP_CODE_NOT_FOUND; else if (errNo == TSDB_CODE_HTTP_UNSUPPORT_URL) - httpCode = 404; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_INVALID_URL) - httpCode = 404; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_NO_ENOUGH_MEMORY) - httpCode = 507; + httpCode = HTTP_CODE_INSUFFICIENT_STORAGE; else if (errNo == TSDB_CODE_HTTP_REQUSET_TOO_BIG) - httpCode = 413; + httpCode = HTTP_CODE_PAYLOAD_TOO_LARGE; else if (errNo == TSDB_CODE_HTTP_NO_AUTH_INFO) - httpCode = 401; + httpCode = HTTP_CODE_UNAUTHORIZED; else if (errNo == TSDB_CODE_HTTP_NO_MSG_INPUT) - httpCode = 400; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_NO_SQL_INPUT) - httpCode = 400; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_NO_EXEC_USEDB) - httpCode = 400; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_SESSION_FULL) - httpCode = 421; + httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR; else if (errNo == TSDB_CODE_HTTP_GEN_TAOSD_TOKEN_ERR) - httpCode = 507; + httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR; else if (errNo == TSDB_CODE_HTTP_INVALID_MULTI_REQUEST) - httpCode = 400; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_CREATE_GZIP_FAILED) - httpCode = 507; + httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR; else if (errNo == TSDB_CODE_HTTP_FINISH_GZIP_FAILED) - httpCode = 507; + httpCode = HTTP_CODE_INTERNAL_SERVER_ERROR; else if (errNo == TSDB_CODE_HTTP_INVALID_VERSION) - httpCode = 406; + httpCode = HTTP_CODE_HTTP_VER_NOT_SUPPORTED; else if (errNo == TSDB_CODE_HTTP_INVALID_CONTENT_LENGTH) - httpCode = 406; + httpCode = HTTP_CODE_LENGTH_REQUIRED; else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_TYPE) - httpCode = 406; + httpCode = HTTP_CODE_UNAUTHORIZED; else if (errNo == TSDB_CODE_HTTP_INVALID_AUTH_FORMAT) - httpCode = 406; + httpCode = HTTP_CODE_UNAUTHORIZED; else if (errNo == TSDB_CODE_HTTP_INVALID_BASIC_AUTH) - httpCode = 406; + httpCode = HTTP_CODE_UNAUTHORIZED; else if (errNo == TSDB_CODE_HTTP_INVALID_TAOSD_AUTH) - httpCode = 406; + httpCode = HTTP_CODE_UNAUTHORIZED; else if (errNo == TSDB_CODE_HTTP_PARSE_METHOD_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_TARGET_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_VERSION_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_SP_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_STATUS_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_PHRASE_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_CRLF_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_KEY_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_HEADER_VAL_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_SIZE_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_CHUNK_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_END_FAILED) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_INVALID_STATE) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else if (errNo == TSDB_CODE_HTTP_PARSE_ERROR_STATE) - httpCode = 406; + httpCode = HTTP_CODE_BAD_REQUEST; else - httpCode = 400; + httpCode = HTTP_CODE_BAD_REQUEST; if (pContext->parser && pContext->parser->httpCode != 0) { httpCode = pContext->parser->httpCode; } + HttpServer *pServer = &tsHttpServer; + SMonHttpStatus *httpStatus = monGetHttpStatusHashTableEntry(httpCode); + // FIXME(@huolinhe): I don't known why the errors index is overflowed, but fix it by index check + if (httpStatus->index < HTTP_STATUS_CODE_NUM) { + pServer->statusCodeErrs[httpStatus->index] += 1; + } + + pContext->error = true; + char *httpCodeStr = httpGetStatusDesc(httpCode); httpSendErrorRespImp(pContext, httpCode, httpCodeStr, errNo & 0XFFFF, tstrerror(errNo)); } void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) { - int32_t httpCode = 400; + int32_t httpCode = HTTP_CODE_BAD_REQUEST; char temp[512] = {0}; int32_t len = sprintf(temp, "invalid SQL: %s", errMsg); @@ -165,7 +181,7 @@ void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char *errMsg) { } } - httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_SQL & 0XFFFF, temp); + httpSendErrorRespImp(pContext, httpCode, "Bad Request", TSDB_CODE_TSC_INVALID_OPERATION & 0XFFFF, temp); } void httpSendSuccResp(HttpContext *pContext, char *desc) { diff --git a/src/plugins/http/src/httpRestHandle.c b/src/plugins/http/src/httpRestHandle.c index 8728b9e37a..24e4f90244 100644 --- a/src/plugins/http/src/httpRestHandle.c +++ b/src/plugins/http/src/httpRestHandle.c @@ -19,6 +19,7 @@ #include "httpLog.h" #include "httpRestHandle.h" #include "httpRestJson.h" +#include "tglobal.h" static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest}; static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest}; @@ -62,11 +63,11 @@ void restInitHandle(HttpServer* pServer) { bool restGetUserFromUrl(HttpContext* pContext) { HttpParser* pParser = pContext->parser; - if (pParser->path[REST_USER_URL_POS].pos >= TSDB_USER_LEN || pParser->path[REST_USER_URL_POS].pos <= 0) { + if (pParser->path[REST_USER_USEDB_URL_POS].pos >= TSDB_USER_LEN || pParser->path[REST_USER_USEDB_URL_POS].pos <= 0) { return false; } - tstrncpy(pContext->user, pParser->path[REST_USER_URL_POS].str, TSDB_USER_LEN); + tstrncpy(pContext->user, pParser->path[REST_USER_USEDB_URL_POS].str, TSDB_USER_LEN); return true; } @@ -107,6 +108,24 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) { HttpSqlCmd* cmd = &(pContext->singleCmd); cmd->nativSql = sql; + /* find if there is db_name in url */ + pContext->db[0] = '\0'; + + HttpString *path = &pContext->parser->path[REST_USER_USEDB_URL_POS]; + if (tsHttpDbNameMandatory) { + if (path->pos == 0) { + httpError("context:%p, fd:%d, user:%s, database name is mandatory", pContext, pContext->fd, pContext->user); + httpSendErrorResp(pContext, TSDB_CODE_HTTP_INVALID_URL); + return false; + } + } + + if (path->pos > 0 && !(strlen(sql) > 4 && (sql[0] == 'u' || sql[0] == 'U') && + (sql[1] == 's' || sql[1] == 'S') && (sql[2] == 'e' || sql[2] == 'E') && sql[3] == ' ')) + { + snprintf(pContext->db, /*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN, "%s", path->str); + } + pContext->reqType = HTTP_REQTYPE_SINGLE_SQL; if (timestampFmt == REST_TIMESTAMP_FMT_LOCAL_STRING) { pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod; @@ -119,6 +138,90 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) { return true; } +#define REST_FUNC_URL_POS 2 +#define REST_OUTP_URL_POS 3 +#define REST_AGGR_URL_POS 4 +#define REST_BUFF_URL_POS 5 + +#define HTTP_FUNC_LEN 32 +#define HTTP_OUTP_LEN 16 +#define HTTP_AGGR_LEN 2 +#define HTTP_BUFF_LEN 32 + +static int udfSaveFile(const char *fname, const char *content, long len) { + int fd = open(fname, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755); + if (fd < 0) + return -1; + if (taosWrite(fd, (void *)content, len) < 0) + return -1; + + return 0; +} + +static bool restProcessUdfRequest(HttpContext* pContext) { + HttpParser* pParser = pContext->parser; + if (pParser->path[REST_FUNC_URL_POS].pos >= HTTP_FUNC_LEN || pParser->path[REST_FUNC_URL_POS].pos <= 0) { + return false; + } + + if (pParser->path[REST_OUTP_URL_POS].pos >= HTTP_OUTP_LEN || pParser->path[REST_OUTP_URL_POS].pos <= 0) { + return false; + } + + if (pParser->path[REST_AGGR_URL_POS].pos >= HTTP_AGGR_LEN || pParser->path[REST_AGGR_URL_POS].pos <= 0) { + return false; + } + + if (pParser->path[REST_BUFF_URL_POS].pos >= HTTP_BUFF_LEN || pParser->path[REST_BUFF_URL_POS].pos <= 0) { + return false; + } + + char* sql = pContext->parser->body.str; + int len = pContext->parser->body.pos; + if (sql == NULL) { + httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_SQL_INPUT); + return false; + } + + char udfDir[256] = {0}; + char buf[64] = "udf-"; + char funcName[64] = {0}; + int aggr = 0; + char outputType[16] = {0}; + char buffSize[32] = {0}; + + tstrncpy(funcName, pParser->path[REST_FUNC_URL_POS].str, HTTP_FUNC_LEN); + tstrncpy(buf + 4, funcName, HTTP_FUNC_LEN); + + if (pParser->path[REST_AGGR_URL_POS].str[0] != '0') { + aggr = 1; + } + + tstrncpy(outputType, pParser->path[REST_OUTP_URL_POS].str, HTTP_OUTP_LEN); + tstrncpy(buffSize, pParser->path[REST_BUFF_URL_POS].str, HTTP_BUFF_LEN); + + taosGetTmpfilePath(funcName, udfDir); + + udfSaveFile(udfDir, sql, len); + + tfree(sql); + pContext->parser->body.str = malloc(1024); + sql = pContext->parser->body.str; + int sqlLen = sprintf(sql, "create %s function %s as \"%s\" outputtype %s bufsize %s", + aggr == 1 ? "aggregate" : " ", funcName, udfDir, outputType, buffSize); + + pContext->parser->body.pos = sqlLen; + pContext->parser->body.size = sqlLen + 1; + + HttpSqlCmd* cmd = &(pContext->singleCmd); + cmd->nativSql = sql; + + pContext->reqType = HTTP_REQTYPE_SINGLE_SQL; + pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod; + + return true; +} + bool restProcessRequest(struct HttpContext* pContext) { if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) { restGetUserFromUrl(pContext); @@ -138,6 +241,8 @@ bool restProcessRequest(struct HttpContext* pContext) { return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_UTC_STRING); } else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) { return restProcessLoginRequest(pContext); + } else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "udf")) { + return restProcessUdfRequest(pContext); } else { } diff --git a/src/plugins/http/src/httpRestJson.c b/src/plugins/http/src/httpRestJson.c index 60c23e603e..13596b0e8a 100644 --- a/src/plugins/http/src/httpRestJson.c +++ b/src/plugins/http/src/httpRestJson.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tglobal.h" +#include "tsclient.h" #include "httpLog.h" #include "httpJson.h" #include "httpRestHandle.h" @@ -62,13 +63,21 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result) httpJsonItemToken(jsonBuf); httpJsonToken(jsonBuf, JsonArrStt); + SSqlObj *pObj = (SSqlObj *) result; + bool isAlterSql = (pObj->sqlstr == NULL) ? false : httpCheckAlterSql(pObj->sqlstr); + if (num_fields == 0) { httpJsonItemToken(jsonBuf); httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN); } else { - for (int32_t i = 0; i < num_fields; ++i) { + if (isAlterSql == true) { httpJsonItemToken(jsonBuf); - httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name)); + httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN); + } else { + for (int32_t i = 0; i < num_fields; ++i) { + httpJsonItemToken(jsonBuf); + httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name)); + } } } @@ -99,8 +108,14 @@ void restStartSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result) httpJsonItemToken(jsonBuf); httpJsonToken(jsonBuf, JsonArrStt); - httpJsonItemToken(jsonBuf); - httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name)); + if (isAlterSql == true) { + httpJsonItemToken(jsonBuf); + httpJsonString(jsonBuf, REST_JSON_AFFECT_ROWS, REST_JSON_AFFECT_ROWS_LEN); + } else { + httpJsonItemToken(jsonBuf); + httpJsonString(jsonBuf, fields[i].name, (int32_t)strlen(fields[i].name)); + } + httpJsonItemToken(jsonBuf); httpJsonInt(jsonBuf, fields[i].type); httpJsonItemToken(jsonBuf); @@ -186,13 +201,11 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, break; case TSDB_DATA_TYPE_TIMESTAMP: if (timestampFormat == REST_TIMESTAMP_FMT_LOCAL_STRING) { - httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]), - taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO); + httpJsonTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result)); } else if (timestampFormat == REST_TIMESTAMP_FMT_TIMESTAMP) { httpJsonInt64(jsonBuf, *((int64_t *)row[i])); } else { - httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]), - taos_result_precision(result) == TSDB_TIME_PRECISION_MICRO); + httpJsonUtcTimestamp(jsonBuf, *((int64_t *)row[i]), taos_result_precision(result)); } break; default: diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 4dcf3d5501..6f694b606e 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -53,7 +53,7 @@ static void httpStopThread(HttpThread *pThread) { break; } } while (0); - if (r) { + if (r && taosCheckPthreadValid(pThread->thread)) { pthread_cancel(pThread->thread); } #else @@ -63,15 +63,21 @@ static void httpStopThread(HttpThread *pThread) { httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno)); pThread->stop = true; - pthread_cancel(pThread->thread); + if (taosCheckPthreadValid(pThread->thread)) { + pthread_cancel(pThread->thread); + } } else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThread->label, strerror(errno)); - pthread_cancel(pThread->thread); + if (taosCheckPthreadValid(pThread->thread)) { + pthread_cancel(pThread->thread); + } } #endif // __APPLE__ - pthread_join(pThread->thread, NULL); + if (taosCheckPthreadValid(pThread->thread)) { + pthread_join(pThread->thread, NULL); + } #ifdef __APPLE__ if (sv[0] != -1) { @@ -117,6 +123,8 @@ static void httpProcessHttpData(void *param) { int32_t fdNum; taosSetMaskSIGPIPE(); + //dkj + //setThreadName("httpData"); while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; @@ -189,9 +197,7 @@ static void httpProcessHttpData(void *param) { } else { if (httpReadData(pContext)) { (*(pThread->processData))(pContext); - atomic_fetch_add_32(&pServer->requestNum, 1); - } else { - httpReleaseContext(pContext/*, false*/); + atomic_fetch_add_64(&pServer->requestNum, 1); } } } @@ -208,6 +214,7 @@ static void *httpAcceptHttpConnection(void *arg) { int32_t totalFds = 0; taosSetMaskSIGPIPE(); + setThreadName("httpAcceptConn"); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); @@ -269,7 +276,11 @@ static void *httpAcceptHttpConnection(void *arg) { sprintf(pContext->ipstr, "%s:%u", taosInetNtoa(clientAddr.sin_addr), htons(clientAddr.sin_port)); struct epoll_event event; +#ifndef _TD_NINGSI_60 event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP | EPOLLERR | EPOLLHUP | EPOLLRDHUP; +#else + event.events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP | EPOLLRDHUP; +#endif event.data.ptr = pContext; if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, @@ -394,15 +405,23 @@ static bool httpReadData(HttpContext *pContext) { return true; } } else if (nread < 0) { - if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + if (errno == EINTR) { + httpDebug("context:%p, fd:%d, read from socket error:%d, continue", pContext, pContext->fd, errno); + continue; + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { httpDebug("context:%p, fd:%d, read from socket error:%d, wait another event", pContext, pContext->fd, errno); - return false; // later again + httpReleaseContext(pContext/*, false */); + return false; } else { httpError("context:%p, fd:%d, read from socket error:%d, close connect", pContext, pContext->fd, errno); + taosCloseSocket(pContext->fd); + httpReleaseContext(pContext/*, false */); return false; } } else { httpError("context:%p, fd:%d, nread:%d, wait another event", pContext, pContext->fd, nread); + taosCloseSocket(pContext->fd); + httpReleaseContext(pContext/*, false */); return false; } } diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index b345c1531f..e1b3b17347 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -263,7 +263,7 @@ void httpProcessSingleSqlCallBackImp(void *param, TAOS_RES *result, int32_t code if (code != TSDB_CODE_SUCCESS) { SSqlObj *pObj = (SSqlObj *)result; - if (code == TSDB_CODE_TSC_INVALID_SQL) { + if (code == TSDB_CODE_TSC_INVALID_OPERATION) { terrno = code; httpError("context:%p, fd:%d, user:%s, query error, code:%s, sqlObj:%p, error:%s", pContext, pContext->fd, pContext->user, tstrerror(code), pObj, taos_errstr(pObj)); @@ -405,9 +405,15 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { if (pContext->session == NULL) { httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL); - httpCloseContextByApp(pContext); } else { + // httpProcessRequestCb called by another thread and a subsequent thread calls this + // function again, if this function called by httpProcessRequestCb executes memset + // just before the subsequent thread executes *Cmd function, nativSql will be NULL + pthread_mutex_lock(&pContext->ctxMutex); + httpExecCmd(pContext); + + pthread_mutex_unlock(&pContext->ctxMutex); } } @@ -419,7 +425,74 @@ void httpProcessRequest(HttpContext *pContext) { &(pContext->taos)); httpDebug("context:%p, fd:%d, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd, pContext->user, pContext->taos); + + if (pContext->taos != NULL) { + STscObj *pObj = pContext->taos; + pObj->from = TAOS_REQ_FROM_HTTP; + } } else { httpExecCmd(pContext); } } + +int32_t httpCheckAllocEscapeSql(char *oldSql, char **newSql) +{ + char *pos; + + if (oldSql == NULL || newSql == NULL) { + return TSDB_CODE_SUCCESS; + } + + /* bad sql clause */ + pos = strstr(oldSql, "%%"); + if (pos) { + httpError("bad sql:%s", oldSql); + return TSDB_CODE_HTTP_REQUEST_JSON_ERROR; + } + + pos = strchr(oldSql, '%'); + if (pos == NULL) { + httpDebug("sql:%s", oldSql); + *newSql = oldSql; + return TSDB_CODE_SUCCESS; + } + + *newSql = (char *) calloc(1, (strlen(oldSql) << 1) + 1); + if (newSql == NULL) { + httpError("failed to allocate for new sql, old sql:%s", oldSql); + return TSDB_CODE_HTTP_NO_ENOUGH_MEMORY; + } + + char *src = oldSql; + char *dst = *newSql; + size_t sqlLen = strlen(src); + + while (1) { + memcpy(dst, src, pos - src + 1); + dst += pos - src + 1; + *dst++ = '%'; + + if (pos + 1 >= oldSql + sqlLen) { + break; + } + + src = ++pos; + pos = strchr(pos, '%'); + if (pos == NULL) { + memcpy(dst, src, strlen(src)); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +void httpCheckFreeEscapedSql(char *oldSql, char *newSql) +{ + if (oldSql && newSql) { + if (oldSql != newSql) { + free(newSql); + } + } +} + diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 203db21895..1e388541ac 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -30,6 +30,7 @@ #include "httpGcHandle.h" #include "httpRestHandle.h" #include "httpTgHandle.h" +#include "httpMetricsHandle.h" #ifndef _ADMIN void adminInitHandle(HttpServer* pServer) {} @@ -52,7 +53,7 @@ int32_t httpInitSystem() { gcInitHandle(&tsHttpServer); tgInitHandle(&tsHttpServer); opInitHandle(&tsHttpServer); - + metricsInitHandle(&tsHttpServer); return 0; } @@ -119,4 +120,10 @@ void httpCleanUpSystem() { tsHttpServer.status = HTTP_SERVER_CLOSED; } -int32_t httpGetReqCount() { return atomic_exchange_32(&tsHttpServer.requestNum, 0); } +int64_t httpGetReqCount() { return atomic_exchange_64(&tsHttpServer.requestNum, 0); } +int32_t httpGetStatusCodeCount(int index) { + return atomic_load_32(&tsHttpServer.statusCodeErrs[index]); +} +int32_t httpClearStatusCodeCount(int index) { + return atomic_exchange_32(&tsHttpServer.statusCodeErrs[index], 0); +} diff --git a/src/plugins/http/src/httpTgHandle.c b/src/plugins/http/src/httpTgHandle.c index c1d006ff5a..32516b9fd1 100644 --- a/src/plugins/http/src/httpTgHandle.c +++ b/src/plugins/http/src/httpTgHandle.c @@ -209,7 +209,7 @@ void tgParseSchemaMetric(cJSON *metric) { parsedOk = false; goto ParseEnd; } - int32_t nameLen = (int32_t)strlen(field->valuestring); + nameLen = (int32_t)strlen(field->valuestring); if (nameLen == 0 || nameLen >= TSDB_TABLE_NAME_LEN) { parsedOk = false; goto ParseEnd; @@ -610,7 +610,18 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) { // stable tag for detail for (int32_t i = 0; i < orderTagsLen; ++i) { cJSON *tag = orderedTags[i]; - stable_cmd->tagNames[i] = table_cmd->tagNames[i] = httpAddToSqlCmdBuffer(pContext, tag->string); + + char *tagStr = NULL; + int32_t retCode = httpCheckAllocEscapeSql(tag->string, &tagStr); + if (retCode != TSDB_CODE_SUCCESS) { + httpSendErrorResp(pContext, retCode); + + return false; + } + + stable_cmd->tagNames[i] = table_cmd->tagNames[i] = httpAddToSqlCmdBuffer(pContext, tagStr); + + httpCheckFreeEscapedSql(tag->string, tagStr); if (tag->type == cJSON_String) stable_cmd->tagValues[i] = table_cmd->tagValues[i] = httpAddToSqlCmdBuffer(pContext, "'%s'", tag->valuestring); diff --git a/src/plugins/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c index ade50bdad6..c49d561e2b 100644 --- a/src/plugins/http/src/httpUtil.c +++ b/src/plugins/http/src/httpUtil.c @@ -21,6 +21,7 @@ #include "httpResp.h" #include "httpSql.h" #include "httpUtil.h" +#include "ttoken.h" bool httpCheckUsedbSql(char *sql) { if (strstr(sql, "use ") != NULL) { @@ -29,6 +30,17 @@ bool httpCheckUsedbSql(char *sql) { return false; } +bool httpCheckAlterSql(char *sql) { + int32_t index = 0; + + do { + SStrToken t0 = tStrGetToken(sql, &index, false); + if (t0.type != TK_LP) { + return t0.type == TK_ALTER; + } + } while (1); +} + void httpTimeToString(int32_t t, char *buf, int32_t buflen) { memset(buf, 0, (size_t)buflen); char ts[32] = {0}; @@ -338,10 +350,10 @@ int32_t httpShrinkTableName(HttpContext *pContext, int32_t pos, char *name) { return pos; } - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)name, (uint32_t)len); - MD5Final(&context); + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, (uint8_t *)name, (uint32_t)len); + tMD5Final(&context); int32_t table_name = httpAddToSqlCmdBuffer( pContext, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], diff --git a/src/plugins/monitor/src/monMain.c b/src/plugins/monitor/src/monMain.c index 94af8e3ecd..3f46256f6c 100644 --- a/src/plugins/monitor/src/monMain.c +++ b/src/plugins/monitor/src/monMain.c @@ -72,6 +72,75 @@ static void monBuildMonitorSql(char *sql, int32_t cmd); extern int32_t (*monStartSystemFp)(); extern void (*monStopSystemFp)(); extern void (*monExecuteSQLFp)(char *sql); +static void monInitHttpStatusHashTable(); +static void monCleanupHttpStatusHashTable(); + +static void *monHttpStatusHashTable; +static SMonHttpStatus monHttpStatusTable[] = { + {"HTTP_CODE_CONTINUE", 100}, + {"HTTP_CODE_SWITCHING_PROTOCOL", 101}, + {"HTTP_CODE_PROCESSING", 102}, + {"HTTP_CODE_EARLY_HINTS", 103}, + {"HTTP_CODE_OK", 200}, + {"HTTP_CODE_CREATED", 201}, + {"HTTP_CODE_ACCEPTED", 202}, + {"HTTP_CODE_NON_AUTHORITATIVE_INFO", 203}, + {"HTTP_CODE_NO_CONTENT", 204}, + {"HTTP_CODE_RESET_CONTENT", 205}, + {"HTTP_CODE_PARTIAL_CONTENT", 206}, + {"HTTP_CODE_MULTI_STATUS", 207}, + {"HTTP_CODE_ALREADY_REPORTED", 208}, + {"HTTP_CODE_IM_USED", 226}, + {"HTTP_CODE_MULTIPLE_CHOICE", 300}, + {"HTTP_CODE_MOVED_PERMANENTLY", 301}, + {"HTTP_CODE_FOUND", 302}, + {"HTTP_CODE_SEE_OTHER", 303}, + {"HTTP_CODE_NOT_MODIFIED", 304}, + {"HTTP_CODE_USE_PROXY", 305}, + {"HTTP_CODE_UNUSED", 306}, + {"HTTP_CODE_TEMPORARY_REDIRECT", 307}, + {"HTTP_CODE_PERMANENT_REDIRECT", 308}, + {"HTTP_CODE_BAD_REQUEST", 400}, + {"HTTP_CODE_UNAUTHORIZED", 401}, + {"HTTP_CODE_PAYMENT_REQUIRED", 402}, + {"HTTP_CODE_FORBIDDEN", 403}, + {"HTTP_CODE_NOT_FOUND", 404}, + {"HTTP_CODE_METHOD_NOT_ALLOWED", 405}, + {"HTTP_CODE_NOT_ACCEPTABLE", 406}, + {"HTTP_CODE_PROXY_AUTH_REQUIRED", 407}, + {"HTTP_CODE_REQUEST_TIMEOUT", 408}, + {"HTTP_CODE_CONFLICT", 409}, + {"HTTP_CODE_GONE", 410}, + {"HTTP_CODE_LENGTH_REQUIRED", 411}, + {"HTTP_CODE_PRECONDITION_FAILED", 412}, + {"HTTP_CODE_PAYLOAD_TOO_LARGE", 413}, + {"HTTP_CODE_URI_TOO_LARGE", 414}, + {"HTTP_CODE_UNSUPPORTED_MEDIA_TYPE", 415}, + {"HTTP_CODE_RANGE_NOT_SATISFIABLE", 416}, + {"HTTP_CODE_EXPECTATION_FAILED", 417}, + {"HTTP_CODE_IM_A_TEAPOT", 418}, + {"HTTP_CODE_MISDIRECTED_REQUEST", 421}, + {"HTTP_CODE_UNPROCESSABLE_ENTITY", 422}, + {"HTTP_CODE_LOCKED", 423}, + {"HTTP_CODE_FAILED_DEPENDENCY", 424}, + {"HTTP_CODE_TOO_EARLY", 425}, + {"HTTP_CODE_UPGRADE_REQUIRED", 426}, + {"HTTP_CODE_PRECONDITION_REQUIRED", 428}, + {"HTTP_CODE_TOO_MANY_REQUESTS", 429}, + {"HTTP_CODE_REQ_HDR_FIELDS_TOO_LARGE",431}, + {"HTTP_CODE_UNAVAIL_4_LEGAL_REASONS", 451}, + {"HTTP_CODE_INTERNAL_SERVER_ERROR", 500}, + {"HTTP_CODE_NOT_IMPLEMENTED", 501}, + {"HTTP_CODE_BAD_GATEWAY", 502}, + {"HTTP_CODE_SERVICE_UNAVAILABLE", 503}, + {"HTTP_CODE_GATEWAY_TIMEOUT", 504}, + {"HTTP_CODE_HTTP_VER_NOT_SUPPORTED", 505}, + {"HTTP_CODE_VARIANT_ALSO_NEGOTIATES", 506}, + {"HTTP_CODE_INSUFFICIENT_STORAGE", 507}, + {"HTTP_CODE_LOOP_DETECTED", 508}, + {"HTTP_CODE_NOT_EXTENDED", 510}, + {"HTTP_CODE_NETWORK_AUTH_REQUIRED", 511} +}; int32_t monInitSystem() { if (tsMonitor.ep[0] == 0) { @@ -85,6 +154,8 @@ int32_t monInitSystem() { } } + monInitHttpStatusHashTable(); + pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); @@ -255,6 +326,7 @@ void monCleanupSystem() { taos_close(tsMonitor.conn); tsMonitor.conn = NULL; } + monCleanupHttpStatusHashTable(); monInfo("monitor module is cleaned up"); } @@ -417,3 +489,29 @@ void monExecuteSQL(char *sql) { monDebug("execute sql:%s", sql); taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql"); } + +static void monInitHttpStatusHashTable() { + int32_t numOfEntries = tListLen(monHttpStatusTable); + monHttpStatusHashTable = taosHashInit(numOfEntries, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + for (int32_t i = 0; i < numOfEntries; ++i) { + monHttpStatusTable[i].index = i; + SMonHttpStatus* pEntry = &monHttpStatusTable[i]; + taosHashPut(monHttpStatusHashTable, &monHttpStatusTable[i].code, sizeof(int32_t), + &pEntry, POINTER_BYTES); + } +} + +static void monCleanupHttpStatusHashTable() { + void* m = monHttpStatusHashTable; + if (m != NULL && atomic_val_compare_exchange_ptr(&monHttpStatusHashTable, m, 0) == m) { + taosHashCleanup(m); + } +} + +SMonHttpStatus *monGetHttpStatusHashTableEntry(int32_t code) { + if (monHttpStatusHashTable == NULL) { + return NULL; + } + return (SMonHttpStatus*)taosHashGet(monHttpStatusHashTable, &code, sizeof(int32_t)); +} + diff --git a/src/util/inc/tmd5.h b/src/util/inc/tmd5.h index d7fd038f37..829765726e 100644 --- a/src/util/inc/tmd5.h +++ b/src/util/inc/tmd5.h @@ -38,4 +38,9 @@ void MD5Init(MD5_CTX *mdContext); void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen); void MD5Final(MD5_CTX *mdContext); +#define T_MD5_CTX MD5_CTX +#define tMD5Init MD5Init +#define tMD5Update MD5Update +#define tMD5Final MD5Final + #endif diff --git a/src/util/src/terror.c b/src/util/src/terror.c index 3d527cc1a2..fd9fc3fb3a 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -406,6 +406,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, "tag value can not mor TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, "value not find") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, "value type should be boolean, number or string") +TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_REQUEST_JSON_ERROR, "http request json error") + // odbc TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, "convertion not a valid literal input") -- GitLab