Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
908aa37a
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
908aa37a
编写于
5月 18, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add new macro TSDB_EP_LEN
上级
499170d3
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
43 addition
and
428 deletion
+43
-428
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+2
-2
src/common/src/tglobal.c
src/common/src/tglobal.c
+7
-7
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+1
-1
src/inc/taosdef.h
src/inc/taosdef.h
+2
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+5
-5
src/mnode/inc/mgmtDef.h
src/mnode/inc/mgmtDef.h
+1
-1
src/plugins/monitor/src/monitorMain.c
src/plugins/monitor/src/monitorMain.c
+1
-1
src/rpc/inc/rpcHaship.h
src/rpc/inc/rpcHaship.h
+0
-33
src/rpc/src/rpcHaship.c
src/rpc/src/rpcHaship.c
+0
-167
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+24
-210
未找到文件。
src/client/src/tscUtil.c
浏览文件 @
908aa37a
...
...
@@ -2169,7 +2169,7 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
tscMgmtIpSet
.
inUse
=
0
;
if
(
first
&&
first
[
0
]
!=
0
)
{
if
(
strlen
(
first
)
>=
TSDB_
FQDN
_LEN
)
{
if
(
strlen
(
first
)
>=
TSDB_
EP
_LEN
)
{
terrno
=
TSDB_CODE_INVALID_FQDN
;
return
-
1
;
}
...
...
@@ -2178,7 +2178,7 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
}
if
(
second
&&
second
[
0
]
!=
0
)
{
if
(
strlen
(
second
)
>=
TSDB_
FQDN
_LEN
)
{
if
(
strlen
(
second
)
>=
TSDB_
EP
_LEN
)
{
terrno
=
TSDB_CODE_INVALID_FQDN
;
return
-
1
;
}
...
...
src/common/src/tglobal.c
浏览文件 @
908aa37a
...
...
@@ -61,10 +61,10 @@ int32_t tscEmbedded = 0;
*/
int64_t
tsMsPerDay
[]
=
{
86400000L
,
86400000000L
};
char
tsFirst
[
TSDB_
FQDN
_LEN
]
=
{
0
};
char
tsSecond
[
TSDB_
FQDN
_LEN
]
=
{
0
};
char
tsArbitrator
[
TSDB_
FQDN
_LEN
]
=
{
0
};
char
tsLocalEp
[
TSDB_
FQDN
_LEN
]
=
{
0
};
// Local End Point, hostname:port
char
tsFirst
[
TSDB_
EP
_LEN
]
=
{
0
};
char
tsSecond
[
TSDB_
EP
_LEN
]
=
{
0
};
char
tsArbitrator
[
TSDB_
EP
_LEN
]
=
{
0
};
char
tsLocalEp
[
TSDB_
EP
_LEN
]
=
{
0
};
// Local End Point, hostname:port
uint16_t
tsServerPort
=
6030
;
uint16_t
tsDnodeShellPort
=
6030
;
// udp[6035-6039] tcp[6035]
uint16_t
tsDnodeDnodePort
=
6035
;
// udp/tcp
...
...
@@ -284,7 +284,7 @@ static void doInitGlobalConfig() {
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
0
;
cfg
.
ptrLength
=
TSDB_
FQDN
_LEN
;
cfg
.
ptrLength
=
TSDB_
EP
_LEN
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
...
...
@@ -294,7 +294,7 @@ static void doInitGlobalConfig() {
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
0
;
cfg
.
ptrLength
=
TSDB_
FQDN
_LEN
;
cfg
.
ptrLength
=
TSDB_
EP
_LEN
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
...
...
@@ -356,7 +356,7 @@ static void doInitGlobalConfig() {
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_CLIENT
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
0
;
cfg
.
ptrLength
=
TSDB_
FQDN
_LEN
;
cfg
.
ptrLength
=
TSDB_
EP
_LEN
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
908aa37a
...
...
@@ -411,7 +411,7 @@ static bool dnodeReadMnodeInfos() {
dError
(
"failed to read mnode mgmtIpList.json, nodeName not found"
);
goto
PARSE_OVER
;
}
strncpy
(
tsMnodeInfos
.
nodeInfos
[
i
].
nodeEp
,
nodeEp
->
valuestring
,
TSDB_
FQDN
_LEN
);
strncpy
(
tsMnodeInfos
.
nodeInfos
[
i
].
nodeEp
,
nodeEp
->
valuestring
,
TSDB_
EP
_LEN
);
}
ret
=
true
;
...
...
src/inc/taosdef.h
浏览文件 @
908aa37a
...
...
@@ -218,7 +218,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 64
#define TSDB_FQDN_LEN 256
#define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_METER_VNODE_BITS 20
...
...
src/inc/taosmsg.h
浏览文件 @
908aa37a
...
...
@@ -530,7 +530,7 @@ typedef struct {
typedef
struct
{
int32_t
nodeId
;
char
nodeEp
[
TSDB_
FQDN
_LEN
];
char
nodeEp
[
TSDB_
EP
_LEN
];
}
SDMMnodeInfo
;
typedef
struct
{
...
...
@@ -542,7 +542,7 @@ typedef struct {
typedef
struct
{
uint32_t
version
;
int32_t
dnodeId
;
char
dnodeEp
[
TSDB_
FQDN
_LEN
];
char
dnodeEp
[
TSDB_
EP
_LEN
];
uint32_t
moduleStatus
;
uint32_t
lastReboot
;
// time stamp for last reboot
uint16_t
numOfTotalVnodes
;
// from config file
...
...
@@ -584,7 +584,7 @@ typedef struct {
typedef
struct
{
int32_t
nodeId
;
char
nodeEp
[
TSDB_
FQDN
_LEN
];
char
nodeEp
[
TSDB_
EP
_LEN
];
}
SMDVnodeDesc
;
typedef
struct
{
...
...
@@ -669,7 +669,7 @@ typedef struct SCMShowRsp {
}
SCMShowRsp
;
typedef
struct
{
char
ep
[
TSDB_
FQDN
_LEN
];
// end point, hostname:port
char
ep
[
TSDB_
EP
_LEN
];
// end point, hostname:port
}
SCMCreateDnodeMsg
,
SCMDropDnodeMsg
;
typedef
struct
{
...
...
@@ -684,7 +684,7 @@ typedef struct {
}
SDMConfigVnodeMsg
;
typedef
struct
{
char
ep
[
TSDB_
FQDN
_LEN
];
// end point, hostname:port
char
ep
[
TSDB_
EP
_LEN
];
// end point, hostname:port
char
config
[
64
];
}
SMDCfgDnodeMsg
,
SCMCfgDnodeMsg
;
...
...
src/mnode/inc/mgmtDef.h
浏览文件 @
908aa37a
...
...
@@ -33,7 +33,7 @@ typedef struct SDnodeObj {
int32_t
dnodeId
;
uint16_t
dnodePort
;
char
dnodeFqdn
[
TSDB_FQDN_LEN
+
1
];
char
dnodeEp
[
TSDB_
FQDN
_LEN
+
1
];
char
dnodeEp
[
TSDB_
EP
_LEN
+
1
];
int64_t
createdTime
;
uint32_t
lastAccess
;
int32_t
openVnodes
;
...
...
src/plugins/monitor/src/monitorMain.c
浏览文件 @
908aa37a
...
...
@@ -68,7 +68,7 @@ typedef enum {
typedef
struct
{
void
*
conn
;
void
*
timer
;
char
ep
[
TSDB_
FQDN
_LEN
];
char
ep
[
TSDB_
EP
_LEN
];
int8_t
cmdIndex
;
int8_t
state
;
char
sql
[
SQL_LENGTH
];
...
...
src/rpc/inc/rpcHaship.h
已删除
100644 → 0
浏览文件 @
499170d3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_hash_ip_header_
#define _rpc_hash_ip_header_
#ifdef __cplusplus
extern
"C"
{
#endif
void
*
rpcOpenIpHash
(
int
maxSessions
);
void
rpcCloseIpHash
(
void
*
handle
);
void
*
rpcAddIpHash
(
void
*
handle
,
void
*
pData
,
uint32_t
ip
,
uint16_t
port
);
void
rpcDeleteIpHash
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
);
void
*
rpcGetIpHash
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
);
#ifdef __cplusplus
}
#endif
#endif
src/rpc/src/rpcHaship.c
已删除
100644 → 0
浏览文件 @
499170d3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tmempool.h"
#include "rpcLog.h"
typedef
struct
SIpHash
{
uint32_t
ip
;
uint16_t
port
;
int
hash
;
struct
SIpHash
*
prev
;
struct
SIpHash
*
next
;
void
*
data
;
}
SIpHash
;
typedef
struct
{
SIpHash
**
ipHashList
;
mpool_h
ipHashMemPool
;
int
maxSessions
;
}
SHashObj
;
int
rpcHashIp
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
)
{
SHashObj
*
pObj
=
(
SHashObj
*
)
handle
;
int
hash
=
0
;
hash
=
(
int
)(
ip
>>
16
);
hash
+=
(
unsigned
short
)(
ip
&
0xFFFF
);
hash
+=
port
;
hash
=
hash
%
pObj
->
maxSessions
;
return
hash
;
}
void
*
rpcAddIpHash
(
void
*
handle
,
void
*
data
,
uint32_t
ip
,
uint16_t
port
)
{
int
hash
;
SIpHash
*
pNode
;
SHashObj
*
pObj
;
pObj
=
(
SHashObj
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
NULL
;
hash
=
rpcHashIp
(
pObj
,
ip
,
port
);
pNode
=
(
SIpHash
*
)
taosMemPoolMalloc
(
pObj
->
ipHashMemPool
);
pNode
->
ip
=
ip
;
pNode
->
port
=
port
;
pNode
->
data
=
data
;
pNode
->
prev
=
0
;
pNode
->
next
=
pObj
->
ipHashList
[
hash
];
pNode
->
hash
=
hash
;
if
(
pObj
->
ipHashList
[
hash
]
!=
0
)
(
pObj
->
ipHashList
[
hash
])
->
prev
=
pNode
;
pObj
->
ipHashList
[
hash
]
=
pNode
;
return
pObj
;
}
void
rpcDeleteIpHash
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
)
{
int
hash
;
SIpHash
*
pNode
;
SHashObj
*
pObj
;
pObj
=
(
SHashObj
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
hash
=
rpcHashIp
(
pObj
,
ip
,
port
);
pNode
=
pObj
->
ipHashList
[
hash
];
while
(
pNode
)
{
if
(
pNode
->
ip
==
ip
&&
pNode
->
port
==
port
)
break
;
pNode
=
pNode
->
next
;
}
if
(
pNode
)
{
if
(
pNode
->
prev
)
{
pNode
->
prev
->
next
=
pNode
->
next
;
}
else
{
pObj
->
ipHashList
[
hash
]
=
pNode
->
next
;
}
if
(
pNode
->
next
)
{
pNode
->
next
->
prev
=
pNode
->
prev
;
}
taosMemPoolFree
(
pObj
->
ipHashMemPool
,
(
char
*
)
pNode
);
}
}
void
*
rpcGetIpHash
(
void
*
handle
,
uint32_t
ip
,
uint16_t
port
)
{
int
hash
;
SIpHash
*
pNode
;
SHashObj
*
pObj
;
pObj
=
(
SHashObj
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
NULL
;
hash
=
rpcHashIp
(
pObj
,
ip
,
port
);
pNode
=
pObj
->
ipHashList
[
hash
];
while
(
pNode
)
{
if
(
pNode
->
ip
==
ip
&&
pNode
->
port
==
port
)
{
break
;
}
pNode
=
pNode
->
next
;
}
if
(
pNode
)
{
return
pNode
->
data
;
}
return
NULL
;
}
void
*
rpcOpenIpHash
(
int
maxSessions
)
{
SIpHash
**
ipHashList
;
mpool_h
ipHashMemPool
;
SHashObj
*
pObj
;
ipHashMemPool
=
taosMemPoolInit
(
maxSessions
,
sizeof
(
SIpHash
));
if
(
ipHashMemPool
==
0
)
return
NULL
;
ipHashList
=
calloc
(
sizeof
(
SIpHash
*
),
(
size_t
)
maxSessions
);
if
(
ipHashList
==
0
)
{
taosMemPoolCleanUp
(
ipHashMemPool
);
return
NULL
;
}
pObj
=
malloc
(
sizeof
(
SHashObj
));
if
(
pObj
==
NULL
)
{
taosMemPoolCleanUp
(
ipHashMemPool
);
free
(
ipHashList
);
return
NULL
;
}
pObj
->
maxSessions
=
maxSessions
;
pObj
->
ipHashMemPool
=
ipHashMemPool
;
pObj
->
ipHashList
=
ipHashList
;
return
pObj
;
}
void
rpcCloseIpHash
(
void
*
handle
)
{
SHashObj
*
pObj
;
pObj
=
(
SHashObj
*
)
handle
;
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
if
(
pObj
->
ipHashMemPool
)
taosMemPoolCleanUp
(
pObj
->
ipHashMemPool
);
if
(
pObj
->
ipHashList
)
free
(
pObj
->
ipHashList
);
memset
(
pObj
,
0
,
sizeof
(
SHashObj
));
free
(
pObj
);
}
src/rpc/src/rpcUdp.c
浏览文件 @
908aa37a
...
...
@@ -19,7 +19,6 @@
#include "ttimer.h"
#include "tutil.h"
#include "rpcLog.h"
#include "rpcHaship.h"
#include "rpcUdp.h"
#include "rpcHead.h"
...
...
@@ -28,8 +27,6 @@
#define RPC_UDP_BUF_TIME 5 // mseconds
#define RPC_MAX_UDP_SIZE 65480
int
tsUdpDelay
=
0
;
typedef
struct
{
void
*
signature
;
int
index
;
...
...
@@ -38,8 +35,6 @@ typedef struct {
uint16_t
localPort
;
// local port
char
label
[
12
];
// copy from udpConnSet;
pthread_t
thread
;
pthread_mutex_t
mutex
;
void
*
tmrCtrl
;
// copy from UdpConnSet;
void
*
hash
;
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
pSet
;
...
...
@@ -55,26 +50,11 @@ typedef struct {
void
*
shandle
;
// handle passed by upper layer during server initialization
int
threads
;
char
label
[
12
];
void
*
tmrCtrl
;
void
*
(
*
fp
)(
SRecvInfo
*
pPacket
);
SUdpConn
udpConn
[];
}
SUdpConnSet
;
typedef
struct
{
void
*
signature
;
uint32_t
ip
;
// dest IP
uint16_t
port
;
// dest Port
SUdpConn
*
pConn
;
struct
sockaddr_in
destAdd
;
void
*
msgHdr
;
int
totalLen
;
void
*
timer
;
int
emptyNum
;
}
SUdpBuf
;
static
void
*
taosRecvUdpData
(
void
*
param
);
static
SUdpBuf
*
taosCreateUdpBuf
(
SUdpConn
*
pConn
,
uint32_t
ip
,
uint16_t
port
);
static
void
taosProcessUdpBufTimer
(
void
*
param
,
void
*
tmrId
);
void
*
taosInitUdpConnection
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
threads
,
void
*
fp
,
void
*
shandle
)
{
SUdpConn
*
pConn
;
...
...
@@ -94,16 +74,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pSet
->
fp
=
fp
;
strcpy
(
pSet
->
label
,
label
);
if
(
tsUdpDelay
)
{
char
udplabel
[
12
];
sprintf
(
udplabel
,
"%s.b"
,
label
);
pSet
->
tmrCtrl
=
taosTmrInit
(
RPC_MAX_UDP_CONNS
*
threads
,
5
,
5000
,
udplabel
);
if
(
pSet
->
tmrCtrl
==
NULL
)
{
tError
(
"%s failed to initialize tmrCtrl"
)
taosCleanUpUdpConnection
(
pSet
);
return
NULL
;
}
}
uint16_t
ownPort
;
for
(
int
i
=
0
;
i
<
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
...
...
@@ -135,11 +105,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn
->
index
=
i
;
pConn
->
pSet
=
pSet
;
pConn
->
signature
=
pConn
;
if
(
tsUdpDelay
)
{
pConn
->
hash
=
rpcOpenIpHash
(
RPC_MAX_UDP_CONNS
);
pthread_mutex_init
(
&
pConn
->
mutex
,
NULL
);
pConn
->
tmrCtrl
=
pSet
->
tmrCtrl
;
}
pthread_attr_t
thAttr
;
pthread_attr_init
(
&
thAttr
);
...
...
@@ -173,10 +138,6 @@ void taosCleanUpUdpConnection(void *handle) {
free
(
pConn
->
buffer
);
pthread_cancel
(
pConn
->
thread
);
taosCloseSocket
(
pConn
->
fd
);
if
(
pConn
->
hash
)
{
rpcCloseIpHash
(
pConn
->
hash
);
pthread_mutex_destroy
(
&
pConn
->
mutex
);
}
}
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
...
...
@@ -185,7 +146,6 @@ void taosCleanUpUdpConnection(void *handle) {
tTrace
(
"chandle:%p is closed"
,
pConn
);
}
taosTmrCleanUp
(
pSet
->
tmrCtrl
);
tfree
(
pSet
);
}
...
...
@@ -205,64 +165,42 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
static
void
*
taosRecvUdpData
(
void
*
param
)
{
SUdpConn
*
pConn
=
param
;
struct
sockaddr_in
sourceAdd
;
int
dataLen
;
ssize_t
dataLen
;
unsigned
int
addLen
;
uint16_t
port
;
int
minSize
=
sizeof
(
SRpcHead
);
SRecvInfo
recvInfo
;
memset
(
&
sourceAdd
,
0
,
sizeof
(
sourceAdd
));
addLen
=
sizeof
(
sourceAdd
);
tTrace
(
"%s UDP thread is created, index:%d"
,
pConn
->
label
,
pConn
->
index
);
char
*
msg
=
pConn
->
buffer
;
while
(
1
)
{
dataLen
=
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
RPC_MAX_UDP_SIZE
,
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
port
=
ntohs
(
sourceAdd
.
sin_port
);
//tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen);
if
(
dataLen
<
sizeof
(
SRpcHead
))
{
tError
(
"%s recvfrom failed, reason:%s
\n
"
,
pConn
->
label
,
strerror
(
errno
));
continue
;
}
int
processedLen
=
0
,
leftLen
=
0
;
int
msgLen
=
0
;
int
count
=
0
;
char
*
msg
=
pConn
->
buffer
;
while
(
processedLen
<
dataLen
)
{
leftLen
=
dataLen
-
processedLen
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
msgLen
=
htonl
((
uint32_t
)
pHead
->
msgLen
);
if
(
leftLen
<
minSize
||
msgLen
>
leftLen
||
msgLen
<
minSize
)
{
tError
(
"%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d"
,
pConn
->
label
,
dataLen
,
processedLen
,
count
,
msgLen
);
break
;
}
char
*
tmsg
=
malloc
((
size_t
)
msgLen
+
tsRpcOverhead
);
if
(
NULL
==
tmsg
)
{
tError
(
"%s failed to allocate memory, size:%d"
,
pConn
->
label
,
msgLen
);
break
;
}
tmsg
+=
tsRpcOverhead
;
// overhead for SRpcReqContext
memcpy
(
tmsg
,
msg
,
(
size_t
)
msgLen
);
recvInfo
.
msg
=
tmsg
;
recvInfo
.
msgLen
=
msgLen
;
recvInfo
.
ip
=
sourceAdd
.
sin_addr
.
s_addr
;
recvInfo
.
port
=
port
;
recvInfo
.
shandle
=
pConn
->
shandle
;
recvInfo
.
thandle
=
NULL
;
recvInfo
.
chandle
=
pConn
;
recvInfo
.
connType
=
0
;
(
*
(
pConn
->
processData
))(
&
recvInfo
);
processedLen
+=
msgLen
;
msg
+=
msgLen
;
count
++
;
char
*
tmsg
=
malloc
(
dataLen
+
tsRpcOverhead
);
if
(
NULL
==
tmsg
)
{
tError
(
"%s failed to allocate memory, size:%d"
,
pConn
->
label
,
dataLen
);
continue
;
}
// tTrace("%s %d UDP packets are received together", pConn->label, count);
tmsg
+=
tsRpcOverhead
;
// overhead for SRpcReqContext
memcpy
(
tmsg
,
msg
,
dataLen
);
recvInfo
.
msg
=
tmsg
;
recvInfo
.
msgLen
=
dataLen
;
recvInfo
.
ip
=
sourceAdd
.
sin_addr
.
s_addr
;
recvInfo
.
port
=
port
;
recvInfo
.
shandle
=
pConn
->
shandle
;
recvInfo
.
thandle
=
NULL
;
recvInfo
.
chandle
=
pConn
;
recvInfo
.
connType
=
0
;
(
*
(
pConn
->
processData
))(
&
recvInfo
);
}
return
NULL
;
...
...
@@ -270,141 +208,17 @@ static void *taosRecvUdpData(void *param) {
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
SUdpBuf
*
pBuf
;
if
(
pConn
==
NULL
||
pConn
->
signature
!=
pConn
)
return
-
1
;
if
(
pConn
->
hash
==
NULL
)
{
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
destAdd
.
sin_family
=
AF_INET
;
destAdd
.
sin_addr
.
s_addr
=
ip
;
destAdd
.
sin_port
=
htons
(
port
);
//tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr,
// port, dataLen, ret, pConn->localPort, chandle);
int
ret
=
(
int
)
sendto
(
pConn
->
fd
,
data
,
(
size_t
)
dataLen
,
0
,
(
struct
sockaddr
*
)
&
destAdd
,
sizeof
(
destAdd
));
return
ret
;
}
pthread_mutex_lock
(
&
pConn
->
mutex
);
pBuf
=
(
SUdpBuf
*
)
rpcGetIpHash
(
pConn
->
hash
,
ip
,
port
);
if
(
pBuf
==
NULL
)
{
pBuf
=
taosCreateUdpBuf
(
pConn
,
ip
,
port
);
rpcAddIpHash
(
pConn
->
hash
,
pBuf
,
ip
,
port
);
}
if
((
pBuf
->
totalLen
+
dataLen
>
RPC_MAX_UDP_SIZE
)
||
(
taosMsgHdrSize
(
pBuf
->
msgHdr
)
>=
RPC_MAX_UDP_PKTS
))
{
taosTmrReset
(
taosProcessUdpBufTimer
,
RPC_UDP_BUF_TIME
,
pBuf
,
pConn
->
tmrCtrl
,
&
pBuf
->
timer
);
taosSendMsgHdr
(
pBuf
->
msgHdr
,
pConn
->
fd
);
pBuf
->
totalLen
=
0
;
}
taosSetMsgHdrData
(
pBuf
->
msgHdr
,
data
,
dataLen
);
pBuf
->
totalLen
+=
dataLen
;
pthread_mutex_unlock
(
&
pConn
->
mutex
);
return
dataLen
;
}
void
taosFreeMsgHdr
(
void
*
hdr
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
free
(
msgHdr
->
msg_iov
);
}
int
taosMsgHdrSize
(
void
*
hdr
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
return
(
int
)
msgHdr
->
msg_iovlen
;
}
void
taosSendMsgHdr
(
void
*
hdr
,
int
fd
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
sendmsg
(
fd
,
msgHdr
,
0
);
msgHdr
->
msg_iovlen
=
0
;
}
void
taosInitMsgHdr
(
void
**
hdr
,
void
*
dest
,
int
maxPkts
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
malloc
(
sizeof
(
struct
msghdr
));
memset
(
msgHdr
,
0
,
sizeof
(
struct
msghdr
));
*
hdr
=
msgHdr
;
struct
sockaddr_in
*
destAdd
=
(
struct
sockaddr_in
*
)
dest
;
msgHdr
->
msg_name
=
destAdd
;
msgHdr
->
msg_namelen
=
sizeof
(
struct
sockaddr_in
);
int
size
=
(
int
)
sizeof
(
struct
iovec
)
*
maxPkts
;
msgHdr
->
msg_iov
=
(
struct
iovec
*
)
malloc
((
size_t
)
size
);
memset
(
msgHdr
->
msg_iov
,
0
,
(
size_t
)
size
);
}
void
taosSetMsgHdrData
(
void
*
hdr
,
char
*
data
,
int
dataLen
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
msgHdr
->
msg_iov
[
msgHdr
->
msg_iovlen
].
iov_base
=
data
;
msgHdr
->
msg_iov
[
msgHdr
->
msg_iovlen
].
iov_len
=
(
size_t
)
dataLen
;
msgHdr
->
msg_iovlen
++
;
}
void
taosRemoveUdpBuf
(
SUdpBuf
*
pBuf
)
{
taosTmrStopA
(
&
pBuf
->
timer
);
rpcDeleteIpHash
(
pBuf
->
pConn
->
hash
,
pBuf
->
ip
,
pBuf
->
port
);
// tTrace("%s UDP buffer to:0x%lld:%d is removed", pBuf->pConn->label,
// pBuf->ip, pBuf->port);
pBuf
->
signature
=
NULL
;
taosFreeMsgHdr
(
pBuf
->
msgHdr
);
free
(
pBuf
);
}
void
taosProcessUdpBufTimer
(
void
*
param
,
void
*
tmrId
)
{
SUdpBuf
*
pBuf
=
(
SUdpBuf
*
)
param
;
if
(
pBuf
->
signature
!=
param
)
return
;
if
(
pBuf
->
timer
!=
tmrId
)
return
;
SUdpConn
*
pConn
=
pBuf
->
pConn
;
pthread_mutex_lock
(
&
pConn
->
mutex
);
if
(
taosMsgHdrSize
(
pBuf
->
msgHdr
)
>
0
)
{
taosSendMsgHdr
(
pBuf
->
msgHdr
,
pConn
->
fd
);
pBuf
->
totalLen
=
0
;
pBuf
->
emptyNum
=
0
;
}
else
{
pBuf
->
emptyNum
++
;
if
(
pBuf
->
emptyNum
>
200
)
{
taosRemoveUdpBuf
(
pBuf
);
pBuf
=
NULL
;
}
}
pthread_mutex_unlock
(
&
pConn
->
mutex
);
if
(
pBuf
)
taosTmrReset
(
taosProcessUdpBufTimer
,
RPC_UDP_BUF_TIME
,
pBuf
,
pConn
->
tmrCtrl
,
&
pBuf
->
timer
);
}
static
SUdpBuf
*
taosCreateUdpBuf
(
SUdpConn
*
pConn
,
uint32_t
ip
,
uint16_t
port
)
{
SUdpBuf
*
pBuf
=
(
SUdpBuf
*
)
malloc
(
sizeof
(
SUdpBuf
));
memset
(
pBuf
,
0
,
sizeof
(
SUdpBuf
));
pBuf
->
ip
=
ip
;
pBuf
->
port
=
port
;
pBuf
->
pConn
=
pConn
;
pBuf
->
destAdd
.
sin_family
=
AF_INET
;
pBuf
->
destAdd
.
sin_addr
.
s_addr
=
ip
;
pBuf
->
destAdd
.
sin_port
=
(
uint16_t
)
htons
(
port
);
taosInitMsgHdr
(
&
(
pBuf
->
msgHdr
),
&
(
pBuf
->
destAdd
),
RPC_MAX_UDP_PKTS
);
pBuf
->
signature
=
pBuf
;
taosTmrReset
(
taosProcessUdpBufTimer
,
RPC_UDP_BUF_TIME
,
pBuf
,
pConn
->
tmrCtrl
,
&
pBuf
->
timer
);
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
destAdd
.
sin_family
=
AF_INET
;
destAdd
.
sin_addr
.
s_addr
=
ip
;
destAdd
.
sin_port
=
htons
(
port
);
// tTrace("%s UDP buffer to:0x%lld:%d is created", pBuf->pConn->label,
// pBuf->ip, pBuf->port);
int
ret
=
(
int
)
sendto
(
pConn
->
fd
,
data
,
(
size_t
)
dataLen
,
0
,
(
struct
sockaddr
*
)
&
destAdd
,
sizeof
(
destAdd
));
return
pBuf
;
return
ret
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录