Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1b8dc666
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
1b8dc666
编写于
6月 20, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
6月 20, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2392 from taosdata/hotfix/rpcUdp
stop UDP/TCP connection first, then close all connections, then clean up
上级
3d67b948
a3bb1d5f
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
57 addition
and
14 deletion
+57
-14
src/rpc/inc/rpcTcp.h
src/rpc/inc/rpcTcp.h
+2
-0
src/rpc/inc/rpcUdp.h
src/rpc/inc/rpcUdp.h
+1
-0
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+16
-0
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+19
-6
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+19
-8
未找到文件。
src/rpc/inc/rpcTcp.h
浏览文件 @
1b8dc666
...
...
@@ -21,9 +21,11 @@ extern "C" {
#endif
void
*
taosInitTcpServer
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
taosStopTcpServer
(
void
*
param
);
void
taosCleanUpTcpServer
(
void
*
param
);
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
);
void
taosStopTcpClient
(
void
*
chandle
);
void
taosCleanUpTcpClient
(
void
*
chandle
);
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
);
...
...
src/rpc/inc/rpcUdp.h
浏览文件 @
1b8dc666
...
...
@@ -23,6 +23,7 @@ extern "C" {
#include "taosdef.h"
void
*
taosInitUdpConnection
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
void
taosStopUdpConnection
(
void
*
handle
);
void
taosCleanUpUdpConnection
(
void
*
handle
);
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
);
void
*
taosOpenUdpConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
);
...
...
src/rpc/src/rpcMain.c
浏览文件 @
1b8dc666
...
...
@@ -153,6 +153,13 @@ void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpTcpClient
};
void
(
*
taosStopConn
[])(
void
*
thandle
)
=
{
taosStopUdpConnection
,
taosStopUdpConnection
,
taosStopTcpServer
,
taosStopTcpClient
,
};
int
(
*
taosSendData
[])(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
=
{
taosSendUdpData
,
taosSendUdpData
,
...
...
@@ -289,12 +296,18 @@ void *rpcOpen(const SRpcInit *pInit) {
void
rpcClose
(
void
*
param
)
{
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
param
;
// stop connection to outside first
(
*
taosStopConn
[
pRpc
->
connType
|
RPC_CONN_TCP
])(
pRpc
->
tcphandle
);
(
*
taosStopConn
[
pRpc
->
connType
])(
pRpc
->
udphandle
);
// close all connections
for
(
int
i
=
0
;
i
<
pRpc
->
sessions
;
++
i
)
{
if
(
pRpc
->
connList
&&
pRpc
->
connList
[
i
].
user
[
0
])
{
rpcCloseConn
((
void
*
)(
pRpc
->
connList
+
i
));
}
}
// clean up
(
*
taosCleanUpConn
[
pRpc
->
connType
|
RPC_CONN_TCP
])(
pRpc
->
tcphandle
);
(
*
taosCleanUpConn
[
pRpc
->
connType
])(
pRpc
->
udphandle
);
...
...
@@ -588,6 +601,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
pConn
->
inTranId
=
0
;
pConn
->
outTranId
=
0
;
pConn
->
secured
=
0
;
pConn
->
peerId
=
0
;
pConn
->
peerIp
=
0
;
pConn
->
peerPort
=
0
;
pConn
->
pReqMsg
=
NULL
;
...
...
@@ -627,6 +641,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn
->
spi
=
pRpc
->
spi
;
pConn
->
encrypt
=
pRpc
->
encrypt
;
if
(
pConn
->
spi
)
memcpy
(
pConn
->
secret
,
pRpc
->
secret
,
TSDB_KEY_LEN
);
tTrace
(
"%s %p client connection is allocated"
,
pRpc
->
label
,
pConn
);
}
return
pConn
;
...
...
@@ -681,6 +696,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
taosHashPut
(
pRpc
->
hash
,
hashstr
,
size
,
(
char
*
)
&
pConn
,
POINTER_BYTES
);
tTrace
(
"%s %p server connection is allocated"
,
pRpc
->
label
,
pConn
);
}
return
pConn
;
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
1b8dc666
...
...
@@ -190,22 +190,28 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
}
}
void
taosCleanUpTcpServer
(
void
*
handle
)
{
void
taosStopTcpServer
(
void
*
handle
)
{
SServerObj
*
pServerObj
=
handle
;
SThreadObj
*
pThreadObj
;
if
(
pServerObj
==
NULL
)
return
;
if
(
pServerObj
->
fd
>=
0
)
shutdown
(
pServerObj
->
fd
,
SHUT_RD
);
if
(
pServerObj
->
thread
)
pthread_join
(
pServerObj
->
thread
,
NULL
);
tTrace
(
"%s TCP server is stopped"
,
pServerObj
->
label
);
}
void
taosCleanUpTcpServer
(
void
*
handle
)
{
SServerObj
*
pServerObj
=
handle
;
SThreadObj
*
pThreadObj
;
if
(
pServerObj
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pServerObj
->
numOfThreads
;
++
i
)
{
pThreadObj
=
pServerObj
->
pThreadObj
+
i
;
taosStopTcpThread
(
pThreadObj
);
pthread_mutex_destroy
(
&
(
pThreadObj
->
mutex
));
}
tTrace
(
"
TCP:%s,
TCP server is cleaned up"
,
pServerObj
->
label
);
tTrace
(
"
%s
TCP server is cleaned up"
,
pServerObj
->
label
);
tfree
(
pServerObj
->
pThreadObj
);
tfree
(
pServerObj
);
...
...
@@ -226,7 +232,7 @@ static void *taosAcceptTcpConnection(void *arg) {
connFd
=
accept
(
pServerObj
->
fd
,
(
struct
sockaddr
*
)
&
caddr
,
&
addrlen
);
if
(
connFd
==
-
1
)
{
if
(
errno
==
EINVAL
)
{
tTrace
(
"%s TCP server s
ocket was shutdown, exiting...
"
,
pServerObj
->
label
);
tTrace
(
"%s TCP server s
top accepting new connections, exiting
"
,
pServerObj
->
label
);
break
;
}
...
...
@@ -304,12 +310,19 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return
pThreadObj
;
}
void
taosStopTcpClient
(
void
*
chandle
)
{
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
tTrace
(
"%s TCP client is stopped"
,
pThreadObj
->
label
);
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
taosStopTcpThread
(
pThreadObj
);
tTrace
(
"%s
, all connections are
cleaned up"
,
pThreadObj
->
label
);
tTrace
(
"%s
TCP client is
cleaned up"
,
pThreadObj
->
label
);
tfree
(
pThreadObj
);
}
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
1b8dc666
...
...
@@ -30,7 +30,6 @@
#define RPC_MAX_UDP_SIZE 65480
typedef
struct
{
void
*
signature
;
int
index
;
int
fd
;
uint16_t
port
;
// peer port
...
...
@@ -111,7 +110,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn
->
processData
=
fp
;
pConn
->
index
=
i
;
pConn
->
pSet
=
pSet
;
pConn
->
signature
=
pConn
;
int
code
=
pthread_create
(
&
pConn
->
thread
,
&
thAttr
,
taosRecvUdpData
,
pConn
);
if
(
code
!=
0
)
{
...
...
@@ -132,7 +130,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
return
pSet
;
}
void
taos
CleanU
pUdpConnection
(
void
*
handle
)
{
void
taos
Sto
pUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
...
...
@@ -140,8 +138,6 @@ void taosCleanUpUdpConnection(void *handle) {
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
pConn
->
signature
=
NULL
;
if
(
pConn
->
fd
>=
0
)
shutdown
(
pConn
->
fd
,
SHUT_RDWR
);
if
(
pConn
->
fd
>=
0
)
taosCloseSocket
(
pConn
->
fd
);
}
...
...
@@ -150,9 +146,24 @@ void taosCleanUpUdpConnection(void *handle) {
pConn
=
pSet
->
udpConn
+
i
;
if
(
pConn
->
thread
)
pthread_join
(
pConn
->
thread
,
NULL
);
tfree
(
pConn
->
buffer
);
tTrace
(
"%s UDP thread is closed, inedx:%d"
,
pConn
->
label
,
i
);
// tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
}
tTrace
(
"%s UDP is stopped"
,
pSet
->
label
);
}
void
taosCleanUpUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
if
(
pSet
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
if
(
pConn
->
fd
>=
0
)
taosCloseSocket
(
pConn
->
fd
);
}
tTrace
(
"%s UDP is cleaned up"
,
pSet
->
label
);
tfree
(
pSet
);
}
...
...
@@ -185,7 +196,7 @@ static void *taosRecvUdpData(void *param) {
while
(
1
)
{
dataLen
=
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
RPC_MAX_UDP_SIZE
,
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
if
(
dataLen
<=
0
)
{
tTrace
(
"%s UDP socket was closed, exiting
"
,
pConn
->
label
);
tTrace
(
"%s UDP socket was closed, exiting
(%s)"
,
pConn
->
label
,
strerror
(
errno
)
);
break
;
}
...
...
@@ -221,7 +232,7 @@ static void *taosRecvUdpData(void *param) {
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
if
(
pConn
==
NULL
||
pConn
->
signature
!=
pConn
)
return
-
1
;
if
(
pConn
==
NULL
)
return
-
1
;
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录