Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bda24a01
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
bda24a01
编写于
2月 18, 2023
作者:
H
Haojun Liao
提交者:
GitHub
2月 18, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19992 from taosdata/fix/main
enh: handle too many session
上级
085936e6
a9b6d564
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
552 addition
and
94 deletion
+552
-94
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+6
-1
include/util/taoserror.h
include/util/taoserror.h
+4
-0
include/util/tdef.h
include/util/tdef.h
+3
-3
source/common/src/tglobal.c
source/common/src/tglobal.c
+7
-9
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+10
-1
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+2
-2
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+5
-0
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+4
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+493
-61
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+2
-6
source/libs/transport/test/cliBench.c
source/libs/transport/test/cliBench.c
+14
-10
source/util/src/terror.c
source/util/src/terror.c
+1
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-1
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
bda24a01
...
...
@@ -112,7 +112,12 @@ typedef struct SRpcInit {
// fail fast fp
RpcFFfp
ffp
;
void
*
parent
;
int32_t
connLimitNum
;
int32_t
connLimitLock
;
int8_t
supportBatch
;
// 0: no batch, 1. batch
int32_t
batchSize
;
void
*
parent
;
}
SRpcInit
;
typedef
struct
{
...
...
include/util/taoserror.h
浏览文件 @
bda24a01
...
...
@@ -67,6 +67,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
//common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //
...
...
include/util/tdef.h
浏览文件 @
bda24a01
...
...
@@ -281,8 +281,8 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_MAX_REPLICA
5
#define TSDB_SYNC_LOG_BUFFER_SIZE
4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4)
#define TSDB_TBNAME_COLUMN_INDEX (-1)
...
...
@@ -413,7 +413,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else
#define TSDB_MAX_RPC_THREADS
2
0
#define TSDB_MAX_RPC_THREADS
1
0
#endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
...
...
source/common/src/tglobal.c
浏览文件 @
bda24a01
...
...
@@ -76,11 +76,11 @@ bool tsEnableTelem = true;
int32_t
tsTelemInterval
=
43200
;
char
tsTelemServer
[
TSDB_FQDN_LEN
]
=
"telemetry.taosdata.com"
;
uint16_t
tsTelemPort
=
80
;
char
*
tsTelemUri
=
"/report"
;
char
*
tsTelemUri
=
"/report"
;
bool
tsEnableCrashReport
=
true
;
char
*
tsClientCrashReportUri
=
"/ccrashreport"
;
char
*
tsSvrCrashReportUri
=
"/dcrashreport"
;
bool
tsEnableCrashReport
=
true
;
char
*
tsClientCrashReportUri
=
"/ccrashreport"
;
char
*
tsSvrCrashReportUri
=
"/dcrashreport"
;
// schemaless
char
tsSmlTagName
[
TSDB_COL_NAME_LEN
]
=
"_tag_null"
;
...
...
@@ -212,9 +212,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
int32_t
taosSetTfsCfg
(
SConfig
*
pCfg
);
#endif
struct
SConfig
*
taosGetCfg
()
{
return
tsCfg
;
}
struct
SConfig
*
taosGetCfg
()
{
return
tsCfg
;
}
static
int32_t
taosLoadCfg
(
SConfig
*
pCfg
,
const
char
**
envCmd
,
const
char
*
inputCfgDir
,
const
char
*
envFile
,
char
*
apolloUrl
)
{
...
...
@@ -391,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"queryRspPolicy"
,
tsQueryRspPolicy
,
0
,
1
,
0
)
!=
0
)
return
-
1
;
tsNumOfRpcThreads
=
tsNumOfCores
/
2
;
tsNumOfRpcThreads
=
TRANGE
(
tsNumOfRpcThreads
,
1
,
TSDB_MAX_RPC_THREADS
);
tsNumOfRpcThreads
=
TRANGE
(
tsNumOfRpcThreads
,
2
,
TSDB_MAX_RPC_THREADS
);
if
(
cfgAddInt32
(
pCfg
,
"numOfRpcThreads"
,
tsNumOfRpcThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfCommitThreads
=
tsNumOfCores
/
2
;
...
...
@@ -501,7 +499,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem
=
cfgGetItem
(
tsCfg
,
"numOfRpcThreads"
);
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
tsNumOfRpcThreads
=
numOfCores
/
2
;
tsNumOfRpcThreads
=
TRANGE
(
tsNumOfRpcThreads
,
1
,
4
);
tsNumOfRpcThreads
=
TRANGE
(
tsNumOfRpcThreads
,
2
,
TSDB_MAX_RPC_THREADS
);
pItem
->
i32
=
tsNumOfRpcThreads
;
pItem
->
stype
=
stype
;
}
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
bda24a01
...
...
@@ -280,10 +280,19 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit
.
retryMaxInterval
=
tsRedirectMaxPeriod
;
rpcInit
.
retryMaxTimouet
=
tsMaxRetryWaitTime
;
rpcInit
.
failFastInterval
=
1
000
;
// interval threshold(ms)
rpcInit
.
failFastInterval
=
5
000
;
// interval threshold(ms)
rpcInit
.
failFastThreshold
=
3
;
// failed threshold
rpcInit
.
ffp
=
dmFailFastFp
;
int32_t
connLimitNum
=
10000
/
(
tsNumOfRpcThreads
*
3
);
connLimitNum
=
TMAX
(
connLimitNum
,
100
);
connLimitNum
=
TMIN
(
connLimitNum
,
500
);
rpcInit
.
connLimitNum
=
connLimitNum
;
rpcInit
.
connLimitLock
=
1
;
rpcInit
.
supportBatch
=
1
;
rpcInit
.
batchSize
=
16
*
1024
;
pTrans
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pTrans
->
clientRpc
==
NULL
)
{
dError
(
"failed to init dnode rpc client"
);
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
bda24a01
...
...
@@ -94,8 +94,8 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
//
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
//
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
bda24a01
...
...
@@ -64,6 +64,11 @@ typedef struct {
void
(
*
destroyFp
)(
void
*
ahandle
);
bool
(
*
failFastFp
)(
tmsg_t
msgType
);
int32_t
connLimitNum
;
int8_t
connLimitLock
;
// 0: no lock. 1. lock
int8_t
supportBatch
;
// 0: no batch, 1: support batch
int32_t
batchSize
;
int
index
;
void
*
parent
;
void
*
tcphandle
;
// returned handle from TCP initialization
...
...
source/libs/transport/src/trans.c
浏览文件 @
bda24a01
...
...
@@ -67,6 +67,10 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc
->
startTimer
=
pInit
->
tfp
;
pRpc
->
destroyFp
=
pInit
->
dfp
;
pRpc
->
failFastFp
=
pInit
->
ffp
;
pRpc
->
connLimitNum
=
pInit
->
connLimitNum
;
pRpc
->
connLimitLock
=
pInit
->
connLimitLock
;
pRpc
->
supportBatch
=
pInit
->
supportBatch
;
pRpc
->
batchSize
=
pInit
->
batchSize
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
if
(
pRpc
->
numOfThreads
<=
0
)
{
...
...
source/libs/transport/src/transCli.c
浏览文件 @
bda24a01
...
...
@@ -12,7 +12,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SConnList
{
...
...
@@ -20,6 +19,30 @@ typedef struct SConnList {
int32_t
size
;
}
SConnList
;
typedef
struct
{
queue
wq
;
int32_t
len
;
int
connMax
;
int
connCnt
;
int
batchLenLimit
;
int
sending
;
char
*
dst
;
char
*
ip
;
uint16_t
port
;
}
SCliBatchList
;
typedef
struct
{
queue
wq
;
queue
listq
;
int32_t
wLen
;
int32_t
batchSize
;
//
int32_t
batch
;
SCliBatchList
*
pList
;
}
SCliBatch
;
typedef
struct
SCliConn
{
T_REF_DECLARE
()
uv_connect_t
connReq
;
...
...
@@ -40,9 +63,10 @@ typedef struct SCliConn {
bool
broken
;
// link broken or not
ConnStatus
status
;
//
int64_t
refId
;
char
*
ip
;
uint32_t
port
;
SCliBatch
*
pBatch
;
int64_t
refId
;
char
*
ip
;
SDelayTask
*
task
;
...
...
@@ -80,11 +104,14 @@ typedef struct SCliThrd {
uint64_t
nextTimeout
;
// next timeout
void
*
pTransInst
;
//
int
connCount
;
void
(
*
destroyAhandleFp
)(
void
*
ahandle
);
SHashObj
*
fqdn2ipCache
;
SCvtAddr
cvtAddr
;
SHashObj
*
failFastCache
;
SHashObj
*
connLimitCache
;
SHashObj
*
batchCache
;
SCliMsg
*
stopMsg
;
...
...
@@ -131,6 +158,11 @@ static void cliAsyncCb(uv_async_t* handle);
static
void
cliIdleCb
(
uv_idle_t
*
handle
);
static
void
cliPrepareCb
(
uv_prepare_t
*
handle
);
static
void
cliHandleBatchReq
(
SCliBatch
*
pBatch
,
SCliThrd
*
pThrd
);
static
void
cliSendBatchCb
(
uv_write_t
*
req
,
int
status
);
SCliBatch
*
cliGetHeadFromList
(
SCliBatchList
*
pList
);
static
bool
cliRecvReleaseReq
(
SCliConn
*
conn
,
STransMsgHead
*
pHead
);
static
int32_t
allocConnRef
(
SCliConn
*
conn
,
bool
update
);
...
...
@@ -141,8 +173,11 @@ static SCliConn* cliCreateConn(SCliThrd* thrd);
static
void
cliDestroyConn
(
SCliConn
*
pConn
,
bool
clear
/*clear tcp handle or not*/
);
static
void
cliDestroy
(
uv_handle_t
*
handle
);
static
void
cliSend
(
SCliConn
*
pConn
);
static
void
cliSendBatch
(
SCliConn
*
pConn
);
static
void
cliDestroyConnMsgs
(
SCliConn
*
conn
,
bool
destroy
);
static
int32_t
cliPreCheckSessionLimit
(
SCliThrd
*
pThrd
,
char
*
ip
,
uint16_t
port
);
// cli util func
static
FORCE_INLINE
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
);
static
FORCE_INLINE
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
);
...
...
@@ -157,6 +192,7 @@ static void cliHandleResp(SCliConn* conn);
// handle except about conn
static
void
cliHandleExcept
(
SCliConn
*
conn
);
static
void
cliReleaseUnfinishedMsg
(
SCliConn
*
conn
);
static
void
cliHandleFastFail
(
SCliConn
*
pConn
,
int
status
);
// handle req from app
static
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
);
...
...
@@ -165,6 +201,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
static
void
cliHandleUpdate
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
);
static
void
(
*
cliAsyncHandle
[])(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
=
{
cliHandleReq
,
cliHandleQuit
,
cliHandleRelease
,
NULL
,
cliHandleUpdate
};
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
/// NULL,cliHandleUpdate};
static
FORCE_INLINE
void
destroyUserdata
(
STransMsg
*
userdata
);
static
FORCE_INLINE
void
destroyCmsg
(
void
*
cmsg
);
...
...
@@ -285,6 +323,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
}
destroyCmsg
(
msg
);
}
transQueueClear
(
&
conn
->
cliMsgs
);
memset
(
&
conn
->
ctx
,
0
,
sizeof
(
conn
->
ctx
));
}
bool
cliMaySendCachedMsg
(
SCliConn
*
conn
)
{
...
...
@@ -487,9 +526,9 @@ void cliConnTimeout(uv_timer_t* handle) {
uv_timer_stop
(
handle
);
handle
->
data
=
NULL
;
taosArrayPush
(
pThrd
->
timerList
,
&
conn
->
timer
);
conn
->
timer
=
NULL
;
cliHandleExceptImpl
(
conn
,
-
1
);
cliHandleFastFail
(
conn
,
UV_ECANCELED
);
}
void
cliReadTimeoutCb
(
uv_timer_t
*
handle
)
{
// set up timeout cb
...
...
@@ -569,17 +608,15 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn
->
status
=
ConnInPool
;
if
(
conn
->
list
==
NULL
)
{
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
conn
->
ip
,
conn
->
port
);
tTrace
(
"%s conn %p added to conn pool, read buf cap:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
conn
->
readBuf
.
cap
);
conn
->
list
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
conn
->
list
=
taosHashGet
((
SHashObj
*
)
pool
,
conn
->
ip
,
strlen
(
conn
->
ip
));
}
else
{
tTrace
(
"%s conn %p added to conn pool, read buf cap:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
conn
->
readBuf
.
cap
);
}
QUEUE_PUSH
(
&
conn
->
list
->
conns
,
&
conn
->
q
);
conn
->
list
->
size
+=
1
;
if
(
conn
->
list
->
size
>=
50
)
{
if
(
conn
->
list
->
size
>=
2
50
)
{
STaskArg
*
arg
=
taosMemoryCalloc
(
1
,
sizeof
(
STaskArg
));
arg
->
param1
=
conn
;
arg
->
param2
=
thrd
;
...
...
@@ -671,7 +708,6 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn
->
stream
=
(
uv_stream_t
*
)
taosMemoryMalloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
stream
->
data
=
conn
;
// transSetConnOption((uv_tcp_t*)conn->stream);
uv_timer_t
*
timer
=
taosArrayGetSize
(
pThrd
->
timerList
)
>
0
?
*
(
uv_timer_t
**
)
taosArrayPop
(
pThrd
->
timerList
)
:
NULL
;
if
(
timer
==
NULL
)
{
...
...
@@ -694,6 +730,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
conn
->
broken
=
0
;
transRefCliHandle
(
conn
);
atomic_add_fetch_32
(
&
pThrd
->
connCount
,
1
);
allocConnRef
(
conn
,
false
);
return
conn
;
...
...
@@ -737,6 +774,11 @@ static void cliDestroy(uv_handle_t* handle) {
conn
->
timer
->
data
=
NULL
;
conn
->
timer
=
NULL
;
}
int32_t
*
oVal
=
taosHashGet
(
pThrd
->
connLimitCache
,
conn
->
ip
,
strlen
(
conn
->
ip
));
int32_t
nVal
=
oVal
==
NULL
?
0
:
(
*
oVal
)
-
1
;
taosHashPut
(
pThrd
->
connLimitCache
,
conn
->
ip
,
strlen
(
conn
->
ip
),
&
nVal
,
sizeof
(
nVal
));
atomic_sub_fetch_32
(
&
pThrd
->
connCount
,
1
);
transReleaseExHandle
(
transGetRefMgt
(),
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
...
...
@@ -748,6 +790,7 @@ static void cliDestroy(uv_handle_t* handle) {
tTrace
(
"%s conn %p destroy successfully"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transReqQueueClear
(
&
conn
->
wreqQueue
);
transDestroyBuffer
(
&
conn
->
readBuf
);
taosMemoryFree
(
conn
);
}
static
bool
cliHandleNoResp
(
SCliConn
*
conn
)
{
...
...
@@ -798,7 +841,65 @@ static void cliSendCb(uv_write_t* req, int status) {
}
uv_read_start
((
uv_stream_t
*
)
pConn
->
stream
,
cliAllocRecvBufferCb
,
cliRecvCb
);
}
void
cliSendBatch
(
SCliConn
*
pConn
)
{
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliBatch
*
pBatch
=
pConn
->
pBatch
;
SCliBatchList
*
pList
=
pBatch
->
pList
;
pList
->
connCnt
+=
1
;
int32_t
wLen
=
pBatch
->
wLen
;
uv_buf_t
*
wb
=
taosMemoryCalloc
(
wLen
,
sizeof
(
uv_buf_t
));
int
i
=
0
;
queue
*
h
=
NULL
;
QUEUE_FOREACH
(
h
,
&
pBatch
->
wq
)
{
SCliMsg
*
pCliMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
STransConnCtx
*
pCtx
=
pCliMsg
->
ctx
;
STransMsg
*
pMsg
=
(
STransMsg
*
)(
&
pCliMsg
->
msg
);
if
(
pMsg
->
pCont
==
0
)
{
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
pMsg
->
contLen
=
0
;
}
int
msgLen
=
transMsgLenFromCont
(
pMsg
->
contLen
);
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
if
(
pHead
->
comp
==
0
)
{
pHead
->
ahandle
=
pCtx
!=
NULL
?
(
uint64_t
)
pCtx
->
ahandle
:
0
;
pHead
->
noResp
=
REQUEST_NO_RESP
(
pMsg
)
?
1
:
0
;
pHead
->
persist
=
REQUEST_PERSIS_HANDLE
(
pMsg
)
?
1
:
0
;
pHead
->
msgType
=
pMsg
->
msgType
;
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
pHead
->
release
=
REQUEST_RELEASE_HANDLE
(
pCliMsg
)
?
1
:
0
;
memcpy
(
pHead
->
user
,
pTransInst
->
user
,
strlen
(
pTransInst
->
user
));
pHead
->
traceId
=
pMsg
->
info
.
traceId
;
pHead
->
magicNum
=
htonl
(
TRANS_MAGIC_NUM
);
}
pHead
->
timestamp
=
taosHton64
(
taosGetTimestampUs
());
if
(
pHead
->
comp
==
0
)
{
if
(
pTransInst
->
compressSize
!=
-
1
&&
pTransInst
->
compressSize
<
pMsg
->
contLen
)
{
msgLen
=
transCompressMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
)
+
sizeof
(
STransMsgHead
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
}
}
else
{
msgLen
=
(
int32_t
)
ntohl
((
uint32_t
)(
pHead
->
msgLen
));
}
wb
[
i
++
]
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
}
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
req
->
data
=
pConn
;
tDebug
(
"%s conn %p start to send batch msg, batch size:%d, msgLen:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
pBatch
->
wLen
,
pBatch
->
batchSize
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
wb
,
wLen
,
cliSendBatchCb
);
taosMemoryFree
(
wb
);
}
void
cliSend
(
SCliConn
*
pConn
)
{
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
...
...
@@ -883,31 +984,137 @@ _RETURN:
return
;
}
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
static
void
cliDestroyBatch
(
SCliBatch
*
pBatch
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
pBatch
->
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
&
pBatch
->
wq
);
QUEUE_REMOVE
(
h
);
if
(
pConn
->
timer
!=
NULL
)
{
uv_timer_stop
(
pConn
->
timer
);
pConn
->
timer
->
data
=
NULL
;
taosArrayPush
(
pThrd
->
timerList
,
&
pConn
->
timer
);
pConn
->
timer
=
NULL
;
SCliMsg
*
p
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
destroyCmsg
(
p
);
}
SCliBatchList
*
p
=
pBatch
->
pList
;
p
->
sending
-=
1
;
taosMemoryFree
(
pBatch
);
}
static
void
cliHandleBatchReq
(
SCliBatch
*
pBatch
,
SCliThrd
*
pThrd
)
{
if
(
pBatch
==
NULL
||
pBatch
->
wLen
==
0
||
QUEUE_IS_EMPTY
(
&
pBatch
->
wq
))
{
return
;
}
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliBatchList
*
pList
=
pBatch
->
pList
;
SCliConn
*
conn
=
getConnFromPool
(
pThrd
->
pool
,
pList
->
ip
,
pList
->
port
);
if
(
conn
==
NULL
&&
0
!=
cliPreCheckSessionLimit
(
pThrd
,
pList
->
ip
,
pList
->
port
))
{
tError
(
"%s failed to send batch msg, batch size:%d, msgLen: %d"
,
pTransInst
->
label
,
pBatch
->
wLen
,
pBatch
->
batchSize
);
cliDestroyBatch
(
pBatch
);
return
;
}
if
(
conn
==
NULL
)
{
conn
=
cliCreateConn
(
pThrd
);
conn
->
pBatch
=
pBatch
;
conn
->
ip
=
strdup
(
pList
->
dst
);
uint32_t
ipaddr
=
cliGetIpFromFqdnCache
(
pThrd
->
fqdn2ipCache
,
pList
->
ip
);
if
(
ipaddr
==
0xffffffff
)
{
uv_timer_stop
(
conn
->
timer
);
conn
->
timer
->
data
=
NULL
;
taosArrayPush
(
pThrd
->
timerList
,
&
conn
->
timer
);
conn
->
timer
=
NULL
;
cliHandleFastFail
(
conn
,
-
1
);
return
;
}
struct
sockaddr_in
addr
;
addr
.
sin_family
=
AF_INET
;
addr
.
sin_addr
.
s_addr
=
ipaddr
;
addr
.
sin_port
=
(
uint16_t
)
htons
(
pList
->
port
);
tTrace
(
"%s conn %p try to connect to %s"
,
pTransInst
->
label
,
conn
,
pList
->
dst
);
int32_t
fd
=
taosCreateSocketWithTimeout
(
TRANS_CONN_TIMEOUT
*
4
);
if
(
fd
==
-
1
)
{
tError
(
"%s conn %p failed to create socket, reason:%s"
,
transLabel
(
pTransInst
),
conn
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
cliHandleFastFail
(
conn
,
-
1
);
return
;
}
int
ret
=
uv_tcp_open
((
uv_tcp_t
*
)
conn
->
stream
,
fd
);
if
(
ret
!=
0
)
{
tError
(
"%s conn %p failed to set stream, reason:%s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
ret
));
cliHandleFastFail
(
conn
,
-
1
);
return
;
}
ret
=
transSetConnOption
((
uv_tcp_t
*
)
conn
->
stream
);
if
(
ret
!=
0
)
{
tError
(
"%s conn %p failed to set socket opt, reason:%s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
ret
));
cliHandleFastFail
(
conn
,
-
1
);
return
;
}
ret
=
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
cliConnCb
);
if
(
ret
!=
0
)
{
uv_timer_stop
(
conn
->
timer
);
conn
->
timer
->
data
=
NULL
;
taosArrayPush
(
pThrd
->
timerList
,
&
conn
->
timer
);
conn
->
timer
=
NULL
;
cliHandleFastFail
(
conn
,
-
1
);
return
;
}
uv_timer_start
(
conn
->
timer
,
cliConnTimeout
,
TRANS_CONN_TIMEOUT
,
0
);
return
;
}
conn
->
pBatch
=
pBatch
;
cliSendBatch
(
conn
);
}
static
void
cliSendBatchCb
(
uv_write_t
*
req
,
int
status
)
{
SCliConn
*
conn
=
req
->
data
;
SCliThrd
*
thrd
=
conn
->
hostThrd
;
SCliBatch
*
p
=
conn
->
pBatch
;
SCliBatchList
*
pBatchList
=
p
->
pList
;
SCliBatch
*
nxtBatch
=
cliGetHeadFromList
(
pBatchList
);
pBatchList
->
connCnt
-=
1
;
conn
->
pBatch
=
NULL
;
if
(
status
!=
0
)
{
tDebug
(
"%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
p
->
wLen
,
p
->
batchSize
,
uv_err_name
(
status
));
cliHandleExcept
(
conn
);
cliHandleBatchReq
(
nxtBatch
,
thrd
);
}
else
{
tDebug
(
"%s conn %p succ to send batch msg, batch size:%d, msgLen:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
p
->
wLen
,
p
->
batchSize
);
if
(
nxtBatch
!=
NULL
)
{
conn
->
pBatch
=
nxtBatch
;
cliSendBatch
(
conn
);
}
else
{
addConnToPool
(
thrd
->
pool
,
conn
);
}
}
cliDestroyBatch
(
p
);
taosMemoryFree
(
req
);
}
static
void
cliHandleFastFail
(
SCliConn
*
pConn
,
int
status
)
{
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
if
(
status
==
-
1
)
status
=
ENETUNREACH
;
if
(
pConn
->
pBatch
==
NULL
)
{
SCliMsg
*
pMsg
=
transQueueGet
(
&
pConn
->
cliMsgs
,
0
);
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
tError
(
"%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s"
,
CONN_GET_INST_LABEL
(
pConn
),
pMsg
?
TMSG_INFO
(
pMsg
->
msg
.
msgType
)
:
0
,
pConn
,
pConn
->
ip
,
pConn
->
port
,
uv_strerror
(
status
));
STraceId
*
trace
=
&
pMsg
->
msg
.
info
.
traceId
;
tGError
(
"%s msg %s failed to send, conn %p failed to connect to %s, reason: %s"
,
CONN_GET_INST_LABEL
(
pConn
),
TMSG_INFO
(
pMsg
->
msg
.
msgType
),
pConn
,
pConn
->
ip
,
uv_strerror
(
status
));
if
(
pMsg
!=
NULL
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
)
&&
(
pTransInst
->
failFastFp
!=
NULL
&&
pTransInst
->
failFastFp
(
pMsg
->
msg
.
msgType
)))
{
char
*
ip
=
pConn
->
ip
;
uint32_t
port
=
pConn
->
port
;
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
key
,
strlen
(
key
));
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
));
int64_t
cTimestamp
=
taosGetTimestampMs
();
if
(
item
!=
NULL
)
{
int32_t
elapse
=
cTimestamp
-
item
->
timestamp
;
...
...
@@ -919,15 +1126,47 @@ void cliConnCb(uv_connect_t* req, int status) {
}
}
else
{
SFailFastItem
item
=
{.
count
=
1
,
.
timestamp
=
cTimestamp
};
taosHashPut
(
pThrd
->
failFastCache
,
key
,
strlen
(
key
),
&
item
,
sizeof
(
SFailFastItem
));
taosHashPut
(
pThrd
->
failFastCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
),
&
item
,
sizeof
(
SFailFastItem
));
}
}
cliHandleExcept
(
pConn
);
}
else
{
tError
(
"%s batch msg failed to send, conn %p failed to connect to %s, reason: %s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
pConn
->
ip
,
uv_strerror
(
status
));
cliDestroyBatch
(
pConn
->
pBatch
);
pConn
->
pBatch
=
NULL
;
}
cliHandleExcept
(
pConn
);
}
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
bool
timeout
=
false
;
if
(
pConn
->
timer
==
NULL
)
{
timeout
=
true
;
}
else
{
uv_timer_stop
(
pConn
->
timer
);
pConn
->
timer
->
data
=
NULL
;
taosArrayPush
(
pThrd
->
timerList
,
&
pConn
->
timer
);
pConn
->
timer
=
NULL
;
}
if
(
status
!=
0
)
{
if
(
timeout
==
false
)
{
cliHandleFastFail
(
pConn
,
status
);
}
else
if
(
timeout
==
true
)
{
// already deal by timeout
}
return
;
}
struct
sockaddr
peername
,
sockname
;
int
addrlen
=
sizeof
(
peername
);
int32_t
*
oVal
=
taosHashGet
(
pThrd
->
connLimitCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
));
int32_t
nVal
=
oVal
==
NULL
?
0
:
(
*
oVal
)
+
1
;
taosHashPut
(
pThrd
->
connLimitCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
),
&
nVal
,
sizeof
(
nVal
));
struct
sockaddr
peername
,
sockname
;
int
addrlen
=
sizeof
(
peername
);
uv_tcp_getpeername
((
uv_tcp_t
*
)
pConn
->
stream
,
&
peername
,
&
addrlen
);
transSockInfo2Str
(
&
peername
,
pConn
->
dst
);
...
...
@@ -936,8 +1175,11 @@ void cliConnCb(uv_connect_t* req, int status) {
transSockInfo2Str
(
&
sockname
,
pConn
->
src
);
tTrace
(
"%s conn %p connect to server successfully"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
cliSend
(
pConn
);
if
(
pConn
->
pBatch
!=
NULL
)
{
cliSendBatch
(
pConn
);
}
else
{
cliSend
(
pConn
);
}
}
static
void
cliHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
...
...
@@ -1062,12 +1304,32 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
return
;
}
static
int32_t
cliPreCheckSessionLimit
(
SCliThrd
*
pThrd
,
char
*
ip
,
uint16_t
port
)
{
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
// STransConnCtx* pCtx = pMsg->ctx;
// char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
// int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
int32_t
*
val
=
taosHashGet
(
pThrd
->
connLimitCache
,
key
,
strlen
(
key
));
if
(
val
==
NULL
)
return
0
;
if
(
*
val
>=
pTransInst
->
connLimitNum
)
{
return
-
1
;
}
return
0
;
}
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
{
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
cliMayCvtFqdnToIp
(
&
pCtx
->
epSet
,
&
pThrd
->
cvtAddr
);
STraceId
*
trace
=
&
pMsg
->
msg
.
info
.
traceId
;
char
*
ip
=
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
);
uint16_t
port
=
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
);
if
(
!
EPSET_IS_VALID
(
&
pCtx
->
epSet
))
{
tGError
(
"%s, msg %s sent with invalid epset"
,
pTransInst
->
label
,
TMSG_INFO
(
pMsg
->
msg
.
msgType
));
...
...
@@ -1076,16 +1338,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
}
if
(
REQUEST_NO_RESP
(
&
pMsg
->
msg
)
&&
(
pTransInst
->
failFastFp
!=
NULL
&&
pTransInst
->
failFastFp
(
pMsg
->
msg
.
msgType
)))
{
char
*
ip
=
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
);
uint32_t
port
=
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
);
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
key
,
strlen
(
key
));
if
(
item
!=
NULL
)
{
int32_t
elapse
=
(
int32_t
)(
taosGetTimestampMs
()
-
item
->
timestamp
);
if
(
item
->
count
>=
pTransInst
->
failFastThreshold
&&
(
elapse
>=
0
&&
elapse
<=
pTransInst
->
failFastInterval
))
{
STraceId
*
trace
=
&
(
pMsg
->
msg
.
info
.
traceId
);
tGTrace
(
"%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d"
,
pTransInst
->
label
,
TMSG_INFO
(
pMsg
->
msg
.
msgType
),
ip
,
port
,
item
->
count
,
elapse
);
destroyCmsg
(
pMsg
);
...
...
@@ -1107,6 +1366,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return
;
}
if
(
conn
==
NULL
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
)
&&
0
!=
cliPreCheckSessionLimit
(
pThrd
,
ip
,
port
))
{
tGTrace
(
"%s, msg %s cancel to send, reason: %s"
,
pTransInst
->
label
,
TMSG_INFO
(
pMsg
->
msg
.
msgType
),
tstrerror
(
TSDB_CODE_RPC_MAX_SESSIONS
));
destroyCmsg
(
pMsg
);
return
;
}
if
(
conn
!=
NULL
)
{
transCtxMerge
(
&
conn
->
ctx
,
&
pCtx
->
appCtx
);
transQueuePush
(
&
conn
->
cliMsgs
,
pMsg
);
...
...
@@ -1120,10 +1386,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
transCtxMerge
(
&
conn
->
ctx
,
&
pCtx
->
appCtx
);
transQueuePush
(
&
conn
->
cliMsgs
,
pMsg
);
conn
->
ip
=
strdup
(
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
));
conn
->
port
=
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
);
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
char
*
fqdn
=
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
);
uint16_t
port
=
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
);
CONN_CONSTRUCT_HASH_KEY
(
key
,
fqdn
,
port
);
conn
->
ip
=
strdup
(
key
);
uint32_t
ipaddr
=
cliGetIpFromFqdnCache
(
pThrd
->
fqdn2ipCache
,
conn
->
ip
);
uint32_t
ipaddr
=
cliGetIpFromFqdnCache
(
pThrd
->
fqdn2ipCache
,
fqdn
);
if
(
ipaddr
==
0xffffffff
)
{
uv_timer_stop
(
conn
->
timer
);
conn
->
timer
->
data
=
NULL
;
...
...
@@ -1137,9 +1407,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
struct
sockaddr_in
addr
;
addr
.
sin_family
=
AF_INET
;
addr
.
sin_addr
.
s_addr
=
ipaddr
;
addr
.
sin_port
=
(
uint16_t
)
htons
(
(
uint16_t
)
conn
->
port
);
addr
.
sin_port
=
(
uint16_t
)
htons
(
port
);
tGTrace
(
"%s conn %p try to connect to %s
:%d"
,
pTransInst
->
label
,
conn
,
conn
->
ip
,
conn
->
port
);
tGTrace
(
"%s conn %p try to connect to %s
"
,
pTransInst
->
label
,
conn
,
conn
->
ip
);
int32_t
fd
=
taosCreateSocketWithTimeout
(
TRANS_CONN_TIMEOUT
*
4
);
if
(
fd
==
-
1
)
{
tGError
(
"%s conn %p failed to create socket, reason:%s"
,
transLabel
(
pTransInst
),
conn
,
...
...
@@ -1163,45 +1433,168 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
ret
=
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
cliConnCb
);
if
(
ret
!=
0
)
{
tGError
(
"%s conn %p failed to connect to %s:%d, reason:%s"
,
pTransInst
->
label
,
conn
,
conn
->
ip
,
conn
->
port
,
uv_err_name
(
ret
));
uv_timer_stop
(
conn
->
timer
);
conn
->
timer
->
data
=
NULL
;
taosArrayPush
(
pThrd
->
timerList
,
&
conn
->
timer
);
conn
->
timer
=
NULL
;
cliHandle
Except
(
conn
);
cliHandle
FastFail
(
conn
,
ret
);
return
;
}
uv_timer_start
(
conn
->
timer
,
cliConnTimeout
,
TRANS_CONN_TIMEOUT
,
0
);
}
tGTrace
(
"%s conn %p ready"
,
pTransInst
->
label
,
conn
);
}
static
void
cliAsyncCb
(
uv_async_t
*
handle
)
{
SAsyncItem
*
item
=
handle
->
data
;
SCliThrd
*
pThrd
=
item
->
pThrd
;
SCliMsg
*
pMsg
=
NULL
;
// batch process to avoid to lock/unlock frequently
queue
wq
;
taosThreadMutexLock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
taosThreadMutexUnlock
(
&
item
->
mtx
);
static
void
cliNoBatchDealReq
(
queue
*
wq
,
SCliThrd
*
pThrd
)
{
int
count
=
0
;
while
(
!
QUEUE_IS_EMPTY
(
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
wq
);
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
(
*
cliAsyncHandle
[
pMsg
->
type
])(
pMsg
,
pThrd
);
count
++
;
}
if
(
count
>=
2
)
{
tTrace
(
"cli process batch size:%d"
,
count
);
}
}
SCliBatch
*
cliGetHeadFromList
(
SCliBatchList
*
pList
)
{
if
(
QUEUE_IS_EMPTY
(
&
pList
->
wq
)
||
pList
->
connCnt
>
pList
->
connMax
||
pList
->
sending
>
pList
->
connMax
)
{
return
NULL
;
}
queue
*
hr
=
QUEUE_HEAD
(
&
pList
->
wq
);
QUEUE_REMOVE
(
hr
);
pList
->
sending
+=
1
;
pList
->
len
-=
1
;
SCliBatch
*
batch
=
QUEUE_DATA
(
hr
,
SCliBatch
,
listq
);
return
batch
;
}
static
void
cliBatchDealReq
(
queue
*
wq
,
SCliThrd
*
pThrd
)
{
STrans
*
pInst
=
pThrd
->
pTransInst
;
int
count
=
0
;
while
(
!
QUEUE_IS_EMPTY
(
&
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
&
wq
);
while
(
!
QUEUE_IS_EMPTY
(
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
wq
);
QUEUE_REMOVE
(
h
);
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
if
(
pMsg
->
type
==
Normal
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
))
{
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
char
*
ip
=
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
);
uint32_t
port
=
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
);
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
// SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
SCliBatchList
**
ppBatchList
=
taosHashGet
(
pThrd
->
batchCache
,
key
,
sizeof
(
key
));
if
(
ppBatchList
==
NULL
||
*
ppBatchList
==
NULL
)
{
SCliBatchList
*
pBatchList
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliBatchList
));
QUEUE_INIT
(
&
pBatchList
->
wq
);
pBatchList
->
connMax
=
pInst
->
connLimitNum
;
pBatchList
->
connCnt
=
0
;
pBatchList
->
batchLenLimit
=
pInst
->
batchSize
;
pBatchList
->
len
+=
1
;
pBatchList
->
ip
=
strdup
(
ip
);
pBatchList
->
dst
=
strdup
(
key
);
pBatchList
->
port
=
port
;
SCliBatch
*
pBatch
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliBatch
));
QUEUE_INIT
(
&
pBatch
->
wq
);
QUEUE_INIT
(
&
pBatch
->
listq
);
QUEUE_PUSH
(
&
pBatch
->
wq
,
h
);
pBatch
->
wLen
+=
1
;
pBatch
->
batchSize
+=
pMsg
->
msg
.
contLen
;
pBatch
->
pList
=
pBatchList
;
QUEUE_PUSH
(
&
pBatchList
->
wq
,
&
pBatch
->
listq
);
taosHashPut
(
pThrd
->
batchCache
,
key
,
sizeof
(
key
),
&
pBatchList
,
sizeof
(
void
*
));
}
else
{
if
(
QUEUE_IS_EMPTY
(
&
(
*
ppBatchList
)
->
wq
))
{
SCliBatch
*
pBatch
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliBatch
));
QUEUE_INIT
(
&
pBatch
->
wq
);
QUEUE_INIT
(
&
pBatch
->
listq
);
QUEUE_PUSH
(
&
pBatch
->
wq
,
h
);
pBatch
->
wLen
+=
1
;
pBatch
->
batchSize
=
pMsg
->
msg
.
contLen
;
pBatch
->
pList
=
*
ppBatchList
;
QUEUE_PUSH
(
&
((
*
ppBatchList
)
->
wq
),
&
pBatch
->
listq
);
(
*
ppBatchList
)
->
len
+=
1
;
continue
;
}
queue
*
hdr
=
QUEUE_TAIL
(
&
((
*
ppBatchList
)
->
wq
));
SCliBatch
*
pBatch
=
QUEUE_DATA
(
hdr
,
SCliBatch
,
listq
);
if
((
pBatch
->
batchSize
+
pMsg
->
msg
.
contLen
)
<
(
*
ppBatchList
)
->
batchLenLimit
)
{
QUEUE_PUSH
(
&
pBatch
->
wq
,
h
);
pBatch
->
batchSize
+=
pMsg
->
msg
.
contLen
;
pBatch
->
wLen
+=
1
;
}
else
{
SCliBatch
*
pBatch
=
taosMemoryCalloc
(
1
,
sizeof
(
SCliBatch
));
QUEUE_INIT
(
&
pBatch
->
wq
);
QUEUE_INIT
(
&
pBatch
->
listq
);
QUEUE_PUSH
(
&
pBatch
->
wq
,
h
);
pBatch
->
wLen
+=
1
;
pBatch
->
batchSize
+=
pMsg
->
msg
.
contLen
;
pBatch
->
pList
=
*
ppBatchList
;
QUEUE_PUSH
(
&
((
*
ppBatchList
)
->
wq
),
&
pBatch
->
listq
);
(
*
ppBatchList
)
->
len
+=
1
;
}
}
continue
;
}
(
*
cliAsyncHandle
[
pMsg
->
type
])(
pMsg
,
pThrd
);
count
++
;
}
void
**
pIter
=
taosHashIterate
(
pThrd
->
batchCache
,
NULL
);
while
(
pIter
!=
NULL
)
{
SCliBatchList
*
batchList
=
(
SCliBatchList
*
)(
*
pIter
);
SCliBatch
*
batch
=
cliGetHeadFromList
(
batchList
);
if
(
batch
!=
NULL
)
{
cliHandleBatchReq
(
batch
,
pThrd
);
}
pIter
=
(
void
**
)
taosHashIterate
(
pThrd
->
batchCache
,
pIter
);
}
if
(
count
>=
2
)
{
tTrace
(
"cli process batch size:%d"
,
count
);
}
// if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb);
}
static
void
cliAsyncCb
(
uv_async_t
*
handle
)
{
SAsyncItem
*
item
=
handle
->
data
;
SCliThrd
*
pThrd
=
item
->
pThrd
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliMsg
*
pMsg
=
NULL
;
// batch process to avoid to lock/unlock frequently
queue
wq
;
taosThreadMutexLock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
taosThreadMutexUnlock
(
&
item
->
mtx
);
int8_t
supportBatch
=
pTransInst
->
supportBatch
;
if
(
supportBatch
==
0
)
{
cliNoBatchDealReq
(
&
wq
,
pThrd
);
}
else
if
(
supportBatch
==
1
)
{
cliBatchDealReq
(
&
wq
,
pThrd
);
}
if
(
pThrd
->
stopMsg
!=
NULL
)
cliHandleQuit
(
pThrd
->
stopMsg
,
pThrd
);
}
...
...
@@ -1380,7 +1773,11 @@ static SCliThrd* createThrdObj(void* trans) {
taosMemoryFree
(
pThrd
);
return
NULL
;
}
pThrd
->
asyncPool
=
transAsyncPoolCreate
(
pThrd
->
loop
,
8
,
pThrd
,
cliAsyncCb
);
if
(
pTransInst
->
supportBatch
)
{
pThrd
->
asyncPool
=
transAsyncPoolCreate
(
pThrd
->
loop
,
4
,
pThrd
,
cliAsyncCb
);
}
else
{
pThrd
->
asyncPool
=
transAsyncPoolCreate
(
pThrd
->
loop
,
8
,
pThrd
,
cliAsyncCb
);
}
if
(
pThrd
->
asyncPool
==
NULL
)
{
tError
(
"failed to init async pool"
);
uv_loop_close
(
pThrd
->
loop
);
...
...
@@ -1414,6 +1811,10 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd
->
destroyAhandleFp
=
pTransInst
->
destroyFp
;
pThrd
->
fqdn2ipCache
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pThrd
->
failFastCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pThrd
->
connLimitCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
pTransInst
->
connLimitLock
==
0
?
HASH_NO_LOCK
:
HASH_ENTRY_LOCK
);
pThrd
->
batchCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pThrd
->
quit
=
false
;
return
pThrd
;
...
...
@@ -1442,6 +1843,25 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree
(
pThrd
->
loop
);
taosHashCleanup
(
pThrd
->
fqdn2ipCache
);
taosHashCleanup
(
pThrd
->
failFastCache
);
taosHashCleanup
(
pThrd
->
connLimitCache
);
void
**
pIter
=
taosHashIterate
(
pThrd
->
batchCache
,
NULL
);
while
(
pIter
!=
NULL
)
{
SCliBatchList
*
pBatchList
=
(
SCliBatchList
*
)(
*
pIter
);
while
(
!
QUEUE_IS_EMPTY
(
&
pBatchList
->
wq
))
{
queue
*
h
=
QUEUE_HEAD
(
&
pBatchList
->
wq
);
QUEUE_REMOVE
(
h
);
SCliBatch
*
pBatch
=
QUEUE_DATA
(
h
,
SCliBatch
,
listq
);
cliDestroyBatch
(
pBatch
);
}
taosMemoryFree
(
pBatchList
->
ip
);
taosMemoryFree
(
pBatchList
->
dst
);
taosMemoryFree
(
pBatchList
);
pIter
=
(
void
**
)
taosHashIterate
(
pThrd
->
batchCache
,
pIter
);
}
taosHashCleanup
(
pThrd
->
batchCache
);
taosMemoryFree
(
pThrd
);
}
...
...
@@ -1865,6 +2285,19 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
TSDB_CODE_RPC_BROKEN_LINK
;
}
if
(
pTransInst
->
connLimitNum
>
0
&&
REQUEST_NO_RESP
(
pReq
))
{
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
char
*
ip
=
EPSET_GET_INUSE_IP
((
SEpSet
*
)
pEpSet
);
uint16_t
port
=
EPSET_GET_INUSE_PORT
((
SEpSet
*
)
pEpSet
);
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
int32_t
*
val
=
taosHashGet
(
pThrd
->
connLimitCache
,
key
,
strlen
(
key
));
if
(
val
!=
NULL
&&
*
val
>=
pTransInst
->
connLimitNum
)
{
transFreeMsg
(
pReq
->
pCont
);
transReleaseExHandle
(
transGetInstMgt
(),
(
int64_t
)
shandle
);
return
TSDB_CODE_RPC_MAX_SESSIONS
;
}
}
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
...
...
@@ -1989,4 +2422,3 @@ int64_t transAllocHandle() {
return
exh
->
refId
;
}
#endif
source/libs/transport/src/transSvr.c
浏览文件 @
bda24a01
...
...
@@ -12,8 +12,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
static
TdThreadOnce
transModuleInit
=
PTHREAD_ONCE_INIT
;
...
...
@@ -246,11 +244,11 @@ static bool uvHandleReq(SSvrConn* pConn) {
}
}
else
{
if
(
cost
>=
EXCEPTION_LIMIT_US
)
{
tGWarn
(
"%s conn %p %s received from %s, local info:%s, len:%d,
r
esp:%d, code:%d, cost:%dus, recv exception"
,
tGWarn
(
"%s conn %p %s received from %s, local info:%s, len:%d,
noR
esp:%d, code:%d, cost:%dus, recv exception"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
else
{
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d,
r
esp:%d, code:%d, cost:%dus"
,
tGDebug
(
"%s conn %p %s received from %s, local info:%s, len:%d,
noR
esp:%d, code:%d, cost:%dus"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
msgLen
,
pHead
->
noResp
,
transMsg
.
code
,
(
int
)(
cost
));
}
...
...
@@ -1347,5 +1345,3 @@ _return2:
}
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pConnInfo
)
{
return
-
1
;
}
#endif
source/libs/transport/test/cliBench.c
浏览文件 @
bda24a01
...
...
@@ -32,22 +32,21 @@ typedef struct {
void
*
pRpc
;
}
SInfo
;
void
initLogEnv
()
{
const
char
*
logDir
=
"/tmp/trans_cli"
;
const
char
*
defaultLogFileNamePrefix
=
"taoslog"
;
const
char
*
logDir
=
"/tmp/trans_cli"
;
const
char
*
defaultLogFileNamePrefix
=
"taoslog"
;
const
int32_t
maxLogFileNum
=
10000
;
tsAsyncLog
=
0
;
//idxDebugFlag = 143;
//
idxDebugFlag = 143;
strcpy
(
tsLogDir
,
(
char
*
)
logDir
);
taosRemoveDir
(
tsLogDir
);
taosMkDir
(
tsLogDir
);
taosMkDir
(
tsLogDir
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
}
}
static
void
processResponse
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
info
.
ahandle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
...
...
@@ -72,11 +71,12 @@ static void *sendRequest(void *param) {
rpcMsg
.
pCont
=
rpcMallocCont
(
pInfo
->
msgSize
);
rpcMsg
.
contLen
=
pInfo
->
msgSize
;
rpcMsg
.
info
.
ahandle
=
pInfo
;
rpcMsg
.
info
.
noResp
=
1
;
rpcMsg
.
msgType
=
1
;
tDebug
(
"thread:%d, send request, contLen:%d num:%d"
,
pInfo
->
index
,
pInfo
->
msgSize
,
pInfo
->
num
);
rpcSendRequest
(
pInfo
->
pRpc
,
&
pInfo
->
epSet
,
&
rpcMsg
,
NULL
);
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
tsem_wait
(
&
pInfo
->
rspSem
);
//
tsem_wait(&pInfo->rspSem);
}
tDebug
(
"thread:%d, it is over"
,
pInfo
->
index
);
...
...
@@ -112,7 +112,12 @@ int main(int argc, char *argv[]) {
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"michael"
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
connLimitNum
=
10
;
rpcInit
.
connLimitLock
=
1
;
rpcInit
.
batchSize
=
16
*
1024
;
rpcInit
.
supportBatch
=
1
;
rpcDebugFlag
=
135
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
...
...
@@ -148,7 +153,6 @@ int main(int argc, char *argv[]) {
exit
(
0
);
}
}
initLogEnv
();
...
...
source/util/src/terror.c
浏览文件 @
bda24a01
...
...
@@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_BROKEN_LINK
,
"Conn is broken"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_TIMEOUT
,
"Conn read timeout"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
,
"some vnode/qnode/mnode(s) out of service"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_MAX_SESSIONS
,
"rpc open too many session"
)
//common & util
TAOS_DEFINE_ERROR
(
TSDB_CODE_TIME_UNSYNCED
,
"Client and server's time is not synchronized"
)
...
...
tests/parallel_test/cases.task
浏览文件 @
bda24a01
...
...
@@ -301,7 +301,7 @@
,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim
,,y,script,./test.sh -f tsim/vnode/replica3_vgroup.sim
,,y,script,./test.sh -f tsim/vnode/replica3_many.sim
,,y,script,./test.sh -f tsim/vnode/replica3_import.sim
#
,,y,script,./test.sh -f tsim/vnode/replica3_import.sim
,,y,script,./test.sh -f tsim/vnode/stable_balance_replica1.sim
,,y,script,./test.sh -f tsim/vnode/stable_dnode2_stop.sim
,,y,script,./test.sh -f tsim/vnode/stable_dnode2.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录