Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6f29506e
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6f29506e
编写于
6月 07, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
make rpcTcp multi-thread safe
上级
8f7c2713
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
128 addition
and
63 deletion
+128
-63
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+13
-11
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+94
-49
src/util/src/tsocket.c
src/util/src/tsocket.c
+21
-3
未找到文件。
src/rpc/src/rpcMain.c
浏览文件 @
6f29506e
...
...
@@ -805,16 +805,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if
(
pConn
==
NULL
)
{
tTrace
(
"%s %p, failed to get connection obj(%s)"
,
pRpc
->
label
,
(
void
*
)
pHead
->
ahandle
,
tstrerror
(
terrno
));
return
NULL
;
}
else
{
if
(
rpcIsReq
(
pHead
->
msgType
))
{
pConn
->
ahandle
=
(
void
*
)
pHead
->
ahandle
;
sprintf
(
pConn
->
info
,
"%s %p %p"
,
pRpc
->
label
,
pConn
,
pConn
->
ahandle
);
}
}
}
rpcLockConn
(
pConn
);
sid
=
pConn
->
sid
;
if
(
rpcIsReq
(
pHead
->
msgType
))
{
pConn
->
ahandle
=
(
void
*
)
pHead
->
ahandle
;
sprintf
(
pConn
->
info
,
"%s %p %p"
,
pRpc
->
label
,
pConn
,
pConn
->
ahandle
);
}
sid
=
pConn
->
sid
;
pConn
->
chandle
=
pRecv
->
chandle
;
pConn
->
peerIp
=
pRecv
->
ip
;
pConn
->
peerPort
=
pRecv
->
port
;
...
...
@@ -847,10 +847,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
static
void
rpcProcessBrokenLink
(
SRpcConn
*
pConn
)
{
if
(
pConn
==
NULL
)
return
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
tTrace
(
"%s, link is broken"
,
pConn
->
info
);
// pConn->chandle = NULL;
rpcLockConn
(
pConn
);
if
(
pConn
->
outType
)
{
SRpcReqContext
*
pContext
=
pConn
->
pContext
;
...
...
@@ -871,7 +872,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
(*(pRpc->cfp))(&rpcMsg);
*/
}
rpcUnlockConn
(
pConn
);
rpcCloseConn
(
pConn
);
}
...
...
@@ -885,7 +887,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
// underlying UDP layer does not know it is server or client
pRecv
->
connType
=
pRecv
->
connType
|
pRpc
->
connType
;
if
(
pRecv
->
ip
==
0
&&
pConn
)
{
if
(
pRecv
->
ip
==
0
)
{
rpcProcessBrokenLink
(
pConn
);
return
NULL
;
}
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
6f29506e
...
...
@@ -16,6 +16,7 @@
#include "os.h"
#include "tsocket.h"
#include "tutil.h"
#include "taoserror.h"
#include "rpcLog.h"
#include "rpcHead.h"
#include "rpcTcp.h"
...
...
@@ -26,8 +27,9 @@
typedef
struct
SFdObj
{
void
*
signature
;
int
fd
;
// TCP socket FD
void
*
thandle
;
// handle from upper layer, like TAOS
int
fd
;
// TCP socket FD
int
closedByApp
;
// 1: already closed by App
void
*
thandle
;
// handle from upper layer, like TAOS
uint32_t
ip
;
uint16_t
port
;
struct
SThreadObj
*
pThreadObj
;
...
...
@@ -71,6 +73,12 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
SThreadObj
*
pThreadObj
;
pServerObj
=
(
SServerObj
*
)
calloc
(
sizeof
(
SServerObj
),
1
);
if
(
pServerObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pServerObj
->
ip
=
ip
;
pServerObj
->
port
=
port
;
tstrncpy
(
pServerObj
->
label
,
label
,
sizeof
(
pServerObj
->
label
));
...
...
@@ -79,6 +87,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj
->
pThreadObj
=
(
SThreadObj
*
)
calloc
(
sizeof
(
SThreadObj
),
numOfThreads
);
if
(
pServerObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
free
(
pServerObj
);
return
NULL
;
}
...
...
@@ -93,12 +102,14 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
code
=
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
);
if
(
code
<
0
)
{
tError
(
"%s failed to init TCP process data mutex(%s)"
,
label
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;;
}
pThreadObj
->
pollFd
=
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
-
1
;
break
;
}
...
...
@@ -110,6 +121,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_destroy
(
&
thattr
);
if
(
code
!=
0
)
{
tError
(
"%s failed to create TCP process data thread(%s)"
,
label
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
...
...
@@ -124,6 +136,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
code
=
pthread_create
(
&
(
pServerObj
->
thread
),
&
thattr
,
(
void
*
)
taosAcceptTcpConnection
,
(
void
*
)(
pServerObj
));
pthread_attr_destroy
(
&
thattr
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"%s failed to create TCP accept thread(%s)"
,
label
,
strerror
(
errno
));
}
}
...
...
@@ -147,10 +160,12 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
struct
epoll_event
event
=
{
.
events
=
EPOLLIN
};
eventfd_t
fd
=
eventfd
(
1
,
0
);
if
(
fd
==
-
1
)
{
tError
(
"%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s"
,
pThreadObj
->
label
,
strerror
(
errno
));
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
tError
(
"%s, failed to create eventfd(%s)"
,
pThreadObj
->
label
,
strerror
(
errno
));
pthread_cancel
(
pThreadObj
->
thread
);
}
else
if
(
epoll_ctl
(
pThreadObj
->
pollFd
,
EPOLL_CTL_ADD
,
fd
,
&
event
)
<
0
)
{
tError
(
"%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s"
,
pThreadObj
->
label
,
strerror
(
errno
));
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
tError
(
"%s, failed to call epoll_ctl(%s)"
,
pThreadObj
->
label
,
strerror
(
errno
));
pthread_cancel
(
pThreadObj
->
thread
);
}
...
...
@@ -211,6 +226,7 @@ static void* taosAcceptTcpConnection(void *arg) {
tTrace
(
"%s TCP server socket was shutdown, exiting..."
,
pServerObj
->
label
);
break
;
}
tError
(
"%s TCP accept failure(%s)"
,
pServerObj
->
label
,
strerror
(
errno
));
continue
;
}
...
...
@@ -254,6 +270,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if
(
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP client mutex(%s)"
,
label
,
strerror
(
errno
));
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
...
...
@@ -261,6 +278,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP client epoll"
,
label
);
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
...
...
@@ -273,6 +291,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
if
(
code
!=
0
)
{
close
(
pThreadObj
->
pollFd
);
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"%s failed to create TCP read data thread(%s)"
,
label
,
strerror
(
errno
));
return
NULL
;
}
...
...
@@ -287,7 +306,7 @@ void taosCleanUpTcpClient(void *chandle) {
if
(
pThreadObj
==
NULL
)
return
;
taosStopTcpThread
(
pThreadObj
);
tTrace
(
"
:
%s, all connections are cleaned up"
,
pThreadObj
->
label
);
tTrace
(
"%s, all connections are cleaned up"
,
pThreadObj
->
label
);
tfree
(
pThreadObj
);
}
...
...
@@ -318,7 +337,9 @@ void taosCloseTcpConnection(void *chandle) {
SFdObj
*
pFdObj
=
chandle
;
if
(
pFdObj
==
NULL
)
return
;
taosFreeFdObj
(
pFdObj
);
pFdObj
->
thandle
=
NULL
;
pFdObj
->
closedByApp
=
1
;
shutdown
(
pFdObj
->
fd
,
SHUT_WR
);
}
int
taosSendTcpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
{
...
...
@@ -334,7 +355,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj
*
pThreadObj
=
pFdObj
->
pThreadObj
;
// notify the upper layer, so it will clean the associated context
if
(
pFdObj
->
thandle
)
{
if
(
pFdObj
->
closedByApp
==
0
)
{
shutdown
(
pFdObj
->
fd
,
SHUT_WR
);
SRecvInfo
recvInfo
;
recvInfo
.
msg
=
NULL
;
recvInfo
.
msgLen
=
0
;
...
...
@@ -345,9 +368,59 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
recvInfo
.
chandle
=
NULL
;
recvInfo
.
connType
=
RPC_CONN_TCP
;
(
*
(
pThreadObj
->
processData
))(
&
recvInfo
);
}
else
{
taosFreeFdObj
(
pFdObj
);
}
taosFreeFdObj
(
pFdObj
);
}
static
int
taosReadTcpData
(
SFdObj
*
pFdObj
,
SRecvInfo
*
pInfo
)
{
SRpcHead
rpcHead
;
int32_t
msgLen
,
leftLen
,
retLen
,
headLen
;
char
*
buffer
,
*
msg
;
SThreadObj
*
pThreadObj
=
pFdObj
->
pThreadObj
;
headLen
=
taosReadMsg
(
pFdObj
->
fd
,
&
rpcHead
,
sizeof
(
SRpcHead
));
if
(
headLen
!=
sizeof
(
SRpcHead
))
{
tTrace
(
"%s %p, read error, headLen:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
headLen
);
return
-
1
;
}
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
rpcHead
.
msgLen
);
buffer
=
malloc
(
msgLen
+
tsRpcOverhead
);
if
(
NULL
==
buffer
)
{
tError
(
"%s %p, TCP malloc(size:%d) fail"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
msgLen
);
return
-
1
;
}
msg
=
buffer
+
tsRpcOverhead
;
leftLen
=
msgLen
-
headLen
;
retLen
=
taosReadMsg
(
pFdObj
->
fd
,
msg
+
headLen
,
leftLen
);
if
(
leftLen
!=
retLen
)
{
tError
(
"%s %p, read error, leftLen:%d retLen:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
leftLen
,
retLen
);
free
(
buffer
);
return
-
1
;
}
memcpy
(
msg
,
&
rpcHead
,
sizeof
(
SRpcHead
));
pInfo
->
msg
=
msg
;
pInfo
->
msgLen
=
msgLen
;
pInfo
->
ip
=
pFdObj
->
ip
;
pInfo
->
port
=
pFdObj
->
port
;
pInfo
->
shandle
=
pThreadObj
->
shandle
;
pInfo
->
thandle
=
pFdObj
->
thandle
;;
pInfo
->
chandle
=
pFdObj
;
pInfo
->
connType
=
RPC_CONN_TCP
;
if
(
pFdObj
->
closedByApp
)
{
free
(
buffer
);
return
-
1
;
}
return
0
;
}
#define maxEvents 10
...
...
@@ -357,7 +430,6 @@ static void *taosProcessTcpData(void *param) {
SFdObj
*
pFdObj
;
struct
epoll_event
events
[
maxEvents
];
SRecvInfo
recvInfo
;
SRpcHead
rpcHead
;
while
(
1
)
{
int
fdNum
=
epoll_wait
(
pThreadObj
->
pollFd
,
events
,
maxEvents
,
-
1
);
...
...
@@ -376,51 +448,23 @@ static void *taosProcessTcpData(void *param) {
continue
;
}
if
(
events
[
i
].
events
&
EPOLLHUP
)
{
tTrace
(
"%s %p, FD hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
);
taosReportBrokenLink
(
pFdObj
);
continue
;
}
int32_t
headLen
=
taosReadMsg
(
pFdObj
->
fd
,
&
rpcHead
,
sizeof
(
SRpcHead
));
if
(
headLen
!=
sizeof
(
SRpcHead
))
{
tTrace
(
"%s %p, read error, headLen:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
headLen
);
if
(
events
[
i
].
events
&
EPOLLRDHUP
)
{
tTrace
(
"%s %p, FD RD hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
);
taosReportBrokenLink
(
pFdObj
);
continue
;
}
int32_t
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
rpcHead
.
msgLen
);
char
*
buffer
=
malloc
(
msgLen
+
tsRpcOverhead
);
if
(
NULL
==
buffer
)
{
tError
(
"%s %p, TCP malloc(size:%d) fail"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
msgLen
);
if
(
events
[
i
].
events
&
EPOLLHUP
)
{
tTrace
(
"%s %p, FD hang up"
,
pThreadObj
->
label
,
pFdObj
->
thandle
);
taosReportBrokenLink
(
pFdObj
);
continue
;
}
char
*
msg
=
buffer
+
tsRpcOverhead
;
int32_t
leftLen
=
msgLen
-
headLen
;
int32_t
retLen
=
taosReadMsg
(
pFdObj
->
fd
,
msg
+
headLen
,
leftLen
);
if
(
leftLen
!=
retLen
)
{
tError
(
"%s %p, read error, leftLen:%d retLen:%d"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
leftLen
,
retLen
);
taosReportBrokenLink
(
pFdObj
);
tfree
(
buffer
);
if
(
taosReadTcpData
(
pFdObj
,
&
recvInfo
)
<
0
)
{
shutdown
(
pFdObj
->
fd
,
SHUT_WR
);
continue
;
}
// tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
memcpy
(
msg
,
&
rpcHead
,
sizeof
(
SRpcHead
));
recvInfo
.
msg
=
msg
;
recvInfo
.
msgLen
=
msgLen
;
recvInfo
.
ip
=
pFdObj
->
ip
;
recvInfo
.
port
=
pFdObj
->
port
;
recvInfo
.
shandle
=
pThreadObj
->
shandle
;
recvInfo
.
thandle
=
pFdObj
->
thandle
;;
recvInfo
.
chandle
=
pFdObj
;
recvInfo
.
connType
=
RPC_CONN_TCP
;
pFdObj
->
thandle
=
(
*
(
pThreadObj
->
processData
))(
&
recvInfo
);
if
(
pFdObj
->
thandle
==
NULL
)
taosFreeFdObj
(
pFdObj
);
}
...
...
@@ -433,16 +477,20 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
struct
epoll_event
event
;
SFdObj
*
pFdObj
=
(
SFdObj
*
)
calloc
(
sizeof
(
SFdObj
),
1
);
if
(
pFdObj
==
NULL
)
return
NULL
;
if
(
pFdObj
==
NULL
)
{
return
NULL
;
}
pFdObj
->
closedByApp
=
0
;
pFdObj
->
fd
=
fd
;
pFdObj
->
pThreadObj
=
pThreadObj
;
pFdObj
->
signature
=
pFdObj
;
event
.
events
=
EPOLLIN
|
EPOLL
PRI
|
EPOLLWAKE
UP
;
event
.
events
=
EPOLLIN
|
EPOLL
RDH
UP
;
event
.
data
.
ptr
=
pFdObj
;
if
(
epoll_ctl
(
pThreadObj
->
pollFd
,
EPOLL_CTL_ADD
,
fd
,
&
event
)
<
0
)
{
tfree
(
pFdObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
...
...
@@ -475,13 +523,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
taosCloseSocket
(
pFdObj
->
fd
);
pThreadObj
->
numOfFds
--
;
if
(
pThreadObj
->
numOfFds
<
0
)
tError
(
"%s %p, TCP thread:%d, number of FDs is negative!!!"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pThreadObj
->
threadId
);
// remove from the FdObject list
if
(
pFdObj
->
prev
)
{
(
pFdObj
->
prev
)
->
next
=
pFdObj
->
next
;
}
else
{
...
...
src/util/src/tsocket.c
浏览文件 @
6f29506e
...
...
@@ -40,9 +40,27 @@ int taosGetFqdn(char *fqdn) {
}
uint32_t
taosGetIpFromFqdn
(
const
char
*
fqdn
)
{
struct
hostent
*
record
=
gethostbyname
(
fqdn
);
if
(
record
==
NULL
)
return
-
1
;
return
((
struct
in_addr
*
)
record
->
h_addr
)
->
s_addr
;
struct
addrinfo
hints
,
*
servinfo
,
*
p
;
struct
sockaddr_in
*
h
;
uint32_t
ip
=
-
1
;
memset
(
&
hints
,
0
,
sizeof
hints
);
hints
.
ai_family
=
AF_UNSPEC
;
// use AF_INET6 to force IPv6
hints
.
ai_socktype
=
SOCK_STREAM
;
if
(
getaddrinfo
(
fqdn
,
"http"
,
&
hints
,
&
servinfo
)
!=
0
)
{
uError
(
"failed to get IP from %s(%s)"
,
fqdn
,
strerror
(
errno
));
return
-
1
;
}
// to do: loop through all the results and connect to the first we can
for
(
p
=
servinfo
;
p
!=
NULL
;
p
=
p
->
ai_next
)
{
h
=
(
struct
sockaddr_in
*
)
p
->
ai_addr
;
ip
=
h
->
sin_addr
.
s_addr
;
}
freeaddrinfo
(
servinfo
);
// all done with this structure
return
ip
;
}
// Function converting an IP address string to an unsigned int.
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录