Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0d904d5a
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0d904d5a
编写于
6月 23, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: handle except
上级
d5f5c33c
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
173 addition
and
178 deletion
+173
-178
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+16
-15
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+121
-127
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+36
-36
未找到文件。
source/client/src/clientEnv.c
浏览文件 @
0d904d5a
...
...
@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "catalog.h"
#include "functionMgt.h"
#include "clientInt.h"
#include "clientLog.h"
#include "functionMgt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tcache.h"
...
...
@@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile
int32_t
tscInitRes
=
0
;
static
void
registerRequest
(
SRequestObj
*
pRequest
)
{
STscObj
*
pTscObj
=
acquireTscObj
(
*
(
int64_t
*
)
pRequest
->
pTscObj
->
id
);
STscObj
*
pTscObj
=
acquireTscObj
(
*
(
int64_t
*
)
pRequest
->
pTscObj
->
id
);
assert
(
pTscObj
!=
NULL
);
...
...
@@ -54,14 +54,14 @@ static void registerRequest(SRequestObj *pRequest) {
int32_t
currentInst
=
atomic_add_fetch_64
((
int64_t
*
)
&
pSummary
->
currentRequests
,
1
);
tscDebug
(
"0x%"
PRIx64
" new Request from connObj:0x%"
PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
*
(
int64_t
*
)
pRequest
->
pTscObj
->
id
,
num
,
currentInst
,
total
,
pRequest
->
requestId
);
pRequest
->
self
,
*
(
int64_t
*
)
pRequest
->
pTscObj
->
id
,
num
,
currentInst
,
total
,
pRequest
->
requestId
);
}
}
static
void
deregisterRequest
(
SRequestObj
*
pRequest
)
{
assert
(
pRequest
!=
NULL
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SAppClusterSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
int32_t
currentInst
=
atomic_sub_fetch_64
((
int64_t
*
)
&
pActivity
->
currentRequests
,
1
);
...
...
@@ -70,8 +70,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
int64_t
duration
=
taosGetTimestampUs
()
-
pRequest
->
metric
.
start
;
tscDebug
(
"0x%"
PRIx64
" free Request from connObj: 0x%"
PRIx64
", reqId:0x%"
PRIx64
" elapsed:%"
PRIu64
" ms, current:%d, app current:%d"
,
pRequest
->
self
,
*
(
int64_t
*
)
pTscObj
->
id
,
pRequest
->
requestId
,
duration
/
1000
,
num
,
currentInst
);
releaseTscObj
(
*
(
int64_t
*
)
pTscObj
->
id
);
pRequest
->
self
,
*
(
int64_t
*
)
pTscObj
->
id
,
pRequest
->
requestId
,
duration
/
1000
,
num
,
currentInst
);
releaseTscObj
(
*
(
int64_t
*
)
pTscObj
->
id
);
}
// todo close the transporter properly
...
...
@@ -80,7 +80,7 @@ void closeTransporter(STscObj *pTscObj) {
return
;
}
tscDebug
(
"free transporter:%p in connObj: 0x%"
PRIx64
,
pTscObj
->
pAppInfo
->
pTransporter
,
*
(
int64_t
*
)
pTscObj
->
id
);
tscDebug
(
"free transporter:%p in connObj: 0x%"
PRIx64
,
pTscObj
->
pAppInfo
->
pTransporter
,
*
(
int64_t
*
)
pTscObj
->
id
);
rpcClose
(
pTscObj
->
pAppInfo
->
pTransporter
);
}
...
...
@@ -128,16 +128,17 @@ void closeAllRequests(SHashObj *pRequests) {
void
destroyTscObj
(
void
*
pObj
)
{
STscObj
*
pTscObj
=
pObj
;
SClientHbKey
connKey
=
{.
tscRid
=
*
(
int64_t
*
)
pTscObj
->
id
,
.
connType
=
pTscObj
->
connType
};
SClientHbKey
connKey
=
{.
tscRid
=
*
(
int64_t
*
)
pTscObj
->
id
,
.
connType
=
pTscObj
->
connType
};
hbDeregisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connKey
);
int64_t
connNum
=
atomic_sub_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
closeAllRequests
(
pTscObj
->
pRequests
);
schedulerStopQueryHb
(
pTscObj
->
pAppInfo
->
pTransporter
);
if
(
0
==
connNum
)
{
// TODO
//
closeTransporter(pTscObj);
closeTransporter
(
pTscObj
);
}
tscDebug
(
"connObj 0x%"
PRIx64
" destroyed, totalConn:%"
PRId64
,
*
(
int64_t
*
)
pTscObj
->
id
,
pTscObj
->
pAppInfo
->
numOfConns
);
tscDebug
(
"connObj 0x%"
PRIx64
" destroyed, totalConn:%"
PRId64
,
*
(
int64_t
*
)
pTscObj
->
id
,
pTscObj
->
pAppInfo
->
numOfConns
);
taosThreadMutexDestroy
(
&
pTscObj
->
mutex
);
taosMemoryFreeClear
(
pTscObj
);
}
...
...
@@ -167,10 +168,10 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
taosThreadMutexInit
(
&
pObj
->
mutex
,
NULL
);
pObj
->
id
=
taosMemoryMalloc
(
sizeof
(
int64_t
));
*
(
int64_t
*
)
pObj
->
id
=
taosAddRef
(
clientConnRefPool
,
pObj
);
*
(
int64_t
*
)
pObj
->
id
=
taosAddRef
(
clientConnRefPool
,
pObj
);
pObj
->
schemalessType
=
1
;
tscDebug
(
"connObj created, 0x%"
PRIx64
,
*
(
int64_t
*
)
pObj
->
id
);
tscDebug
(
"connObj created, 0x%"
PRIx64
,
*
(
int64_t
*
)
pObj
->
id
);
return
pObj
;
}
...
...
@@ -325,7 +326,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return
0
;
}
SConfig
*
pCfg
=
taosGetCfg
();
SConfig
*
pCfg
=
taosGetCfg
();
SConfigItem
*
pItem
=
NULL
;
switch
(
option
)
{
...
...
source/libs/transport/src/transCli.c
浏览文件 @
0d904d5a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
...
...
@@ -54,7 +54,7 @@ typedef struct SCliMsg {
int
sent
;
//(0: no send, 1: alread sent)
}
SCliMsg
;
typedef
struct
SCliThrd
Obj
{
typedef
struct
SCliThrd
{
TdThread
thread
;
// tid
int64_t
pid
;
// pid
uv_loop_t
*
loop
;
...
...
@@ -72,13 +72,13 @@ typedef struct SCliThrdObj {
SCvtAddr
cvtAddr
;
bool
quit
;
}
SCliThrd
Obj
;
}
SCliThrd
;
typedef
struct
SCliObj
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SCliThrd
Obj
**
pThreadObj
;
SCliThrd
**
pThreadObj
;
}
SCliObj
;
typedef
struct
SConnList
{
...
...
@@ -106,7 +106,7 @@ static void cliAsyncCb(uv_async_t* handle);
static
int
cliAppCb
(
SCliConn
*
pConn
,
STransMsg
*
pResp
,
SCliMsg
*
pMsg
);
static
SCliConn
*
cliCreateConn
(
SCliThrd
Obj
*
thrd
);
static
SCliConn
*
cliCreateConn
(
SCliThrd
*
thrd
);
static
void
cliDestroyConn
(
SCliConn
*
pConn
,
bool
clear
/*clear tcp handle or not*/
);
static
void
cliDestroy
(
uv_handle_t
*
handle
);
static
void
cliSend
(
SCliConn
*
pConn
);
...
...
@@ -122,14 +122,14 @@ static void cliHandleResp(SCliConn* conn);
static
void
cliHandleExcept
(
SCliConn
*
conn
);
// handle req from app
static
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
);
static
void
cliHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
);
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
);
static
void
cliHandleUpdate
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
);
static
void
(
*
cliAsyncHandle
[])(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
)
=
{
cliHandleReq
,
cliHandleQuit
,
cliHandleRelease
,
NULL
,
cliHandleUpdate
};
static
void
cliSendQuit
(
SCliThrd
Obj
*
thrd
);
static
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
);
static
void
cliHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
);
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
);
static
void
cliHandleUpdate
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
);
static
void
(
*
cliAsyncHandle
[])(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
=
{
cliHandleReq
,
cliHandleQuit
,
cliHandleRelease
,
NULL
,
cliHandleUpdate
};
static
void
cliSendQuit
(
SCliThrd
*
thrd
);
static
void
destroyUserdata
(
STransMsg
*
userdata
);
static
int
cliRBChoseIdx
(
STrans
*
pTransInst
);
...
...
@@ -137,8 +137,8 @@ static int cliRBChoseIdx(STrans* pTransInst);
static
void
destroyCmsg
(
SCliMsg
*
cmsg
);
static
void
transDestroyConnCtx
(
STransConnCtx
*
ctx
);
// thread obj
static
SCliThrd
Obj
*
createThrdObj
();
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
);
static
SCliThrd
*
createThrdObj
();
static
void
destroyThrdObj
(
SCliThrd
*
pThrd
);
static
void
cliWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
...
...
@@ -174,12 +174,12 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
idx = -1; \
} else { \
ASYNC_CHECK_HANDLE((exh), refId); \
pThrd = (SCliThrd
Obj*)(exh)->pThrd;
\
pThrd = (SCliThrd
*)(exh)->pThrd;
\
} \
} while (0)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd
Obj
*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
...
...
@@ -195,7 +195,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \
if (status != ConnInPool) { \
addConnToPool(((SCliThrd
Obj*)conn->hostThrd)->pool, conn);
\
addConnToPool(((SCliThrd
*)conn->hostThrd)->pool, conn);
\
} \
transRemoveExHandle(refMgt, conn->refId); \
return; \
...
...
@@ -279,7 +279,7 @@ _RETURN:
return
false
;
}
void
cliHandleResp
(
SCliConn
*
conn
)
{
SCliThrd
Obj
*
pThrd
=
conn
->
hostThrd
;
SCliThrd
*
pThrd
=
conn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)(
conn
->
readBuf
.
buf
);
...
...
@@ -379,7 +379,7 @@ void cliHandleExcept(SCliConn* pConn) {
return
;
}
}
SCliThrd
Obj
*
pThrd
=
pConn
->
hostThrd
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
bool
once
=
false
;
do
{
...
...
@@ -424,7 +424,7 @@ void cliHandleExcept(SCliConn* pConn) {
}
void
cliTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrd
Obj
*
pThrd
=
handle
->
data
;
SCliThrd
*
pThrd
=
handle
->
data
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
tTrace
(
"%s conn timeout, try to remove expire conn from conn pool"
,
pTransInst
->
label
);
...
...
@@ -501,7 +501,7 @@ static void allocConnRef(SCliConn* conn, bool update) {
conn
->
refId
=
exh
->
refId
;
}
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
)
{
SCliThrd
Obj
*
thrd
=
conn
->
hostThrd
;
SCliThrd
*
thrd
=
conn
->
hostThrd
;
CONN_HANDLE_THREAD_QUIT
(
thrd
);
allocConnRef
(
conn
,
true
);
...
...
@@ -562,7 +562,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
}
}
static
SCliConn
*
cliCreateConn
(
SCliThrd
Obj
*
pThrd
)
{
static
SCliConn
*
cliCreateConn
(
SCliThrd
*
pThrd
)
{
SCliConn
*
conn
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliConn
));
// read/write stream handle
conn
->
stream
=
(
uv_stream_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_tcp_t
));
...
...
@@ -615,7 +615,7 @@ static bool cliHandleNoResp(SCliConn* conn) {
}
if
(
res
==
true
)
{
if
(
cliMaySendCachedMsg
(
conn
)
==
false
)
{
SCliThrd
Obj
*
thrd
=
conn
->
hostThrd
;
SCliThrd
*
thrd
=
conn
->
hostThrd
;
addConnToPool
(
thrd
->
pool
,
conn
);
}
}
...
...
@@ -651,7 +651,7 @@ void cliSend(SCliConn* pConn) {
STransConnCtx
*
pCtx
=
pCliMsg
->
ctx
;
SCliThrd
Obj
*
pThrd
=
pConn
->
hostThrd
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransMsg
*
pMsg
=
(
STransMsg
*
)(
&
pCliMsg
->
msg
);
...
...
@@ -709,7 +709,7 @@ void cliConnCb(uv_connect_t* req, int status) {
cliSend
(
pConn
);
}
static
void
cliHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
)
{
static
void
cliHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
tDebug
(
"cli work thread %p start to quit"
,
pThrd
);
destroyCmsg
(
pMsg
);
destroyConnPool
(
pThrd
->
pool
);
...
...
@@ -720,7 +720,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
// uv_stop(pThrd->loop);
}
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
)
{
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
SCliConn
*
conn
=
pMsg
->
msg
.
info
.
handle
;
tDebug
(
"%s conn %p start to release to inst"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
...
...
@@ -735,39 +735,30 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
transUnrefCliHandle
(
conn
);
}
}
static
void
cliHandleUpdate
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
)
{
static
void
cliHandleUpdate
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
pThrd
->
cvtAddr
=
pCtx
->
cvtAddr
;
destroyCmsg
(
pMsg
);
}
SCliConn
*
cliGetConn
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
)
{
SCliConn
*
cliGetConn
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
SCliConn
*
conn
=
NULL
;
// SExHandleWrap* exWrap = &pMsg->msg.info.handle;
// if (exWrap != NULL) {
//}
// SExHandle* exh = transAcquireExHandle(refMgt, exWrap->refId);
// if (exh == NULL) {
// if (pInfo->refId != 0) {
// tTrace("%s conn %p ignore msg", CONN_GET_INST_LABEL(conn), conn);
// assert(0);
// return NULL;
// }
//} else {
// transReleaseExHandle(refMgt, pInfo->refId);
// return exh->handle;
//}
int64_t
refId
=
(
int64_t
)(
pMsg
->
msg
.
info
.
handle
);
if
(
refId
!=
0
)
{
SExHandle
*
exh
=
transAcquireExHandle
(
refMgt
,
refId
);
if
(
exh
==
NULL
)
{
assert
(
0
);
}
else
{
conn
=
exh
->
handle
;
transReleaseExHandle
(
refMgt
,
refId
);
}
return
conn
;
};
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
conn
=
getConnFromPool
(
pThrd
->
pool
,
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
),
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
));
if
(
conn
!=
NULL
)
{
SExHandle
*
exh
=
taosMemoryCalloc
(
1
,
sizeof
(
SExHandle
));
exh
->
handle
=
conn
;
exh
->
pThrd
=
pThrd
;
exh
->
refId
=
transAddExHandle
(
refMgt
,
exh
);
conn
->
refId
=
exh
->
refId
;
tTrace
(
"%s conn %p get from conn pool"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
else
{
tTrace
(
"%s not found conn in conn pool %p"
,
((
STrans
*
)
pThrd
->
pTransInst
)
->
label
,
pThrd
->
pool
);
...
...
@@ -785,7 +776,7 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
}
}
}
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrd
Obj
*
pThrd
)
{
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
...
...
@@ -834,7 +825,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
static
void
cliAsyncCb
(
uv_async_t
*
handle
)
{
SAsyncItem
*
item
=
handle
->
data
;
SCliThrd
Obj
*
pThrd
=
item
->
pThrd
;
SCliThrd
*
pThrd
=
item
->
pThrd
;
SCliMsg
*
pMsg
=
NULL
;
// batch process to avoid to lock/unlock frequently
...
...
@@ -861,7 +852,7 @@ static void cliAsyncCb(uv_async_t* handle) {
}
static
void
*
cliWorkThread
(
void
*
arg
)
{
SCliThrd
Obj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
SCliThrd
*
pThrd
=
(
SCliThrd
*
)
arg
;
pThrd
->
pid
=
taosGetSelfPthreadId
();
setThreadName
(
"trans-cli-work"
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
...
...
@@ -874,10 +865,10 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
STrans
*
pTransInst
=
shandle
;
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SCliThrd
Obj
**
)
taosMemoryCalloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
cli
->
pThreadObj
=
(
SCliThrd
**
)
taosMemoryCalloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrd
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrd
Obj
*
pThrd
=
createThrdObj
();
SCliThrd
*
pThrd
=
createThrdObj
();
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pTransInst
->
idleTime
);
pThrd
->
pTransInst
=
shandle
;
...
...
@@ -911,8 +902,8 @@ static void destroyCmsg(SCliMsg* pMsg) {
taosMemoryFree
(
pMsg
);
}
static
SCliThrd
Obj
*
createThrdObj
()
{
SCliThrd
Obj
*
pThrd
=
(
SCliThrdObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SCliThrdObj
));
static
SCliThrd
*
createThrdObj
()
{
SCliThrd
*
pThrd
=
(
SCliThrd
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SCliThrd
));
QUEUE_INIT
(
&
pThrd
->
msg
);
taosThreadMutexInit
(
&
pThrd
->
msgMtx
,
NULL
);
...
...
@@ -930,7 +921,7 @@ static SCliThrdObj* createThrdObj() {
pThrd
->
quit
=
false
;
return
pThrd
;
}
static
void
destroyThrdObj
(
SCliThrd
Obj
*
pThrd
)
{
static
void
destroyThrdObj
(
SCliThrd
*
pThrd
)
{
if
(
pThrd
==
NULL
)
{
return
;
}
...
...
@@ -951,7 +942,7 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
taosMemoryFree
(
ctx
);
}
void
cliSendQuit
(
SCliThrd
Obj
*
thrd
)
{
void
cliSendQuit
(
SCliThrd
*
thrd
)
{
// cli can stop gracefully
SCliMsg
*
msg
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliMsg
));
msg
->
type
=
Quit
;
...
...
@@ -974,13 +965,14 @@ static void doDelayTask(void* param) {
STaskArg
*
arg
=
param
;
SCliMsg
*
pMsg
=
arg
->
param1
;
SCliThrd
Obj
*
pThrd
=
arg
->
param2
;
SCliThrd
*
pThrd
=
arg
->
param2
;
cliHandleReq
(
pMsg
,
pThrd
);
taosMemoryFree
(
arg
);
}
int
cliAppCb
(
SCliConn
*
pConn
,
STransMsg
*
pResp
,
SCliMsg
*
pMsg
)
{
SCliThrd
Obj
*
pThrd
=
pConn
->
hostThrd
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
if
(
pMsg
==
NULL
||
pMsg
->
ctx
==
NULL
)
{
...
...
@@ -995,9 +987,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if
(
pCtx
->
retryCount
==
0
)
{
pCtx
->
origEpSet
=
pCtx
->
epSet
;
}
/*
* upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL
*/
if
(
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
tmsg_t
msgType
=
pCtx
->
msgType
;
if
((
pTransInst
->
retry
!=
NULL
&&
pEpSet
->
numOfEps
>
1
&&
(
pTransInst
->
retry
(
pResp
->
code
)))
||
(
pResp
->
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
pResp
->
code
==
TSDB_CODE_APP_NOT_READY
||
...
...
@@ -1048,6 +1042,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
return
-
1
;
}
}
}
STraceId
*
trace
=
&
pResp
->
info
.
traceId
;
if
(
pCtx
->
pSem
!=
NULL
)
{
...
...
@@ -1101,8 +1096,8 @@ void transUnrefCliHandle(void* handle) {
cliDestroyConn
((
SCliConn
*
)
handle
,
true
);
}
}
SCliThrd
Obj
*
transGetWorkThrdFromHandle
(
int64_t
handle
)
{
SCliThrd
Obj
*
pThrd
=
NULL
;
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
)
{
SCliThrd
*
pThrd
=
NULL
;
SExHandle
*
exh
=
transAcquireExHandle
(
refMgt
,
handle
);
if
(
exh
==
NULL
)
{
return
NULL
;
...
...
@@ -1111,17 +1106,16 @@ SCliThrdObj* transGetWorkThrdFromHandle(int64_t handle) {
transReleaseExHandle
(
refMgt
,
handle
);
return
pThrd
;
}
SCliThrdObj
*
transGetWorkThrd
(
STrans
*
trans
,
int64_t
handle
)
{
int
idx
=
-
1
;
SCliThrd
*
transGetWorkThrd
(
STrans
*
trans
,
int64_t
handle
)
{
if
(
handle
==
0
)
{
idx
=
cliRBChoseIdx
(
trans
);
i
nt
i
dx
=
cliRBChoseIdx
(
trans
);
return
((
SCliObj
*
)
trans
->
tcphandle
)
->
pThreadObj
[
idx
];
}
return
transGetWorkThrdFromHandle
(
handle
);
}
void
transReleaseCliHandle
(
void
*
handle
)
{
int
idx
=
-
1
;
SCliThrd
Obj
*
pThrd
=
transGetWorkThrdFromHandle
((
int64_t
)
handle
);
SCliThrd
*
pThrd
=
transGetWorkThrdFromHandle
((
int64_t
)
handle
);
if
(
pThrd
==
NULL
)
{
return
;
}
...
...
@@ -1137,7 +1131,7 @@ void transReleaseCliHandle(void* handle) {
void
transSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
STransMsg
*
pReq
,
STransCtx
*
ctx
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
SCliThrd
Obj
*
pThrd
=
transGetWorkThrd
(
pTransInst
,
(
int64_t
)
pReq
->
info
.
handle
);
SCliThrd
*
pThrd
=
transGetWorkThrd
(
pTransInst
,
(
int64_t
)
pReq
->
info
.
handle
);
if
(
pThrd
==
NULL
)
{
transFreeMsg
(
pReq
->
pCont
);
return
;
...
...
@@ -1170,7 +1164,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
void
transSendRecv
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
SCliThrd
Obj
*
pThrd
=
transGetWorkThrd
(
pTransInst
,
(
int64_t
)
pReq
->
info
.
handle
);
SCliThrd
*
pThrd
=
transGetWorkThrd
(
pTransInst
,
(
int64_t
)
pReq
->
info
.
handle
);
if
(
pThrd
==
NULL
)
{
transFreeMsg
(
pReq
->
pCont
);
return
;
...
...
@@ -1224,7 +1218,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
cliMsg
->
ctx
=
pCtx
;
cliMsg
->
type
=
Update
;
SCliThrd
Obj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
];
SCliThrd
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
i
];
tDebug
(
"%s update epset at thread:%08"
PRId64
""
,
pTransInst
->
label
,
thrd
->
pid
);
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
0d904d5a
...
...
@@ -65,7 +65,7 @@ typedef struct SSvrMsg {
STransMsgType
type
;
}
SSvrMsg
;
typedef
struct
SWorkThrd
Obj
{
typedef
struct
SWorkThrd
{
TdThread
thread
;
uv_connect_t
connect_req
;
uv_pipe_t
*
pipe
;
...
...
@@ -78,7 +78,7 @@ typedef struct SWorkThrdObj {
queue
conn
;
void
*
pTransInst
;
bool
quit
;
}
SWorkThrd
Obj
;
}
SWorkThrd
;
typedef
struct
SServerObj
{
TdThread
thread
;
...
...
@@ -89,7 +89,7 @@ typedef struct SServerObj {
int
workerIdx
;
int
numOfThreads
;
int
numOfWorkerReady
;
SWorkThrd
Obj
**
pThreadObj
;
SWorkThrd
**
pThreadObj
;
uv_pipe_t
pipeListen
;
uv_pipe_t
**
pipe
;
...
...
@@ -135,11 +135,11 @@ static void destroyConnRegArg(SSvrConn* conn);
static
int
reallocConnRef
(
SSvrConn
*
conn
);
static
void
uvHandleQuit
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
);
static
void
uvHandleRelease
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
);
static
void
uvHandleResp
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
);
static
void
uvHandleRegister
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
);
static
void
(
*
transAsyncHandle
[])(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
)
=
{
uvHandleResp
,
uvHandleQuit
,
uvHandleRelease
,
static
void
uvHandleQuit
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
);
static
void
uvHandleRelease
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
);
static
void
uvHandleResp
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
);
static
void
uvHandleRegister
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
);
static
void
(
*
transAsyncHandle
[])(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
)
=
{
uvHandleResp
,
uvHandleQuit
,
uvHandleRelease
,
uvHandleRegister
,
NULL
};
static
int32_t
exHandlesMgt
;
...
...
@@ -160,7 +160,7 @@ static void* transWorkerThread(void* arg);
static
void
*
transAcceptThread
(
void
*
arg
);
// add handle loop
static
bool
addHandleToWorkloop
(
SWorkThrd
Obj
*
pThrd
,
char
*
pipeName
);
static
bool
addHandleToWorkloop
(
SWorkThrd
*
pThrd
,
char
*
pipeName
);
static
bool
addHandleToAcceptloop
(
void
*
arg
);
#define CONN_SHOULD_RELEASE(conn, head) \
...
...
@@ -233,7 +233,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// wreq->data = pConn;
// uv_read_stop((uv_stream_t*)pConn->pTcp);
// transRefSrvHandle(pConn);
// uv_queue_work(((SWorkThrd
Obj
*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
CONN_SHOULD_RELEASE
(
pConn
,
pHead
);
...
...
@@ -478,7 +478,7 @@ static void destroySmsg(SSvrMsg* smsg) {
transFreeMsg
(
smsg
->
msg
.
pCont
);
taosMemoryFree
(
smsg
);
}
static
void
destroyAllConn
(
SWorkThrd
Obj
*
pThrd
)
{
static
void
destroyAllConn
(
SWorkThrd
*
pThrd
)
{
tTrace
(
"thread %p destroy all conn "
,
pThrd
);
while
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
pThrd
->
conn
);
...
...
@@ -494,7 +494,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
}
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
)
{
SAsyncItem
*
item
=
handle
->
data
;
SWorkThrd
Obj
*
pThrd
=
item
->
pThrd
;
SWorkThrd
*
pThrd
=
item
->
pThrd
;
SSvrConn
*
conn
=
NULL
;
queue
wq
;
...
...
@@ -624,7 +624,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert
(
buf
->
base
[
0
]
==
notify
[
0
]);
taosMemoryFree
(
buf
->
base
);
SWorkThrd
Obj
*
pThrd
=
q
->
data
;
SWorkThrd
*
pThrd
=
q
->
data
;
uv_pipe_t
*
pipe
=
(
uv_pipe_t
*
)
q
;
if
(
!
uv_pipe_pending_count
(
pipe
))
{
...
...
@@ -692,10 +692,10 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
if
(
status
!=
0
)
{
return
;
}
SWorkThrd
Obj
*
pThrd
=
container_of
(
connect
,
SWorkThrdObj
,
connect_req
);
SWorkThrd
*
pThrd
=
container_of
(
connect
,
SWorkThrd
,
connect_req
);
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
}
static
bool
addHandleToWorkloop
(
SWorkThrd
Obj
*
pThrd
,
char
*
pipeName
)
{
static
bool
addHandleToWorkloop
(
SWorkThrd
*
pThrd
,
char
*
pipeName
)
{
pThrd
->
loop
=
(
uv_loop_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_loop_t
));
if
(
0
!=
uv_loop_init
(
pThrd
->
loop
))
{
return
false
;
...
...
@@ -748,14 +748,14 @@ static bool addHandleToAcceptloop(void* arg) {
}
void
*
transWorkerThread
(
void
*
arg
)
{
setThreadName
(
"trans-worker"
);
SWorkThrd
Obj
*
pThrd
=
(
SWorkThrdObj
*
)
arg
;
SWorkThrd
*
pThrd
=
(
SWorkThrd
*
)
arg
;
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
return
NULL
;
}
static
SSvrConn
*
createConn
(
void
*
hThrd
)
{
SWorkThrd
Obj
*
pThrd
=
hThrd
;
SWorkThrd
*
pThrd
=
hThrd
;
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
QUEUE_INIT
(
&
pConn
->
queue
);
...
...
@@ -818,7 +818,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
if
(
conn
==
NULL
)
{
return
;
}
SWorkThrd
Obj
*
thrd
=
conn
->
hostThrd
;
SWorkThrd
*
thrd
=
conn
->
hostThrd
;
transReleaseExHandle
(
refMgt
,
conn
->
refId
);
transRemoveExHandle
(
refMgt
,
conn
->
refId
);
...
...
@@ -863,7 +863,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
numOfThreads
=
numOfThreads
;
srv
->
workerIdx
=
0
;
srv
->
numOfWorkerReady
=
0
;
srv
->
pThreadObj
=
(
SWorkThrd
Obj
**
)
taosMemoryCalloc
(
srv
->
numOfThreads
,
sizeof
(
SWorkThrdObj
*
));
srv
->
pThreadObj
=
(
SWorkThrd
**
)
taosMemoryCalloc
(
srv
->
numOfThreads
,
sizeof
(
SWorkThrd
*
));
srv
->
pipe
=
(
uv_pipe_t
**
)
taosMemoryCalloc
(
srv
->
numOfThreads
,
sizeof
(
uv_pipe_t
*
));
srv
->
ip
=
ip
;
srv
->
port
=
port
;
...
...
@@ -888,7 +888,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
assert
(
0
==
uv_listen
((
uv_stream_t
*
)
&
srv
->
pipeListen
,
SOMAXCONN
,
uvPipeListenCb
));
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SWorkThrd
Obj
*
thrd
=
(
SWorkThrdObj
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SWorkThrdObj
));
SWorkThrd
*
thrd
=
(
SWorkThrd
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SWorkThrd
));
thrd
->
pTransInst
=
shandle
;
thrd
->
quit
=
false
;
srv
->
pThreadObj
[
i
]
=
thrd
;
...
...
@@ -933,7 +933,7 @@ End:
return
NULL
;
}
void
uvHandleQuit
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
)
{
void
uvHandleQuit
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
)
{
thrd
->
quit
=
true
;
if
(
QUEUE_IS_EMPTY
(
&
thrd
->
conn
))
{
uv_walk
(
thrd
->
loop
,
uvWalkCb
,
NULL
);
...
...
@@ -942,7 +942,7 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
}
taosMemoryFree
(
msg
);
}
void
uvHandleRelease
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
)
{
void
uvHandleRelease
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
)
{
SSvrConn
*
conn
=
msg
->
pConn
;
if
(
conn
->
status
==
ConnAcquire
)
{
reallocConnRef
(
conn
);
...
...
@@ -956,12 +956,12 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
}
destroySmsg
(
msg
);
}
void
uvHandleResp
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
)
{
void
uvHandleResp
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
)
{
// send msg to client
tDebug
(
"%s conn %p start to send resp (2/2)"
,
transLabel
(
thrd
->
pTransInst
),
msg
->
pConn
);
uvStartSendResp
(
msg
);
}
void
uvHandleRegister
(
SSvrMsg
*
msg
,
SWorkThrd
Obj
*
thrd
)
{
void
uvHandleRegister
(
SSvrMsg
*
msg
,
SWorkThrd
*
thrd
)
{
SSvrConn
*
conn
=
msg
->
pConn
;
tDebug
(
"%s conn %p register brokenlink callback"
,
transLabel
(
thrd
->
pTransInst
),
conn
);
if
(
conn
->
status
==
ConnAcquire
)
{
...
...
@@ -982,7 +982,7 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
taosMemoryFree
(
msg
);
}
}
void
destroyWorkThrd
(
SWorkThrd
Obj
*
pThrd
)
{
void
destroyWorkThrd
(
SWorkThrd
*
pThrd
)
{
if
(
pThrd
==
NULL
)
{
return
;
}
...
...
@@ -993,7 +993,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
}
void
sendQuitToWorkThrd
(
SWorkThrd
Obj
*
pThrd
)
{
void
sendQuitToWorkThrd
(
SWorkThrd
*
pThrd
)
{
SSvrMsg
*
msg
=
taosMemoryCalloc
(
1
,
sizeof
(
SSvrMsg
));
msg
->
type
=
Quit
;
tDebug
(
"server send quit msg to work thread"
);
...
...
@@ -1060,7 +1060,7 @@ void transReleaseSrvHandle(void* handle) {
ASYNC_CHECK_HANDLE
(
exh
,
refId
);
SWorkThrd
Obj
*
pThrd
=
exh
->
pThrd
;
SWorkThrd
*
pThrd
=
exh
->
pThrd
;
ASYNC_ERR_JRET
(
pThrd
);
STransMsg
tmsg
=
{.
code
=
0
,
.
info
.
handle
=
exh
,
.
info
.
ahandle
=
NULL
,
.
info
.
refId
=
refId
};
...
...
@@ -1090,7 +1090,7 @@ void transSendResponse(const STransMsg* msg) {
STransMsg
tmsg
=
*
msg
;
tmsg
.
info
.
refId
=
refId
;
SWorkThrd
Obj
*
pThrd
=
exh
->
pThrd
;
SWorkThrd
*
pThrd
=
exh
->
pThrd
;
ASYNC_ERR_JRET
(
pThrd
);
SSvrMsg
*
m
=
taosMemoryCalloc
(
1
,
sizeof
(
SSvrMsg
));
...
...
@@ -1120,7 +1120,7 @@ void transRegisterMsg(const STransMsg* msg) {
STransMsg
tmsg
=
*
msg
;
tmsg
.
info
.
refId
=
refId
;
SWorkThrd
Obj
*
pThrd
=
exh
->
pThrd
;
SWorkThrd
*
pThrd
=
exh
->
pThrd
;
ASYNC_ERR_JRET
(
pThrd
);
SSvrMsg
*
m
=
taosMemoryCalloc
(
1
,
sizeof
(
SSvrMsg
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录