Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7d5a13b8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7d5a13b8
编写于
6月 27, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'enh/stopquery' of github.com:taosdata/TDengine into enh/stopquery
上级
d4027164
393a5dcf
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
74 addition
and
81 deletion
+74
-81
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+11
-8
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+3
-7
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+13
-20
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+27
-13
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+20
-33
未找到文件。
source/libs/transport/inc/transComm.h
浏览文件 @
7d5a13b8
...
...
@@ -252,7 +252,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
do { \
if (id > 0) { \
tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(
refMgt, id);
\
SExHandle* exh2 = transAcquireExHandle(
id);
\
if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, id); \
...
...
@@ -260,7 +260,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
} \
} else if (id == 0) { \
tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(
refMgt, id);
\
SExHandle* exh2 = transAcquireExHandle(
id);
\
if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
exh2 ? exh2->refId : 0); \
...
...
@@ -273,6 +273,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
goto _return2; \
} \
} while (0)
int
transInitBuffer
(
SConnBuffer
*
buf
);
int
transClearBuffer
(
SConnBuffer
*
buf
);
int
transDestroyBuffer
(
SConnBuffer
*
buf
);
...
...
@@ -390,13 +391,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
*/
void
transThreadOnce
();
void
transInitEnv
();
void
transInit
();
void
transCleanup
();
int32_t
transOpenExHandleMgt
(
int
size
);
void
transCloseExHandleMgt
(
int32_t
mgt
);
int64_t
transAddExHandle
(
int32_t
mgt
,
void
*
p
);
int32_t
transRemoveExHandle
(
int
32_t
mgt
,
int
64_t
refId
);
SExHandle
*
transAcquireExHandle
(
int
32_t
mgt
,
int
64_t
refId
);
int32_t
transReleaseExHandle
(
int
32_t
mgt
,
int
64_t
refId
);
void
transCloseExHandleMgt
();
int64_t
transAddExHandle
(
void
*
p
);
int32_t
transRemoveExHandle
(
int64_t
refId
);
SExHandle
*
transAcquireExHandle
(
int64_t
refId
);
int32_t
transReleaseExHandle
(
int64_t
refId
);
void
transDestoryExHandle
(
void
*
handle
);
#ifdef __cplusplus
...
...
source/libs/transport/src/trans.c
浏览文件 @
7d5a13b8
...
...
@@ -36,7 +36,7 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
return
0
;
}
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
transInitEnv
();
rpcInit
();
SRpcInfo
*
pRpc
=
taosMemoryCalloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
{
...
...
@@ -82,7 +82,6 @@ void rpcClose(void* arg) {
tInfo
(
"start to close rpc"
);
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
arg
;
(
*
taosCloseHandle
[
pRpc
->
connType
])(
pRpc
->
tcphandle
);
transCloseExHandleMgt
(
pRpc
->
refMgt
);
taosMemoryFree
(
pRpc
);
return
;
...
...
@@ -164,17 +163,14 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
transSetDefaultAddr
(
thandle
,
ip
,
fqdn
);
}
// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) {
// SRpcHandleInfo* pInfo = &pMsg->info;
// pInfo->traceId = uid;
//}
int32_t
rpcInit
()
{
transInit
();
// impl later
return
0
;
}
void
rpcCleanup
(
void
)
{
// impl later
transCleanup
();
return
;
}
...
...
source/libs/transport/src/transCli.c
浏览文件 @
7d5a13b8
...
...
@@ -15,9 +15,6 @@
#ifdef USE_UV
#include "transComm.h"
static
int32_t
transSCliInst
=
0
;
static
int32_t
refMgt
=
0
;
typedef
struct
SCliConn
{
T_REF_DECLARE
()
uv_connect_t
connReq
;
...
...
@@ -503,12 +500,12 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
}
static
void
allocConnRef
(
SCliConn
*
conn
,
bool
update
)
{
if
(
update
)
{
transRemoveExHandle
(
refMgt
,
conn
->
refId
);
transRemoveExHandle
(
conn
->
refId
);
}
SExHandle
*
exh
=
taosMemoryCalloc
(
1
,
sizeof
(
SExHandle
));
exh
->
handle
=
conn
;
exh
->
pThrd
=
conn
->
hostThrd
;
exh
->
refId
=
transAddExHandle
(
refMgt
,
exh
);
exh
->
refId
=
transAddExHandle
(
exh
);
conn
->
refId
=
exh
->
refId
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
...
...
@@ -602,9 +599,13 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace
(
"%s conn %p remove from conn pool"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
QUEUE_REMOVE
(
&
conn
->
conn
);
QUEUE_INIT
(
&
conn
->
conn
);
transRemoveExHandle
(
refMgt
,
conn
->
refId
);
transRemoveExHandle
(
conn
->
refId
);
if
(
clear
)
{
uv_close
((
uv_handle_t
*
)
conn
->
stream
,
cliDestroy
);
if
(
uv_is_active
((
uv_handle_t
*
)
conn
->
stream
))
{
uv_close
((
uv_handle_t
*
)
conn
->
stream
,
cliDestroy
);
}
else
{
cliDestroy
((
uv_handle_t
*
)
conn
->
stream
);
}
}
}
static
void
cliDestroy
(
uv_handle_t
*
handle
)
{
...
...
@@ -735,7 +736,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
}
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
int64_t
refId
=
(
int64_t
)(
pMsg
->
msg
.
info
.
handle
);
SExHandle
*
exh
=
transAcquireExHandle
(
ref
Mgt
,
ref
Id
);
SExHandle
*
exh
=
transAcquireExHandle
(
refId
);
if
(
exh
==
NULL
)
{
tDebug
(
"%"
PRId64
" already release"
,
refId
);
}
...
...
@@ -761,7 +762,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn
*
conn
=
NULL
;
int64_t
refId
=
(
int64_t
)(
pMsg
->
msg
.
info
.
handle
);
if
(
refId
!=
0
)
{
SExHandle
*
exh
=
transAcquireExHandle
(
ref
Mgt
,
ref
Id
);
SExHandle
*
exh
=
transAcquireExHandle
(
refId
);
if
(
exh
==
NULL
)
{
*
ignore
=
true
;
destroyCmsg
(
pMsg
);
...
...
@@ -769,7 +770,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
// assert(0);
}
else
{
conn
=
exh
->
handle
;
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
}
return
conn
;
};
...
...
@@ -899,10 +900,6 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
cli
->
pThreadObj
[
i
]
=
pThrd
;
}
int
ref
=
atomic_add_fetch_32
(
&
transSCliInst
,
1
);
if
(
ref
==
1
)
{
refMgt
=
transOpenExHandleMgt
(
50000
);
}
return
cli
;
}
...
...
@@ -1086,10 +1083,6 @@ void transCloseClient(void* arg) {
}
taosMemoryFree
(
cli
->
pThreadObj
);
taosMemoryFree
(
cli
);
int
ref
=
atomic_sub_fetch_32
(
&
transSCliInst
,
1
);
if
(
ref
==
0
)
{
transCloseExHandleMgt
(
refMgt
);
}
}
void
transRefCliHandle
(
void
*
handle
)
{
if
(
handle
==
NULL
)
{
...
...
@@ -1111,12 +1104,12 @@ void transUnrefCliHandle(void* handle) {
}
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
)
{
SCliThrd
*
pThrd
=
NULL
;
SExHandle
*
exh
=
transAcquireExHandle
(
refMgt
,
handle
);
SExHandle
*
exh
=
transAcquireExHandle
(
handle
);
if
(
exh
==
NULL
)
{
return
NULL
;
}
pThrd
=
exh
->
pThrd
;
transReleaseExHandle
(
refMgt
,
handle
);
transReleaseExHandle
(
handle
);
return
pThrd
;
}
SCliThrd
*
transGetWorkThrd
(
STrans
*
trans
,
int64_t
handle
)
{
...
...
source/libs/transport/src/transComm.c
浏览文件 @
7d5a13b8
...
...
@@ -16,7 +16,9 @@
#include "transComm.h"
// static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static
TdThreadOnce
transModuleInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
refMgt
;
int
transAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
...
...
@@ -478,35 +480,47 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
return
true
;
}
void
transInitEnv
()
{
//
static
void
transInitEnv
()
{
refMgt
=
transOpenExHandleMgt
(
50000
);
uv_os_setenv
(
"UV_TCP_SINGLE_ACCEPT"
,
"1"
);
}
static
void
transDestroyEnv
()
{
// close ref
transCloseExHandleMgt
(
refMgt
);
}
void
transInit
()
{
// init env
taosThreadOnce
(
&
transModuleInit
,
transInitEnv
);
}
void
transCleanup
()
{
// clean env
transDestroyEnv
();
}
int32_t
transOpenExHandleMgt
(
int
size
)
{
// added into once later
return
taosOpenRef
(
size
,
transDestoryExHandle
);
}
void
transCloseExHandleMgt
(
int32_t
mgt
)
{
void
transCloseExHandleMgt
()
{
// close ref
taosCloseRef
(
m
gt
);
taosCloseRef
(
refM
gt
);
}
int64_t
transAddExHandle
(
int32_t
mgt
,
void
*
p
)
{
int64_t
transAddExHandle
(
void
*
p
)
{
// acquire extern handle
return
taosAddRef
(
m
gt
,
p
);
return
taosAddRef
(
refM
gt
,
p
);
}
int32_t
transRemoveExHandle
(
int
32_t
mgt
,
int
64_t
refId
)
{
int32_t
transRemoveExHandle
(
int64_t
refId
)
{
// acquire extern handle
return
taosRemoveRef
(
m
gt
,
refId
);
return
taosRemoveRef
(
refM
gt
,
refId
);
}
SExHandle
*
transAcquireExHandle
(
int
32_t
mgt
,
int
64_t
refId
)
{
SExHandle
*
transAcquireExHandle
(
int64_t
refId
)
{
// acquire extern handle
return
(
SExHandle
*
)
taosAcquireRef
(
m
gt
,
refId
);
return
(
SExHandle
*
)
taosAcquireRef
(
refM
gt
,
refId
);
}
int32_t
transReleaseExHandle
(
int
32_t
mgt
,
int
64_t
refId
)
{
int32_t
transReleaseExHandle
(
int64_t
refId
)
{
// release extern handle
return
taosReleaseRef
(
m
gt
,
refId
);
return
taosReleaseRef
(
refM
gt
,
refId
);
}
void
transDestoryExHandle
(
void
*
handle
)
{
if
(
handle
==
NULL
)
{
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
7d5a13b8
...
...
@@ -19,9 +19,7 @@
static
TdThreadOnce
transModuleInit
=
PTHREAD_ONCE_INIT
;
static
char
*
notify
=
"a"
;
static
int32_t
tranSSvrInst
=
0
;
static
int32_t
refMgt
=
0
;
static
char
*
notify
=
"a"
;
typedef
struct
{
int
notifyCount
;
//
...
...
@@ -274,7 +272,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg
.
info
.
ahandle
=
(
void
*
)
pHead
->
ahandle
;
transMsg
.
info
.
handle
=
(
void
*
)
transAcquireExHandle
(
refMgt
,
pConn
->
refId
);
transMsg
.
info
.
handle
=
(
void
*
)
transAcquireExHandle
(
pConn
->
refId
);
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
...
...
@@ -292,7 +290,7 @@ static void uvHandleReq(SSvrConn* pConn) {
pConnInfo
->
clientPort
=
ntohs
(
pConn
->
addr
.
sin_port
);
tstrncpy
(
pConnInfo
->
user
,
pConn
->
user
,
sizeof
(
pConnInfo
->
user
));
transReleaseExHandle
(
refMgt
,
pConn
->
refId
);
transReleaseExHandle
(
pConn
->
refId
);
STrans
*
pTransInst
=
pConn
->
pTransInst
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
...
...
@@ -523,15 +521,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle
*
exh1
=
transMsg
.
info
.
handle
;
int64_t
refId
=
transMsg
.
info
.
refId
;
SExHandle
*
exh2
=
transAcquireExHandle
(
ref
Mgt
,
ref
Id
);
SExHandle
*
exh2
=
transAcquireExHandle
(
refId
);
if
(
exh2
==
NULL
||
exh1
!=
exh2
)
{
tTrace
(
"handle except msg %p, ignore it"
,
exh1
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
destroySmsg
(
msg
);
continue
;
}
msg
->
pConn
=
exh1
->
handle
;
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
(
*
transAsyncHandle
[
msg
->
type
])(
msg
,
pThrd
);
}
}
...
...
@@ -773,8 +771,8 @@ static SSvrConn* createConn(void* hThrd) {
SExHandle
*
exh
=
taosMemoryMalloc
(
sizeof
(
SExHandle
));
exh
->
handle
=
pConn
;
exh
->
pThrd
=
pThrd
;
exh
->
refId
=
transAddExHandle
(
refMgt
,
exh
);
transAcquireExHandle
(
refMgt
,
exh
->
refId
);
exh
->
refId
=
transAddExHandle
(
exh
);
transAcquireExHandle
(
exh
->
refId
);
pConn
->
refId
=
exh
->
refId
;
transRefSrvHandle
(
pConn
);
...
...
@@ -803,14 +801,14 @@ static void destroyConnRegArg(SSvrConn* conn) {
}
}
static
int
reallocConnRef
(
SSvrConn
*
conn
)
{
transReleaseExHandle
(
refMgt
,
conn
->
refId
);
transRemoveExHandle
(
refMgt
,
conn
->
refId
);
transReleaseExHandle
(
conn
->
refId
);
transRemoveExHandle
(
conn
->
refId
);
// avoid app continue to send msg on invalid handle
SExHandle
*
exh
=
taosMemoryMalloc
(
sizeof
(
SExHandle
));
exh
->
handle
=
conn
;
exh
->
pThrd
=
conn
->
hostThrd
;
exh
->
refId
=
transAddExHandle
(
refMgt
,
exh
);
transAcquireExHandle
(
refMgt
,
exh
->
refId
);
exh
->
refId
=
transAddExHandle
(
exh
);
transAcquireExHandle
(
exh
->
refId
);
conn
->
refId
=
exh
->
refId
;
return
0
;
...
...
@@ -822,8 +820,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
}
SWorkThrd
*
thrd
=
conn
->
hostThrd
;
transReleaseExHandle
(
refMgt
,
conn
->
refId
);
transRemoveExHandle
(
refMgt
,
conn
->
refId
);
transReleaseExHandle
(
conn
->
refId
);
transRemoveExHandle
(
conn
->
refId
);
tDebug
(
"%s conn %p destroy"
,
transLabel
(
thrd
->
pTransInst
),
conn
);
// uv_timer_stop(&conn->pTimer);
...
...
@@ -871,12 +869,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
// taosThreadOnce(&transModuleInit, uvInitEnv);
int
ref
=
atomic_add_fetch_32
(
&
tranSSvrInst
,
1
);
if
(
ref
==
1
)
{
refMgt
=
transOpenExHandleMgt
(
50000
);
}
assert
(
0
==
uv_pipe_init
(
srv
->
loop
,
&
srv
->
pipeListen
,
0
));
#ifdef WINDOWS
char
pipeName
[
64
];
...
...
@@ -1028,11 +1020,6 @@ void transCloseServer(void* arg) {
taosMemoryFree
(
srv
->
pipe
);
taosMemoryFree
(
srv
);
int
ref
=
atomic_sub_fetch_32
(
&
tranSSvrInst
,
1
);
if
(
ref
==
0
)
{
transCloseExHandleMgt
(
refMgt
);
}
}
void
transRefSrvHandle
(
void
*
handle
)
{
...
...
@@ -1071,11 +1058,11 @@ void transReleaseSrvHandle(void* handle) {
tTrace
(
"%s conn %p start to release"
,
transLabel
(
pThrd
->
pTransInst
),
exh
->
handle
);
transSendAsync
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
return
;
_return1:
tTrace
(
"handle %p failed to send to release handle"
,
exh
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
return
;
_return2:
tTrace
(
"handle %p failed to send to release handle"
,
exh
);
...
...
@@ -1100,12 +1087,12 @@ void transSendResponse(const STransMsg* msg) {
STraceId
*
trace
=
(
STraceId
*
)
&
msg
->
info
.
traceId
;
tGTrace
(
"conn %p start to send resp (1/2)"
,
exh
->
handle
);
transSendAsync
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
return
;
_return1:
tTrace
(
"handle %p failed to send resp"
,
exh
);
rpcFreeCont
(
msg
->
pCont
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
return
;
_return2:
tTrace
(
"handle %p failed to send resp"
,
exh
);
...
...
@@ -1129,13 +1116,13 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace
(
"%s conn %p start to register brokenlink callback"
,
transLabel
(
pThrd
->
pTransInst
),
exh
->
handle
);
transSendAsync
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
return
;
_return1:
tTrace
(
"handle %p failed to register brokenlink"
,
exh
);
rpcFreeCont
(
msg
->
pCont
);
transReleaseExHandle
(
ref
Mgt
,
ref
Id
);
transReleaseExHandle
(
refId
);
return
;
_return2:
tTrace
(
"handle %p failed to register brokenlink"
,
exh
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录