Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5d965446
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5d965446
编写于
2月 22, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.0' into refact/slguan
上级
b4d1bc95
9c0f2cb4
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
650 addition
and
1020 deletion
+650
-1020
src/inc/trpc.h
src/inc/trpc.h
+7
-17
src/rpc/inc/rpcClient.h
src/rpc/inc/rpcClient.h
+1
-1
src/rpc/inc/rpcHead.h
src/rpc/inc/rpcHead.h
+21
-3
src/rpc/inc/rpcServer.h
src/rpc/inc/rpcServer.h
+1
-1
src/rpc/inc/rpcUdp.h
src/rpc/inc/rpcUdp.h
+2
-3
src/rpc/src/rpcClient.c
src/rpc/src/rpcClient.c
+153
-150
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+161
-126
src/rpc/src/rpcServer.c
src/rpc/src/rpcServer.c
+150
-137
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+150
-570
src/rpc/test/rclient.c
src/rpc/test/rclient.c
+2
-6
src/rpc/test/rserver.c
src/rpc/test/rserver.c
+2
-6
未找到文件。
src/inc/trpc.h
浏览文件 @
5d965446
...
...
@@ -23,18 +23,8 @@ extern "C" {
#include <stdint.h>
#include "taosdef.h"
#define TAOS_CONN_UDPS 0
#define TAOS_CONN_UDPC 1
#define TAOS_CONN_TCPS 2
#define TAOS_CONN_TCPC 3
#define TAOS_CONN_HTTPS 4
#define TAOS_CONN_HTTPC 5
#define TAOS_SOCKET_TYPE_NAME_TCP "tcp"
#define TAOS_SOCKET_TYPE_NAME_UDP "udp"
#define TAOS_CONN_SOCKET_TYPE_S() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPS:TAOS_CONN_TCPS)
#define TAOS_CONN_SOCKET_TYPE_C() ((strcasecmp(tsSocketType, TAOS_SOCKET_TYPE_NAME_UDP) == 0)? TAOS_CONN_UDPC:TAOS_CONN_TCPC)
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
extern
int
tsRpcHeadSize
;
...
...
@@ -61,20 +51,20 @@ typedef struct {
int
connType
;
// TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int
idleTime
;
// milliseconds, 0 means idle timer is disabled
// the following is for client
s
ecurity only
// the following is for client
app
ecurity only
char
*
user
;
// user name
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
*
secret
;
// key for authentication
char
*
ckey
;
// ciphering key
// call back to process incoming msg
void
(
*
cfp
)(
char
type
,
void
*
pCont
,
int
contLen
,
void
*
a
handle
,
int32_t
code
);
// call back to process incoming msg
, code shall be ignored by server app
void
(
*
cfp
)(
char
type
,
void
*
pCont
,
int
contLen
,
void
*
handle
,
int32_t
code
);
// call back to process notify the ipSet changes
// call back to process notify the ipSet changes
, for client app only
void
(
*
ufp
)(
void
*
ahandle
,
SRpcIpSet
*
pIpSet
);
// call back to retrieve the client auth info
// call back to retrieve the client auth info
, for server app only
int
(
*
afp
)(
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
}
SRpcInit
;
...
...
src/rpc/inc/rpcClient.h
浏览文件 @
5d965446
...
...
@@ -26,7 +26,7 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
void
taosCleanUpTcpClient
(
void
*
chandle
);
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
char
*
ip
,
uint16_t
port
);
void
taosCloseTcpClientConnection
(
void
*
chandle
);
int
taosSendTcpClientData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
);
int
taosSendTcpClientData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
);
#ifdef __cplusplus
}
...
...
src/rpc/inc/rpcHead.h
浏览文件 @
5d965446
...
...
@@ -20,12 +20,29 @@
extern
"C"
{
#endif
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
typedef
struct
{
void
*
msg
;
int
msgLen
;
uint32_t
ip
;
uint16_t
port
;
int
connType
;
void
*
shandle
;
void
*
thandle
;
void
*
chandle
;
}
SRecvInfo
;
#pragma pack(push, 1)
typedef
struct
{
char
version
:
4
;
// RPC version
char
comp
:
4
;
// compression algorithm, 0:no compression 1:lz4
char
tcp
:
2
;
// tcp flag
char
resflag
:
2
;
// reserved bits
char
spi
:
3
;
// security parameter index
char
encrypt
:
3
;
// encrypt algorithm, 0: no encryption
uint16_t
tranId
;
// transcation ID
...
...
@@ -33,12 +50,12 @@ typedef struct {
uint32_t
sourceId
;
// source ID, an index for connection list
uint32_t
destId
;
// destination ID, an index for connection list
uint32_t
destIp
;
// destination IP address, for NAT scenario
char
user
[
TSDB_UNI_LEN
];
char
user
[
TSDB_UNI_LEN
];
// user ID
uint16_t
port
;
// for UDP only, port may be changed
char
empty
[
1
];
// reserved
uint8_t
msgType
;
// message type
int32_t
msgLen
;
// message length including the header iteslf
int32_t
code
;
int32_t
code
;
// code in response message
uint8_t
content
[
0
];
// message body starts from here
}
SRpcHead
;
...
...
@@ -54,6 +71,7 @@ typedef struct {
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
...
...
src/rpc/inc/rpcServer.h
浏览文件 @
5d965446
...
...
@@ -25,7 +25,7 @@ extern "C" {
void
*
taosInitTcpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
taosCleanUpTcpServer
(
void
*
param
);
void
taosCloseTcpServerConnection
(
void
*
param
);
int
taosSendTcpServerData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
);
int
taosSendTcpServerData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
);
#ifdef __cplusplus
}
...
...
src/rpc/inc/rpcUdp.h
浏览文件 @
5d965446
...
...
@@ -22,10 +22,9 @@ extern "C" {
#include "taosdef.h"
void
*
taosInitUdpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
void
*
taosInitUdpClient
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
void
*
taosInitUdpConnection
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
void
taosCleanUpUdpConnection
(
void
*
handle
);
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
dataLen
,
void
*
chandle
);
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
);
void
*
taosOpenUdpConnection
(
void
*
shandle
,
void
*
thandle
,
char
*
ip
,
uint16_t
port
);
void
taosFreeMsgHdr
(
void
*
hdr
);
...
...
src/rpc/src/rpcClient.c
浏览文件 @
5d965446
...
...
@@ -45,16 +45,143 @@ typedef struct _tcp_client {
int
numOfFds
;
char
label
[
12
];
char
ipstr
[
20
];
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
(
*
processData
)(
char
*
data
,
int
dataLen
,
unsigned
int
ip
,
uint16_t
port
,
void
*
shandle
,
void
*
thandle
,
void
*
chandle
);
// char buffer[128000];
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
(
*
processData
)(
SRecvInfo
*
pRecv
);
}
STcpClient
;
#define maxTcpEvents 100
static
void
taosCleanUpTcpFdObj
(
STcpFd
*
pFdObj
);
static
void
*
taosReadTcpData
(
void
*
param
);
void
*
taosInitTcpClient
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
)
{
STcpClient
*
pTcp
;
pthread_attr_t
thattr
;
pTcp
=
(
STcpClient
*
)
malloc
(
sizeof
(
STcpClient
));
memset
(
pTcp
,
0
,
sizeof
(
STcpClient
));
strcpy
(
pTcp
->
label
,
label
);
strcpy
(
pTcp
->
ipstr
,
ip
);
pTcp
->
shandle
=
shandle
;
if
(
pthread_mutex_init
(
&
(
pTcp
->
mutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP mutex, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
if
(
pthread_cond_init
(
&
(
pTcp
->
fdReady
),
NULL
)
!=
0
)
{
tError
(
"%s init TCP condition variable failed, reason:%s
\n
"
,
label
,
strerror
(
errno
));
return
NULL
;
}
pTcp
->
pollFd
=
epoll_create
(
10
);
// size does not matter
if
(
pTcp
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
return
NULL
;
}
pTcp
->
processData
=
fp
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pTcp
->
thread
),
&
thattr
,
taosReadTcpData
,
(
void
*
)(
pTcp
))
!=
0
)
{
tError
(
"%s failed to create TCP read data thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
tTrace
(
"%s TCP client is initialized, ip:%s port:%hu"
,
label
,
ip
,
port
);
return
pTcp
;
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
STcpClient
*
pTcp
=
(
STcpClient
*
)
chandle
;
if
(
pTcp
==
NULL
)
return
;
while
(
pTcp
->
pHead
)
{
taosCleanUpTcpFdObj
(
pTcp
->
pHead
);
pTcp
->
pHead
=
pTcp
->
pHead
->
next
;
}
close
(
pTcp
->
pollFd
);
pthread_cancel
(
pTcp
->
thread
);
pthread_join
(
pTcp
->
thread
,
NULL
);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree
(
pTcp
);
}
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
char
*
ip
,
uint16_t
port
)
{
STcpClient
*
pTcp
=
(
STcpClient
*
)
shandle
;
STcpFd
*
pFdObj
;
struct
epoll_event
event
;
struct
in_addr
destIp
;
int
fd
;
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pTcp
->
ipstr
);
if
(
fd
<=
0
)
return
NULL
;
pFdObj
=
(
STcpFd
*
)
malloc
(
sizeof
(
STcpFd
));
if
(
pFdObj
==
NULL
)
{
tError
(
"%s no enough resource to allocate TCP FD IDs"
,
pTcp
->
label
);
tclose
(
fd
);
return
NULL
;
}
memset
(
pFdObj
,
0
,
sizeof
(
STcpFd
));
pFdObj
->
fd
=
fd
;
strcpy
(
pFdObj
->
ipstr
,
ip
);
inet_aton
(
ip
,
&
destIp
);
pFdObj
->
ip
=
destIp
.
s_addr
;
pFdObj
->
port
=
port
;
pFdObj
->
pTcp
=
pTcp
;
pFdObj
->
thandle
=
thandle
;
pFdObj
->
signature
=
pFdObj
;
event
.
events
=
EPOLLIN
|
EPOLLPRI
|
EPOLLWAKEUP
;
event
.
data
.
ptr
=
pFdObj
;
if
(
epoll_ctl
(
pTcp
->
pollFd
,
EPOLL_CTL_ADD
,
fd
,
&
event
)
<
0
)
{
tError
(
"%s failed to add TCP FD for epoll, error:%s"
,
pTcp
->
label
,
strerror
(
errno
));
tfree
(
pFdObj
);
tclose
(
fd
);
return
NULL
;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock
(
&
(
pTcp
->
mutex
));
pFdObj
->
next
=
pTcp
->
pHead
;
if
(
pTcp
->
pHead
)
(
pTcp
->
pHead
)
->
prev
=
pFdObj
;
pTcp
->
pHead
=
pFdObj
;
pTcp
->
numOfFds
++
;
pthread_cond_signal
(
&
pTcp
->
fdReady
);
pthread_mutex_unlock
(
&
(
pTcp
->
mutex
));
tTrace
(
"%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d"
,
pTcp
->
label
,
ip
,
port
,
pFdObj
,
pTcp
->
numOfFds
);
return
pFdObj
;
}
void
taosCloseTcpClientConnection
(
void
*
chandle
)
{
STcpFd
*
pFdObj
=
(
STcpFd
*
)
chandle
;
if
(
pFdObj
==
NULL
)
return
;
taosCleanUpTcpFdObj
(
pFdObj
);
}
int
taosSendTcpClientData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
{
STcpFd
*
pFdObj
=
(
STcpFd
*
)
chandle
;
if
(
chandle
==
NULL
)
return
-
1
;
return
(
int
)
send
(
pFdObj
->
fd
,
data
,
(
size_t
)
len
,
0
);
}
static
void
taosCleanUpTcpFdObj
(
STcpFd
*
pFdObj
)
{
STcpClient
*
pTcp
;
SRecvInfo
recvInfo
;
if
(
pFdObj
==
NULL
)
return
;
if
(
pFdObj
->
signature
!=
pFdObj
)
return
;
...
...
@@ -75,8 +202,6 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
if
(
pTcp
->
numOfFds
<
0
)
tError
(
"%s number of TCP FDs shall never be negative, FD:%p"
,
pTcp
->
label
,
pFdObj
);
// remove from the FdObject list
if
(
pFdObj
->
prev
)
{
(
pFdObj
->
prev
)
->
next
=
pFdObj
->
next
;
}
else
{
...
...
@@ -89,40 +214,28 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pthread_mutex_unlock
(
&
pTcp
->
mutex
);
// notify the upper layer to clean the associated context
if
(
pFdObj
->
thandle
)
(
*
(
pTcp
->
processData
))(
NULL
,
0
,
0
,
0
,
pTcp
->
shandle
,
pFdObj
->
thandle
,
NULL
);
recvInfo
.
msg
=
NULL
;
recvInfo
.
msgLen
=
0
;
recvInfo
.
ip
=
0
;
recvInfo
.
port
=
0
;
recvInfo
.
shandle
=
pTcp
->
shandle
;
recvInfo
.
thandle
=
pFdObj
->
thandle
;;
recvInfo
.
chandle
=
NULL
;
recvInfo
.
connType
=
RPC_CONN_TCP
;
if
(
pFdObj
->
thandle
)
(
*
(
pTcp
->
processData
))(
&
recvInfo
);
tTrace
(
"%s TCP is cleaned up, FD:%p numOfFds:%d"
,
pTcp
->
label
,
pFdObj
,
pTcp
->
numOfFds
);
memset
(
pFdObj
,
0
,
sizeof
(
STcpFd
));
tfree
(
pFdObj
);
}
void
taosCleanUpTcpClient
(
void
*
chandle
)
{
STcpClient
*
pTcp
=
(
STcpClient
*
)
chandle
;
if
(
pTcp
==
NULL
)
return
;
while
(
pTcp
->
pHead
)
{
taosCleanUpTcpFdObj
(
pTcp
->
pHead
);
pTcp
->
pHead
=
pTcp
->
pHead
->
next
;
}
close
(
pTcp
->
pollFd
);
pthread_cancel
(
pTcp
->
thread
);
pthread_join
(
pTcp
->
thread
,
NULL
);
// tTrace (":%s, all connections are cleaned up", pTcp->label);
tfree
(
pTcp
);
}
static
void
*
taosReadTcpData
(
void
*
param
)
{
STcpClient
*
pTcp
=
(
STcpClient
*
)
param
;
STcpClient
*
pTcp
=
(
STcpClient
*
)
param
;
int
i
,
fdNum
;
STcpFd
*
pFdObj
;
STcpFd
*
pFdObj
;
struct
epoll_event
events
[
maxTcpEvents
];
SRecvInfo
recvInfo
;
while
(
1
)
{
pthread_mutex_lock
(
&
pTcp
->
mutex
);
...
...
@@ -186,8 +299,16 @@ static void *taosReadTcpData(void *param) {
continue
;
}
pFdObj
->
thandle
=
(
*
(
pTcp
->
processData
))(
buffer
,
dataLen
,
pFdObj
->
ip
,
pFdObj
->
port
,
pTcp
->
shandle
,
pFdObj
->
thandle
,
pFdObj
);
recvInfo
.
msg
=
buffer
;
recvInfo
.
msgLen
=
dataLen
;
recvInfo
.
ip
=
pFdObj
->
ip
;
recvInfo
.
port
=
pFdObj
->
port
;
recvInfo
.
shandle
=
pTcp
->
shandle
;
recvInfo
.
thandle
=
pFdObj
->
thandle
;;
recvInfo
.
chandle
=
pFdObj
;
recvInfo
.
connType
=
RPC_CONN_TCP
;
pFdObj
->
thandle
=
(
*
(
pTcp
->
processData
))(
&
recvInfo
);
if
(
pFdObj
->
thandle
==
NULL
)
taosCleanUpTcpFdObj
(
pFdObj
);
}
...
...
@@ -196,122 +317,4 @@ static void *taosReadTcpData(void *param) {
return
NULL
;
}
void
*
taosInitTcpClient
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
)
{
STcpClient
*
pTcp
;
pthread_attr_t
thattr
;
pTcp
=
(
STcpClient
*
)
malloc
(
sizeof
(
STcpClient
));
memset
(
pTcp
,
0
,
sizeof
(
STcpClient
));
strcpy
(
pTcp
->
label
,
label
);
strcpy
(
pTcp
->
ipstr
,
ip
);
pTcp
->
shandle
=
shandle
;
if
(
pthread_mutex_init
(
&
(
pTcp
->
mutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP mutex, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
if
(
pthread_cond_init
(
&
(
pTcp
->
fdReady
),
NULL
)
!=
0
)
{
tError
(
"%s init TCP condition variable failed, reason:%s
\n
"
,
label
,
strerror
(
errno
));
return
NULL
;
}
pTcp
->
pollFd
=
epoll_create
(
10
);
// size does not matter
if
(
pTcp
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
return
NULL
;
}
pTcp
->
processData
=
fp
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pTcp
->
thread
),
&
thattr
,
taosReadTcpData
,
(
void
*
)(
pTcp
))
!=
0
)
{
tError
(
"%s failed to create TCP read data thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
tTrace
(
"%s TCP client is initialized, ip:%s port:%hu"
,
label
,
ip
,
port
);
return
pTcp
;
}
void
taosCloseTcpClientConnection
(
void
*
chandle
)
{
STcpFd
*
pFdObj
=
(
STcpFd
*
)
chandle
;
if
(
pFdObj
==
NULL
)
return
;
taosCleanUpTcpFdObj
(
pFdObj
);
}
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
char
*
ip
,
uint16_t
port
)
{
STcpClient
*
pTcp
=
(
STcpClient
*
)
shandle
;
STcpFd
*
pFdObj
;
struct
epoll_event
event
;
struct
in_addr
destIp
;
int
fd
;
/*
if ( (strcmp(ip, "127.0.0.1") == 0 ) || (strcmp(ip, "localhost") == 0 ) ) {
fd = taosOpenUDClientSocket(ip, port);
} else {
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
}
*/
fd
=
taosOpenTcpClientSocket
(
ip
,
port
,
pTcp
->
ipstr
);
if
(
fd
<=
0
)
return
NULL
;
pFdObj
=
(
STcpFd
*
)
malloc
(
sizeof
(
STcpFd
));
if
(
pFdObj
==
NULL
)
{
tError
(
"%s no enough resource to allocate TCP FD IDs"
,
pTcp
->
label
);
tclose
(
fd
);
return
NULL
;
}
memset
(
pFdObj
,
0
,
sizeof
(
STcpFd
));
pFdObj
->
fd
=
fd
;
strcpy
(
pFdObj
->
ipstr
,
ip
);
inet_aton
(
ip
,
&
destIp
);
pFdObj
->
ip
=
destIp
.
s_addr
;
pFdObj
->
port
=
port
;
pFdObj
->
pTcp
=
pTcp
;
pFdObj
->
thandle
=
thandle
;
pFdObj
->
signature
=
pFdObj
;
event
.
events
=
EPOLLIN
|
EPOLLPRI
|
EPOLLWAKEUP
;
event
.
data
.
ptr
=
pFdObj
;
if
(
epoll_ctl
(
pTcp
->
pollFd
,
EPOLL_CTL_ADD
,
fd
,
&
event
)
<
0
)
{
tError
(
"%s failed to add TCP FD for epoll, error:%s"
,
pTcp
->
label
,
strerror
(
errno
));
tfree
(
pFdObj
);
tclose
(
fd
);
return
NULL
;
}
// notify the data process, add into the FdObj list
pthread_mutex_lock
(
&
(
pTcp
->
mutex
));
pFdObj
->
next
=
pTcp
->
pHead
;
if
(
pTcp
->
pHead
)
(
pTcp
->
pHead
)
->
prev
=
pFdObj
;
pTcp
->
pHead
=
pFdObj
;
pTcp
->
numOfFds
++
;
pthread_cond_signal
(
&
pTcp
->
fdReady
);
pthread_mutex_unlock
(
&
(
pTcp
->
mutex
));
tTrace
(
"%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d"
,
pTcp
->
label
,
ip
,
port
,
pFdObj
,
pTcp
->
numOfFds
);
return
pFdObj
;
}
int
taosSendTcpClientData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
)
{
STcpFd
*
pFdObj
=
(
STcpFd
*
)
chandle
;
if
(
chandle
==
NULL
)
return
-
1
;
return
(
int
)
send
(
pFdObj
->
fd
,
data
,
(
size_t
)
len
,
0
);
}
src/rpc/src/rpcMain.c
浏览文件 @
5d965446
...
...
@@ -49,9 +49,9 @@ typedef struct {
int
connType
;
char
label
[
12
];
char
user
[
TSDB_UNI_LEN
];
// meter ID
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
user
[
TSDB_UNI_LEN
];
// meter ID
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
secret
[
TSDB_KEY_LEN
];
// secret for the link
char
ckey
[
TSDB_KEY_LEN
];
// ciphering key
...
...
@@ -62,7 +62,8 @@ typedef struct {
void
*
idPool
;
// handle to ID pool
void
*
tmrCtrl
;
// handle to timer
void
*
hash
;
// handle returned by hash utility
void
*
shandle
;
// returned handle from lower layer during initialization
void
*
tcphandle
;
// returned handle from TCP initialization
void
*
udphandle
;
// returned handle from UDP initialization
void
*
pCache
;
// connection cache
pthread_mutex_t
mutex
;
struct
_RpcConn
*
connList
;
// connection list
...
...
@@ -79,6 +80,7 @@ typedef struct {
int16_t
numOfTry
;
// number of try for different servers
int8_t
oldIndex
;
// server IP index passed by app
int8_t
redirect
;
// flag to indicate redirect
int8_t
connType
;
// connection type
char
msg
[
0
];
// RpcHead starts from here
}
SRpcReqContext
;
...
...
@@ -113,6 +115,8 @@ typedef struct _RpcConn {
char
*
pReqMsg
;
// request message including header
int
reqMsgLen
;
// request message length
SRpcInfo
*
pRpc
;
// the associated SRpcInfo
int
connType
;
// connection type
int64_t
lockedBy
;
// lock for connection
SRpcReqContext
*
pContext
;
// request context
}
SRpcConn
;
...
...
@@ -122,9 +126,16 @@ int tsRpcProgressTime = 10; // milliseocnds
int
tsRpcMaxRetry
;
int
tsRpcHeadSize
;
// server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0
#define RPC_CONN_UDPC 1
#define RPC_CONN_TCPS 2
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
void
*
(
*
taosInitConn
[])(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
threads
,
void
*
fp
,
void
*
shandle
)
=
{
taosInitUdp
Server
,
taosInitUdpC
lient
,
taosInitUdp
Connection
,
taosInitUdpC
onnection
,
taosInitTcpServer
,
taosInitTcpClient
};
...
...
@@ -136,7 +147,7 @@ void (*taosCleanUpConn[])(void *thandle) = {
taosCleanUpTcpClient
};
int
(
*
taosSendData
[])(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
)
=
{
int
(
*
taosSendData
[])(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
=
{
taosSendUdpData
,
taosSendUdpData
,
taosSendTcpServerData
,
...
...
@@ -157,19 +168,19 @@ void (*taosCloseConn[])(void *chandle) = {
taosCloseTcpClientConnection
};
static
SRpcConn
*
rpcOpenConn
(
SRpcInfo
*
pRpc
,
char
*
peerIpStr
,
uint16_t
peerPort
);
static
SRpcConn
*
rpcOpenConn
(
SRpcInfo
*
pRpc
,
char
*
peerIpStr
,
uint16_t
peerPort
,
int8_t
connType
);
static
void
rpcCloseConn
(
void
*
thandle
);
static
SRpcConn
*
rpcSet
ConnToServer
(
SRpcInfo
*
pRpc
,
SRpcIpSet
ipSe
t
);
static
SRpcConn
*
rpcSet
upConnToServer
(
SRpcReqContext
*
pContex
t
);
static
SRpcConn
*
rpcAllocateClientConn
(
SRpcInfo
*
pRpc
);
static
SRpcConn
*
rpcAllocateServerConn
(
SRpcInfo
*
pRpc
,
char
*
user
,
char
*
hashstr
);
static
SRpcConn
*
rpcGetConnObj
(
SRpcInfo
*
pRpc
,
int
sid
,
char
*
user
,
char
*
hashstr
);
static
void
rpcSendReqToServer
(
SRpcInfo
*
pRpc
,
SRpcReqContext
*
pContext
);
static
void
rpcSendQuickRsp
(
SRpcConn
*
pConn
,
int32_t
code
);
static
void
rpcSendErrorMsgToPeer
(
SR
pcInfo
*
pRpc
,
char
*
pMsg
,
int32_t
code
,
uint32_t
ip
,
uint16_t
port
,
void
*
chandl
e
);
static
void
rpcSendErrorMsgToPeer
(
SR
ecvInfo
*
pRecv
,
int32_t
cod
e
);
static
void
rpcSendMsgToPeer
(
SRpcConn
*
pConn
,
void
*
data
,
int
dataLen
);
static
void
*
rpcProcessMsgFromPeer
(
void
*
msg
,
int
msgLen
,
uint32_t
ip
,
uint16_t
port
,
void
*
shandle
,
void
*
thandle
,
void
*
chandle
);
static
void
*
rpcProcessMsgFromPeer
(
SRecvInfo
*
pRecv
);
static
void
rpcProcessIncomingMsg
(
SRpcConn
*
pConn
,
SRpcHead
*
pHead
);
static
void
rpcProcessConnError
(
void
*
param
,
void
*
id
);
static
void
rpcProcessRetryTimer
(
void
*
,
void
*
);
...
...
@@ -181,6 +192,8 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
);
static
int
rpcAddAuthPart
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
);
static
int
rpcCheckAuthentication
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
);
static
void
rpcLockConn
(
SRpcConn
*
pConn
);
static
void
rpcUnlockConn
(
SRpcConn
*
pConn
);
void
*
rpcOpen
(
SRpcInit
*
pInit
)
{
SRpcInfo
*
pRpc
;
...
...
@@ -194,7 +207,8 @@ void *rpcOpen(SRpcInit *pInit) {
if
(
pInit
->
label
)
strcpy
(
pRpc
->
label
,
pInit
->
label
);
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
idleTime
=
pInit
->
idleTime
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
// pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
pRpc
->
numOfThreads
=
1
;
if
(
pInit
->
localIp
)
strcpy
(
pRpc
->
localIp
,
pInit
->
localIp
);
pRpc
->
localPort
=
pInit
->
localPort
;
pRpc
->
afp
=
pInit
->
afp
;
...
...
@@ -207,9 +221,12 @@ void *rpcOpen(SRpcInit *pInit) {
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
afp
=
pInit
->
afp
;
pRpc
->
shandle
=
(
*
taosInitConn
[
pRpc
->
connType
])(
pRpc
->
localIp
,
pRpc
->
localPort
,
pRpc
->
label
,
pRpc
->
tcphandle
=
(
*
taosInitConn
[
pRpc
->
connType
|
RPC_CONN_TCP
])(
pRpc
->
localIp
,
pRpc
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
rpcProcessMsgFromPeer
,
pRpc
);
pRpc
->
udphandle
=
(
*
taosInitConn
[
pRpc
->
connType
])(
pRpc
->
localIp
,
pRpc
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
rpcProcessMsgFromPeer
,
pRpc
);
if
(
pRpc
->
shandle
==
NULL
)
{
if
(
pRpc
->
tcphandle
==
NULL
||
pRpc
->
udphandle
==
NULL
)
{
tError
(
"%s failed to init network, %s:%d"
,
pRpc
->
label
,
pRpc
->
localIp
,
pRpc
->
localPort
);
rpcClose
(
pRpc
);
return
NULL
;
...
...
@@ -237,18 +254,20 @@ void *rpcOpen(SRpcInit *pInit) {
return
NULL
;
}
pRpc
->
hash
=
taosInitStrHash
(
pRpc
->
sessions
,
sizeof
(
pRpc
),
taosHashString
);
if
(
pRpc
->
hash
==
NULL
)
{
tError
(
"%s failed to init string hash"
,
pRpc
->
label
);
rpcClose
(
pRpc
);
return
NULL
;
}
pRpc
->
pCache
=
rpcOpenConnCache
(
pRpc
->
sessions
,
rpcCloseConn
,
pRpc
->
tmrCtrl
,
tsShellActivityTimer
*
1000
);
if
(
pRpc
->
pCache
==
NULL
)
{
tError
(
"%s failed to init connection cache"
,
pRpc
->
label
);
rpcClose
(
pRpc
);
return
NULL
;
if
(
pRpc
->
connType
==
TAOS_CONN_SERVER
)
{
pRpc
->
hash
=
taosInitStrHash
(
pRpc
->
sessions
,
sizeof
(
pRpc
),
taosHashString
);
if
(
pRpc
->
hash
==
NULL
)
{
tError
(
"%s failed to init string hash"
,
pRpc
->
label
);
rpcClose
(
pRpc
);
return
NULL
;
}
}
else
{
pRpc
->
pCache
=
rpcOpenConnCache
(
pRpc
->
sessions
,
rpcCloseConn
,
pRpc
->
tmrCtrl
,
tsShellActivityTimer
*
1000
);
if
(
pRpc
->
pCache
==
NULL
)
{
tError
(
"%s failed to init connection cache"
,
pRpc
->
label
);
rpcClose
(
pRpc
);
return
NULL
;
}
}
pthread_mutex_init
(
&
pRpc
->
mutex
,
NULL
);
...
...
@@ -261,7 +280,8 @@ void *rpcOpen(SRpcInit *pInit) {
void
rpcClose
(
void
*
param
)
{
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
param
;
(
*
taosCleanUpConn
[
pRpc
->
connType
])(
pRpc
->
shandle
);
(
*
taosCleanUpConn
[
pRpc
->
connType
|
RPC_CONN_TCP
])(
pRpc
->
tcphandle
);
(
*
taosCleanUpConn
[
pRpc
->
connType
])(
pRpc
->
udphandle
);
for
(
int
i
=
0
;
i
<
pRpc
->
sessions
;
++
i
)
{
if
(
pRpc
->
connList
[
i
].
user
[
0
])
{
...
...
@@ -313,6 +333,16 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
pContext
->
msgType
=
type
;
pContext
->
oldIndex
=
pIpSet
->
index
;
pContext
->
connType
=
RPC_CONN_UDPC
;
if
(
contLen
>
16000
)
pContext
->
connType
=
RPC_CONN_TCPC
;
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
if
(
type
==
TSDB_MSG_TYPE_DNODE_QUERY
||
type
==
TSDB_MSG_TYPE_DNODE_RETRIEVE
||
type
==
TSDB_MSG_TYPE_STABLE_META
||
type
==
TSDB_MSG_TYPE_MULTI_TABLE_META
||
type
==
TSDB_MSG_TYPE_SHOW
)
pContext
->
connType
=
RPC_CONN_TCPC
;
rpcSendReqToServer
(
pRpc
,
pContext
);
return
;
...
...
@@ -334,11 +364,11 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
contLen
=
rpcCompressRpcMsg
(
pCont
,
contLen
);
msgLen
=
rpcMsgLenFromCont
(
contLen
);
pthread_mutex_lock
(
&
pRpc
->
mutex
);
rpcLockConn
(
pConn
);
if
(
pConn
->
inType
==
0
||
pConn
->
user
[
0
]
==
0
)
{
tTrace
(
"%s %p, connection is already released, rsp wont be sent"
,
pRpc
->
label
,
pConn
);
pthread_mutex_lock
(
&
pRpc
->
mutex
);
rpcUnlockConn
(
pConn
);
return
;
}
...
...
@@ -346,7 +376,6 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead
->
version
=
1
;
pHead
->
msgType
=
pConn
->
inType
+
1
;
pHead
->
spi
=
0
;
pHead
->
tcp
=
0
;
pHead
->
encrypt
=
0
;
pHead
->
tranId
=
pConn
->
inTranId
;
pHead
->
sourceId
=
pConn
->
ownId
;
...
...
@@ -364,7 +393,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pConn
->
rspMsgLen
=
msgLen
;
if
(
pHead
->
content
[
0
]
==
TSDB_CODE_ACTION_IN_PROGRESS
)
pConn
->
inTranId
--
;
pthread_mutex_unlock
(
&
pRpc
->
mutex
);
rpcUnlockConn
(
pConn
);
taosTmrStopA
(
&
pConn
->
pTimer
);
rpcSendMsgToPeer
(
pConn
,
msg
,
msgLen
);
...
...
@@ -388,7 +417,6 @@ void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
void
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
SRpcConn
*
pConn
=
(
SRpcConn
*
)
thandle
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
pInfo
->
clientIp
=
pConn
->
peerIp
;
pInfo
->
clientPort
=
pConn
->
peerPort
;
...
...
@@ -396,7 +424,7 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
strcpy
(
pInfo
->
user
,
pConn
->
user
);
}
static
SRpcConn
*
rpcOpenConn
(
SRpcInfo
*
pRpc
,
char
*
peerIpStr
,
uint16_t
peerPort
)
{
static
SRpcConn
*
rpcOpenConn
(
SRpcInfo
*
pRpc
,
char
*
peerIpStr
,
uint16_t
peerPort
,
int8_t
connType
)
{
SRpcConn
*
pConn
;
pConn
=
rpcAllocateClientConn
(
pRpc
);
...
...
@@ -406,12 +434,14 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort)
pConn
->
peerIp
=
inet_addr
(
peerIpStr
);
pConn
->
peerPort
=
peerPort
;
strcpy
(
pConn
->
user
,
pRpc
->
user
);
pConn
->
connType
=
connType
;
if
(
taosOpenConn
[
pRpc
->
connType
])
{
pConn
->
chandle
=
(
*
taosOpenConn
[
pRpc
->
connType
])(
pRpc
->
shandle
,
pConn
,
pConn
->
peerIpstr
,
pConn
->
peerPort
);
if
(
taosOpenConn
[
connType
])
{
void
*
shandle
=
(
connType
&
RPC_CONN_TCP
)
?
pRpc
->
tcphandle
:
pRpc
->
udphandle
;
pConn
->
chandle
=
(
*
taosOpenConn
[
connType
])(
shandle
,
pConn
,
pConn
->
peerIpstr
,
pConn
->
peerPort
);
if
(
pConn
->
chandle
)
{
tTrace
(
"%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu
localPort
:%d"
,
pRpc
->
label
,
pConn
,
pConn
->
sid
,
pRpc
->
user
,
pConn
->
peerIpstr
,
pConn
->
peerPort
,
pConn
->
localPort
);
tTrace
(
"%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu
connType
:%d"
,
pRpc
->
label
,
pConn
,
pConn
->
sid
,
pRpc
->
user
,
pConn
->
peerIpstr
,
pConn
->
peerPort
,
pConn
->
connType
);
}
else
{
tError
(
"%s %p, failed to set up connection to ip:%s:%hu"
,
pRpc
->
label
,
pConn
,
pConn
->
peerIpstr
,
pConn
->
peerPort
);
...
...
@@ -429,18 +459,18 @@ static void rpcCloseConn(void *thandle) {
SRpcConn
*
pConn
=
(
SRpcConn
*
)
thandle
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
pthread_mutex_lock
(
&
pRpc
->
mutex
);
rpcLockConn
(
pConn
);
if
(
pConn
->
user
[
0
])
{
pConn
->
user
[
0
]
=
0
;
if
(
taosCloseConn
[
p
Rpc
->
connType
])
(
*
taosCloseConn
[
pRpc
->
connType
])(
pConn
->
chandle
);
if
(
taosCloseConn
[
p
Conn
->
connType
])
(
*
taosCloseConn
[
pConn
->
connType
])(
pConn
->
chandle
);
taosTmrStopA
(
&
pConn
->
pTimer
);
taosTmrStopA
(
&
pConn
->
pIdleTimer
);
if
(
pRpc
->
connType
==
TAOS_CONN_
UDPS
||
pRpc
->
connType
==
TAOS_CONN_TCPS
)
{
if
(
pRpc
->
connType
==
TAOS_CONN_
SERVER
)
{
char
hashstr
[
40
]
=
{
0
};
sprintf
(
hashstr
,
"%x:%x:%x
"
,
pConn
->
peerIp
,
pConn
->
peerUid
,
pConn
->
peerId
);
sprintf
(
hashstr
,
"%x:%x:%x
:%d"
,
pConn
->
peerIp
,
pConn
->
peerUid
,
pConn
->
peerId
,
pConn
->
connType
);
taosDeleteStrHash
(
pRpc
->
hash
,
hashstr
);
rpcFreeOutMsg
(
pConn
->
pRspMsg
);
// it may have a response msg saved, but not request msg
pConn
->
pRspMsg
=
NULL
;
...
...
@@ -458,7 +488,7 @@ static void rpcCloseConn(void *thandle) {
tTrace
(
"%s %p, rpc connection is closed"
,
pRpc
->
label
,
pConn
);
}
pthread_mutex_unlock
(
&
pRpc
->
mutex
);
rpcUnlockConn
(
pConn
);
}
static
SRpcConn
*
rpcAllocateClientConn
(
SRpcInfo
*
pRpc
)
{
...
...
@@ -540,15 +570,17 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, char *user, char *hashst
return
pConn
;
}
SRpcConn
*
rpcSetConnToServer
(
SRpcInfo
*
pRpc
,
SRpcIpSet
ipSet
)
{
SRpcConn
*
pConn
;
static
SRpcConn
*
rpcSetupConnToServer
(
SRpcReqContext
*
pContext
)
{
SRpcConn
*
pConn
;
SRpcInfo
*
pRpc
=
pContext
->
pRpc
;
SRpcIpSet
*
pIpSet
=
&
pContext
->
ipSet
;
pConn
=
rpcGetConnFromCache
(
pRpc
->
pCache
,
ipSet
.
ip
[
ipSet
.
index
],
ipSet
.
port
,
pRpc
->
user
);
pConn
=
rpcGetConnFromCache
(
pRpc
->
pCache
,
pIpSet
->
ip
[
pIpSet
->
index
],
pIpSet
->
port
,
pRpc
->
user
);
if
(
pConn
==
NULL
)
{
char
ipstr
[
20
]
=
{
0
};
tinet_ntoa
(
ipstr
,
ipSet
.
ip
[
ipSet
.
index
]);
pConn
=
rpcOpenConn
(
pRpc
,
ipstr
,
ipSet
.
port
);
pConn
->
destIp
=
ipSet
.
ip
[
ipSet
.
index
];
tinet_ntoa
(
ipstr
,
pIpSet
->
ip
[
pIpSet
->
index
]);
pConn
=
rpcOpenConn
(
pRpc
,
ipstr
,
pIpSet
->
port
,
pContext
->
connType
);
if
(
pConn
)
pConn
->
destIp
=
pIpSet
->
ip
[
pIpSet
->
index
];
}
return
pConn
;
...
...
@@ -618,7 +650,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
taosTmrStopA
(
&
pConn
->
pTimer
);
pConn
->
retry
=
0
;
if
(
*
pHead
->
content
==
TSDB_CODE_ACTION_IN_PROGRESS
||
pHead
->
tcp
)
{
if
(
*
pHead
->
content
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
pConn
->
tretry
<=
tsRpcMaxRetry
)
{
pConn
->
tretry
++
;
tTrace
(
"%s %p, peer is still processing the transaction"
,
pRpc
->
label
,
pConn
);
...
...
@@ -638,13 +670,12 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
rpcProcessHead
(
SRpcInfo
*
pRpc
,
SRpcConn
**
ppConn
,
void
*
data
,
int
dataLen
,
uint32_t
ip
)
{
int32_t
sid
,
code
=
0
;
SRpcConn
*
pConn
=
NULL
;
static
SRpcConn
*
rpcProcessHead
(
SRpcInfo
*
pRpc
,
SRecvInfo
*
pRecv
)
{
int32_t
sid
;
SRpcConn
*
pConn
=
NULL
;
char
hashstr
[
40
]
=
{
0
};
*
ppConn
=
NULL
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
data
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
sid
=
htonl
(
pHead
->
destId
);
pHead
->
code
=
htonl
(
pHead
->
code
);
...
...
@@ -652,50 +683,57 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int
if
(
pHead
->
msgType
>=
TSDB_MSG_TYPE_MAX
||
pHead
->
msgType
<=
0
)
{
tTrace
(
"%s sid:%d, invalid message type:%d"
,
pRpc
->
label
,
sid
,
pHead
->
msgType
);
return
TSDB_CODE_INVALID_MSG_TYPE
;
terrno
=
TSDB_CODE_INVALID_MSG_TYPE
;
return
NULL
;
}
if
(
data
Len
!=
pHead
->
msgLen
)
{
if
(
pRecv
->
msg
Len
!=
pHead
->
msgLen
)
{
tTrace
(
"%s sid:%d, %s has invalid length, dataLen:%d, msgLen:%d"
,
pRpc
->
label
,
sid
,
taosMsg
[
pHead
->
msgType
],
data
Len
,
pHead
->
msgLen
);
return
TSDB_CODE_INVALID_MSG_LEN
;
taosMsg
[
pHead
->
msgType
],
pRecv
->
msg
Len
,
pHead
->
msgLen
);
terrno
=
TSDB_CODE_INVALID_MSG_LEN
;
return
NULL
;
}
if
(
sid
<
0
||
sid
>=
pRpc
->
sessions
)
{
tTrace
(
"%s sid:%d, sid is out of range, max sid:%d, %s discarded"
,
pRpc
->
label
,
sid
,
pRpc
->
sessions
,
taosMsg
[
pHead
->
msgType
]);
return
TSDB_CODE_INVALID_SESSION_ID
;
terrno
=
TSDB_CODE_INVALID_SESSION_ID
;
return
NULL
;
}
if
(
sid
==
0
)
sprintf
(
hashstr
,
"%x:%x:%x
"
,
ip
,
pHead
->
uid
,
pHead
->
sourceId
);
if
(
sid
==
0
)
sprintf
(
hashstr
,
"%x:%x:%x
:%d"
,
pRecv
->
ip
,
pHead
->
uid
,
pHead
->
sourceId
,
pRecv
->
connType
);
pConn
=
rpcGetConnObj
(
pRpc
,
sid
,
pHead
->
user
,
hashstr
);
if
(
pConn
==
NULL
)
return
terrno
;
if
(
pConn
==
NULL
)
return
NULL
;
*
ppConn
=
pConn
;
rpcLockConn
(
pConn
)
;
sid
=
pConn
->
sid
;
if
(
pHead
->
uid
)
pConn
->
peerUid
=
pHead
->
uid
;
if
(
pHead
->
tcp
)
{
tTrace
(
"%s %p, content will be transfered via TCP"
,
pRpc
->
label
,
pConn
)
;
if
(
pConn
->
outType
)
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
return
TSDB_CODE_ALREADY_PROCESSED
;
pConn
->
chandle
=
pRecv
->
chandle
;
if
(
pConn
->
peerIp
!=
pRecv
->
ip
)
{
pConn
->
peerIp
=
pRecv
->
ip
;
char
ipstr
[
20
]
=
{
0
}
;
tinet_ntoa
(
ipstr
,
pRecv
->
ip
);
strcpy
(
pConn
->
peerIpstr
,
ipstr
)
;
}
if
(
pRecv
->
port
)
pConn
->
peerPort
=
pRecv
->
port
;
if
(
pHead
->
port
)
pConn
->
peerPort
=
pHead
->
port
;
if
(
pHead
->
uid
)
pConn
->
peerUid
=
pHead
->
uid
;
code
=
rpcCheckAuthentication
(
pConn
,
(
char
*
)
pHead
,
dataLen
);
if
(
code
!=
0
)
return
code
;
terrno
=
rpcCheckAuthentication
(
pConn
,
(
char
*
)
pHead
,
pRecv
->
msgLen
);
if
(
terrno
==
0
)
{
if
(
pHead
->
msgType
!=
TSDB_MSG_TYPE_REG
&&
pHead
->
encrypt
)
{
// decrypt here
}
if
(
pHead
->
msgType
!=
TSDB_MSG_TYPE_REG
&&
pHead
->
encrypt
)
{
// decrypt here
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
terrno
=
rpcProcessReqHead
(
pConn
,
pHead
);
pConn
->
connType
=
pRecv
->
connType
;
}
else
{
terrno
=
rpcProcessRspHead
(
pConn
,
pHead
);
}
}
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
code
=
rpcProcessReqHead
(
pConn
,
pHead
);
}
else
{
code
=
rpcProcessRspHead
(
pConn
,
pHead
);
}
rpcUnlockConn
(
pConn
);
return
code
;
return
pConn
;
}
static
void
rpcProcessBrokenLink
(
SRpcConn
*
pConn
)
{
...
...
@@ -713,45 +751,29 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
rpcCloseConn
(
pConn
);
}
static
void
*
rpcProcessMsgFromPeer
(
void
*
msg
,
int
msgLen
,
uint32_t
ip
,
uint16_t
port
,
void
*
shandle
,
void
*
thandle
,
void
*
chandle
)
{
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
shandle
;
SRpcConn
*
pConn
=
(
SRpcConn
*
)
thandle
;
static
void
*
rpcProcessMsgFromPeer
(
SRecvInfo
*
pRecv
)
{
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
pRecv
->
shandle
;
SRpcConn
*
pConn
=
(
SRpcConn
*
)
pRecv
->
thandle
;
int32_t
code
=
0
;
tDump
(
msg
,
msgLen
);
tDump
(
pRecv
->
msg
,
pRecv
->
msgLen
);
if
(
ip
==
0
&&
pConn
)
{
// underlying UDP layer does not know it is server or client
pRecv
->
connType
=
pRecv
->
connType
|
pRpc
->
connType
;
if
(
pRecv
->
ip
==
0
&&
pConn
)
{
rpcProcessBrokenLink
(
pConn
);
tfree
(
msg
);
tfree
(
pRecv
->
msg
);
return
NULL
;
}
pthread_mutex_lock
(
&
pRpc
->
mutex
);
code
=
rpcProcessHead
(
pRpc
,
&
pConn
,
msg
,
msgLen
,
ip
);
if
(
pConn
)
{
// update connection info
pConn
->
chandle
=
chandle
;
if
(
pConn
->
peerIp
!=
ip
)
{
pConn
->
peerIp
=
ip
;
char
ipstr
[
20
]
=
{
0
};
tinet_ntoa
(
ipstr
,
ip
);
strcpy
(
pConn
->
peerIpstr
,
ipstr
);
}
if
(
port
)
pConn
->
peerPort
=
port
;
if
(
pHead
->
port
)
// port maybe changed by the peer
pConn
->
peerPort
=
pHead
->
port
;
}
pthread_mutex_unlock
(
&
pRpc
->
mutex
);
pConn
=
rpcProcessHead
(
pRpc
,
pRecv
);
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
{
tTrace
(
"%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
ip
,
port
,
code
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pRecv
->
ip
,
pRecv
->
port
,
code
,
pRecv
->
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
if
(
pConn
&&
pRpc
->
idleTime
)
{
...
...
@@ -761,7 +783,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
if
(
code
!=
TSDB_CODE_ALREADY_PROCESSED
)
{
if
(
code
!=
0
)
{
// parsing error
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
rpcSendErrorMsgToPeer
(
pR
pc
,
msg
,
code
,
ip
,
port
,
chandl
e
);
rpcSendErrorMsgToPeer
(
pR
ecv
,
cod
e
);
tTrace
(
"%s %p, %s is sent with error code:%x"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
+
1
],
code
);
}
}
else
{
// parsing OK
...
...
@@ -769,7 +791,7 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
}
}
if
(
code
!=
0
)
free
(
msg
);
if
(
code
!=
0
)
free
(
pRecv
->
msg
);
return
pConn
;
}
...
...
@@ -816,7 +838,6 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
pHead
->
version
=
1
;
pHead
->
msgType
=
pConn
->
inType
+
1
;
pHead
->
spi
=
0
;
pHead
->
tcp
=
0
;
pHead
->
encrypt
=
0
;
pHead
->
tranId
=
pConn
->
inTranId
;
pHead
->
sourceId
=
pConn
->
ownId
;
...
...
@@ -828,19 +849,18 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
rpcSendMsgToPeer
(
pConn
,
msg
,
0
);
}
static
void
rpcSendErrorMsgToPeer
(
SR
pcInfo
*
pRpc
,
char
*
pMsg
,
int32_t
code
,
uint32_t
ip
,
uint16_t
port
,
void
*
chandl
e
)
{
static
void
rpcSendErrorMsgToPeer
(
SR
ecvInfo
*
pRecv
,
int32_t
cod
e
)
{
SRpcHead
*
pRecvHead
,
*
pReplyHead
;
char
msg
[
sizeof
(
SRpcHead
)
+
sizeof
(
SRpcDigest
)
+
sizeof
(
uint32_t
)
];
uint32_t
timeStamp
;
int
msgLen
;
uint32_t
timeStamp
;
int
msgLen
;
pRecvHead
=
(
SRpcHead
*
)
p
M
sg
;
pRecvHead
=
(
SRpcHead
*
)
p
Recv
->
m
sg
;
pReplyHead
=
(
SRpcHead
*
)
msg
;
memset
(
msg
,
0
,
sizeof
(
SRpcHead
));
pReplyHead
->
version
=
pRecvHead
->
version
;
pReplyHead
->
msgType
=
(
char
)(
pRecvHead
->
msgType
+
1
);
pReplyHead
->
tcp
=
0
;
pReplyHead
->
spi
=
0
;
pReplyHead
->
encrypt
=
0
;
pReplyHead
->
tranId
=
pRecvHead
->
tranId
;
...
...
@@ -860,7 +880,7 @@ static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint
}
pReplyHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
(
*
taosSendData
[
pR
pc
->
connType
])(
ip
,
port
,
msg
,
msgLen
,
chandle
);
(
*
taosSendData
[
pR
ecv
->
connType
])(
pRecv
->
ip
,
pRecv
->
port
,
msg
,
msgLen
,
pRecv
->
chandle
);
return
;
}
...
...
@@ -872,19 +892,18 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
char
msgType
=
pContext
->
msgType
;
pContext
->
numOfTry
++
;
SRpcConn
*
pConn
=
rpcSet
ConnToServer
(
pRpc
,
pContext
->
ipSe
t
);
SRpcConn
*
pConn
=
rpcSet
upConnToServer
(
pContex
t
);
if
(
pConn
==
NULL
)
{
pContext
->
code
=
terrno
;
taosTmrStart
(
rpcProcessConnError
,
0
,
pContext
,
pRpc
->
tmrCtrl
);
return
;
}
pthread_mutex_lock
(
&
pRpc
->
mutex
);
rpcLockConn
(
pConn
);
// set the message header
pHead
->
version
=
1
;
pHead
->
msgType
=
msgType
;
pHead
->
tcp
=
0
;
pHead
->
encrypt
=
0
;
pConn
->
tranId
++
;
if
(
pConn
->
tranId
==
0
)
pConn
->
tranId
++
;
...
...
@@ -903,10 +922,10 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pConn
->
reqMsgLen
=
msgLen
;
pConn
->
pContext
=
pContext
;
pthread_mutex_unlock
(
&
pRpc
->
mutex
);
rpcUnlockConn
(
pConn
);
rpcSendMsgToPeer
(
pConn
,
msg
,
msgLen
);
//
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
taosTmrReset
(
rpcProcessRetryTimer
,
tsRpcTimer
,
pConn
,
pRpc
->
tmrCtrl
,
&
pConn
->
pTimer
);
}
static
void
rpcSendMsgToPeer
(
SRpcConn
*
pConn
,
void
*
msg
,
int
msgLen
)
{
...
...
@@ -928,7 +947,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
(
uint8_t
)
pHead
->
content
[
0
],
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
writtenLen
=
(
*
taosSendData
[
p
Rpc
->
connType
])(
pConn
->
peerIp
,
pConn
->
peerPort
,
(
char
*
)
pHead
,
msgLen
,
pConn
->
chandle
);
writtenLen
=
(
*
taosSendData
[
p
Conn
->
connType
])(
pConn
->
peerIp
,
pConn
->
peerPort
,
pHead
,
msgLen
,
pConn
->
chandle
);
if
(
writtenLen
!=
msgLen
)
{
tError
(
"%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s"
,
pRpc
->
label
,
pConn
,
...
...
@@ -960,7 +979,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
int
reportDisc
=
0
;
pthread_mutex_lock
(
&
pRpc
->
mutex
);
rpcLockConn
(
pConn
);
if
(
pConn
->
outType
&&
pConn
->
user
[
0
])
{
tTrace
(
"%s %p, expected %s is not received"
,
pRpc
->
label
,
pConn
,
taosMsg
[(
int
)
pConn
->
outType
+
1
]);
...
...
@@ -982,7 +1001,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
tTrace
(
"%s %p, retry timer not processed"
,
pRpc
->
label
,
pConn
);
}
pthread_mutex_unlock
(
&
pRpc
->
mutex
);
rpcUnlockConn
(
pConn
);
if
(
reportDisc
&&
pConn
->
pContext
)
{
pConn
->
pContext
->
code
=
TSDB_CODE_NETWORK_UNAVAIL
;
...
...
@@ -1007,7 +1026,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
SRpcConn
*
pConn
=
(
SRpcConn
*
)
param
;
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
pthread_mutex_lock
(
&
pRpc
->
mutex
);
rpcLockConn
(
pConn
);
if
(
pConn
->
inType
&&
pConn
->
user
[
0
])
{
tTrace
(
"%s %p, progress timer expired, send progress"
,
pRpc
->
label
,
pConn
);
...
...
@@ -1017,7 +1036,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
tTrace
(
"%s %p, progress timer:%p not processed"
,
pRpc
->
label
,
pConn
,
tmrId
);
}
pthread_mutex_unlock
(
&
pRpc
->
mutex
);
rpcUnlockConn
(
pConn
);
}
static
void
rpcFreeOutMsg
(
void
*
msg
)
{
...
...
@@ -1180,4 +1199,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
return
code
;
}
static
void
rpcLockConn
(
SRpcConn
*
pConn
)
{
int64_t
tid
=
taosGetPthreadId
();
int
i
=
0
;
while
(
atomic_val_compare_exchange_64
(
&
(
pConn
->
lockedBy
),
0
,
tid
)
!=
0
)
{
if
(
++
i
%
1000
==
0
)
{
sched_yield
();
}
}
}
static
void
rpcUnlockConn
(
SRpcConn
*
pConn
)
{
int64_t
tid
=
taosGetPthreadId
();
if
(
atomic_val_compare_exchange_64
(
&
(
pConn
->
lockedBy
),
tid
,
0
)
!=
tid
)
{
assert
(
false
);
}
}
src/rpc/src/rpcServer.c
浏览文件 @
5d965446
...
...
@@ -46,10 +46,8 @@ typedef struct _thread_obj {
int
numOfFds
;
int
threadId
;
char
label
[
12
];
// char buffer[128000]; // buffer to receive data
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
(
*
processData
)(
char
*
data
,
int
dataLen
,
unsigned
int
ip
,
uint16_t
port
,
void
*
shandle
,
void
*
thandle
,
void
*
chandle
);
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
(
*
processData
)(
SRecvInfo
*
pPacket
);
}
SThreadObj
;
typedef
struct
{
...
...
@@ -62,59 +60,81 @@ typedef struct {
pthread_t
thread
;
}
SServerObj
;
static
void
taosCleanUpFdObj
(
SFdObj
*
pFdObj
)
{
SThreadObj
*
pThreadObj
;
static
void
taosCleanUpFdObj
(
SFdObj
*
pFdObj
);
static
void
taosProcessTcpData
(
void
*
param
);
static
void
taosAcceptTcpConnection
(
void
*
arg
);
if
(
pFdObj
==
NULL
)
return
;
if
(
pFdObj
->
signature
!=
pFdObj
)
return
;
void
*
taosInitTcpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
int
i
;
SServerObj
*
pServerObj
;
pthread_attr_t
thattr
;
SThreadObj
*
pThreadObj
;
pThreadObj
=
pFdObj
->
pThreadObj
;
if
(
pThreadObj
==
NULL
)
{
tError
(
"FdObj double clean up!!!"
);
return
;
pServerObj
=
(
SServerObj
*
)
malloc
(
sizeof
(
SServerObj
));
strcpy
(
pServerObj
->
ip
,
ip
);
pServerObj
->
port
=
port
;
strcpy
(
pServerObj
->
label
,
label
);
pServerObj
->
numOfThreads
=
numOfThreads
;
pServerObj
->
pThreadObj
=
(
SThreadObj
*
)
malloc
(
sizeof
(
SThreadObj
)
*
(
size_t
)
numOfThreads
);
if
(
pServerObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
return
NULL
;
}
memset
(
pServerObj
->
pThreadObj
,
0
,
sizeof
(
SThreadObj
)
*
(
size_t
)
numOfThreads
);
epoll_ctl
(
pThreadObj
->
pollFd
,
EPOLL_CTL_DEL
,
pFdObj
->
fd
,
NULL
);
close
(
pFdObj
->
fd
);
pThreadObj
=
pServerObj
->
pThreadObj
;
for
(
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pThreadObj
->
processData
=
fp
;
strcpy
(
pThreadObj
->
label
,
label
);
pThreadObj
->
shandle
=
shandle
;
pthread_mutex_lock
(
&
pThreadObj
->
threadMutex
);
if
(
pthread_mutex_init
(
&
(
pThreadObj
->
threadMutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP process data mutex, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
pThreadObj
->
numOfFds
--
;
if
(
pthread_cond_init
(
&
(
pThreadObj
->
fdReady
),
NULL
)
!=
0
)
{
tError
(
"%s init TCP condition variable failed, reason:%s
\n
"
,
label
,
strerror
(
errno
));
return
NULL
;
}
if
(
pThreadObj
->
numOfFds
<
0
)
tError
(
"%s TCP thread:%d, number of FDs shall never be negative"
,
pThreadObj
->
label
,
pThreadObj
->
threadId
);
pThreadObj
->
pollFd
=
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
return
NULL
;
}
// remove from the FdObject list
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
(
void
*
)
taosProcessTcpData
,
(
void
*
)(
pThreadObj
))
!=
0
)
{
tError
(
"%s failed to create TCP process data thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
if
(
pFdObj
->
prev
)
{
(
pFdObj
->
prev
)
->
next
=
pFdObj
->
next
;
}
else
{
pThreadObj
->
pHead
=
pFdObj
->
next
;
pThreadObj
->
threadId
=
i
;
pThreadObj
++
;
}
if
(
pFdObj
->
next
)
{
(
pFdObj
->
next
)
->
prev
=
pFdObj
->
prev
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pServerObj
->
thread
),
&
thattr
,
(
void
*
)
taosAcceptTcpConnection
,
(
void
*
)(
pServerObj
))
!=
0
)
{
tError
(
"%s failed to create TCP accept thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
pthread_mutex_unlock
(
&
pThreadObj
->
threadMutex
);
// notify the upper layer, so it will clean the associated context
if
(
pFdObj
->
thandle
)
(
*
(
pThreadObj
->
processData
))(
NULL
,
0
,
0
,
0
,
pThreadObj
->
shandle
,
pFdObj
->
thandle
,
NULL
);
tTrace
(
"%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d"
,
pThreadObj
->
label
,
pThreadObj
->
threadId
,
pFdObj
,
pThreadObj
->
numOfFds
);
memset
(
pFdObj
,
0
,
sizeof
(
SFdObj
));
tfree
(
pFdObj
);
}
void
taosCloseTcpServerConnection
(
void
*
chandle
)
{
SFdObj
*
pFdObj
=
(
SFdObj
*
)
chandle
;
if
(
pFdObj
==
NULL
)
return
;
/*
if ( pthread_create(&(pServerObj->thread), &thattr,
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
tError("%s failed to create UD accept thread, reason:%s", label,
strerror(errno));
return NULL;
}
*/
pthread_attr_destroy
(
&
thattr
);
tTrace
(
"%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d"
,
label
,
ip
,
port
,
numOfThreads
);
taosCleanUpFdObj
(
pFdObj
)
;
return
(
void
*
)
pServerObj
;
}
void
taosCleanUpTcpServer
(
void
*
handle
)
{
...
...
@@ -148,6 +168,22 @@ void taosCleanUpTcpServer(void *handle) {
tfree
(
pServerObj
);
}
void
taosCloseTcpServerConnection
(
void
*
chandle
)
{
SFdObj
*
pFdObj
=
(
SFdObj
*
)
chandle
;
if
(
pFdObj
==
NULL
)
return
;
taosCleanUpFdObj
(
pFdObj
);
}
int
taosSendTcpServerData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
{
SFdObj
*
pFdObj
=
(
SFdObj
*
)
chandle
;
if
(
chandle
==
NULL
)
return
-
1
;
return
(
int
)
send
(
pFdObj
->
fd
,
data
,
(
size_t
)
len
,
0
);
}
#define maxEvents 10
static
void
taosProcessTcpData
(
void
*
param
)
{
...
...
@@ -155,7 +191,7 @@ static void taosProcessTcpData(void *param) {
int
i
,
fdNum
;
SFdObj
*
pFdObj
;
struct
epoll_event
events
[
maxEvents
];
SRecvInfo
recvInfo
;
pThreadObj
=
(
SThreadObj
*
)
param
;
while
(
1
)
{
...
...
@@ -209,15 +245,22 @@ static void taosProcessTcpData(void *param) {
continue
;
}
pFdObj
->
thandle
=
(
*
(
pThreadObj
->
processData
))(
buffer
,
dataLen
,
pFdObj
->
ip
,
pFdObj
->
port
,
pThreadObj
->
shandle
,
pFdObj
->
thandle
,
pFdObj
);
recvInfo
.
msg
=
buffer
;
recvInfo
.
msgLen
=
dataLen
;
recvInfo
.
ip
=
pFdObj
->
ip
;
recvInfo
.
port
=
pFdObj
->
port
;
recvInfo
.
shandle
=
pThreadObj
->
shandle
;
recvInfo
.
thandle
=
pFdObj
->
thandle
;;
recvInfo
.
chandle
=
pFdObj
;
recvInfo
.
connType
=
RPC_CONN_TCP
;
pFdObj
->
thandle
=
(
*
(
pThreadObj
->
processData
))(
&
recvInfo
);
if
(
pFdObj
->
thandle
==
NULL
)
taosCleanUpFdObj
(
pFdObj
);
}
}
}
void
taosAcceptTcpConnection
(
void
*
arg
)
{
static
void
taosAcceptTcpConnection
(
void
*
arg
)
{
int
connFd
=
-
1
;
struct
sockaddr_in
clientAddr
;
int
sockFd
;
...
...
@@ -280,16 +323,11 @@ void taosAcceptTcpConnection(void *arg) {
// notify the data process, add into the FdObj list
pthread_mutex_lock
(
&
(
pThreadObj
->
threadMutex
));
pFdObj
->
next
=
pThreadObj
->
pHead
;
if
(
pThreadObj
->
pHead
)
(
pThreadObj
->
pHead
)
->
prev
=
pFdObj
;
pThreadObj
->
pHead
=
pFdObj
;
pThreadObj
->
numOfFds
++
;
pthread_cond_signal
(
&
pThreadObj
->
fdReady
);
pthread_mutex_unlock
(
&
(
pThreadObj
->
threadMutex
));
tTrace
(
"%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d"
,
pServerObj
->
label
,
...
...
@@ -301,7 +339,65 @@ void taosAcceptTcpConnection(void *arg) {
}
}
void
taosAcceptUDConnection
(
void
*
arg
)
{
static
void
taosCleanUpFdObj
(
SFdObj
*
pFdObj
)
{
SThreadObj
*
pThreadObj
;
if
(
pFdObj
==
NULL
)
return
;
if
(
pFdObj
->
signature
!=
pFdObj
)
return
;
pThreadObj
=
pFdObj
->
pThreadObj
;
if
(
pThreadObj
==
NULL
)
{
tError
(
"FdObj double clean up!!!"
);
return
;
}
epoll_ctl
(
pThreadObj
->
pollFd
,
EPOLL_CTL_DEL
,
pFdObj
->
fd
,
NULL
);
close
(
pFdObj
->
fd
);
pthread_mutex_lock
(
&
pThreadObj
->
threadMutex
);
pThreadObj
->
numOfFds
--
;
if
(
pThreadObj
->
numOfFds
<
0
)
tError
(
"%s TCP thread:%d, number of FDs shall never be negative"
,
pThreadObj
->
label
,
pThreadObj
->
threadId
);
// remove from the FdObject list
if
(
pFdObj
->
prev
)
{
(
pFdObj
->
prev
)
->
next
=
pFdObj
->
next
;
}
else
{
pThreadObj
->
pHead
=
pFdObj
->
next
;
}
if
(
pFdObj
->
next
)
{
(
pFdObj
->
next
)
->
prev
=
pFdObj
->
prev
;
}
pthread_mutex_unlock
(
&
pThreadObj
->
threadMutex
);
// notify the upper layer, so it will clean the associated context
SRecvInfo
recvInfo
;
recvInfo
.
msg
=
NULL
;
recvInfo
.
msgLen
=
0
;
recvInfo
.
ip
=
0
;
recvInfo
.
port
=
0
;
recvInfo
.
shandle
=
pThreadObj
->
shandle
;
recvInfo
.
thandle
=
pFdObj
->
thandle
;;
recvInfo
.
chandle
=
NULL
;
recvInfo
.
connType
=
RPC_CONN_TCP
;
if
(
pFdObj
->
thandle
)
(
*
(
pThreadObj
->
processData
))(
&
recvInfo
);
tTrace
(
"%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d"
,
pThreadObj
->
label
,
pThreadObj
->
threadId
,
pFdObj
,
pThreadObj
->
numOfFds
);
memset
(
pFdObj
,
0
,
sizeof
(
SFdObj
));
tfree
(
pFdObj
);
}
#if 0
static void taosAcceptUDConnection(void *arg) {
int connFd = -1;
int sockFd;
int threadId = 0;
...
...
@@ -353,16 +449,11 @@ void taosAcceptUDConnection(void *arg) {
// notify the data process, add into the FdObj list
pthread_mutex_lock(&(pThreadObj->threadMutex));
pFdObj->next = pThreadObj->pHead;
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
...
...
@@ -373,79 +464,7 @@ void taosAcceptUDConnection(void *arg) {
threadId = threadId % pServerObj->numOfThreads;
}
}
void
*
taosInitTcpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
int
i
;
SServerObj
*
pServerObj
;
pthread_attr_t
thattr
;
SThreadObj
*
pThreadObj
;
pServerObj
=
(
SServerObj
*
)
malloc
(
sizeof
(
SServerObj
));
strcpy
(
pServerObj
->
ip
,
ip
);
pServerObj
->
port
=
port
;
strcpy
(
pServerObj
->
label
,
label
);
pServerObj
->
numOfThreads
=
numOfThreads
;
pServerObj
->
pThreadObj
=
(
SThreadObj
*
)
malloc
(
sizeof
(
SThreadObj
)
*
(
size_t
)
numOfThreads
);
if
(
pServerObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
return
NULL
;
}
memset
(
pServerObj
->
pThreadObj
,
0
,
sizeof
(
SThreadObj
)
*
(
size_t
)
numOfThreads
);
pThreadObj
=
pServerObj
->
pThreadObj
;
for
(
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pThreadObj
->
processData
=
fp
;
strcpy
(
pThreadObj
->
label
,
label
);
pThreadObj
->
shandle
=
shandle
;
if
(
pthread_mutex_init
(
&
(
pThreadObj
->
threadMutex
),
NULL
)
<
0
)
{
tError
(
"%s failed to init TCP process data mutex, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
if
(
pthread_cond_init
(
&
(
pThreadObj
->
fdReady
),
NULL
)
!=
0
)
{
tError
(
"%s init TCP condition variable failed, reason:%s
\n
"
,
label
,
strerror
(
errno
));
return
NULL
;
}
pThreadObj
->
pollFd
=
epoll_create
(
10
);
// size does not matter
if
(
pThreadObj
->
pollFd
<
0
)
{
tError
(
"%s failed to create TCP epoll"
,
label
);
return
NULL
;
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
(
void
*
)
taosProcessTcpData
,
(
void
*
)(
pThreadObj
))
!=
0
)
{
tError
(
"%s failed to create TCP process data thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
pThreadObj
->
threadId
=
i
;
pThreadObj
++
;
}
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
if
(
pthread_create
(
&
(
pServerObj
->
thread
),
&
thattr
,
(
void
*
)
taosAcceptTcpConnection
,
(
void
*
)(
pServerObj
))
!=
0
)
{
tError
(
"%s failed to create TCP accept thread, reason:%s"
,
label
,
strerror
(
errno
));
return
NULL
;
}
/*
if ( pthread_create(&(pServerObj->thread), &thattr,
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
tError("%s failed to create UD accept thread, reason:%s", label,
strerror(errno));
return NULL;
}
*/
pthread_attr_destroy
(
&
thattr
);
tTrace
(
"%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d"
,
label
,
ip
,
port
,
numOfThreads
);
return
(
void
*
)
pServerObj
;
}
#endif
#if 0
void taosListTcpConnection(void *handle, char *buffer) {
...
...
@@ -489,10 +508,4 @@ void taosListTcpConnection(void *handle, char *buffer) {
}
#endif
int
taosSendTcpServerData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
len
,
void
*
chandle
)
{
SFdObj
*
pFdObj
=
(
SFdObj
*
)
chandle
;
if
(
chandle
==
NULL
)
return
-
1
;
return
(
int
)
send
(
pFdObj
->
fd
,
data
,
(
size_t
)
len
,
0
);
}
src/rpc/src/rpcUdp.c
浏览文件 @
5d965446
...
...
@@ -44,9 +44,8 @@ typedef struct {
void
*
hash
;
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
pSet
;
void
*
(
*
processData
)(
char
*
data
,
int
dataLen
,
unsigned
int
ip
,
uint16_t
port
,
void
*
shandle
,
void
*
thandle
,
void
*
chandle
);
char
buffer
[
RPC_MAX_UDP_SIZE
];
// buffer to receive data
void
*
(
*
processData
)(
SRecvInfo
*
pRecv
);
char
buffer
[
RPC_MAX_UDP_SIZE
];
// buffer to receive data
}
SUdpConn
;
typedef
struct
{
...
...
@@ -58,10 +57,8 @@ typedef struct {
int
threads
;
char
label
[
12
];
void
*
tmrCtrl
;
pthread_t
tcpThread
;
int
tcpFd
;
void
*
(
*
fp
)(
char
*
data
,
int
dataLen
,
uint32_t
ip
,
uint16_t
port
,
void
*
shandle
,
void
*
thandle
,
void
*
chandle
);
SUdpConn
udpConn
[];
void
*
(
*
fp
)(
SRecvInfo
*
pPacket
);
SUdpConn
udpConn
[];
}
SUdpConnSet
;
typedef
struct
{
...
...
@@ -76,420 +73,9 @@ typedef struct {
int
emptyNum
;
}
SUdpBuf
;
typedef
struct
{
uint64_t
handle
;
uint16_t
port
;
int32_t
msgLen
;
}
SPacketInfo
;
typedef
struct
{
int
fd
;
uint32_t
ip
;
uint16_t
port
;
SUdpConnSet
*
pSet
;
}
STransfer
;
typedef
struct
{
void
*
pTimer
;
SUdpConnSet
*
pSet
;
SUdpConn
*
pConn
;
int
dataLen
;
uint32_t
ip
;
uint16_t
port
;
char
data
[
96
];
}
SMonitor
;
typedef
struct
{
uint64_t
handle
;
uint64_t
hash
;
}
SHandleViaTcp
;
void
taosFreeMsgHdr
(
void
*
hdr
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
free
(
msgHdr
->
msg_iov
);
}
int
taosMsgHdrSize
(
void
*
hdr
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
return
(
int
)
msgHdr
->
msg_iovlen
;
}
void
taosSendMsgHdr
(
void
*
hdr
,
int
fd
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
sendmsg
(
fd
,
msgHdr
,
0
);
msgHdr
->
msg_iovlen
=
0
;
}
void
taosInitMsgHdr
(
void
**
hdr
,
void
*
dest
,
int
maxPkts
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
malloc
(
sizeof
(
struct
msghdr
));
memset
(
msgHdr
,
0
,
sizeof
(
struct
msghdr
));
*
hdr
=
msgHdr
;
struct
sockaddr_in
*
destAdd
=
(
struct
sockaddr_in
*
)
dest
;
msgHdr
->
msg_name
=
destAdd
;
msgHdr
->
msg_namelen
=
sizeof
(
struct
sockaddr_in
);
int
size
=
(
int
)
sizeof
(
struct
iovec
)
*
maxPkts
;
msgHdr
->
msg_iov
=
(
struct
iovec
*
)
malloc
((
size_t
)
size
);
memset
(
msgHdr
->
msg_iov
,
0
,
(
size_t
)
size
);
}
void
taosSetMsgHdrData
(
void
*
hdr
,
char
*
data
,
int
dataLen
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
msgHdr
->
msg_iov
[
msgHdr
->
msg_iovlen
].
iov_base
=
data
;
msgHdr
->
msg_iov
[
msgHdr
->
msg_iovlen
].
iov_len
=
(
size_t
)
dataLen
;
msgHdr
->
msg_iovlen
++
;
}
bool
taosCheckHandleViaTcpValid
(
SHandleViaTcp
*
handleViaTcp
)
{
return
handleViaTcp
->
hash
==
taosHashUInt64
(
handleViaTcp
->
handle
);
}
void
taosInitHandleViaTcp
(
SHandleViaTcp
*
handleViaTcp
,
uint64_t
handle
)
{
handleViaTcp
->
handle
=
handle
;
handleViaTcp
->
hash
=
taosHashUInt64
(
handleViaTcp
->
handle
);
}
void
taosProcessMonitorTimer
(
void
*
param
,
void
*
tmrId
)
{
SMonitor
*
pMonitor
=
(
SMonitor
*
)
param
;
if
(
pMonitor
->
pTimer
!=
tmrId
)
return
;
SUdpConnSet
*
pSet
=
pMonitor
->
pSet
;
pMonitor
->
pTimer
=
NULL
;
if
(
pSet
)
{
char
*
data
=
malloc
((
size_t
)
pMonitor
->
dataLen
);
memcpy
(
data
,
pMonitor
->
data
,
(
size_t
)
pMonitor
->
dataLen
);
tTrace
(
"%s monitor timer is expired, update the link status"
,
pSet
->
label
);
(
*
pSet
->
fp
)(
data
,
pMonitor
->
dataLen
,
pMonitor
->
ip
,
0
,
pSet
->
shandle
,
NULL
,
NULL
);
taosTmrReset
(
taosProcessMonitorTimer
,
200
,
pMonitor
,
pSet
->
tmrCtrl
,
&
pMonitor
->
pTimer
);
}
else
{
taosTmrStopA
(
&
pMonitor
->
pTimer
);
free
(
pMonitor
);
}
}
void
*
taosReadTcpData
(
void
*
argv
)
{
SMonitor
*
pMonitor
=
(
SMonitor
*
)
argv
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pMonitor
->
data
;
SPacketInfo
*
pInfo
=
(
SPacketInfo
*
)
pHead
->
content
;
SUdpConnSet
*
pSet
=
pMonitor
->
pSet
;
int
retLen
,
fd
;
char
ipstr
[
64
];
pInfo
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pInfo
->
msgLen
);
tinet_ntoa
(
ipstr
,
pMonitor
->
ip
);
tTrace
(
"%s receive packet via TCP:%s:%hu, msgLen:%d, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d"
,
pSet
->
label
,
ipstr
,
pInfo
->
port
,
pInfo
->
msgLen
,
pInfo
->
handle
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
fd
=
taosOpenTcpClientSocket
(
ipstr
,
(
int16_t
)
pInfo
->
port
,
tsLocalIp
);
if
(
fd
<
0
)
{
tError
(
"%s failed to open TCP client socket ip:%s:%hu"
,
pSet
->
label
,
ipstr
,
pInfo
->
port
);
pMonitor
->
pSet
=
NULL
;
return
NULL
;
}
SHandleViaTcp
handleViaTcp
;
taosInitHandleViaTcp
(
&
handleViaTcp
,
pInfo
->
handle
);
retLen
=
(
int
)
taosWriteSocket
(
fd
,
(
char
*
)
&
handleViaTcp
,
sizeof
(
SHandleViaTcp
));
if
(
retLen
!=
(
int
)
sizeof
(
SHandleViaTcp
))
{
tError
(
"%s failed to send handle:0x%x to server, retLen:%d"
,
pSet
->
label
,
pInfo
->
handle
,
retLen
);
pMonitor
->
pSet
=
NULL
;
}
else
{
tTrace
(
"%s handle:0x%x is sent to server"
,
pSet
->
label
,
pInfo
->
handle
);
char
*
buffer
=
malloc
((
size_t
)
pInfo
->
msgLen
);
if
(
NULL
==
buffer
)
{
tError
(
"%s failed to malloc(size:%d) for recv server data"
,
pSet
->
label
,
pInfo
->
msgLen
);
retLen
=
0
;
//taosCloseTcpSocket(fd);
//pMonitor->pSet = NULL;
//return NULL;
}
else
{
retLen
=
taosReadMsg
(
fd
,
buffer
,
pInfo
->
msgLen
);
}
pMonitor
->
pSet
=
NULL
;
if
(
retLen
!=
pInfo
->
msgLen
)
{
tError
(
"%s failed to read data from server, msgLen:%d retLen:%d"
,
pSet
->
label
,
pInfo
->
msgLen
,
retLen
);
tfree
(
buffer
);
}
else
{
(
*
pSet
->
fp
)(
buffer
,
pInfo
->
msgLen
,
pMonitor
->
ip
,
pInfo
->
port
,
pSet
->
shandle
,
NULL
,
pMonitor
->
pConn
);
}
}
taosCloseTcpSocket
(
fd
);
return
NULL
;
}
int
taosReceivePacketViaTcp
(
uint32_t
ip
,
SRpcHead
*
pHead
,
SUdpConn
*
pConn
)
{
SUdpConnSet
*
pSet
=
pConn
->
pSet
;
SPacketInfo
*
pInfo
=
(
SPacketInfo
*
)
pHead
->
content
;
int
code
=
0
;
pthread_attr_t
thattr
;
pthread_t
thread
;
tTrace
(
"%s receive packet via TCP, handle:0x%x, source:0x%08x dest:0x%08x tranId:%d"
,
pSet
->
label
,
pInfo
->
handle
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
SMonitor
*
pMonitor
=
(
SMonitor
*
)
calloc
(
1
,
sizeof
(
SMonitor
));
pMonitor
->
dataLen
=
sizeof
(
SRpcHead
)
+
sizeof
(
SPacketInfo
);
memcpy
(
pMonitor
->
data
,
pHead
,
(
size_t
)
pMonitor
->
dataLen
);
pMonitor
->
pSet
=
pSet
;
pMonitor
->
ip
=
ip
;
pMonitor
->
port
=
pInfo
->
port
;
pMonitor
->
pConn
=
pConn
;
taosTmrReset
(
taosProcessMonitorTimer
,
0
,
pMonitor
,
pSet
->
tmrCtrl
,
&
pMonitor
->
pTimer
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
code
=
pthread_create
(
&
(
thread
),
&
thattr
,
taosReadTcpData
,
(
void
*
)
pMonitor
);
if
(
code
<
0
)
{
tTrace
(
"%s failed to create thread to read tcp data, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
pMonitor
->
pSet
=
NULL
;
}
pthread_attr_destroy
(
&
thattr
);
return
code
;
}
void
*
taosRecvUdpData
(
void
*
param
)
{
struct
sockaddr_in
sourceAdd
;
unsigned
int
addLen
,
dataLen
;
SUdpConn
*
pConn
=
(
SUdpConn
*
)
param
;
uint16_t
port
;
int
minSize
=
sizeof
(
SRpcHead
);
memset
(
&
sourceAdd
,
0
,
sizeof
(
sourceAdd
));
addLen
=
sizeof
(
sourceAdd
);
tTrace
(
"%s UDP thread is created, index:%d"
,
pConn
->
label
,
pConn
->
index
);
while
(
1
)
{
dataLen
=
(
uint32_t
)
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
sizeof
(
pConn
->
buffer
),
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
tTrace
(
"%s msg is recv from 0x%x:%hu len:%d"
,
pConn
->
label
,
sourceAdd
.
sin_addr
.
s_addr
,
ntohs
(
sourceAdd
.
sin_port
),
dataLen
);
if
(
dataLen
<
sizeof
(
SRpcHead
))
{
tError
(
"%s recvfrom failed, reason:%s
\n
"
,
pConn
->
label
,
strerror
(
errno
));
continue
;
}
port
=
ntohs
(
sourceAdd
.
sin_port
);
int
processedLen
=
0
,
leftLen
=
0
;
int
msgLen
=
0
;
int
count
=
0
;
char
*
msg
=
pConn
->
buffer
;
while
(
processedLen
<
(
int
)
dataLen
)
{
leftLen
=
dataLen
-
processedLen
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
if
(
leftLen
<
minSize
||
msgLen
>
leftLen
||
msgLen
<
minSize
)
{
tError
(
"%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d"
,
pConn
->
label
,
dataLen
,
processedLen
,
count
,
msgLen
);
break
;
}
if
(
pHead
->
tcp
==
1
)
{
taosReceivePacketViaTcp
(
sourceAdd
.
sin_addr
.
s_addr
,
(
SRpcHead
*
)
msg
,
pConn
);
}
else
{
char
*
data
=
malloc
((
size_t
)
msgLen
);
memcpy
(
data
,
msg
,
(
size_t
)
msgLen
);
(
*
(
pConn
->
processData
))(
data
,
msgLen
,
sourceAdd
.
sin_addr
.
s_addr
,
port
,
pConn
->
shandle
,
NULL
,
pConn
);
}
processedLen
+=
msgLen
;
msg
+=
msgLen
;
count
++
;
}
// tTrace("%s %d UDP packets are received together", pConn->label, count);
}
return
NULL
;
}
void
*
taosTransferDataViaTcp
(
void
*
argv
)
{
STransfer
*
pTransfer
=
(
STransfer
*
)
argv
;
int
connFd
=
pTransfer
->
fd
;
int
msgLen
,
retLen
,
leftLen
;
uint64_t
handle
;
SRpcHead
*
pHead
=
NULL
,
head
;
SUdpConnSet
*
pSet
=
pTransfer
->
pSet
;
SHandleViaTcp
handleViaTcp
;
retLen
=
taosReadMsg
(
connFd
,
&
handleViaTcp
,
sizeof
(
SHandleViaTcp
));
if
(
retLen
!=
sizeof
(
SHandleViaTcp
))
{
tError
(
"%s UDP server failed to read handle, retLen:%d"
,
pSet
->
label
,
retLen
);
taosCloseSocket
(
connFd
);
free
(
pTransfer
);
return
NULL
;
}
if
(
!
taosCheckHandleViaTcpValid
(
&
handleViaTcp
))
{
tError
(
"%s UDP server read handle via tcp invalid, handle:%"
PRIu64
", hash:%"
PRIu64
,
pSet
->
label
,
handleViaTcp
.
handle
,
handleViaTcp
.
hash
);
taosCloseSocket
(
connFd
);
free
(
pTransfer
);
return
NULL
;
}
handle
=
handleViaTcp
.
handle
;
if
(
handle
==
0
)
{
// receive a packet from client
tTrace
(
"%s data will be received via TCP from 0x%x:%hu"
,
pSet
->
label
,
pTransfer
->
ip
,
pTransfer
->
port
);
retLen
=
taosReadMsg
(
connFd
,
&
head
,
sizeof
(
SRpcHead
));
if
(
retLen
!=
(
int
)
sizeof
(
SRpcHead
))
{
tError
(
"%s failed to read msg header, retLen:%d"
,
pSet
->
label
,
retLen
);
}
else
{
SMonitor
*
pMonitor
=
(
SMonitor
*
)
calloc
(
1
,
sizeof
(
SMonitor
));
if
(
NULL
==
pMonitor
)
{
tError
(
"%s malloc failed by TransferViaTcp from client"
,
pSet
->
label
);
taosCloseSocket
(
connFd
);
free
(
pTransfer
);
return
NULL
;
}
pMonitor
->
dataLen
=
sizeof
(
SRpcHead
);
memcpy
(
pMonitor
->
data
,
&
head
,
(
size_t
)
pMonitor
->
dataLen
);
((
SRpcHead
*
)
pMonitor
->
data
)
->
msgLen
=
(
int32_t
)
htonl
(
sizeof
(
SRpcHead
));
((
SRpcHead
*
)
pMonitor
->
data
)
->
tcp
=
1
;
pMonitor
->
ip
=
pTransfer
->
ip
;
pMonitor
->
port
=
head
.
port
;
pMonitor
->
pSet
=
pSet
;
taosTmrReset
(
taosProcessMonitorTimer
,
0
,
pMonitor
,
pSet
->
tmrCtrl
,
&
pMonitor
->
pTimer
);
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
head
.
msgLen
);
char
*
buffer
=
malloc
((
size_t
)
msgLen
);
if
(
NULL
==
buffer
)
{
tError
(
"%s malloc failed for msg by TransferViaTcp"
,
pSet
->
label
);
taosCloseSocket
(
connFd
);
free
(
pTransfer
);
return
NULL
;
}
leftLen
=
msgLen
-
(
int
)
sizeof
(
SRpcHead
);
retLen
=
taosReadMsg
(
connFd
,
buffer
+
sizeof
(
SRpcHead
),
leftLen
);
pMonitor
->
pSet
=
NULL
;
if
(
retLen
!=
leftLen
)
{
tError
(
"%s failed to read data from client, leftLen:%d retLen:%d, error:%s"
,
pSet
->
label
,
leftLen
,
retLen
,
strerror
(
errno
));
}
else
{
tTrace
(
"%s data is received from client via TCP from 0x%x:%hu, msgLen:%d"
,
pSet
->
label
,
pTransfer
->
ip
,
pTransfer
->
port
,
msgLen
);
pSet
->
index
=
(
pSet
->
index
+
1
)
%
pSet
->
threads
;
SUdpConn
*
pConn
=
pSet
->
udpConn
+
pSet
->
index
;
memcpy
(
buffer
,
&
head
,
sizeof
(
SRpcHead
));
(
*
pSet
->
fp
)(
buffer
,
msgLen
,
pTransfer
->
ip
,
head
.
port
,
pSet
->
shandle
,
NULL
,
pConn
);
}
taosWriteMsg
(
connFd
,
&
handleViaTcp
,
sizeof
(
SHandleViaTcp
));
}
}
else
{
// send a packet to client
tTrace
(
"%s send packet to client via TCP, handle:0x%x"
,
pSet
->
label
,
handle
);
pHead
=
(
SRpcHead
*
)
handle
;
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
if
(
pHead
->
tcp
!=
0
||
msgLen
<
1024
)
{
tError
(
"%s invalid handle:%p, connection shall be closed"
,
pSet
->
label
,
pHead
);
}
else
{
SMonitor
*
pMonitor
=
(
SMonitor
*
)
calloc
(
1
,
sizeof
(
SMonitor
));
if
(
NULL
==
pMonitor
)
{
tError
(
"%s malloc failed by TransferViaTcp to client"
,
pSet
->
label
);
taosCloseSocket
(
connFd
);
free
(
pTransfer
);
return
NULL
;
}
pMonitor
->
dataLen
=
sizeof
(
SRpcHead
);
memcpy
(
pMonitor
->
data
,
(
void
*
)
handle
,
(
size_t
)
pMonitor
->
dataLen
);
SRpcHead
*
pThead
=
(
SRpcHead
*
)
pMonitor
->
data
;
pThead
->
tcp
=
1
;
pThead
->
msgType
=
(
char
)(
pHead
->
msgType
-
1
);
pThead
->
msgLen
=
(
int32_t
)
htonl
(
sizeof
(
SRpcHead
));
uint32_t
id
=
pThead
->
sourceId
;
pThead
->
sourceId
=
pThead
->
destId
;
pThead
->
destId
=
id
;
pMonitor
->
ip
=
pTransfer
->
ip
;
pMonitor
->
port
=
pTransfer
->
port
;
pMonitor
->
pSet
=
pSet
;
taosTmrReset
(
taosProcessMonitorTimer
,
200
,
pMonitor
,
pSet
->
tmrCtrl
,
&
pMonitor
->
pTimer
);
retLen
=
taosWriteMsg
(
connFd
,
(
void
*
)
handle
,
msgLen
);
pMonitor
->
pSet
=
NULL
;
if
(
retLen
!=
msgLen
)
{
tError
(
"%s failed to send data to client, msgLen:%d retLen:%d"
,
pSet
->
label
,
msgLen
,
retLen
);
}
else
{
tTrace
(
"%s data is sent to client successfully via TCP to 0x%x:%hu, size:%d"
,
pSet
->
label
,
pTransfer
->
ip
,
pTransfer
->
port
,
msgLen
);
}
}
}
// retLen = taosReadMsg(connFd, &handleViaTcp, sizeof(handleViaTcp));
free
(
pTransfer
);
taosCloseSocket
(
connFd
);
return
NULL
;
}
void
*
taosUdpTcpConnection
(
void
*
argv
)
{
int
connFd
=
-
1
;
struct
sockaddr_in
clientAddr
;
pthread_attr_t
thattr
;
pthread_t
thread
;
uint32_t
sourceIp
;
char
ipstr
[
20
];
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
argv
;
pSet
->
tcpFd
=
taosOpenTcpServerSocket
(
pSet
->
ip
,
pSet
->
port
);
if
(
pSet
->
tcpFd
<
0
)
{
tPrint
(
"%s failed to create TCP socket %s:%hu for UDP server, reason:%s"
,
pSet
->
label
,
pSet
->
ip
,
pSet
->
port
,
strerror
(
errno
));
taosKillSystem
();
return
NULL
;
}
tTrace
(
"%s UDP server is created, ip:%s:%hu"
,
pSet
->
label
,
pSet
->
ip
,
pSet
->
port
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
while
(
1
)
{
if
(
pSet
->
tcpFd
<
0
)
break
;
socklen_t
addrlen
=
sizeof
(
clientAddr
);
connFd
=
accept
(
pSet
->
tcpFd
,
(
struct
sockaddr
*
)
&
clientAddr
,
&
addrlen
);
if
(
connFd
<
0
)
{
tError
(
"%s UDP server TCP accept failure, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
continue
;
}
sourceIp
=
clientAddr
.
sin_addr
.
s_addr
;
tinet_ntoa
(
ipstr
,
sourceIp
);
tTrace
(
"%s UDP server TCP connection from ip:%s:%u"
,
pSet
->
label
,
ipstr
,
htons
(
clientAddr
.
sin_port
));
STransfer
*
pTransfer
=
malloc
(
sizeof
(
STransfer
));
pTransfer
->
fd
=
connFd
;
pTransfer
->
ip
=
sourceIp
;
pTransfer
->
port
=
clientAddr
.
sin_port
;
pTransfer
->
pSet
=
pSet
;
if
(
pthread_create
(
&
(
thread
),
&
thattr
,
taosTransferDataViaTcp
,
(
void
*
)
pTransfer
)
<
0
)
{
tTrace
(
"%s failed to create thread for UDP server, reason:%s"
,
pSet
->
label
,
strerror
(
errno
));
free
(
pTransfer
);
taosCloseSocket
(
connFd
);
}
}
pthread_attr_destroy
(
&
thattr
);
return
NULL
;
}
static
void
*
taosRecvUdpData
(
void
*
param
);
static
SUdpBuf
*
taosCreateUdpBuf
(
SUdpConn
*
pConn
,
uint32_t
ip
,
uint16_t
port
);
static
void
taosProcessUdpBufTimer
(
void
*
param
,
void
*
tmrId
);
void
*
taosInitUdpConnection
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
threads
,
void
*
fp
,
void
*
shandle
)
{
pthread_attr_t
thAttr
;
...
...
@@ -508,7 +94,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pSet
->
port
=
port
;
pSet
->
shandle
=
shandle
;
pSet
->
fp
=
fp
;
pSet
->
tcpFd
=
-
1
;
strcpy
(
pSet
->
label
,
label
);
// if ( tsUdpDelay ) {
...
...
@@ -570,39 +155,11 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
return
pSet
;
}
void
*
taosInitUdpServer
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
threads
,
void
*
fp
,
void
*
shandle
)
{
SUdpConnSet
*
pSet
;
pSet
=
taosInitUdpConnection
(
ip
,
port
,
label
,
threads
,
fp
,
shandle
);
if
(
pSet
==
NULL
)
return
NULL
;
pSet
->
server
=
1
;
pSet
->
fp
=
fp
;
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
// not support by windows
// pthread_t thread;
// pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet);
pthread_create
(
&
(
pSet
->
tcpThread
),
&
thattr
,
taosUdpTcpConnection
,
pSet
);
pthread_attr_destroy
(
&
thattr
);
return
pSet
;
}
void
*
taosInitUdpClient
(
char
*
ip
,
uint16_t
port
,
char
*
label
,
int
threads
,
void
*
fp
,
void
*
shandle
)
{
return
taosInitUdpConnection
(
ip
,
port
,
label
,
threads
,
fp
,
shandle
);
}
void
taosCleanUpUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
if
(
pSet
==
NULL
)
return
;
if
(
pSet
->
server
==
1
)
{
pthread_cancel
(
pSet
->
tcpThread
);
}
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
...
...
@@ -621,8 +178,6 @@ void taosCleanUpUdpConnection(void *handle) {
tTrace
(
"chandle:%p is closed"
,
pConn
);
}
if
(
pSet
->
tcpFd
>=
0
)
taosCloseTcpSocket
(
pSet
->
tcpFd
);
pSet
->
tcpFd
=
-
1
;
taosTmrCleanUp
(
pSet
->
tmrCtrl
);
tfree
(
pSet
);
}
...
...
@@ -641,6 +196,148 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por
return
pConn
;
}
static
void
*
taosRecvUdpData
(
void
*
param
)
{
struct
sockaddr_in
sourceAdd
;
int
dataLen
;
unsigned
int
addLen
;
SUdpConn
*
pConn
=
(
SUdpConn
*
)
param
;
uint16_t
port
;
int
minSize
=
sizeof
(
SRpcHead
);
SRecvInfo
recvInfo
;
memset
(
&
sourceAdd
,
0
,
sizeof
(
sourceAdd
));
addLen
=
sizeof
(
sourceAdd
);
tTrace
(
"%s UDP thread is created, index:%d"
,
pConn
->
label
,
pConn
->
index
);
while
(
1
)
{
dataLen
=
recvfrom
(
pConn
->
fd
,
pConn
->
buffer
,
sizeof
(
pConn
->
buffer
),
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
tTrace
(
"%s msg is recv from 0x%x:%hu len:%d"
,
pConn
->
label
,
sourceAdd
.
sin_addr
.
s_addr
,
ntohs
(
sourceAdd
.
sin_port
),
dataLen
);
if
(
dataLen
<
sizeof
(
SRpcHead
))
{
tError
(
"%s recvfrom failed, reason:%s
\n
"
,
pConn
->
label
,
strerror
(
errno
));
continue
;
}
port
=
ntohs
(
sourceAdd
.
sin_port
);
int
processedLen
=
0
,
leftLen
=
0
;
int
msgLen
=
0
;
int
count
=
0
;
char
*
msg
=
pConn
->
buffer
;
while
(
processedLen
<
dataLen
)
{
leftLen
=
dataLen
-
processedLen
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
msgLen
=
htonl
((
uint32_t
)
pHead
->
msgLen
);
if
(
leftLen
<
minSize
||
msgLen
>
leftLen
||
msgLen
<
minSize
)
{
tError
(
"%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d"
,
pConn
->
label
,
dataLen
,
processedLen
,
count
,
msgLen
);
break
;
}
char
*
data
=
malloc
((
size_t
)
msgLen
);
memcpy
(
data
,
msg
,
(
size_t
)
msgLen
);
recvInfo
.
msg
=
data
;
recvInfo
.
msgLen
=
msgLen
;
recvInfo
.
ip
=
sourceAdd
.
sin_addr
.
s_addr
;
recvInfo
.
port
=
port
;
recvInfo
.
shandle
=
pConn
->
shandle
;
recvInfo
.
thandle
=
NULL
;
recvInfo
.
chandle
=
pConn
;
recvInfo
.
connType
=
0
;
(
*
(
pConn
->
processData
))(
&
recvInfo
);
processedLen
+=
msgLen
;
msg
+=
msgLen
;
count
++
;
}
// tTrace("%s %d UDP packets are received together", pConn->label, count);
}
return
NULL
;
}
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
SUdpBuf
*
pBuf
;
if
(
pConn
==
NULL
||
pConn
->
signature
!=
pConn
)
return
-
1
;
if
(
pConn
->
hash
==
NULL
)
{
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
destAdd
.
sin_family
=
AF_INET
;
destAdd
.
sin_addr
.
s_addr
=
ip
;
destAdd
.
sin_port
=
htons
(
port
);
int
ret
=
(
int
)
sendto
(
pConn
->
fd
,
data
,
(
size_t
)
dataLen
,
0
,
(
struct
sockaddr
*
)
&
destAdd
,
sizeof
(
destAdd
));
tTrace
(
"%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x"
,
pConn
->
label
,
destAdd
.
sin_addr
.
s_addr
,
port
,
dataLen
,
ret
,
pConn
->
localPort
,
chandle
);
return
ret
;
}
pthread_mutex_lock
(
&
pConn
->
mutex
);
pBuf
=
(
SUdpBuf
*
)
rpcGetIpHash
(
pConn
->
hash
,
ip
,
port
);
if
(
pBuf
==
NULL
)
{
pBuf
=
taosCreateUdpBuf
(
pConn
,
ip
,
port
);
rpcAddIpHash
(
pConn
->
hash
,
pBuf
,
ip
,
port
);
}
if
((
pBuf
->
totalLen
+
dataLen
>
RPC_MAX_UDP_SIZE
)
||
(
taosMsgHdrSize
(
pBuf
->
msgHdr
)
>=
RPC_MAX_UDP_PKTS
))
{
taosTmrReset
(
taosProcessUdpBufTimer
,
RPC_UDP_BUF_TIME
,
pBuf
,
pConn
->
tmrCtrl
,
&
pBuf
->
timer
);
taosSendMsgHdr
(
pBuf
->
msgHdr
,
pConn
->
fd
);
pBuf
->
totalLen
=
0
;
}
taosSetMsgHdrData
(
pBuf
->
msgHdr
,
data
,
dataLen
);
pBuf
->
totalLen
+=
dataLen
;
pthread_mutex_unlock
(
&
pConn
->
mutex
);
return
dataLen
;
}
void
taosFreeMsgHdr
(
void
*
hdr
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
free
(
msgHdr
->
msg_iov
);
}
int
taosMsgHdrSize
(
void
*
hdr
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
return
(
int
)
msgHdr
->
msg_iovlen
;
}
void
taosSendMsgHdr
(
void
*
hdr
,
int
fd
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
sendmsg
(
fd
,
msgHdr
,
0
);
msgHdr
->
msg_iovlen
=
0
;
}
void
taosInitMsgHdr
(
void
**
hdr
,
void
*
dest
,
int
maxPkts
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
malloc
(
sizeof
(
struct
msghdr
));
memset
(
msgHdr
,
0
,
sizeof
(
struct
msghdr
));
*
hdr
=
msgHdr
;
struct
sockaddr_in
*
destAdd
=
(
struct
sockaddr_in
*
)
dest
;
msgHdr
->
msg_name
=
destAdd
;
msgHdr
->
msg_namelen
=
sizeof
(
struct
sockaddr_in
);
int
size
=
(
int
)
sizeof
(
struct
iovec
)
*
maxPkts
;
msgHdr
->
msg_iov
=
(
struct
iovec
*
)
malloc
((
size_t
)
size
);
memset
(
msgHdr
->
msg_iov
,
0
,
(
size_t
)
size
);
}
void
taosSetMsgHdrData
(
void
*
hdr
,
char
*
data
,
int
dataLen
)
{
struct
msghdr
*
msgHdr
=
(
struct
msghdr
*
)
hdr
;
msgHdr
->
msg_iov
[
msgHdr
->
msg_iovlen
].
iov_base
=
data
;
msgHdr
->
msg_iov
[
msgHdr
->
msg_iovlen
].
iov_len
=
(
size_t
)
dataLen
;
msgHdr
->
msg_iovlen
++
;
}
void
taosRemoveUdpBuf
(
SUdpBuf
*
pBuf
)
{
taosTmrStopA
(
&
pBuf
->
timer
);
rpcDeleteIpHash
(
pBuf
->
pConn
->
hash
,
pBuf
->
ip
,
pBuf
->
port
);
...
...
@@ -679,7 +376,7 @@ void taosProcessUdpBufTimer(void *param, void *tmrId) {
if
(
pBuf
)
taosTmrReset
(
taosProcessUdpBufTimer
,
RPC_UDP_BUF_TIME
,
pBuf
,
pConn
->
tmrCtrl
,
&
pBuf
->
timer
);
}
SUdpBuf
*
taosCreateUdpBuf
(
SUdpConn
*
pConn
,
uint32_t
ip
,
uint16_t
port
)
{
static
SUdpBuf
*
taosCreateUdpBuf
(
SUdpConn
*
pConn
,
uint32_t
ip
,
uint16_t
port
)
{
SUdpBuf
*
pBuf
=
(
SUdpBuf
*
)
malloc
(
sizeof
(
SUdpBuf
));
memset
(
pBuf
,
0
,
sizeof
(
SUdpBuf
));
...
...
@@ -700,121 +397,4 @@ SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) {
return
pBuf
;
}
int
taosSendPacketViaTcp
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
pConn
->
pSet
;
int
code
=
-
1
,
retLen
,
msgLen
;
char
ipstr
[
64
];
char
buffer
[
128
];
SRpcHead
*
pHead
;
if
(
pSet
->
server
)
{
// send from server
pHead
=
(
SRpcHead
*
)
buffer
;
memcpy
(
pHead
,
data
,
sizeof
(
SRpcHead
));
pHead
->
tcp
=
1
;
SPacketInfo
*
pInfo
=
(
SPacketInfo
*
)
pHead
->
content
;
pInfo
->
handle
=
(
uint64_t
)
data
;
pInfo
->
port
=
pSet
->
port
;
pInfo
->
msgLen
=
pHead
->
msgLen
;
msgLen
=
sizeof
(
SRpcHead
)
+
sizeof
(
SPacketInfo
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
code
=
taosSendUdpData
(
ip
,
port
,
buffer
,
msgLen
,
chandle
);
tTrace
(
"%s data from server will be sent via TCP:%hu, msgType:%d, length:%d, handle:0x%x"
,
pSet
->
label
,
pInfo
->
port
,
pHead
->
msgType
,
htonl
((
uint32_t
)
pInfo
->
msgLen
),
pInfo
->
handle
);
if
(
code
>
0
)
code
=
dataLen
;
}
else
{
// send from client
tTrace
(
"%s data will be sent via TCP from client"
,
pSet
->
label
);
// send a UDP header first to set up the connection
pHead
=
(
SRpcHead
*
)
buffer
;
memcpy
(
pHead
,
data
,
sizeof
(
SRpcHead
));
pHead
->
tcp
=
2
;
msgLen
=
sizeof
(
SRpcHead
);
pHead
->
msgLen
=
(
int32_t
)
htonl
(
msgLen
);
code
=
taosSendUdpData
(
ip
,
port
,
buffer
,
msgLen
,
chandle
);
//pHead = (SRpcHead *)data;
tinet_ntoa
(
ipstr
,
ip
);
int
fd
=
taosOpenTcpClientSocket
(
ipstr
,
pConn
->
port
,
tsLocalIp
);
if
(
fd
<
0
)
{
tError
(
"%s failed to open TCP socket to:%s:%hu to send packet"
,
pSet
->
label
,
ipstr
,
pConn
->
port
);
}
else
{
SHandleViaTcp
handleViaTcp
;
taosInitHandleViaTcp
(
&
handleViaTcp
,
0
);
retLen
=
(
int
)
taosWriteSocket
(
fd
,
(
char
*
)
&
handleViaTcp
,
sizeof
(
SHandleViaTcp
));
if
(
retLen
!=
(
int
)
sizeof
(
handleViaTcp
))
{
tError
(
"%s failed to send handle to server, retLen:%d"
,
pSet
->
label
,
retLen
);
}
else
{
retLen
=
taosWriteMsg
(
fd
,
data
,
dataLen
);
if
(
retLen
!=
dataLen
)
{
tError
(
"%s failed to send data via TCP, dataLen:%d, retLen:%d, error:%s"
,
pSet
->
label
,
dataLen
,
retLen
,
strerror
(
errno
));
}
else
{
code
=
dataLen
;
tTrace
(
"%s data is sent via TCP successfully"
,
pSet
->
label
);
}
}
taosReadMsg
(
fd
,
(
char
*
)
&
handleViaTcp
,
sizeof
(
SHandleViaTcp
));
taosCloseTcpSocket
(
fd
);
}
}
return
code
;
}
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
char
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
SUdpBuf
*
pBuf
;
if
(
pConn
==
NULL
||
pConn
->
signature
!=
pConn
)
return
-
1
;
if
(
dataLen
>=
RPC_MAX_UDP_SIZE
)
return
taosSendPacketViaTcp
(
ip
,
port
,
data
,
dataLen
,
chandle
);
if
(
pConn
->
hash
==
NULL
)
{
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
destAdd
.
sin_family
=
AF_INET
;
destAdd
.
sin_addr
.
s_addr
=
ip
;
destAdd
.
sin_port
=
htons
(
port
);
int
ret
=
(
int
)
sendto
(
pConn
->
fd
,
data
,
(
size_t
)
dataLen
,
0
,
(
struct
sockaddr
*
)
&
destAdd
,
sizeof
(
destAdd
));
tTrace
(
"%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x"
,
pConn
->
label
,
destAdd
.
sin_addr
.
s_addr
,
port
,
dataLen
,
ret
,
pConn
->
localPort
,
chandle
);
return
ret
;
}
pthread_mutex_lock
(
&
pConn
->
mutex
);
pBuf
=
(
SUdpBuf
*
)
rpcGetIpHash
(
pConn
->
hash
,
ip
,
port
);
if
(
pBuf
==
NULL
)
{
pBuf
=
taosCreateUdpBuf
(
pConn
,
ip
,
port
);
rpcAddIpHash
(
pConn
->
hash
,
pBuf
,
ip
,
port
);
}
if
((
pBuf
->
totalLen
+
dataLen
>
RPC_MAX_UDP_SIZE
)
||
(
taosMsgHdrSize
(
pBuf
->
msgHdr
)
>=
RPC_MAX_UDP_PKTS
))
{
taosTmrReset
(
taosProcessUdpBufTimer
,
RPC_UDP_BUF_TIME
,
pBuf
,
pConn
->
tmrCtrl
,
&
pBuf
->
timer
);
taosSendMsgHdr
(
pBuf
->
msgHdr
,
pConn
->
fd
);
pBuf
->
totalLen
=
0
;
}
taosSetMsgHdrData
(
pBuf
->
msgHdr
,
data
,
dataLen
);
pBuf
->
totalLen
+=
dataLen
;
pthread_mutex_unlock
(
&
pConn
->
mutex
);
return
dataLen
;
}
src/rpc/test/rclient.c
浏览文件 @
5d965446
...
...
@@ -85,7 +85,6 @@ int main(int argc, char *argv[]) {
int
msgSize
=
128
;
int
numOfReqs
=
0
;
int
appThreads
=
1
;
char
socketType
[
20
]
=
"udp"
;
char
serverIp
[
40
]
=
"127.0.0.1"
;
struct
timeval
systemTime
;
int64_t
startTime
,
endTime
;
...
...
@@ -113,9 +112,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
ckey
=
"key"
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
&&
i
<
argc
-
1
)
{
strcpy
(
socketType
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
ipSet
.
port
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
&&
i
<
argc
-
1
)
{
ipSet
.
ip
[
0
]
=
inet_addr
(
argv
[
++
i
]);
...
...
@@ -138,7 +135,6 @@ int main(int argc, char *argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-c ctype]: connection type:udp or tpc, default is:%s
\n
"
,
socketType
);
printf
(
" [-i ip]: first server IP address, default is:%s
\n
"
,
serverIp
);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
ipSet
.
port
);
printf
(
" [-t threads]: number of rpc threads, default is:%d
\n
"
,
rpcInit
.
numOfThreads
);
...
...
@@ -154,7 +150,7 @@ int main(int argc, char *argv[]) {
}
}
rpcInit
.
connType
=
strcasecmp
(
socketType
,
"udp"
)
==
0
?
TAOS_CONN_UDPC
:
TAOS_CONN_TCPC
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
taosInitLog
(
"client.log"
,
100000
,
10
);
void
*
pRpc
=
rpcOpen
(
&
rpcInit
);
...
...
src/rpc/test/rserver.c
浏览文件 @
5d965446
...
...
@@ -60,7 +60,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
int
main
(
int
argc
,
char
*
argv
[])
{
SRpcInit
rpcInit
;
char
socketType
[
20
]
=
"udp"
;
char
dataName
[
20
]
=
"server.data"
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -73,9 +72,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
idleTime
=
2000
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
&&
i
<
argc
-
1
)
{
strcpy
(
socketType
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
localPort
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
&&
i
<
argc
-
1
)
{
strcpy
(
rpcInit
.
localIp
,
argv
[
++
i
]);
...
...
@@ -93,7 +90,6 @@ int main(int argc, char *argv[]) {
rpcDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-c ctype]: connection type:udp or tcp, default is:%s
\n
"
,
socketType
);
printf
(
" [-i ip]: server IP address, default is:%s
\n
"
,
rpcInit
.
localIp
);
printf
(
" [-p port]: server port number, default is:%d
\n
"
,
rpcInit
.
localPort
);
printf
(
" [-t threads]: number of threads, default is:%d
\n
"
,
rpcInit
.
numOfThreads
);
...
...
@@ -107,7 +103,7 @@ int main(int argc, char *argv[]) {
}
}
rpcInit
.
connType
=
strcasecmp
(
socketType
,
"udp"
)
==
0
?
TAOS_CONN_UDPS
:
TAOS_CONN_TCPS
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
taosInitLog
(
"server.log"
,
100000
,
10
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录