Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
3e5ba2a7
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
3e5ba2a7
编写于
8月 24, 2019
作者:
J
Jeff Tao
提交者:
GitHub
8月 24, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #383 from localvar/udp-fix
fix & enhancement for udp connection handling
上级
a9ad5678
01bb110b
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
28 addition
and
12 deletion
+28
-12
src/rpc/src/tudp.c
src/rpc/src/tudp.c
+28
-12
未找到文件。
src/rpc/src/tudp.c
浏览文件 @
3e5ba2a7
...
...
@@ -137,9 +137,7 @@ void taosProcessMonitorTimer(void *param, void *tmrId) {
tTrace
(
"%s monitor timer is expired, update the link status"
,
pSet
->
label
);
(
*
pSet
->
fp
)(
data
,
pMonitor
->
dataLen
,
pMonitor
->
ip
,
0
,
pSet
->
shandle
,
NULL
,
NULL
);
taosTmrReset
(
taosProcessMonitorTimer
,
200
,
pMonitor
,
pSet
->
tmrCtrl
,
&
pMonitor
->
pTimer
);
}
if
(
pMonitor
->
pSet
==
NULL
)
{
}
else
{
taosTmrStopA
(
&
pMonitor
->
pTimer
);
free
(
pMonitor
);
}
...
...
@@ -181,6 +179,7 @@ void *taosReadTcpData(void *argv) {
if
(
retLen
!=
pInfo
->
msgLen
)
{
tError
(
"%s failed to read data from server, msgLen:%d retLen:%d"
,
pSet
->
label
,
pInfo
->
msgLen
,
retLen
);
free
(
buffer
);
}
else
{
(
*
pSet
->
fp
)(
buffer
,
pInfo
->
msgLen
,
pMonitor
->
ip
,
(
int16_t
)
pInfo
->
port
,
pSet
->
shandle
,
NULL
,
pMonitor
->
pConn
);
}
...
...
@@ -214,9 +213,11 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) {
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
code
=
pthread_create
(
&
(
thread
),
&
thattr
,
taosReadTcpData
,
(
void
*
)
pMonitor
);
if
(
code
<
0
)
{
tTrace
(
"%s faile to create thread to read tcp data, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
tTrace
(
"%s failed to create thread to read tcp data, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
pMonitor
->
pSet
=
NULL
;
}
pthread_attr_destroy
(
&
thattr
);
return
code
;
}
...
...
@@ -402,6 +403,9 @@ void *taosUdpTcpConnection(void *argv) {
tTrace
(
"%s UDP server is created, ip:%s:%d"
,
pSet
->
label
,
pSet
->
ip
,
pSet
->
port
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
while
(
1
)
{
if
(
pSet
->
tcpFd
<
0
)
break
;
socklen_t
addrlen
=
sizeof
(
clientAddr
);
...
...
@@ -422,14 +426,14 @@ void *taosUdpTcpConnection(void *argv) {
pTransfer
->
port
=
clientAddr
.
sin_port
;
pTransfer
->
pSet
=
pSet
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
if
(
pthread_create
(
&
(
thread
),
&
thattr
,
taosTransferDataViaTcp
,
(
void
*
)
pTransfer
)
<
0
)
{
tTrace
(
"%s faile to create thread for UDP server, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
tTrace
(
"%s failed to create thread for UDP server, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
free
(
pTransfer
);
taosCloseSocket
(
connFd
);
}
}
pthread_attr_destroy
(
&
thattr
);
return
NULL
;
}
...
...
@@ -448,7 +452,6 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
memset
(
pSet
,
0
,
(
size_t
)
size
);
strcpy
(
pSet
->
ip
,
ip
);
pSet
->
port
=
port
;
pSet
->
threads
=
threads
;
pSet
->
shandle
=
shandle
;
pSet
->
fp
=
fp
;
pSet
->
tcpFd
=
-
1
;
...
...
@@ -458,8 +461,16 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
char
udplabel
[
12
];
sprintf
(
udplabel
,
"%s.b"
,
label
);
pSet
->
tmrCtrl
=
taosTmrInit
(
RPC_MAX_UDP_CONNS
*
threads
,
5
,
5000
,
udplabel
);
if
(
pSet
->
tmrCtrl
==
NULL
)
{
tError
(
"%s failed to initialize tmrCtrl"
)
taosCleanUpUdpConnection
(
pSet
);
return
NULL
;
}
// }
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
short
ownPort
;
for
(
int
i
=
0
;
i
<
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
...
...
@@ -467,6 +478,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
pConn
->
fd
=
taosOpenUdpSocket
(
ip
,
ownPort
);
if
(
pConn
->
fd
<
0
)
{
tError
(
"%s failed to open UDP socket %s:%d"
,
label
,
ip
,
port
);
taosCleanUpUdpConnection
(
pSet
);
return
NULL
;
}
...
...
@@ -477,11 +489,10 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
pConn
->
localPort
=
(
int16_t
)
ntohs
(
sin
.
sin_port
);
}
pthread_attr_init
(
&
thAttr
);
pthread_attr_setdetachstate
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
pConn
->
thread
,
&
thAttr
,
taosRecvUdpData
,
pConn
)
!=
0
)
{
close
(
pConn
->
fd
);
tError
(
"%s failed to create thread to process UDP data, reason:%s"
,
label
,
strerror
(
errno
));
taosCloseSocket
(
pConn
->
fd
);
taosCleanUpUdpConnection
(
pSet
);
return
NULL
;
}
...
...
@@ -496,6 +507,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
pthread_mutex_init
(
&
pConn
->
mutex
,
NULL
);
pConn
->
tmrCtrl
=
pSet
->
tmrCtrl
;
}
++
pSet
->
threads
;
}
pthread_attr_destroy
(
&
thAttr
);
...
...
@@ -520,6 +532,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp
// pthread_t thread;
// pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet);
pthread_create
(
&
(
pSet
->
tcpThread
),
&
thattr
,
taosUdpTcpConnection
,
pSet
);
pthread_attr_destroy
(
&
thattr
);
return
pSet
;
}
...
...
@@ -540,13 +553,16 @@ void taosCleanUpUdpConnection(void *handle) {
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
pConn
->
signature
=
NULL
;
pthread_cancel
(
pConn
->
thread
);
taosCloseSocket
(
pConn
->
fd
);
if
(
pConn
->
hash
)
{
taosCloseIpHash
(
pConn
->
hash
);
pthread_mutex_destroy
(
&
pConn
->
mutex
);
}
}
pthread_cancel
(
pConn
->
thread
);
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
pthread_join
(
pConn
->
thread
,
NULL
);
tTrace
(
"chandle:%p is closed"
,
pConn
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录