Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
843477e3
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
843477e3
编写于
3月 20, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
handle except
上级
9a5d1139
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
128 addition
and
153 deletion
+128
-153
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+1
-1
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+38
-33
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+28
-108
source/libs/transport/test/transUT.cc
source/libs/transport/test/transUT.cc
+61
-11
未找到文件。
source/libs/transport/src/trans.c
浏览文件 @
843477e3
...
...
@@ -144,7 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(
*
taosUnRefHandle
[
type
])(
handle
);
}
void
rpcRegisterBrokenLinkArg
(
SRpcMsg
*
msg
)
{
rpcSendResponse
(
msg
);
}
void
rpcRegisterBrokenLinkArg
(
SRpcMsg
*
msg
)
{
transRegisterMsg
(
msg
);
}
void
rpcReleaseHandle
(
void
*
handle
,
int8_t
type
)
{
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
(
*
transReleaseHandle
[
type
])(
handle
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
843477e3
...
...
@@ -132,27 +132,29 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request", conn); \
if (T_REF_VAL_GET(conn) == 1) { \
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
while (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
if (T_REF_VAL_GET(conn) == 1) { \
SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \
} \
return; \
} \
} while (0)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \
if (thrd->quit) { \
cliHandleExcept(conn); \
return; \
} \
#define CONN_HANDLE_THREAD_QUIT(thrd) \
do { \
if (thrd->quit) { \
return; \
} \
} while (0)
#define CONN_HANDLE_BROKEN(conn) \
...
...
@@ -263,7 +265,7 @@ _RETURN:
void
cliHandleExcept
(
SCliConn
*
pConn
)
{
if
(
taosArrayGetSize
(
pConn
->
cliMsgs
)
==
0
)
{
if
(
pConn
->
broken
==
true
||
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
if
(
pConn
->
broken
==
true
&&
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
transUnrefCliHandle
(
pConn
);
return
;
}
...
...
@@ -375,6 +377,9 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return
conn
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
SCliThrdObj
*
thrd
=
conn
->
hostThrd
;
CONN_HANDLE_THREAD_QUIT
(
thrd
);
char
key
[
128
]
=
{
0
};
transCtxDestroy
(
&
conn
->
ctx
);
...
...
@@ -539,7 +544,6 @@ void cliSend(SCliConn* pConn) {
}
pHead
->
noResp
=
REQUEST_NO_RESP
(
pMsg
)
?
1
:
0
;
pHead
->
persist
=
REQUEST_PERSIS_HANDLE
(
pMsg
)
?
1
:
0
;
pHead
->
msgType
=
pMsg
->
msgType
;
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
...
...
@@ -594,12 +598,17 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn
*
conn
=
pMsg
->
msg
.
handle
;
tDebug
(
"%s cli conn %p start to release to inst"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transUnrefCliHandle
(
conn
);
taosArrayPush
(
conn
->
cliMsgs
,
&
pMsg
);
if
(
taosArrayGetSize
(
conn
->
cliMsgs
)
>=
2
)
{
return
;
// send one by one
if
(
T_REF_VAL_GET
(
conn
)
==
2
)
{
transUnrefCliHandle
(
conn
);
taosArrayPush
(
conn
->
cliMsgs
,
&
pMsg
);
if
(
taosArrayGetSize
(
conn
->
cliMsgs
)
>=
2
)
{
return
;
// send one by one
}
cliSend
(
conn
);
}
else
{
// conn already broken down
transUnrefCliHandle
(
conn
);
}
cliSend
(
conn
);
}
SCliConn
*
cliGetConn
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
...
...
@@ -836,11 +845,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
if
(
index
==
-
1
)
{
index
=
cliRBChoseIdx
(
pTransInst
);
}
int32_t
flen
=
0
;
if
(
transCompressMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
flen
))
{
// imp later
}
tDebug
(
"send request at thread:%d %p, dst: %s:%d"
,
index
,
pMsg
,
ip
,
port
);
STransConnCtx
*
pCtx
=
calloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
ahandle
=
pMsg
->
ahandle
;
pCtx
->
msgType
=
pMsg
->
msgType
;
...
...
@@ -851,9 +856,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
if
(
ctx
!=
NULL
)
{
pCtx
->
appCtx
=
*
ctx
;
}
assert
(
pTransInst
->
connType
==
TAOS_CONN_CLIENT
);
// atomic or not
SCliMsg
*
cliMsg
=
calloc
(
1
,
sizeof
(
SCliMsg
));
cliMsg
->
ctx
=
pCtx
;
...
...
@@ -862,6 +865,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
cliMsg
->
type
=
Normal
;
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
index
];
tDebug
(
"send request at thread:%d %p, dst: %s:%d"
,
index
,
pMsg
,
ip
,
port
);
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
}
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
843477e3
...
...
@@ -114,10 +114,6 @@ static const char* notify = "a";
return; \
} \
} while (0)
// refactor later
static
int
transAddAuthPart
(
SSrvConn
*
pConn
,
char
*
msg
,
int
msgLen
);
static
int
uvAuthMsg
(
SSrvConn
*
pConn
,
char
*
msg
,
int
msgLen
);
static
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
uvAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
...
...
@@ -144,9 +140,9 @@ static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/)
static
void
uvHandleQuit
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
uvHandleRelease
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
uvHandle
Send
Resp
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
uvHandleResp
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
uvHandleRegister
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
(
*
transAsyncHandle
[])(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
=
{
uvHandle
Send
Resp
,
uvHandleQuit
,
uvHandleRelease
,
static
void
(
*
transAsyncHandle
[])(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
=
{
uvHandleResp
,
uvHandleQuit
,
uvHandleRelease
,
uvHandleRegister
};
static
void
uvDestroyConn
(
uv_handle_t
*
handle
);
...
...
@@ -165,59 +161,6 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
transAllocBuffer
(
pBuf
,
buf
);
}
static
int
uvAuthMsg
(
SSrvConn
*
pConn
,
char
*
msg
,
int
len
)
{
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)
msg
;
int
code
=
0
;
if
((
pConn
->
secured
&&
pHead
->
spi
==
0
)
||
(
pHead
->
spi
==
0
&&
pConn
->
spi
==
0
))
{
// secured link, or no authentication
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, secured link, no auth is required", pConn->info);
return
0
;
}
if
(
!
rpcIsReq
(
pHead
->
msgType
))
{
// for response, if code is auth failure, it shall bypass the auth process
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_INVALID_VERSION
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
code
==
TSDB_CODE_MND_USER_NOT_EXIST
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return
0
;
}
}
code
=
0
;
if
(
pHead
->
spi
==
pConn
->
spi
)
{
// authentication
SRpcDigest
*
pDigest
=
(
SRpcDigest
*
)((
char
*
)
pHead
+
len
-
sizeof
(
SRpcDigest
));
int32_t
delta
;
delta
=
(
int32_t
)
htonl
(
pDigest
->
timeStamp
);
delta
-=
(
int32_t
)
taosGetTimestampSec
();
if
(
abs
(
delta
)
>
900
)
{
tWarn
(
"%s, time diff:%d is too big, msg discarded"
,
pConn
->
info
,
delta
);
code
=
TSDB_CODE_RPC_INVALID_TIME_STAMP
;
}
else
{
if
(
transAuthenticateMsg
(
pHead
,
len
-
TSDB_AUTH_LEN
,
pDigest
->
auth
,
pConn
->
secret
)
<
0
)
{
// tDebug("%s, authentication failed, msg discarded", pConn->info);
code
=
TSDB_CODE_RPC_AUTH_FAILURE
;
}
else
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
)
-
sizeof
(
SRpcDigest
);
if
(
!
rpcIsReq
(
pHead
->
msgType
))
pConn
->
secured
=
1
;
// link is secured for client
// tTrace("%s, message is authenticated", pConn->info);
}
}
}
else
{
tDebug
(
"%s, auth spi:%d not matched with received:%d"
,
pConn
->
info
,
pConn
->
spi
,
pHead
->
spi
);
code
=
pHead
->
spi
?
TSDB_CODE_RPC_AUTH_FAILURE
:
TSDB_CODE_RPC_AUTH_REQUIRED
;
}
return
code
;
}
// refers specifically to query or insert timeout
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
SSrvConn
*
conn
=
handle
->
data
;
...
...
@@ -292,7 +235,6 @@ static void uvHandleReq(SSrvConn* pConn) {
STrans
*
pTransInst
=
(
STrans
*
)
p
->
shandle
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
}
void
uvOnRecvCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
...
...
@@ -351,24 +293,26 @@ void uvOnSendCb(uv_write_t* req, int status) {
if
(
msg
->
type
==
Release
&&
conn
->
status
!=
ConnNormal
)
{
conn
->
status
=
ConnNormal
;
transUnrefSrvHandle
(
conn
);
}
else
if
(
msg
->
type
==
Register
&&
conn
->
status
==
ConnAcquire
)
{
conn
->
regArg
.
notifyCount
=
0
;
conn
->
regArg
.
init
=
1
;
conn
->
regArg
.
msg
=
msg
->
msg
;
if
(
conn
->
broken
)
{
STrans
*
pTransInst
=
conn
->
pTransInst
;
(
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
(
conn
->
regArg
.
msg
),
NULL
);
memset
(
&
conn
->
regArg
,
0
,
sizeof
(
conn
->
regArg
));
}
free
(
msg
);
return
;
}
destroySmsg
(
msg
);
// send second data, just use for push
if
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>
0
)
{
tTrace
(
"resent server conn %p sending msg size: %d"
,
conn
,
(
int
)
taosArrayGetSize
(
conn
->
srvMsgs
));
msg
=
(
SSrvMsg
*
)
taosArrayGetP
(
conn
->
srvMsgs
,
0
);
uvStartSendRespInternal
(
msg
);
if
(
msg
->
type
==
Register
&&
conn
->
status
==
ConnAcquire
)
{
conn
->
regArg
.
notifyCount
=
0
;
conn
->
regArg
.
init
=
1
;
conn
->
regArg
.
msg
=
msg
->
msg
;
if
(
conn
->
broken
)
{
STrans
*
pTransInst
=
conn
->
pTransInst
;
(
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
(
conn
->
regArg
.
msg
),
NULL
);
memset
(
&
conn
->
regArg
,
0
,
sizeof
(
conn
->
regArg
));
}
taosArrayRemove
(
conn
->
srvMsgs
,
0
);
free
(
msg
);
}
else
{
uvStartSendRespInternal
(
msg
);
}
}
}
}
else
{
...
...
@@ -387,7 +331,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
}
static
void
uvPrepareSendData
(
SSrvMsg
*
smsg
,
uv_buf_t
*
wb
)
{
// impl later;
tTrace
(
"server conn %p prepare to send resp"
,
smsg
->
pConn
);
SSrvConn
*
pConn
=
smsg
->
pConn
;
...
...
@@ -438,13 +381,12 @@ static void uvStartSendResp(SSrvMsg* smsg) {
transUnrefSrvHandle
(
pConn
);
}
if
(
taosArrayGetSize
(
pConn
->
srvMsgs
)
>
0
)
{
taosArrayPush
(
pConn
->
srvMsgs
,
&
smsg
);
if
(
taosArrayGetSize
(
pConn
->
srvMsgs
)
>
1
)
{
tDebug
(
"server conn %p send data to client %s:%d, local info: %s:%d"
,
pConn
,
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
));
taosArrayPush
(
pConn
->
srvMsgs
,
&
smsg
);
return
;
}
taosArrayPush
(
pConn
->
srvMsgs
,
&
smsg
);
uvStartSendRespInternal
(
smsg
);
return
;
}
...
...
@@ -675,7 +617,7 @@ static SSrvConn* createConn(void* hThrd) {
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
pConn
->
srvMsgs
=
taosArrayInit
(
2
,
sizeof
(
void
*
));
//
tTrace
(
"conn %p created"
,
pConn
);
tTrace
(
"
server
conn %p created"
,
pConn
);
memset
(
&
pConn
->
regArg
,
0
,
sizeof
(
pConn
->
regArg
));
pConn
->
broken
=
false
;
...
...
@@ -697,7 +639,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
}
conn
->
srvMsgs
=
taosArrayDestroy
(
conn
->
srvMsgs
);
if
(
clear
)
{
tTrace
(
"
try to destroy conn %p
"
,
conn
);
tTrace
(
"
server conn %p to be destroyed
"
,
conn
);
uv_shutdown_t
*
req
=
malloc
(
sizeof
(
uv_shutdown_t
));
uv_shutdown
(
req
,
(
uv_stream_t
*
)
conn
->
pTcp
,
uvShutDownCb
);
}
...
...
@@ -720,25 +662,6 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_stop
(
thrd
->
loop
);
}
}
static
int
transAddAuthPart
(
SSrvConn
*
pConn
,
char
*
msg
,
int
msgLen
)
{
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)
msg
;
if
(
pConn
->
spi
&&
pConn
->
secured
==
0
)
{
// add auth part
pHead
->
spi
=
pConn
->
spi
;
STransDigestMsg
*
pDigest
=
(
STransDigestMsg
*
)(
msg
+
msgLen
);
pDigest
->
timeStamp
=
htonl
(
taosGetTimestampSec
());
msgLen
+=
sizeof
(
SRpcDigest
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
}
else
{
pHead
->
spi
=
0
;
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
}
return
msgLen
;
}
void
*
transInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SServerObj
*
srv
=
calloc
(
1
,
sizeof
(
SServerObj
));
...
...
@@ -815,20 +738,19 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
}
uvStartSendRespInternal
(
msg
);
return
;
}
else
if
(
conn
->
status
==
ConnRelease
)
{
// already release by server app, do nothing
}
else
if
(
conn
->
status
==
ConnNormal
)
{
// no nothing
// user should not call this rpcRelease handle;
}
else
if
(
conn
->
status
==
ConnRelease
||
conn
->
status
==
ConnNormal
)
{
tDebug
(
"server conn %p already released, ignore release-msg"
,
conn
);
}
free
(
msg
);
destroySmsg
(
msg
);
}
void
uvHandle
Send
Resp
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
{
void
uvHandleResp
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
{
// send msg to client
tDebug
(
"server conn %p start to send resp"
,
msg
->
pConn
);
uvStartSendResp
(
msg
);
}
void
uvHandleRegister
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
{
SSrvConn
*
conn
=
msg
->
pConn
;
tDebug
(
"server conn %p register brokenlink callback"
,
conn
);
if
(
conn
->
status
==
ConnAcquire
)
{
if
(
taosArrayGetSize
(
conn
->
srvMsgs
)
>
0
)
{
taosArrayPush
(
conn
->
srvMsgs
,
&
msg
);
...
...
@@ -901,12 +823,10 @@ void transUnrefSrvHandle(void* handle) {
return
;
}
int
ref
=
T_REF_DEC
((
SSrvConn
*
)
handle
);
tDebug
(
"handle %p ref count: %d"
,
handle
,
ref
);
tDebug
(
"server conn %p ref count: %d"
,
handle
,
ref
);
if
(
ref
==
0
)
{
destroyConn
((
SSrvConn
*
)
handle
,
true
);
}
// unref srv handle
}
void
transReleaseSrvHandle
(
void
*
handle
)
{
...
...
@@ -951,7 +871,7 @@ void transRegisterMsg(const STransMsg* msg) {
srvMsg
->
pConn
=
pConn
;
srvMsg
->
msg
=
*
msg
;
srvMsg
->
type
=
Register
;
tTrace
(
"server conn %p start to
send resp
"
,
pConn
);
tTrace
(
"server conn %p start to
register brokenlink callback
"
,
pConn
);
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
}
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
)
{
...
...
source/libs/transport/test/transUT.cc
浏览文件 @
843477e3
...
...
@@ -35,6 +35,8 @@ int port = 7000;
typedef
void
(
*
CB
)(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
processContinueSend
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
processReleaseHandleCb
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
processRegisterFailure
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
processReq
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
// client process;
static
void
processResp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
...
...
@@ -167,6 +169,35 @@ static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
rpcSendResponse
(
&
rpcMsg
);
}
}
static
void
processReleaseHandleCb
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
rpcMallocCont
(
100
);
rpcMsg
.
contLen
=
100
;
rpcMsg
.
handle
=
pMsg
->
handle
;
rpcMsg
.
code
=
0
;
rpcSendResponse
(
&
rpcMsg
);
rpcReleaseHandle
(
pMsg
->
handle
,
TAOS_CONN_SERVER
);
}
static
void
processRegisterFailure
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
void
*
handle
=
pMsg
->
handle
;
{
SRpcMsg
rpcMsg1
=
{
0
};
rpcMsg1
.
pCont
=
rpcMallocCont
(
100
);
rpcMsg1
.
contLen
=
100
;
rpcMsg1
.
handle
=
handle
;
rpcMsg1
.
code
=
0
;
rpcRegisterBrokenLinkArg
(
&
rpcMsg1
);
}
taosMsleep
(
10
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
rpcMallocCont
(
100
);
rpcMsg
.
contLen
=
100
;
rpcMsg
.
handle
=
pMsg
->
handle
;
rpcMsg
.
code
=
0
;
rpcSendResponse
(
&
rpcMsg
);
}
// client process;
static
void
processResp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
Client
*
client
=
(
Client
*
)
parent
;
...
...
@@ -225,7 +256,7 @@ class TransObj {
srv
->
SetSrvContinueSend
(
cfp
);
}
void
RestartSrv
()
{
srv
->
Restart
();
}
void
cliStop
()
{
void
StopCli
()
{
///////
cli
->
Stop
();
}
...
...
@@ -329,32 +360,35 @@ TEST_F(TransEnv, cliPersistHandle) {
//////////////////
}
TEST_F
(
TransEnv
,
cli
ReleaseHandle
)
{
TEST_F
(
TransEnv
,
srv
ReleaseHandle
)
{
SRpcMsg
resp
=
{
0
};
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
tr
->
SetSrvContinueSend
(
processReleaseHandleCb
);
// tr->Restart(processReleaseHandleCb);
void
*
handle
=
NULL
;
for
(
int
i
=
0
;
i
<
1
;
i
++
)
{
SRpcMsg
req
=
{.
handle
=
resp
.
handle
,
.
persistHandle
=
1
};
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecvNoHandle
(
&
req
,
&
resp
);
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
// tr->cliSendAndRecvNoHandle(&req, &resp);
EXPECT_TRUE
(
resp
.
code
==
0
);
//}
}
//////////////////
}
TEST_F
(
TransEnv
,
cliReleaseHandleExcept
)
{
SRpcMsg
resp
=
{
0
};
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
SRpcMsg
req
=
{.
handle
=
resp
.
handle
,
.
persistHandle
=
1
};
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
NoHandle
(
&
req
,
&
resp
);
if
(
i
==
5
)
{
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
if
(
i
==
1
)
{
std
::
cout
<<
"stop server"
<<
std
::
endl
;
tr
->
StopSrv
();
}
if
(
i
>
=
6
)
{
if
(
i
>
1
)
{
EXPECT_TRUE
(
resp
.
code
!=
0
);
}
}
...
...
@@ -383,7 +417,7 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
if
(
i
>
2
)
{
tr
->
cliStop
();
tr
->
StopCli
();
break
;
}
}
...
...
@@ -413,7 +447,23 @@ TEST_F(TransEnv, cliPersistHandleExcept) {
TEST_F
(
TransEnv
,
multiCliPersistHandleExcept
)
{
// conn broken
}
TEST_F
(
TransEnv
,
queryExcept
)
{}
TEST_F
(
TransEnv
,
queryExcept
)
{
tr
->
SetSrvContinueSend
(
processRegisterFailure
);
SRpcMsg
resp
=
{
0
};
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
SRpcMsg
req
=
{.
handle
=
resp
.
handle
,
.
persistHandle
=
1
};
req
.
msgType
=
1
;
req
.
pCont
=
rpcMallocCont
(
10
);
req
.
contLen
=
10
;
tr
->
cliSendAndRecv
(
&
req
,
&
resp
);
if
(
i
==
2
)
{
rpcReleaseHandle
(
resp
.
handle
,
TAOS_CONN_CLIENT
);
tr
->
StopCli
();
break
;
}
}
taosMsleep
(
4
*
1000
);
}
TEST_F
(
TransEnv
,
noResp
)
{
SRpcMsg
resp
=
{
0
};
for
(
int
i
=
0
;
i
<
5
;
i
++
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录