提交 4e705a3a 编写于 作者: H Hui Li

Merge branch 'develop' into hotfix/test

......@@ -286,7 +286,7 @@ typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char acctId[TSDB_ACCT_LEN];
char db[TSDB_DB_NAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
......
......@@ -368,7 +368,9 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
pSql->fp = pSql->fetchFp;
}
(*pSql->fp)(pSql->param, taosres, code);
if (pSql->fp) {
(*pSql->fp)(pSql->param, taosres, code);
}
if (shouldFree) {
tscTrace("%p sqlObj is automatically freed in async res", pSql);
......
......@@ -97,7 +97,7 @@ int tsParseTime(SSQLToken *pToken, int64_t *time, char **next, char *error, int1
useconds = str2int64(pToken->z);
} else {
// strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
if (taosParseTime(pToken->z, time, pToken->n, timePrec) != TSDB_CODE_SUCCESS) {
if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(error, "invalid timestamp format", pToken->z);
}
......
......@@ -138,7 +138,7 @@ static int setColumnFilterInfoForTimestamp(SQueryInfo* pQueryInfo, tVariant* pVa
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (seg != NULL) {
if (taosParseTime(pVar->pz, &time, pVar->nLen, tinfo.precision) != TSDB_CODE_SUCCESS) {
if (taosParseTime(pVar->pz, &time, pVar->nLen, tinfo.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg);
}
} else {
......@@ -1042,7 +1042,7 @@ int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pDB, SSQL
/* db name is not specified, the tableName dose not include db name */
if (pDB != NULL) {
if (pDB->n >= TSDB_DB_NAME_LEN) {
if (pDB->n >= TSDB_ACCT_LEN + TSDB_DB_NAME_LEN) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -3950,7 +3950,7 @@ int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t t
char* seg = strnchr(pRight->val.pz, '-', pRight->val.nLen, false);
if (seg != NULL) {
if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO) == TSDB_CODE_SUCCESS) {
if (taosParseTime(pRight->val.pz, &val, pRight->val.nLen, TSDB_TIME_PRECISION_MICRO, tsDaylight) == TSDB_CODE_SUCCESS) {
parsed = true;
} else {
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -4508,7 +4508,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
len = tDataTypeDesc[pTagsSchema->type].nSize;
} else {
len = varDataLen(pUpdateMsg->data);
len = varDataTLen(pUpdateMsg->data);
}
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
......
......@@ -1451,8 +1451,6 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
return false;
}
assert(pSql->fp != NULL);
STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
return false;
......
......@@ -170,6 +170,7 @@ extern char gitinfo[];
extern char gitinfoOfInternal[];
extern char buildinfo[];
extern int8_t tsDaylight;
extern char tsTimezone[64];
extern char tsLocale[64];
extern char tsCharset[64]; // default encode string
......
......@@ -198,6 +198,7 @@ char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
char tsInternalPass[] = "secretkey";
int32_t tsMonitorInterval = 30; // seconds
int8_t tsDaylight = 0;
char tsTimezone[64] = {0};
char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
......
......@@ -58,6 +58,7 @@ void tsSetTimeZone() {
* (BST, +0100)
*/
sprintf(tsTimezone, "(%s, %s%02d00)", tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
tsDaylight = daylight;
uPrint("timezone format changed to %s", tsTimezone);
}
......@@ -250,7 +250,7 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int8_t igExists;
int8_t getMeta;
int16_t numOfTags;
......@@ -268,7 +268,7 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */
int32_t tagValLen;
......@@ -682,7 +682,7 @@ typedef struct {
*/
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
uint16_t payloadLen;
char payload[];
} SCMShowMsg;
......
......@@ -469,7 +469,6 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) {
} while( row != NULL);
fclose(fp);
taos_free_result(result);
return numOfRows;
}
......
......@@ -297,7 +297,7 @@ void *deleteTable();
void *asyncWrite(void *sarg);
void generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary);
int generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary);
void rand_string(char *str, int size);
......@@ -817,7 +817,7 @@ void queryDB(TAOS *taos, char *command) {
i--;
}
if (i == 0) {
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql));
taos_free_result(pSql);
......@@ -846,14 +846,19 @@ void *syncWrite(void *sarg) {
int k;
for (k = 0; k < winfo->nrecords_per_request;) {
int rand_num = rand() % 100;
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate)
{
int len = -1;
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) {
long d = tmp_time - rand() % 1000000 + rand_num;
generateData(data, data_type, ncols_per_record, d, len_of_binary);
} else
{
generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
len = generateData(data, data_type, ncols_per_record, d, len_of_binary);
} else {
len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
}
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= BUFFER_SIZE) { // too long
break;
}
pstr += sprintf(pstr, " %s", data);
inserted++;
k++;
......@@ -968,7 +973,7 @@ double getCurrentTime() {
return tv.tv_sec + tv.tv_usec / 1E6;
}
void generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary) {
int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int len_of_binary) {
memset(res, 0, MAX_DATA_SIZE);
char *pstr = res;
pstr += sprintf(pstr, "(%" PRId64, timestamp);
......@@ -1002,9 +1007,16 @@ void generateData(char *res, char **data_type, int num_of_cols, int64_t timestam
rand_string(s, len_of_binary);
pstr += sprintf(pstr, ", \"%s\"", s);
}
if (pstr - res > MAX_DATA_SIZE) {
perror("column length too long, abort");
exit(-1);
}
}
pstr += sprintf(pstr, ")");
return pstr - res;
}
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890";
......
......@@ -108,8 +108,8 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
}
void mnodeReleaseConn(SConnObj *pConn) {
if(pConn == NULL) return;
taosCacheRelease(tsMnodeConnCache, (void**)&pConn, false);
if (pConn == NULL) return;
taosCacheRelease(tsMnodeConnCache, (void **)&pConn, false);
}
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port) {
......@@ -138,7 +138,7 @@ SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t p
static void mnodeFreeConn(void *data) {
SConnObj *pConn = data;
tfree(pConn->pQueries);
tfree(pConn->pQueries);
tfree(pConn->pStreams);
mTrace("connId:%d, is destroyed", pConn->connId);
}
......
......@@ -16,15 +16,11 @@
#ifndef TDENGINE_GC_HANDLE_H
#define TDENGINE_GC_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define GC_ROOT_URL_POS 0
#define GC_ACTION_URL_POS 1
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_TOKEN_H
#define TDENGINE_HTTP_TOKEN_H
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_CONTEXT_H
#define TDENGINE_HTTP_CONTEXT_H
#include "httpInt.h"
bool httpInitContexts();
void httpCleanupContexts();
const char *httpContextStateStr(HttpContextState state);
HttpContext *httpCreateContext(int32_t fd);
bool httpInitContext(HttpContext *pContext);
HttpContext *httpGetContext(void * pContext);
void httpReleaseContext(HttpContext *pContext);
void httpCloseContextByServer(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpNotifyContextClose(HttpContext *pContext);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
#endif
......@@ -13,304 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SERVER_H
#define TDENGINE_HTTP_SERVER_H
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_VERSION_10 0
#define HTTP_VERSION_11 1
//#define HTTP_VERSION_12 2
#define HTTP_UNCUNKED 0
#define HTTP_CHUNKED 1
#define HTTP_KEEPALIVE_NO_INPUT 0
#define HTTP_KEEPALIVE_ENABLE 1
#define HTTP_KEEPALIVE_DISABLE 2
#define HTTP_REQTYPE_OTHERS 0
#define HTTP_REQTYPE_LOGIN 1
#define HTTP_REQTYPE_HEARTBEAT 2
#define HTTP_REQTYPE_SINGLE_SQL 3
#define HTTP_REQTYPE_MULTI_SQL 4
#define HTTP_CHECK_BODY_ERROR -1
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
struct HttpContext;
struct HttpThread;
typedef struct {
void *signature;
int expire;
int access;
void *taos;
char id[HTTP_SESSION_ID_LEN];
} HttpSession;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef struct {
char *pos;
int32_t len;
} HttpBuf;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
char *pLast;
char *pCur;
HttpBuf method;
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
} HttpParser;
typedef struct HttpContext {
void * signature;
int fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
uint8_t httpVersion;
uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t fromMemPool;
uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t reqType;
uint8_t parsed;
int32_t state;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void *taos;
HttpSession *session;
z_stream gzipStream;
HttpEncodeMethod *encodeMethod;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf *jsonBuf;
HttpParser parser;
void *timer;
struct HttpThread *pThread;
struct HttpContext *prev;
struct HttpContext *next;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
int pollFd;
int numOfFds;
int threadId;
char label[HTTP_LABEL_SIZE];
bool (*processData)(HttpContext *pContext);
struct HttpServer *pServer; // handle passed by upper layer during pServer initialization
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
bool online;
int fd;
int cacheContext;
int sessionExpire;
int numOfThreads;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
int methodScannerLen;
pthread_mutex_t serverMutex;
void *pSessionHash;
void *pContextPool;
void *expireTimer;
HttpThread *pThreads;
pthread_t thread;
bool (*processData)(HttpContext *pContext);
int requestNum;
void *timerHandle;
} HttpServer;
// http util method
bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen);
// http init method
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
// http server connection
void httpCleanUpConnect(HttpServer *pServer);
bool httpInitConnect(HttpServer *pServer);
// http context for each client connection
HttpContext *httpCreateContext(HttpServer *pServer);
bool httpInitContext(HttpContext *pContext);
void httpCloseContextByApp(HttpContext *pContext);
void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext);
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpAccessSession(HttpContext *pContext);
void httpFetchSession(HttpContext *pContext);
void httpRestoreSession(HttpContext *pContext);
void httpRemoveExpireSessions(HttpServer *pServer);
bool httpInitAllSessions(HttpServer *pServer);
void httpRemoveAllSessions(HttpServer *pServer);
void httpProcessSessionExpire(void *handle, void *tmrId);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
// http token method
bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len);
bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len);
bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
// util
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
bool httpProcessData(HttpContext *pContext);
bool httpReadDataImp(HttpContext *pContext);
bool httpParseRequest(HttpContext* pContext);
int httpCheckReadCompleted(HttpContext* pContext);
void httpReadDirtyData(HttpContext *pContext);
#ifndef TDENGINE_HTTP_HANDLE_H
#define TDENGINE_HTTP_HANDLE_H
// http request handler
void httpProcessRequest(HttpContext *pContext);
// http json printer
JsonBuf *httpMallocJsonBuf(HttpContext *pContext);
void httpFreeJsonBuf(HttpContext *pContext);
// http multicmds util
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
const char* httpContextStateStr(HttpContextState state);
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState);
void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext);
bool httpProcessData(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_INT_H
#define TDENGINE_HTTP_INT_H
#include <stdbool.h>
#include "pthread.h"
#include "semaphore.h"
#include "tmempool.h"
#include "taosdef.h"
#include "tutil.h"
#include "zlib.h"
#include "http.h"
#include "httpCode.h"
#include "httpLog.h"
#include "httpJson.h"
#define HTTP_MAX_CMD_SIZE 1024
#define HTTP_MAX_BUFFER_SIZE 1024*1024
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*65 //65k
#define HTTP_DECOMPRESS_BUF_SIZE 1024*64
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
#define HTTP_GC_TARGET_SIZE 512
#define HTTP_VERSION_10 0
#define HTTP_VERSION_11 1
//#define HTTP_VERSION_12 2
#define HTTP_UNCUNKED 0
#define HTTP_CHUNKED 1
#define HTTP_KEEPALIVE_NO_INPUT 0
#define HTTP_KEEPALIVE_ENABLE 1
#define HTTP_KEEPALIVE_DISABLE 2
#define HTTP_REQTYPE_OTHERS 0
#define HTTP_REQTYPE_LOGIN 1
#define HTTP_REQTYPE_HEARTBEAT 2
#define HTTP_REQTYPE_SINGLE_SQL 3
#define HTTP_REQTYPE_MULTI_SQL 4
#define HTTP_CHECK_BODY_ERROR -1
#define HTTP_CHECK_BODY_CONTINUE 0
#define HTTP_CHECK_BODY_SUCCESS 1
#define HTTP_WRITE_RETRY_TIMES 500
#define HTTP_WRITE_WAIT_TIME_MS 5
#define HTTP_EXPIRED_TIME 60000
#define HTTP_DELAY_CLOSE_TIME_MS 500
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN + TSDB_PASSWORD_LEN)
typedef enum {
HTTP_SERVER_INIT,
HTTP_SERVER_RUNNING,
HTTP_SERVER_CLOSING,
HTTP_SERVER_CLOSED
} HttpServerStatus;
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
HTTP_CONTEXT_STATE_DROPPING,
HTTP_CONTEXT_STATE_CLOSED
} HttpContextState;
struct HttpContext;
struct HttpThread;
typedef struct {
char id[HTTP_SESSION_ID_LEN];
int refCount;
void *taos;
} HttpSession;
typedef enum {
HTTP_CMD_TYPE_UN_SPECIFIED,
HTTP_CMD_TYPE_CREATE_DB,
HTTP_CMD_TYPE_CREATE_STBALE,
HTTP_CMD_TYPE_INSERT
} HttpSqlCmdType;
typedef enum { HTTP_CMD_STATE_NOT_RUN_YET, HTTP_CMD_STATE_RUN_FINISHED } HttpSqlCmdState;
typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN } HttpSqlCmdReturnType;
typedef struct {
// used by single cmd
char *nativSql;
int32_t numOfRows;
int32_t code;
// these are the locations in the buffer
int32_t tagNames[TSDB_MAX_TAGS];
int32_t tagValues[TSDB_MAX_TAGS];
int32_t timestamp;
int32_t metric;
int32_t stable;
int32_t table;
int32_t values;
int32_t sql;
// used by multi-cmd
int8_t cmdType;
int8_t cmdReturnType;
int8_t cmdState;
int8_t tagNum;
} HttpSqlCmd;
typedef struct {
HttpSqlCmd *cmds;
int16_t pos;
int16_t size;
int16_t maxSize;
int32_t bufferPos;
int32_t bufferSize;
char * buffer;
} HttpSqlCmds;
typedef struct {
char *module;
bool (*decodeFp)(struct HttpContext *pContext);
} HttpDecodeMethod;
typedef struct {
void (*startJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result);
void (*stopJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd);
bool (*buildQueryJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, void *result, int numOfRows);
void (*buildAffectRowJsonFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int affectRows);
void (*initJsonFp)(struct HttpContext *pContext);
void (*cleanJsonFp)(struct HttpContext *pContext);
bool (*checkFinishedFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
void (*setNextCmdFp)(struct HttpContext *pContext, HttpSqlCmd *cmd, int code);
} HttpEncodeMethod;
typedef struct {
char *pos;
int32_t len;
} HttpBuf;
typedef struct {
char buffer[HTTP_BUFFER_SIZE];
int bufsize;
char *pLast;
char *pCur;
HttpBuf method;
HttpBuf path[HTTP_MAX_URL]; // url: dbname/meter/query
HttpBuf data; // body content
HttpBuf token; // auth token
HttpDecodeMethod *pMethod;
} HttpParser;
typedef struct HttpContext {
int32_t refCount;
int fd;
uint32_t accessTimes;
uint32_t lastAccessTime;
int32_t state;
uint8_t httpVersion;
uint8_t httpChunked;
uint8_t httpKeepAlive; // http1.0 and not keep-alive, close connection immediately
uint8_t acceptEncoding;
uint8_t contentEncoding;
uint8_t reqType;
uint8_t parsed;
char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[TSDB_PASSWORD_LEN];
void * taos;
void * ppContext;
HttpSession *session;
z_stream gzipStream;
HttpParser parser;
HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds;
JsonBuf * jsonBuf;
void * timer;
HttpEncodeMethod * encodeMethod;
struct HttpThread *pThread;
} HttpContext;
typedef struct HttpThread {
pthread_t thread;
HttpContext * pHead;
pthread_mutex_t threadMutex;
bool stop;
int pollFd;
int numOfFds;
int threadId;
char label[HTTP_LABEL_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpThread;
typedef struct HttpServer {
char label[HTTP_LABEL_SIZE];
uint32_t serverIp;
uint16_t serverPort;
int fd;
int numOfThreads;
int methodScannerLen;
int32_t requestNum;
int32_t status;
pthread_t thread;
HttpThread * pThreads;
void * contextCache;
void * sessionCache;
pthread_mutex_t serverMutex;
HttpDecodeMethod *methodScanner[HTTP_METHOD_SCANNER_SIZE];
bool (*processData)(HttpContext *pContext);
} HttpServer;
extern const char *httpKeepAliveStr[];
extern const char *httpVersionStr[];
extern HttpServer tsHttpServer;
#endif
......@@ -97,4 +97,8 @@ void httpJsonPrint(JsonBuf* buf, const char* json, int len);
// quick
void httpJsonPairStatus(JsonBuf* buf, int code);
// http json printer
JsonBuf* httpMallocJsonBuf(struct HttpContext* pContext);
void httpFreeJsonBuf(struct HttpContext* pContext);
#endif
......@@ -16,7 +16,7 @@
#ifndef TDENGINE_HTTP_RESP_H
#define TDENGINE_HTTP_RESP_H
#include "httpHandle.h"
#include "httpInt.h"
enum _httpRespTempl {
HTTP_RESPONSE_JSON_OK,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SERVER_H
#define TDENGINE_HTTP_SERVER_H
#include "httpInt.h"
bool httpInitConnect();
void httpCleanUpConnect();
void *httpInitServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void httpCleanUpServer(HttpServer *pServer);
bool httpReadDataImp(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SESSION_H
#define TDENGINE_HTTP_SESSION_H
bool httpInitSessions();
void httpCleanUpSessions();
// http session method
void httpCreateSession(HttpContext *pContext, void *taos);
void httpGetSession(HttpContext *pContext);
void httpReleaseSession(HttpContext *pContext);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_SQL_H
#define TDENGINE_HTTP_SQL_H
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferNoTerminal(HttpContext *pContext, const char *const format, ...);
int32_t httpAddToSqlCmdBufferWithSize(HttpContext *pContext, int mallocSize);
int32_t httpAddToSqlCmdBufferTerminal(HttpContext *pContext);
bool httpMallocMultiCmds(HttpContext *pContext, int cmdSize, int bufferSize);
bool httpReMallocMultiCmdsSize(HttpContext *pContext, int cmdSize);
bool httpReMallocMultiCmdsBuffer(HttpContext *pContext, int bufferSize);
void httpFreeMultiCmds(HttpContext *pContext);
HttpSqlCmd *httpNewSqlCmd(HttpContext *pContext);
HttpSqlCmd *httpCurrSqlCmd(HttpContext *pContext);
int httpCurSqlCmdPos(HttpContext *pContext);
void httpTrimTableName(char *name);
int httpShrinkTableName(HttpContext *pContext, int pos, char *name);
char *httpGetCmdsString(HttpContext *pContext, int pos);
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HTTP_UTIL_H
#define TDENGINE_HTTP_UTIL_H
bool httpCheckUsedbSql(char *sql);
void httpTimeToString(time_t t, char *buf, int buflen);
bool httpUrlMatch(HttpContext *pContext, int pos, char *cmp);
bool httpParseRequest(HttpContext *pContext);
int httpCheckReadCompleted(HttpContext *pContext);
void httpReadDirtyData(HttpContext *pContext);
int httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int32_t *nDestData);
int httpGzipCompressInit(HttpContext *pContext);
int httpGzipCompress(HttpContext *pContext, char *inSrcData, int32_t inSrcDataLen,
char *outDestData, int32_t *outDestDataLen, bool isTheLast);
// http request parser
void httpAddMethod(HttpServer *pServer, HttpDecodeMethod *pMethod);
#endif
......@@ -16,15 +16,11 @@
#ifndef TDENGINE_REST_HANDLE_H
#define TDENGINE_REST_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define REST_ROOT_URL_POS 0
#define REST_ACTION_URL_POS 1
......
......@@ -16,16 +16,11 @@
#ifndef TDENGINE_TG_HANDLE_H
#define TDENGINE_TG_HANDLE_H
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "cJSON.h"
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpUtil.h"
#include "httpResp.h"
#include "httpSql.h"
#define TG_ROOT_URL_POS 0
#define TG_DB_URL_POS 1
......
......@@ -18,8 +18,8 @@
#include "tkey.h"
#include "tutil.h"
#include "http.h"
#include "httpLog.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpAuth.h"
#define KEY_DES_4 4971256377704625728L
......@@ -73,6 +73,7 @@ bool httpParseTaosdAuthToken(HttpContext *pContext, char *token, int len) {
unsigned char *base64 = base64_decode(token, len, &outlen);
if (base64 == NULL || outlen == 0) {
httpError("context:%p, fd:%d, ip:%s, taosd token:%s parsed error", pContext, pContext->fd, pContext->ipstr, token);
if (base64) free(base64);
return false;
}
if (outlen != (TSDB_USER_LEN + TSDB_PASSWORD_LEN)) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tsocket.h"
#include "tutil.h"
#include "ttime.h"
#include "ttimer.h"
#include "tglobal.h"
#include "tcache.h"
#include "hash.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSql.h"
#include "httpSession.h"
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
HttpThread *pThread = pContext->pThread;
if (pContext->fd >= 0) {
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL);
taosCloseSocket(pContext->fd);
pContext->fd = -1;
}
}
static void httpDestroyContext(void *data) {
HttpContext *pContext = *(HttpContext **)data;
if (pContext->fd > 0) tclose(pContext->fd);
HttpThread *pThread = pContext->pThread;
httpRemoveContextFromEpoll(pContext);
httpReleaseSession(pContext);
atomic_sub_fetch_32(&pThread->numOfFds, 1);
pContext->pThread = 0;
pContext->state = HTTP_CONTEXT_STATE_CLOSED;
// avoid double free
httpFreeJsonBuf(pContext);
httpFreeMultiCmds(pContext);
httpTrace("context:%p, is destroyed, refCount:%d", pContext, pContext->refCount);
tfree(pContext);
}
bool httpInitContexts() {
tsHttpServer.contextCache = taosCacheInitWithCb(2, httpDestroyContext);
if (tsHttpServer.contextCache == NULL) {
httpError("failed to init context cache");
return false;
}
return true;
}
void httpCleanupContexts() {
if (tsHttpServer.contextCache != NULL) {
SCacheObj *cache = tsHttpServer.contextCache;
httpPrint("context cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.contextCache);
tsHttpServer.contextCache = NULL;
}
}
const char *httpContextStateStr(HttpContextState state) {
switch (state) {
case HTTP_CONTEXT_STATE_READY:
return "ready";
case HTTP_CONTEXT_STATE_HANDLING:
return "handling";
case HTTP_CONTEXT_STATE_DROPPING:
return "dropping";
case HTTP_CONTEXT_STATE_CLOSED:
return "closed";
default:
return "unknown";
}
}
void httpNotifyContextClose(HttpContext *pContext) {
shutdown(pContext->fd, SHUT_WR);
}
bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, HttpContextState destState) {
return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState);
}
HttpContext *httpCreateContext(int32_t fd) {
HttpContext *pContext = calloc(1, sizeof(HttpContext));
if (pContext == NULL) return NULL;
char contextStr[16] = {0};
snprintf(contextStr, sizeof(contextStr), "%p", pContext);
pContext->fd = fd;
pContext->httpVersion = HTTP_VERSION_10;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->state = HTTP_CONTEXT_STATE_READY;
HttpContext **ppContext = taosCachePut(tsHttpServer.contextCache, contextStr, &pContext, sizeof(HttpContext *), 3);
pContext->ppContext = ppContext;
httpTrace("context:%p, fd:%d, is created, item:%p", pContext, fd, ppContext);
// set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void**)&ppContext, false);
return pContext;
}
HttpContext *httpGetContext(void *ptr) {
char contextStr[16] = {0};
snprintf(contextStr, sizeof(contextStr), "%p", ptr);
HttpContext **ppContext = taosCacheAcquireByName(tsHttpServer.contextCache, contextStr);
if (ppContext) {
HttpContext *pContext = *ppContext;
if (pContext) {
int32_t refCount = atomic_add_fetch_32(&pContext->refCount, 1);
httpTrace("context:%p, fd:%d, is accquired, refCount:%d", pContext, pContext->fd, refCount);
return pContext;
}
}
return NULL;
}
void httpReleaseContext(HttpContext *pContext) {
int32_t refCount = atomic_sub_fetch_32(&pContext->refCount, 1);
assert(refCount >= 0);
httpTrace("context:%p, fd:%d, is releasd, refCount:%d", pContext, pContext->fd, refCount);
HttpContext **ppContext = pContext->ppContext;
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
}
bool httpInitContext(HttpContext *pContext) {
pContext->accessTimes++;
pContext->lastAccessTime = taosGetTimestampSec();
pContext->httpVersion = HTTP_VERSION_10;
pContext->httpKeepAlive = HTTP_KEEPALIVE_NO_INPUT;
pContext->httpChunked = HTTP_UNCUNKED;
pContext->acceptEncoding = HTTP_COMPRESS_IDENTITY;
pContext->contentEncoding = HTTP_COMPRESS_IDENTITY;
pContext->reqType = HTTP_REQTYPE_OTHERS;
pContext->encodeMethod = NULL;
pContext->timer = NULL;
memset(&pContext->singleCmd, 0, sizeof(HttpSqlCmd));
HttpParser *pParser = &pContext->parser;
memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer;
httpTrace("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
return true;
}
void httpCloseContextByApp(HttpContext *pContext) {
pContext->parsed = false;
bool keepAlive = true;
if (pContext->httpVersion == HTTP_VERSION_10 && pContext->httpKeepAlive != HTTP_KEEPALIVE_ENABLE) {
keepAlive = false;
} else if (pContext->httpVersion != HTTP_VERSION_10 && pContext->httpKeepAlive == HTTP_KEEPALIVE_DISABLE) {
keepAlive = false;
} else {}
if (keepAlive) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:handling, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:dropping, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_READY)) {
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, reuse connect",
pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:ready, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr);
} else {
httpRemoveContextFromEpoll(pContext);
httpError("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:true, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
} else {
httpRemoveContextFromEpoll(pContext);
httpTrace("context:%p, fd:%d, ip:%s, last state:%s:%d, keepAlive:false, close connect",
pContext, pContext->fd, pContext->ipstr, httpContextStateStr(pContext->state), pContext->state);
}
httpReleaseContext(pContext);
}
void httpCloseContextByServer(HttpContext *pContext) {
if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_HANDLING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, still used by app", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_DROPPING, HTTP_CONTEXT_STATE_DROPPING)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll already finished, wait app finished", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, close context", pContext, pContext->fd, pContext->ipstr);
} else if (httpAlterContextState(pContext, HTTP_CONTEXT_STATE_CLOSED, HTTP_CONTEXT_STATE_CLOSED)) {
httpTrace("context:%p, fd:%d, ip:%s, epoll finished, will be closed soon", pContext, pContext->fd, pContext->ipstr);
} else {
httpError("context:%p, fd:%d, ip:%s, unknown state:%d", pContext, pContext->fd, pContext->ipstr, pContext->state);
}
pContext->parsed = false;
httpRemoveContextFromEpoll(pContext);
httpReleaseContext(pContext);
}
......@@ -19,11 +19,12 @@
#include "tglobal.h"
#include "tsocket.h"
#include "ttimer.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpAuth.h"
#include "httpServer.h"
#include "httpContext.h"
#include "httpHandle.h"
void httpToLowerUrl(char* url) {
/*ignore case */
......@@ -58,6 +59,10 @@ bool httpParseURL(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser;
char* pSeek;
char* pEnd = strchr(pParser->pLast, ' ');
if (pEnd == NULL) {
return false;
}
if (*pParser->pLast != '/') {
httpSendErrorResp(pContext, HTTP_UNSUPPORT_URL);
return false;
......@@ -159,7 +164,7 @@ bool httpGetHttpMethod(HttpContext* pContext) {
bool httpGetDecodeMethod(HttpContext* pContext) {
HttpParser* pParser = &pContext->parser;
HttpServer* pServer = pContext->pThread->pServer;
HttpServer* pServer = &tsHttpServer;
int methodLen = pServer->methodScannerLen;
for (int i = 0; i < methodLen; i++) {
HttpDecodeMethod* method = pServer->methodScanner[i];
......
......@@ -22,6 +22,7 @@
#include "httpCode.h"
#include "httpJson.h"
#include "httpResp.h"
#include "httpUtil.h"
#define MAX_NUM_STR_SZ 25
......
......@@ -21,6 +21,7 @@
#include "httpResp.h"
#include "httpCode.h"
#include "httpJson.h"
#include "httpContext.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
......
此差异已折叠。
......@@ -15,45 +15,28 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "hash.h"
#include "taos.h"
#include "ttime.h"
#include "ttimer.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpResp.h"
void httpAccessSession(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex);
if (pContext->session == pContext->session->signature) {
pContext->session->expire = (int) taosGetTimestampSec() + pContext->pThread->pServer->sessionExpire;
}
pthread_mutex_unlock(&server->serverMutex);
}
#include "tglobal.h"
#include "tcache.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSession.h"
void httpCreateSession(HttpContext *pContext, void *taos) {
HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex);
HttpServer *server = &tsHttpServer;
httpReleaseSession(pContext);
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos);
pContext->session->expire = 0;
pContext->session->access--;
}
pthread_mutex_lock(&server->serverMutex);
HttpSession session;
memset(&session, 0, sizeof(HttpSession));
HttpSession session = {0};
session.taos = taos;
session.expire = (int)taosGetTimestampSec() + server->sessionExpire;
session.access = 1;
int sessionIdLen = snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
session.refCount = 1;
snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
taosHashPut(server->pSessionHash, session.id, sessionIdLen, (char *)(&session), sizeof(HttpSession));
pContext->session = taosHashGet(server->pSessionHash, session.id, sessionIdLen);
pContext->session = taosCachePut(server->sessionCache, session.id, &session, sizeof(HttpSession), tsHttpSessionExpire);
// void *temp = pContext->session;
// taosCacheRelease(server->sessionCache, (void **)&temp, false);
if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
......@@ -63,26 +46,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
return;
}
pContext->session->signature = pContext->session;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr,
pContext->user, pContext->session, pContext->session->taos);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p sessionRef:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
pthread_mutex_unlock(&server->serverMutex);
}
void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer;
static void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = &tsHttpServer;
pthread_mutex_lock(&server->serverMutex);
char sessionId[HTTP_SESSION_ID_LEN];
int sessonIdLen = snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosHashGet(server->pSessionHash, sessionId, sessonIdLen);
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
pContext->session->access++;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d",
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session,
pContext->session->taos, pContext->session->access, pContext->session->expire);
pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire;
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = taosCacheAcquireByName(server->sessionCache, sessionId);
if (pContext->session != NULL) {
atomic_add_fetch_32(&pContext->session->refCount, 1);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, sessionRef:%d", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos, pContext->session->refCount);
} else {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, session not found", pContext, pContext->fd, pContext->ipstr,
pContext->user);
......@@ -91,88 +71,53 @@ void httpFetchSessionImp(HttpContext *pContext) {
pthread_mutex_unlock(&server->serverMutex);
}
void httpFetchSession(HttpContext *pContext) {
void httpGetSession(HttpContext *pContext) {
if (pContext->session == NULL) {
httpFetchSessionImp(pContext);
} else {
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
if (strcmp(pContext->session->id, sessionId) != 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user);
httpRestoreSession(pContext);
httpFetchSessionImp(pContext);
}
httpReleaseSession(pContext);
httpFetchSessionImp(pContext);
}
}
void httpRestoreSession(HttpContext *pContext) {
HttpServer * server = pContext->pThread->pServer;
void httpReleaseSession(HttpContext *pContext) {
if (pContext == NULL || pContext->session == NULL) return;
// all access to the session is via serverMutex
pthread_mutex_lock(&server->serverMutex);
HttpSession *session = pContext->session;
if (session == NULL || session != session->signature) {
pthread_mutex_unlock(&server->serverMutex);
return;
}
session->access--;
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d",
pContext, pContext->ipstr, pContext->user, session, session->taos,
session->access, pContext->session->expire);
pContext->session = NULL;
pthread_mutex_unlock(&server->serverMutex);
}
int32_t refCount = atomic_sub_fetch_32(&pContext->session->refCount, 1);
assert(refCount >= 0);
httpTrace("context:%p, release session:%p:%p, sessionRef:%d", pContext, pContext->session, pContext->session->taos,
pContext->session->refCount);
void httpResetSession(HttpSession *pSession) {
httpTrace("close session:%p:%p", pSession, pSession->taos);
if (pSession->taos != NULL) {
taos_close(pSession->taos);
pSession->taos = NULL;
}
pSession->signature = NULL;
taosCacheRelease(tsHttpServer.sessionCache, (void **)&pContext->session, false);
pContext->session = NULL;
}
void httpRemoveAllSessions(HttpServer *pServer) {
SHashMutableIterator *pIter = taosHashCreateIter(pServer->pSessionHash);
static void httpDestroySession(void *data) {
HttpSession *session = data;
httpTrace("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
while (taosHashIterNext(pIter)) {
HttpSession *pSession = taosHashIterGet(pIter);
if (pSession == NULL) continue;
httpResetSession(pSession);
if (session->taos != NULL) {
taos_close(session->taos);
session->taos = NULL;
}
taosHashDestroyIter(pIter);
}
bool httpInitAllSessions(HttpServer *pServer) {
if (pServer->pSessionHash == NULL) {
pServer->pSessionHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
void httpCleanUpSessions() {
if (tsHttpServer.sessionCache != NULL) {
SCacheObj *cache = tsHttpServer.sessionCache;
httpPrint("session cache is cleanuping, size:%d", taosHashGetSize(cache->pHashTable));
taosCacheCleanup(tsHttpServer.sessionCache);
tsHttpServer.sessionCache = NULL;
}
if (pServer->pSessionHash == NULL) {
httpError("http init session pool failed");
return false;
}
if (pServer->expireTimer == NULL) {
taosTmrReset(httpProcessSessionExpire, 50000, pServer, pServer->timerHandle, &pServer->expireTimer);
}
return true;
}
bool httpSessionExpired(HttpSession *pSession) {
time_t cur = taosGetTimestampSec();
if (pSession->taos != NULL) {
if (pSession->expire > cur) {
return false; // un-expired, so return false
}
if (pSession->access > 0) {
httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos,
pSession->access);
return false; // still used, so return false
}
httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d",
pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire);
bool httpInitSessions() {
tsHttpServer.sessionCache = taosCacheInitWithCb(5, httpDestroySession);
if (tsHttpServer.sessionCache == NULL) {
httpError("failed to init session cache");
return false;
}
return true;
......
......@@ -18,11 +18,12 @@
#include "tnote.h"
#include "taos.h"
#include "tsclient.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSql.h"
#include "httpResp.h"
#include "httpAuth.h"
#include "httpSession.h"
void *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
......@@ -30,7 +31,7 @@ void httpProcessMultiSql(HttpContext *pContext);
void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -72,7 +73,7 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -172,7 +173,7 @@ void httpProcessMultiSql(HttpContext *pContext) {
}
void httpProcessMultiSqlCmd(HttpContext *pContext) {
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpSqlCmds *multiCmds = pContext->multiCmds;
if (multiCmds == NULL || multiCmds->size <= 0 || multiCmds->pos >= multiCmds->size || multiCmds->pos < 0) {
......@@ -192,7 +193,7 @@ void httpProcessMultiSqlCmd(HttpContext *pContext) {
void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int numOfRows) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -230,7 +231,7 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
HttpEncodeMethod *encode = pContext->encodeMethod;
......@@ -354,7 +355,7 @@ void httpExecCmd(HttpContext *pContext) {
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = param;
if (pContext == NULL || pContext->signature != pContext) return;
if (pContext == NULL) return;
if (code < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, login error, code:%s", pContext, pContext->fd, pContext->ipstr,
......@@ -383,16 +384,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
}
void httpProcessRequest(HttpContext *pContext) {
httpFetchSession(pContext);
httpGetSession(pContext);
if (pContext->session == NULL || pContext->session != pContext->session->signature ||
pContext->reqType == HTTP_REQTYPE_LOGIN) {
if (pContext->session == NULL || pContext->reqType == HTTP_REQTYPE_LOGIN) {
taos_connect_a(NULL, pContext->user, pContext->pass, "", 0, httpProcessRequestCb, (void *)pContext,
&(pContext->taos));
httpTrace("context:%p, fd:%d, ip:%s, user:%s, try connect tdengine, taos:%p", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->taos);
} else {
httpAccessSession(pContext);
httpExecCmd(pContext);
}
}
......@@ -20,84 +20,64 @@
#include "tsocket.h"
#include "ttimer.h"
#include "tadmin.h"
#include "http.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpContext.h"
#include "httpSession.h"
#include "httpServer.h"
#include "httpResp.h"
#include "httpLog.h"
#include "gcHandle.h"
#include "httpHandle.h"
#include "gcHandle.h"
#include "restHandle.h"
#include "tgHandle.h"
#ifndef _ADMIN
void adminInitHandle(HttpServer* pServer) {}
void opInitHandle(HttpServer* pServer) {}
#endif
static HttpServer *httpServer = NULL;
HttpServer tsHttpServer;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
int httpInitSystem() {
// taos_init();
httpServer = (HttpServer *)malloc(sizeof(HttpServer));
memset(httpServer, 0, sizeof(HttpServer));
strcpy(httpServer->label, "rest");
httpServer->serverIp = 0;
httpServer->serverPort = tsHttpPort;
httpServer->cacheContext = tsHttpCacheSessions;
httpServer->sessionExpire = tsHttpSessionExpire;
httpServer->numOfThreads = tsHttpMaxThreads;
httpServer->processData = httpProcessData;
strcpy(tsHttpServer.label, "rest");
tsHttpServer.serverIp = 0;
tsHttpServer.serverPort = tsHttpPort;
tsHttpServer.numOfThreads = tsHttpMaxThreads;
tsHttpServer.processData = httpProcessData;
pthread_mutex_init(&httpServer->serverMutex, NULL);
pthread_mutex_init(&tsHttpServer.serverMutex, NULL);
if (tsHttpEnableRecordSql != 0) {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"http_note");
}
restInitHandle(httpServer);
adminInitHandle(httpServer);
gcInitHandle(httpServer);
tgInitHandle(httpServer);
opInitHandle(httpServer);
restInitHandle(&tsHttpServer);
adminInitHandle(&tsHttpServer);
gcInitHandle(&tsHttpServer);
tgInitHandle(&tsHttpServer);
opInitHandle(&tsHttpServer);
return 0;
}
int httpStartSystem() {
httpPrint("starting to initialize http service ...");
httpPrint("start http server ...");
if (httpServer == NULL) {
httpError("http server is null");
httpInitSystem();
}
if (httpServer->pContextPool == NULL) {
httpServer->pContextPool = taosMemPoolInit(httpServer->cacheContext, sizeof(HttpContext));
}
if (httpServer->pContextPool == NULL) {
httpError("http init context pool failed");
if (tsHttpServer.status != HTTP_SERVER_INIT) {
httpError("http server is already started");
return -1;
}
if (httpServer->timerHandle == NULL) {
httpServer->timerHandle = taosTmrInit(tsHttpCacheSessions * 100 + 100, 200, 60000, "http");
}
if (httpServer->timerHandle == NULL) {
httpError("http init timer failed");
if (!httpInitContexts()) {
httpError("http init contexts failed");
return -1;
}
if (!httpInitAllSessions(httpServer)) {
if (!httpInitSessions()) {
httpError("http init session failed");
return -1;
}
if (!httpInitConnect(httpServer)) {
if (!httpInitConnect()) {
httpError("http init server failed");
return -1;
}
......@@ -106,53 +86,23 @@ int httpStartSystem() {
}
void httpStopSystem() {
if (httpServer != NULL) {
httpServer->online = false;
}
tsHttpServer.status = HTTP_SERVER_CLOSING;
shutdown(tsHttpServer.fd, SHUT_RD);
tgCleanupHandle();
}
void httpCleanUpSystem() {
httpPrint("http service cleanup");
httpPrint("http server cleanup");
httpStopSystem();
//#if 0
if (httpServer == NULL) {
return;
}
if (httpServer->expireTimer != NULL) {
taosTmrStopA(&(httpServer->expireTimer));
}
if (httpServer->timerHandle != NULL) {
taosTmrCleanUp(httpServer->timerHandle);
httpServer->timerHandle = NULL;
}
if (httpServer->pThreads != NULL) {
httpCleanUpConnect(httpServer);
httpServer->pThreads = NULL;
}
#if 0
httpRemoveAllSessions(httpServer);
httpCleanupContexts();
httpCleanUpSessions();
httpCleanUpConnect();
pthread_mutex_destroy(&tsHttpServer.serverMutex);
if (httpServer->pContextPool != NULL) {
taosMemPoolCleanUp(httpServer->pContextPool);
httpServer->pContextPool = NULL;
}
pthread_mutex_destroy(&httpServer->serverMutex);
tfree(httpServer);
#endif
tsHttpServer.status = HTTP_SERVER_CLOSED;
}
int32_t httpGetReqCount() {
if (httpServer != NULL) {
return atomic_exchange_32(&httpServer->requestNum, 0);
}
return 0;
return atomic_exchange_32(&tsHttpServer.requestNum, 0);
}
......@@ -17,11 +17,10 @@
#include "os.h"
#include "tmd5.h"
#include "taos.h"
#include "http.h"
#include "httpLog.h"
#include "httpCode.h"
#include "httpHandle.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpSql.h"
#include "httpUtil.h"
bool httpCheckUsedbSql(char *sql) {
if (strstr(sql, "use ") != NULL) {
......
......@@ -18,9 +18,10 @@
#include "tglobal.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "httpInt.h"
#include "tgHandle.h"
#include "tgJson.h"
#include "httpLog.h"
#include "cJSON.h"
/*
* taos.telegraf.cfg formats like
......
......@@ -800,12 +800,13 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t newlen = 0;
if (!includeLengthPrefix) {
if (pVariant->nType == TSDB_DATA_TYPE_NULL) {
*(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
} else {
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
toNchar(pVariant, &payload, &pVariant->nLen);
toNchar(pVariant, &payload, &newlen);
} else {
wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen);
}
......@@ -817,12 +818,13 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool inclu
char *p = varDataVal(payload);
if (pVariant->nType != TSDB_DATA_TYPE_NCHAR) {
toNchar(pVariant, &p, &pVariant->nLen);
toNchar(pVariant, &p, &newlen);
} else {
wcsncpy((wchar_t *)p, pVariant->wpz, pVariant->nLen);
newlen = pVariant->nLen;
}
varDataSetLen(payload, pVariant->nLen); // the length may be changed after toNchar function called
varDataSetLen(payload, newlen); // the length may be changed after toNchar function called
assert(p == varDataVal(payload));
}
}
......
......@@ -924,7 +924,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
// underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType;
if (pRecv->ip == 0) {
if (pRecv->msg == NULL) {
rpcProcessBrokenLink(pConn);
return NULL;
}
......
......@@ -56,7 +56,7 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts);
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec);
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
void deltaToUtcInitOnce();
#ifdef __cplusplus
......
......@@ -63,10 +63,12 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
#endif
}
#if 0
static FORCE_INLINE void taosFreeNode(void *data) {
SCacheDataNode *pNode = *(SCacheDataNode **)data;
free(pNode);
}
#endif
/**
* @param key key of object for hash, usually a null-terminated string
......@@ -241,7 +243,7 @@ SCacheObj *taosCacheInitWithCb(int64_t refreshTime, void (*freeCb)(void *data))
}
// set free cache node callback function for hash table
taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
// taosHashSetFreecb(pCacheObj->pHashTable, taosFreeNode);
pCacheObj->freeFp = freeCb;
pCacheObj->refreshTime = refreshTime * 1000;
......@@ -565,7 +567,17 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
void doCleanupDataCache(SCacheObj *pCacheObj) {
__cache_wr_lock(pCacheObj);
taosHashCleanup(pCacheObj->pHashTable);
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
while (taosHashIterNext(pIter)) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
// if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pCacheObj, pNode);
//}
}
taosHashDestroyIter(pIter);
taosHashCleanup(pCacheObj->pHashTable);
__cache_unlock(pCacheObj);
taosTrashCanEmpty(pCacheObj, true);
......
......@@ -24,7 +24,6 @@
#include "taosdef.h"
#include "ttime.h"
#include "tutil.h"
/*
* mktime64 - Converts date to seconds.
* Converts Gregorian date to seconds since 1970-01-01 00:00:00.
......@@ -119,15 +118,21 @@ static int month[12] = {
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec);
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
parseLocaltime,
parseLocaltimeWithDst
};
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) {
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t daylight) {
/* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) {
return parseTimeWithTz(timestr, time, timePrec);
} else {
return parseLocaltime(timestr, time, timePrec);
return (*parseLocaltimeFp[daylight])(timestr, time, timePrec);
}
}
......@@ -304,9 +309,6 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return -1;
}
/* mktime will be affected by TZ, set by using taos_options */
//int64_t seconds = mktime(&tm);
//int64_t seconds = (int64_t)user_mktime(&tm);
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
int64_t fraction = 0;
......@@ -324,6 +326,32 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return 0;
}
int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
*time = 0;
struct tm tm = {0};
tm.tm_isdst = -1;
char* str = strptime(timestr, "%Y-%m-%d %H:%M:%S", &tm);
if (str == NULL) {
return -1;
}
/* mktime will be affected by TZ, set by using taos_options */
int64_t seconds = mktime(&tm);
int64_t fraction = 0;
if (*str == '.') {
/* parse the second fraction part */
if ((fraction = parseFraction(str + 1, &str, timePrec)) < 0) {
return -1;
}
}
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : 1000000;
*time = factor * seconds + fraction;
return 0;
}
static int32_t getTimestampInUsFromStrImpl(int64_t val, char unit, int64_t* result) {
*result = val;
......
......@@ -11,6 +11,8 @@
4. pip install ../src/connector/python/linux/python2 ; pip3 install
../src/connector/python/linux/python3
5. pip install numpy; pip3 install numpy
> Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software
> Foundation since January 1, 2020, it is recommended that subsequent test case
......
......@@ -296,13 +296,13 @@ class TDTestCase:
# TSIM: return -1
# TSIM: endi
# TSIM: if $data23 != 1 then
tdLog.info('tdSql.checkData(2, 3, 1)')
tdSql.checkData(2, 3, 1)
tdLog.info('tdSql.checkData(2, 3, TAG)')
tdSql.checkData(2, 3, "TAG")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data33 != 2.000000 then
tdLog.info('tdSql.checkData(3, 3, 2.000000)')
tdSql.checkData(3, 3, 2.000000)
tdSql.checkData(3, 3, "TAG")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -396,7 +396,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 2 then
tdLog.info('tdSql.checkData(0, 3, 2)')
tdSql.checkData(0, 3, 2)
tdSql.checkData(0, 3, "2")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -553,12 +553,12 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 5 then
tdLog.info('tdSql.checkData(0, 3, 5)')
tdSql.checkData(0, 3, 5)
tdSql.checkData(0, 3, "5")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 6 then
tdLog.info('tdSql.checkData(0, 4, 6)')
tdSql.checkData(0, 4, 6)
tdSql.checkData(0, 4, "6")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -584,12 +584,12 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 5 then
tdLog.info('tdSql.checkData(0, 3, 5)')
tdSql.checkData(0, 3, 5)
tdSql.checkData(0, 3, "5")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 6 then
tdLog.info('tdSql.checkData(0, 4, 6)')
tdSql.checkData(0, 4, 6)
tdSql.checkData(0, 4, "6")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -654,7 +654,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3)
tdSql.checkData(0, 4, "3")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -779,7 +779,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3)
tdSql.checkData(0, 4, "3")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -838,7 +838,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 5 then
tdLog.info('tdSql.checkData(0, 3, 5)')
tdSql.checkData(0, 3, 5)
tdSql.checkData(0, 3, "5")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 1 then
......@@ -900,12 +900,12 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 2 then
tdLog.info('tdSql.checkData(0, 3, 2)')
tdSql.checkData(0, 3, 2)
tdSql.checkData(0, 3, "2")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3)
tdSql.checkData(0, 4, "3")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -1024,28 +1024,28 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data02 != 1 then
tdLog.info('tdSql.checkData(0, 2, 1)')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 2, "1")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data03 != 2 then
tdLog.info('tdSql.checkData(0, 3, 2)')
tdSql.checkData(0, 3, 2)
tdSql.checkData(0, 3, "2")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 3 then
tdLog.info('tdSql.checkData(0, 4, 3)')
tdSql.checkData(0, 4, 3)
tdSql.checkData(0, 4, "3")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data05 != 4 then
tdLog.info('tdSql.checkData(0, 5, 4)')
tdSql.checkData(0, 5, 4)
tdSql.checkData(0, 5, "4")
# TSIM: return -1
# TSIM: endi
# TSIM:
# TSIM: sql alter table $mt change tag tgcol1 tgcol4 -x step103
tdLog.info('alter table %s change tag tgcol1 tgcol4 -x step103' % (mt))
tdSql.error('alter table %s change tag tgcol1 tgcol403' % (mt))
tdSql.error('alter table %s change tag tgcol1 tgcol4' % (mt))
# TSIM: return -1
# TSIM: step103:
# TSIM:
......@@ -1098,12 +1098,12 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data02 != 1 then
tdLog.info('tdSql.checkData(0, 2, 1)')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 2, "1")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data03 != 4 then
tdLog.info('tdSql.checkData(0, 3, 4)')
tdSql.checkData(0, 3, 4)
tdSql.checkData(0, 3, "4")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 0 then
......@@ -1112,8 +1112,8 @@ class TDTestCase:
# TSIM: return -1
# TSIM: endi
# TSIM: if $data05 != NULL then
tdLog.info('tdSql.checkData(0, 5, NULL)')
tdSql.checkData(0, 5, None)
#tdLog.info('tdSql.checkData(0, 5, NULL)')
# tdSql.checkData(0, 5, None)
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -1189,13 +1189,13 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data06 != 5 then
tdLog.info('tdSql.checkData(0, 6, 5)')
tdSql.checkData(0, 6, 5)
tdSql.checkData(0, 6, "5")
# TSIM: return -1
# TSIM: endi
# TSIM:
# TSIM: sql alter table $mt change tag tgcol1 tgcol4 -x step114
tdLog.info('alter table %s change tag tgcol1 tgcol4 -x step114' % (mt))
tdSql.error('alter table %s change tag tgcol1 tgcol414' % (mt))
tdSql.error('alter table %s change tag tgcol1 tgcol4' % (mt))
# TSIM: return -1
# TSIM: step114:
# TSIM:
......@@ -1274,7 +1274,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 4 then
tdLog.info('tdSql.checkData(0, 3, 4)')
tdSql.checkData(0, 3, 4)
tdSql.checkData(0, 3, "4")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 5 then
......@@ -1284,7 +1284,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data05 != 6 then
tdLog.info('tdSql.checkData(0, 5, 6)')
tdSql.checkData(0, 5, 6)
tdSql.checkData(0, 5, "6")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data06 != 7 then
......@@ -1376,12 +1376,12 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data06 != 5 then
tdLog.info('tdSql.checkData(0, 6, 5)')
tdSql.checkData(0, 6, 5)
tdSql.checkData(0, 6, "5")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data07 != 6 then
tdLog.info('tdSql.checkData(0, 7, 6)')
tdSql.checkData(0, 7, 6)
tdSql.checkData(0, 7, "6")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -1460,12 +1460,12 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data03 != 1 then
tdLog.info('tdSql.checkData(0, 3, 1)')
tdSql.checkData(0, 3, 1)
tdSql.checkData(0, 3, "1")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data04 != 5 then
tdLog.info('tdSql.checkData(0, 4, 5)')
tdSql.checkData(0, 4, 5)
tdSql.checkData(0, 4, "5")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data05 != 4 then
......@@ -1475,7 +1475,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data06 != 3 then
tdLog.info('tdSql.checkData(0, 6, 3)')
tdSql.checkData(0, 6, 3)
tdSql.checkData(0, 6, "3")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data07 != 2 then
......@@ -1562,7 +1562,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data02 != 1 then
tdLog.info('tdSql.checkData(0, 2, 1)')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 2, "1")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data03 != 2 then
......@@ -1577,7 +1577,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data05 != 4 then
tdLog.info('tdSql.checkData(0, 5, 4)')
tdSql.checkData(0, 5, 4)
tdSql.checkData(0, 5, "4")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data06 != 5.000000000 then
......@@ -1587,7 +1587,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data07 != 6 then
tdLog.info('tdSql.checkData(0, 7, 6)')
tdSql.checkData(0, 7, 6)
tdSql.checkData(0, 7, "6")
# TSIM: return -1
# TSIM: endi
# TSIM:
......@@ -1655,7 +1655,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data02 != 7 then
tdLog.info('tdSql.checkData(0, 2, 7)')
tdSql.checkData(0, 2, 7)
tdSql.checkData(0, 2, "7")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data03 != 9 then
......@@ -1670,7 +1670,7 @@ class TDTestCase:
# TSIM: endi
# TSIM: if $data05 != 8 then
tdLog.info('tdSql.checkData(0, 5, 8)')
tdSql.checkData(0, 5, 8)
tdSql.checkData(0, 5, "8")
# TSIM: return -1
# TSIM: endi
# TSIM: if $data06 != 10 then
......@@ -1744,8 +1744,8 @@ class TDTestCase:
# TSIM: print =============== clear
tdLog.info('=============== clear')
# TSIM: sql drop database $db
tdLog.info('sql drop database $db')
tdSql.execute('sql drop database $db')
tdLog.info('sql drop database db')
tdSql.execute('drop database db')
# TSIM: sql show databases
tdLog.info('show databases')
tdSql.query('show databases')
......
......@@ -49,7 +49,7 @@ if $rows != 0 then
endi
print =============== step4
sql create database a012345678901201234567890120123456789012 -x step4
sql create database a012345678901201234567890120123456789012a012345678901201234567890120123456789012 -x step4
return -1
step4:
sql show databases
......
......@@ -38,7 +38,7 @@ endi
if $data02 != 1 then
return -1
endi
if $data03 != NULL then
if $data03 != null then
return -1
endi
sql alter table tb add column c3 nchar(4)
......@@ -72,7 +72,7 @@ endi
if $data02 != 1 then
return -1
endi
if $data03 != NULL then
if $data03 != null then
return -1
endi
......@@ -169,7 +169,7 @@ endi
if $data01 != 2 then
return -1
endi
if $data02 != NULL then
if $data02 != null then
return -1
endi
sql alter table mt add column c2 int
......@@ -239,4 +239,4 @@ if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -37,7 +37,7 @@ sleep 4000
sql select * from mt
sql select * from strm
sql drop table tb1
sleep 10000
sleep 100000
sql select * from strm
if $rows != 2 then
if $rows != 1 then
......@@ -221,4 +221,4 @@ sql use $db
sql create table stb (ts timestamp, c1 int) tags(t1 int)
sql create table tb1 using stb tags(1)
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册