Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
18f31711
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
18f31711
编写于
7月 31, 2019
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix the issue #240
上级
f0364a00
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
184 addition
and
140 deletion
+184
-140
src/kit/shell/src/shellEngine.c
src/kit/shell/src/shellEngine.c
+1
-1
src/modules/http/inc/httpHandle.h
src/modules/http/inc/httpHandle.h
+49
-40
src/modules/http/src/gcHandle.c
src/modules/http/src/gcHandle.c
+3
-3
src/modules/http/src/httpHandle.c
src/modules/http/src/httpHandle.c
+54
-55
src/modules/http/src/httpServer.c
src/modules/http/src/httpServer.c
+62
-24
src/modules/http/src/httpSession.c
src/modules/http/src/httpSession.c
+2
-2
src/modules/http/src/httpSql.c
src/modules/http/src/httpSql.c
+2
-3
src/modules/http/src/restHandle.c
src/modules/http/src/restHandle.c
+3
-3
src/modules/http/src/tgHandle.c
src/modules/http/src/tgHandle.c
+4
-4
src/system/src/vnodeImport.c
src/system/src/vnodeImport.c
+2
-2
src/util/src/tglobalcfg.c
src/util/src/tglobalcfg.c
+1
-1
src/util/src/tmempool.c
src/util/src/tmempool.c
+1
-2
未找到文件。
src/kit/shell/src/shellEngine.c
浏览文件 @
18f31711
...
...
@@ -487,7 +487,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no) {
case
TSDB_DATA_TYPE_NCHAR
:
memset
(
t_str
,
0
,
TSDB_MAX_BYTES_PER_ROW
);
memcpy
(
t_str
,
row
[
i
],
fields
[
i
].
bytes
);
fprintf
(
fp
,
"
%s
"
,
t_str
);
fprintf
(
fp
,
"
\'
%s
\'
"
,
t_str
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
#ifdef WINDOWS
...
...
src/modules/http/inc/httpHandle.h
浏览文件 @
18f31711
...
...
@@ -31,7 +31,7 @@
#define HTTP_LABEL_SIZE 8
#define HTTP_MAX_EVENTS 10
#define HTTP_BUFFER_SIZE 1024*
100//10
0k
#define HTTP_BUFFER_SIZE 1024*
70 //7
0k
#define HTTP_STEP_SIZE 1024 //http message get process step by step
#define HTTP_MAX_URL 5 //http url stack size
#define HTTP_METHOD_SCANNER_SIZE 7 //http method fp size
...
...
@@ -60,6 +60,13 @@
#define HTTP_PROCESS_ERROR 0
#define HTTP_PROCESS_SUCCESS 1
#define HTTP_PARSE_BODY_ERROR -1
#define HTTP_PARSE_BODY_CONTINUE 0
#define HTTP_PARSE_BODY_SUCCESS 1
#define HTTP_RETRY_TIMES 10
#define HTTP_EXPIRED_TIME 60000
struct
HttpContext
;
struct
HttpThread
;
...
...
@@ -84,7 +91,7 @@ typedef enum { HTTP_CMD_RETURN_TYPE_WITH_RETURN, HTTP_CMD_RETURN_TYPE_NO_RETURN
typedef
struct
{
// used by single cmd
char
*
nativSql
;
char
*
nativSql
;
int32_t
numOfRows
;
int32_t
code
;
...
...
@@ -132,51 +139,53 @@ typedef struct {
}
HttpEncodeMethod
;
typedef
struct
{
char
*
pos
;
char
*
pos
;
int32_t
len
;
}
HttpBuf
;
typedef
struct
{
char
buffer
[
HTTP_MAX_BUFFER_SIZE
];
int
bufsize
;
char
*
pLast
;
char
*
pCur
;
HttpBuf
method
;
HttpBuf
path
[
HTTP_MAX_URL
];
// url: dbname/meter/query
HttpBuf
data
;
// body content
HttpBuf
token
;
// auth token
HttpDecodeMethod
*
pMethod
;
}
HttpParser
;
typedef
struct
HttpContext
{
void
*
signature
;
int
fd
;
uint32_t
accessTimes
;
uint32_t
lastAccessTime
;
uint8_t
httpVersion
:
1
;
uint8_t
httpChunked
:
1
;
uint8_t
httpKeepAlive
:
2
;
// http1.0 and not keep-alive, close connection immediately
uint8_t
fromMemPool
:
1
;
uint8_t
compress
:
1
;
uint8_t
usedByEpoll
:
1
;
uint8_t
usedByApp
:
1
;
uint8_t
httpVersion
;
uint8_t
httpChunked
;
uint8_t
httpKeepAlive
;
// http1.0 and not keep-alive, close connection immediately
uint8_t
fromMemPool
;
uint8_t
compress
;
uint8_t
usedByEpoll
;
uint8_t
usedByApp
;
uint8_t
reqType
;
uint8_t
parsed
;
char
ipstr
[
22
];
char
user
[
TSDB_USER_LEN
];
// parsed from auth token or login message
char
pass
[
TSDB_PASSWORD_LEN
];
void
*
taos
;
void
*
taos
;
HttpSession
*
session
;
HttpEncodeMethod
*
encodeMethod
;
HttpEncodeMethod
*
encodeMethod
;
HttpSqlCmd
singleCmd
;
HttpSqlCmds
*
multiCmds
;
JsonBuf
*
jsonBuf
;
HttpSqlCmds
*
multiCmds
;
JsonBuf
*
jsonBuf
;
pthread_mutex_t
mutex
;
struct
HttpThread
*
pThread
;
struct
HttpContext
*
prev
,
*
next
;
HttpParser
parser
;
void
*
readTimer
;
struct
HttpThread
*
pThread
;
struct
HttpContext
*
prev
;
struct
HttpContext
*
next
;
}
HttpContext
;
typedef
struct
{
char
*
buffer
;
int
bufsize
;
char
*
pLast
;
char
*
pCur
;
HttpBuf
method
;
HttpBuf
path
[
HTTP_MAX_URL
];
// url: dbname/meter/query
HttpBuf
data
;
// body content
HttpBuf
token
;
// auth token
HttpDecodeMethod
*
pMethod
;
}
HttpParser
;
#define HTTP_MAX_FDS_LEN 65536
typedef
struct
HttpThread
{
pthread_t
thread
;
HttpContext
*
pHead
;
...
...
@@ -186,8 +195,6 @@ typedef struct HttpThread {
int
numOfFds
;
int
threadId
;
char
label
[
HTTP_LABEL_SIZE
];
char
buffer
[
HTTP_BUFFER_SIZE
];
// buffer to receive data
HttpParser
parser
;
// parse from buffer
bool
(
*
processData
)(
HttpContext
*
pContext
);
struct
_http_server_obj_
*
pServer
;
// handle passed by upper layer during pServer initialization
}
HttpThread
;
...
...
@@ -202,15 +209,15 @@ typedef struct _http_server_obj_ {
HttpDecodeMethod
*
methodScanner
[
HTTP_METHOD_SCANNER_SIZE
];
int
methodScannerLen
;
pthread_mutex_t
serverMutex
;
void
*
pSessionHash
;
void
*
pContextPool
;
void
*
expireTimer
;
HttpThread
*
pThreads
;
void
*
pSessionHash
;
void
*
pContextPool
;
void
*
expireTimer
;
HttpThread
*
pThreads
;
pthread_t
thread
;
bool
(
*
processData
)(
HttpContext
*
pContext
);
int
requestNum
;
void
*
timerHandle
;
bool
online
;
bool
(
*
processData
)(
HttpContext
*
pContext
);
int
requestNum
;
void
*
timerHandle
;
bool
online
;
}
HttpServer
;
// http util method
...
...
@@ -253,6 +260,8 @@ bool httpGenTaosdAuthToken(HttpContext *pContext, char *token, int maxLen);
bool
httpUrlMatch
(
HttpContext
*
pContext
,
int
pos
,
char
*
cmp
);
bool
httpProcessData
(
HttpContext
*
pContext
);
bool
httpReadDataImp
(
HttpContext
*
pContext
);
bool
httpParseRequest
(
HttpContext
*
pContext
);
int
httpCheckReadCompleted
(
HttpContext
*
pContext
);
// http request handler
void
httpProcessRequest
(
HttpContext
*
pContext
);
...
...
src/modules/http/src/gcHandle.c
浏览文件 @
18f31711
...
...
@@ -26,7 +26,7 @@ static HttpEncodeMethod gcQueryMethod = {
void
gcInitHandle
(
HttpServer
*
pServer
)
{
httpAddMethod
(
pServer
,
&
gcDecodeMethod
);
}
bool
gcGetUserFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
GC_USER_URL_POS
].
len
>
TSDB_USER_LEN
-
1
||
pParser
->
path
[
GC_USER_URL_POS
].
len
<=
0
)
{
return
false
;
}
...
...
@@ -36,7 +36,7 @@ bool gcGetUserFromUrl(HttpContext* pContext) {
}
bool
gcGetPassFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
GC_PASS_URL_POS
].
len
>
TSDB_PASSWORD_LEN
-
1
||
pParser
->
path
[
GC_PASS_URL_POS
].
len
<=
0
)
{
return
false
;
}
...
...
@@ -126,7 +126,7 @@ bool gcProcessLoginRequest(HttpContext* pContext) {
bool
gcProcessQueryRequest
(
HttpContext
*
pContext
)
{
httpTrace
(
"context:%p, fd:%d, ip:%s, process grafana query msg"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
char
*
filter
=
pParser
->
data
.
pos
;
if
(
filter
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_NO_MSG_INPUT
);
...
...
src/modules/http/src/httpHandle.c
浏览文件 @
18f31711
...
...
@@ -39,7 +39,7 @@ void httpToLowerUrl(char* url) {
}
bool
httpUrlMatch
(
HttpContext
*
pContext
,
int
pos
,
char
*
cmp
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pos
<
0
||
pos
>=
HTTP_MAX_URL
)
{
return
false
;
...
...
@@ -58,11 +58,11 @@ bool httpUrlMatch(HttpContext* pContext, int pos, char* cmp) {
// /account/db/meter HTTP/1.1\r\nHost
bool
httpParseURL
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
pThread
->
parser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
char
*
pSeek
;
char
*
pEnd
=
strchr
(
pParser
->
pLast
,
' '
);
if
(
*
pParser
->
pLast
!=
'/'
)
{
httpSendErrorResp
(
pContext
,
HTTP_UNSUPPORT_URL
);
return
false
;
}
pParser
->
pLast
++
;
...
...
@@ -88,12 +88,6 @@ bool httpParseURL(HttpContext* pContext) {
}
pParser
->
pLast
=
pEnd
+
1
;
// for (int i = 0; i < HTTP_MAX_URL; i++) {
// if (pParser->path[i].len > 0) {
// httpTrace("url_pos: %d, path: [%s]", i, pParser->path[i].pos);
// }
//}
if
(
pParser
->
path
[
0
].
len
==
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_UNSUPPORT_URL
);
return
false
;
...
...
@@ -103,9 +97,14 @@ bool httpParseURL(HttpContext* pContext) {
}
bool
httpParseHttpVersion
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
pThread
->
parser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
char
*
pEnd
=
strchr
(
pParser
->
pLast
,
'1'
);
if
(
pEnd
==
NULL
)
{
httpError
(
"context:%p, fd:%d, ip:%s, can't find http version at position:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pParser
->
pLast
);
httpSendErrorResp
(
pContext
,
HTTP_PARSE_HTTP_VERSION_ERROR
);
return
false
;
}
if
(
*
(
pEnd
+
1
)
!=
'.'
)
{
httpError
(
"context:%p, fd:%d, ip:%s, can't find http version at position:%s"
,
pContext
,
pContext
->
fd
,
...
...
@@ -129,7 +128,7 @@ bool httpParseHttpVersion(HttpContext* pContext) {
}
bool
httpGetNextLine
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
while
(
pParser
->
buffer
+
pParser
->
bufsize
-
pParser
->
pCur
++
>
0
)
{
if
(
*
(
pParser
->
pCur
)
==
'\n'
&&
*
(
pParser
->
pCur
-
1
)
==
'\r'
)
{
// cut the string
...
...
@@ -144,7 +143,8 @@ bool httpGetNextLine(HttpContext* pContext) {
}
bool
httpGetHttpMethod
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
pThread
->
parser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
char
*
pSeek
=
strchr
(
pParser
->
pLast
,
' '
);
if
(
pSeek
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_PARSE_HTTP_METHOD_ERROR
);
...
...
@@ -160,7 +160,8 @@ bool httpGetHttpMethod(HttpContext* pContext) {
}
bool
httpGetDecodeMethod
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
pThread
->
parser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
HttpServer
*
pServer
=
pContext
->
pThread
->
pServer
;
int
methodLen
=
pServer
->
methodScannerLen
;
for
(
int
i
=
0
;
i
<
methodLen
;
i
++
)
{
...
...
@@ -180,7 +181,7 @@ bool httpGetDecodeMethod(HttpContext* pContext) {
}
bool
httpParseHead
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
strncasecmp
(
pParser
->
pLast
,
"Content-Length: "
,
16
)
==
0
)
{
pParser
->
data
.
len
=
(
int32_t
)
atoi
(
pParser
->
pLast
+
16
);
httpTrace
(
"context:%p, fd:%d, ip:%s, Content-Length:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
...
...
@@ -258,68 +259,62 @@ bool httpParseChunkedBody(HttpContext* pContext, HttpParser* pParser, bool test)
}
bool
httpReadChunkedBody
(
HttpContext
*
pContext
,
HttpParser
*
pParser
)
{
for
(
int
tryTimes
=
0
;
tryTimes
<
100
;
++
tryTimes
)
{
for
(
int
tryTimes
=
0
;
tryTimes
<
HTTP_RETRY_TIMES
;
++
tryTimes
)
{
bool
parsedOk
=
httpParseChunkedBody
(
pContext
,
pParser
,
true
);
if
(
parsedOk
)
{
// httpTrace("context:%p, fd:%d, ip:%s, chunked body read finished",
// pContext, pContext->fd, pContext->ipstr);
httpParseChunkedBody
(
pContext
,
pParser
,
false
);
return
true
;
return
HTTP_PARSE_BODY_SUCCESS
;
}
else
{
httpTrace
(
"context:%p, fd:%d, ip:%s, chunked body not finished, continue read"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
if
(
!
httpReadDataImp
(
pContext
))
{
httpError
(
"context:%p, fd:%d, ip:%s, read chunked request error"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
return
false
;
return
HTTP_PARSE_BODY_ERROR
;
}
else
{
taosMsleep
(
1
);
}
}
}
httpError
(
"context:%p, fd:%d, ip:%s, chunked body parsed error"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpSendErrorResp
(
pContext
,
HTTP_PARSE_CHUNKED_BODY_ERROR
);
return
false
;
httpTrace
(
"context:%p, fd:%d, ip:%s, chunked body not finished, wait epoll"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
return
HTTP_PARSE_BODY_CONTINUE
;
}
bool
httpReadUnChunkedBody
(
HttpContext
*
pContext
,
HttpParser
*
pParser
)
{
for
(
int
tryTimes
=
0
;
tryTimes
<
100
;
++
tryTimes
)
{
int
httpReadUnChunkedBody
(
HttpContext
*
pContext
,
HttpParser
*
pParser
)
{
for
(
int
tryTimes
=
0
;
tryTimes
<
HTTP_RETRY_TIMES
;
++
tryTimes
)
{
int
dataReadLen
=
pParser
->
bufsize
-
(
int
)(
pParser
->
data
.
pos
-
pParser
->
buffer
);
if
(
dataReadLen
>
pParser
->
data
.
len
)
{
httpError
(
"context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
dataReadLen
,
pParser
->
data
.
len
);
httpSendErrorResp
(
pContext
,
HTTP_PARSE_BODY_ERROR
);
return
false
;
return
HTTP_PARSE_BODY_ERROR
;
}
else
if
(
dataReadLen
<
pParser
->
data
.
len
)
{
httpTrace
(
"context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
dataReadLen
,
pParser
->
data
.
len
);
if
(
!
httpReadDataImp
(
pContext
))
{
httpError
(
"context:%p, fd:%d, ip:%s, read chunked request error"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
return
false
;
return
HTTP_PARSE_BODY_ERROR
;
}
else
{
taosMsleep
(
1
);
}
}
else
{
return
true
;
return
HTTP_PARSE_BODY_SUCCESS
;
}
}
int
dataReadLen
=
pParser
->
bufsize
-
(
int
)(
pParser
->
data
.
pos
-
pParser
->
buffer
);
if
(
dataReadLen
!=
pParser
->
data
.
len
)
{
httpError
(
"context:%p, fd:%d, ip:%s, un-chunked body length error, dataReadLen:%d != pContext->data.len:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
dataReadLen
,
pParser
->
data
.
len
);
httpSendErrorResp
(
pContext
,
HTTP_PARSE_BODY_ERROR
);
return
false
;
}
httpTrace
(
"context:%p, fd:%d, ip:%s, un-chunked body read over, dataReadLen:%d == pContext->data.len:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
dataReadLen
,
pParser
->
data
.
len
);
return
true
;
httpTrace
(
"context:%p, fd:%d, ip:%s, un-chunked body not finished, wait epoll"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
return
HTTP_PARSE_BODY_CONTINUE
;
}
bool
httpParseRequest
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
pThread
->
parser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pContext
->
parsed
)
{
return
true
;
}
httpDump
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pContext
->
parser
.
bufsize
,
pContext
->
parser
.
buffer
);
if
(
!
httpGetHttpMethod
(
pContext
))
{
return
false
;
...
...
@@ -355,22 +350,31 @@ bool httpParseRequest(HttpContext* pContext) {
pParser
->
pLast
=
++
pParser
->
pCur
;
}
while
(
1
);
httpTrace
(
"context:%p, fd:%d, ip:%s, parse http head ok"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
pContext
->
parsed
=
true
;
return
true
;
}
int
httpCheckReadCompleted
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pContext
->
httpChunked
==
HTTP_UNCUNKED
)
{
if
(
!
httpReadUnChunkedBody
(
pContext
,
pParser
))
{
return
false
;
int
ret
=
httpReadUnChunkedBody
(
pContext
,
pParser
);
if
(
ret
!=
HTTP_PARSE_BODY_SUCCESS
)
{
return
ret
;
}
}
else
{
if
(
!
httpReadChunkedBody
(
pContext
,
pParser
))
{
return
false
;
int
ret
=
httpReadChunkedBody
(
pContext
,
pParser
);
if
(
ret
!=
HTTP_PARSE_BODY_SUCCESS
)
{
return
ret
;
}
}
httpTrace
(
"context:%p, fd:%d, ip:%s, parse http request ok"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
return
true
;
return
HTTP_PARSE_BODY_SUCCESS
;
}
bool
httpDecodeRequest
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
pMethod
->
decodeFp
==
NULL
)
{
return
false
;
}
...
...
@@ -382,15 +386,10 @@ bool httpDecodeRequest(HttpContext* pContext) {
* Process the request from http pServer
*/
bool
httpProcessData
(
HttpContext
*
pContext
)
{
httpInitContext
(
pContext
);
if
(
!
httpParseRequest
(
pContext
))
{
httpCloseContextByApp
(
pContext
);
return
HTTP_PROCESS_ERROR
;
}
pContext
->
usedByApp
=
1
;
// handle Cross-domain request
if
(
strcmp
(
pContext
->
p
Thread
->
p
arser
.
method
.
pos
,
"OPTIONS"
)
==
0
)
{
if
(
strcmp
(
pContext
->
parser
.
method
.
pos
,
"OPTIONS"
)
==
0
)
{
httpTrace
(
"context:%p, fd:%d, ip:%s, process options request"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpSendOptionResp
(
pContext
,
"process options request success"
);
return
HTTP_PROCESS_SUCCESS
;
...
...
src/modules/http/src/httpServer.c
浏览文件 @
18f31711
...
...
@@ -37,6 +37,7 @@
#include "tsocket.h"
#include "tutil.h"
#include "ttime.h"
#include "ttimer.h"
#include "http.h"
#include "httpCode.h"
...
...
@@ -95,6 +96,8 @@ void httpFreeContext(HttpServer *pServer, HttpContext *pContext) {
void
httpCleanUpContext
(
HttpThread
*
pThread
,
HttpContext
*
pContext
)
{
// for not keep-alive
taosTmrStopA
(
pContext
->
readTimer
);
if
(
pContext
->
fd
>=
0
)
{
epoll_ctl
(
pThread
->
pollFd
,
EPOLL_CTL_DEL
,
pContext
->
fd
,
NULL
);
taosCloseSocket
(
pContext
->
fd
);
...
...
@@ -148,11 +151,15 @@ bool httpInitContext(HttpContext *pContext) {
pContext
->
httpChunked
=
HTTP_UNCUNKED
;
pContext
->
compress
=
JsonUnCompress
;
pContext
->
usedByEpoll
=
1
;
pContext
->
usedByApp
=
1
;
pContext
->
usedByApp
=
0
;
pContext
->
reqType
=
HTTP_REQTYPE_OTHERS
;
pContext
->
encodeMethod
=
NULL
;
memset
(
&
pContext
->
singleCmd
,
0
,
sizeof
(
HttpSqlCmd
));
HttpParser
*
pParser
=
&
pContext
->
parser
;
memset
(
pParser
,
0
,
sizeof
(
HttpParser
));
pParser
->
pCur
=
pParser
->
pLast
=
pParser
->
buffer
;
httpTrace
(
"context:%p, fd:%d, ip:%s, accessTimes:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
accessTimes
);
return
true
;
}
...
...
@@ -164,6 +171,7 @@ void httpCloseContextByApp(HttpContext *pContext) {
}
pthread_mutex_lock
(
&
pContext
->
mutex
);
pContext
->
parsed
=
false
;
httpTrace
(
"context:%p, fd:%d, ip:%s, app use finished, usedByEpoll:%d, usedByApp:%d, httpVersion:1.%d, keepAlive:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
usedByEpoll
,
pContext
->
usedByApp
,
pContext
->
httpVersion
,
...
...
@@ -189,6 +197,7 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) {
}
pthread_mutex_lock
(
&
pContext
->
mutex
);
pContext
->
usedByEpoll
=
0
;
pContext
->
parsed
=
false
;
httpTrace
(
"context:%p, fd:%d, ip:%s, epoll use finished, usedByEpoll:%d, usedByApp:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
usedByEpoll
,
pContext
->
usedByApp
);
...
...
@@ -206,6 +215,12 @@ void httpCloseContextByServer(HttpThread *pThread, HttpContext *pContext) {
}
}
void
httpCloseContextByServerFromTimer
(
HttpContext
*
pContext
)
{
httpError
(
"context:%p, fd:%d, ip:%s, read http body error, time expired"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpSendErrorResp
(
pContext
,
HTTP_PARSE_BODY_ERROR
);
httpCloseContextByServer
(
pContext
->
pThread
,
pContext
);
}
void
httpCleanUpConnect
(
HttpServer
*
pServer
)
{
int
i
;
HttpThread
*
pThread
;
...
...
@@ -257,7 +272,7 @@ void httpReadDirtyData(int fd) {
}
bool
httpReadDataImp
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
int
blocktimes
=
0
;
while
(
pParser
->
bufsize
<=
(
HTTP_BUFFER_SIZE
-
HTTP_STEP_SIZE
))
{
...
...
@@ -267,17 +282,18 @@ bool httpReadDataImp(HttpContext *pContext) {
break
;
}
else
if
(
nread
<
0
)
{
if
(
errno
==
EINTR
)
{
if
(
blocktimes
++
>
1000
)
{
httpError
(
"context:%p, fd:%d, ip:%s, read from socket error:%d, EINTER too many times"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
errno
);
if
(
blocktimes
++
>
HTTP_RETRY_TIMES
)
{
taosMsleep
(
1
);
httpTrace
(
"context:%p, fd:%d, ip:%s, read from socket error:%d, EINTER times:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
errno
,
blocktimes
);
break
;
}
continue
;
}
else
if
(
errno
==
EAGAIN
||
errno
==
EWOULDBLOCK
)
{
taosMsleep
(
1
);
if
(
blocktimes
++
>
1000
)
{
http
Error
(
"context:%p, fd:%d, ip:%s, read from socket error:%d, EAGAIN too many times
"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
errno
);
if
(
blocktimes
++
>
HTTP_RETRY_TIMES
)
{
taosMsleep
(
1
);
http
Trace
(
"context:%p, fd:%d, ip:%s, read from socket error:%d, EAGAIN times:%d
"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
errno
,
blocktimes
);
break
;
}
continue
;
...
...
@@ -292,27 +308,51 @@ bool httpReadDataImp(HttpContext *pContext) {
if
(
pParser
->
bufsize
>=
(
HTTP_BUFFER_SIZE
-
HTTP_STEP_SIZE
))
{
httpReadDirtyData
(
pContext
->
fd
);
httpError
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, request big than:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
HTTP_BUFFER_SIZE
);
httpError
(
"context:%p, fd:%d, ip:%s, thread:%s, request big than:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
HTTP_BUFFER_SIZE
);
httpSendErrorResp
(
pContext
,
HTTP_REQUSET_TOO_BIG
);
return
false
;
}
}
pParser
->
buffer
[
pParser
->
bufsize
]
=
0
;
httpDump
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, content:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pParser
->
bufsize
,
pParser
->
buffer
);
httpTrace
(
"context:%p, fd:%d, ip:%s, thread:%s, read size:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pParser
->
bufsize
);
return
true
;
}
bool
httpReadData
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
pThread
->
parser
;
memset
(
pParser
,
0
,
sizeof
(
HttpParser
));
pParser
->
pCur
=
pParser
->
pLast
=
pParser
->
buffer
=
pContext
->
pThread
->
buffer
;
return
httpReadDataImp
(
pContext
);
bool
httpReadData
(
HttpThread
*
pThread
,
HttpContext
*
pContext
)
{
if
(
!
pContext
->
parsed
)
{
httpInitContext
(
pContext
);
}
if
(
!
httpReadDataImp
(
pContext
))
{
httpTrace
(
"context:%p, fd:%d, ip:%s, read data error, close connect"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpCloseContextByServer
(
pThread
,
pContext
);
return
false
;
}
if
(
!
httpParseRequest
(
pContext
))
{
httpTrace
(
"context:%p, fd:%d, ip:%s, failed to parse http head, close connect"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpCloseContextByServer
(
pThread
,
pContext
);
return
false
;
}
int
ret
=
httpCheckReadCompleted
(
pContext
);
if
(
ret
==
HTTP_PARSE_BODY_CONTINUE
)
{
httpTrace
(
"context:%p, fd:%d, ip:%s, not finished yet, try another times"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
taosTmrReset
(
httpCloseContextByServerFromTimer
,
HTTP_EXPIRED_TIME
,
pContext
,
pThread
->
pServer
->
timerHandle
,
&
pContext
->
readTimer
);
return
false
;
}
else
if
(
ret
==
HTTP_PARSE_BODY_SUCCESS
){
httpDump
(
"context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, content:
\n
%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
pThread
->
label
,
pContext
->
pThread
->
numOfFds
,
pContext
->
parser
->
data
.
pos
);
return
true
;
}
else
{
httpError
(
"context:%p, fd:%d, ip:%s, failed to read http body, close connect"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpCloseContextByServer
(
pThread
,
pContext
);
return
false
;
}
}
void
httpProcessHttpData
(
void
*
param
)
{
...
...
@@ -377,9 +417,7 @@ void httpProcessHttpData(void *param) {
continue
;
}
if
(
!
httpReadData
(
pContext
))
{
httpTrace
(
"context:%p, fd:%d, ip:%s, read data error"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
httpCloseContextByServer
(
pThread
,
pContext
);
if
(
!
httpReadData
(
pThread
,
pContext
))
{
continue
;
}
...
...
@@ -406,7 +444,7 @@ void httpAcceptHttpConnection(void *arg) {
struct
sockaddr_in
clientAddr
;
int
sockFd
;
int
threadId
=
0
;
int
connThreshold
=
2
*
tsHttpCacheSessions
/
tsHttpMaxThreads
;
const
int
connThreshold
=
2
*
tsHttpCacheSessions
/
tsHttpMaxThreads
;
HttpThread
*
pThread
;
HttpServer
*
pServer
;
HttpContext
*
pContext
;
...
...
src/modules/http/src/httpSession.c
浏览文件 @
18f31711
...
...
@@ -87,8 +87,8 @@ void httpRestoreSession(HttpContext *pContext) {
pthread_mutex_lock
(
&
server
->
serverMutex
);
session
->
access
--
;
httpTrace
(
"context:%p,
fd:%d,
ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
session
,
session
->
id
,
session
->
taos
,
httpTrace
(
"context:%p, ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d"
,
pContext
,
pContext
->
ipstr
,
pContext
->
user
,
session
,
session
->
id
,
session
->
taos
,
session
->
access
,
pContext
->
session
->
expire
);
pthread_mutex_unlock
(
&
server
->
serverMutex
);
}
...
...
src/modules/http/src/httpSql.c
浏览文件 @
18f31711
...
...
@@ -296,9 +296,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
void
httpProcessLoginCmd
(
HttpContext
*
pContext
)
{
char
token
[
128
]
=
"current version only supports basic authorization, no token returned"
;
httpTrace
(
"context:%p, fd:%d, ip:%s, user:%s, return token:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
token
);
httpTrace
(
"user:%s login from %s via http"
,
pContext
->
user
,
pContext
->
ipstr
);
httpTrace
(
"context:%p, fd:%d, ip:%s, user:%s, login via http, return token:%s"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
,
token
);
httpSendSuccResp
(
pContext
,
token
);
}
...
...
src/modules/http/src/restHandle.c
浏览文件 @
18f31711
...
...
@@ -29,7 +29,7 @@ void restInitHandle(HttpServer* pServer) {
}
bool
restGetUserFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
REST_USER_URL_POS
].
len
>
TSDB_USER_LEN
-
1
||
pParser
->
path
[
REST_USER_URL_POS
].
len
<=
0
)
{
return
false
;
}
...
...
@@ -39,7 +39,7 @@ bool restGetUserFromUrl(HttpContext* pContext) {
}
bool
restGetPassFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
REST_PASS_URL_POS
].
len
>
TSDB_PASSWORD_LEN
-
1
||
pParser
->
path
[
REST_PASS_URL_POS
].
len
<=
0
)
{
return
false
;
}
...
...
@@ -59,7 +59,7 @@ bool restProcessSqlRequest(HttpContext* pContext, int isSqlT) {
httpTrace
(
"context:%p, fd:%d, ip:%s, user:%s, process restful sql msg"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
,
pContext
->
user
);
char
*
sql
=
pContext
->
p
Thread
->
p
arser
.
data
.
pos
;
char
*
sql
=
pContext
->
parser
.
data
.
pos
;
if
(
sql
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_NO_SQL_INPUT
);
return
false
;
...
...
src/modules/http/src/tgHandle.c
浏览文件 @
18f31711
...
...
@@ -447,7 +447,7 @@ void tgInitHandle(HttpServer *pServer) {
}
bool
tgGetUserFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
TG_USER_URL_POS
].
len
>
TSDB_USER_LEN
-
1
||
pParser
->
path
[
TG_USER_URL_POS
].
len
<=
0
)
{
return
false
;
}
...
...
@@ -457,7 +457,7 @@ bool tgGetUserFromUrl(HttpContext *pContext) {
}
bool
tgGetPassFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
TG_PASS_URL_POS
].
len
>
TSDB_PASSWORD_LEN
-
1
||
pParser
->
path
[
TG_PASS_URL_POS
].
len
<=
0
)
{
return
false
;
}
...
...
@@ -467,7 +467,7 @@ bool tgGetPassFromUrl(HttpContext *pContext) {
}
char
*
tgGetDbFromUrl
(
HttpContext
*
pContext
)
{
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
if
(
pParser
->
path
[
TG_DB_URL_POS
].
len
<=
0
)
{
httpSendErrorResp
(
pContext
,
HTTP_TG_DB_NOT_INPUT
);
return
NULL
;
...
...
@@ -1158,7 +1158,7 @@ bool tgProcessSingleMetricUseConfigSchema(HttpContext *pContext, cJSON *metric,
bool
tgProcessQueryRequest
(
HttpContext
*
pContext
,
char
*
db
)
{
httpTrace
(
"context:%p, fd:%d, ip:%s, process telegraf query msg"
,
pContext
,
pContext
->
fd
,
pContext
->
ipstr
);
HttpParser
*
pParser
=
&
pContext
->
p
Thread
->
p
arser
;
HttpParser
*
pParser
=
&
pContext
->
parser
;
char
*
filter
=
pParser
->
data
.
pos
;
if
(
filter
==
NULL
)
{
httpSendErrorResp
(
pContext
,
HTTP_NO_MSG_INPUT
);
...
...
src/system/src/vnodeImport.c
浏览文件 @
18f31711
...
...
@@ -773,7 +773,7 @@ int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) {
pImport
->
importedRows
=
pImport
->
rows
;
code
=
vnodeImportToFile
(
pImport
);
}
else
{
d
Error
(
"vid:%d sid:%d id:%s, data is already imported to file"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
);
d
Trace
(
"vid:%d sid:%d id:%s, data is already imported to file"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
);
}
return
code
;
...
...
@@ -817,7 +817,7 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) {
}
else
if
(
pImport
->
firstKey
<
pObj
->
lastKeyOnFile
)
{
code
=
vnodeImportStartToFile
(
pImport
,
payload
,
rows
);
}
else
{
// firstKey == pObj->lastKeyOnFile
d
Error
(
"vid:%d sid:%d id:%s, data is already there"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
);
d
Trace
(
"vid:%d sid:%d id:%s, data is already there"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
);
}
}
...
...
src/util/src/tglobalcfg.c
浏览文件 @
18f31711
...
...
@@ -116,7 +116,7 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
char
tsHttpIp
[
TSDB_IPv4ADDR_LEN
]
=
"0.0.0.0"
;
short
tsHttpPort
=
6020
;
// only tcp, range tcp[6020]
// short tsNginxPort = 6060; //only tcp, range tcp[6060]
int
tsHttpCacheSessions
=
200
0
;
int
tsHttpCacheSessions
=
3
0
;
int
tsHttpSessionExpire
=
36000
;
int
tsHttpMaxThreads
=
2
;
int
tsHttpEnableCompress
=
0
;
...
...
src/util/src/tmempool.c
浏览文件 @
18f31711
...
...
@@ -79,8 +79,7 @@ char *taosMemPoolMalloc(mpool_h handle) {
pthread_mutex_lock
(
&
(
pool_p
->
mutex
));
if
(
pool_p
->
numOfFree
<=
0
)
{
pError
(
"mempool: out of memory"
);
pTrace
(
"mempool: out of memory"
);
}
else
{
pos
=
pool_p
->
pool
+
pool_p
->
blockSize
*
(
pool_p
->
freeList
[
pool_p
->
first
]);
pool_p
->
first
++
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录