Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
305fa254
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
305fa254
编写于
5月 21, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: delete redundancy code
上级
d0625393
变更
17
展开全部
隐藏空白更改
内联
并排
Showing
17 changed file
with
58 addition
and
3156 deletion
+58
-3156
include/libs/transport/rpcHead.h
include/libs/transport/rpcHead.h
+0
-79
include/libs/transport/transport.h
include/libs/transport/transport.h
+1
-2
source/libs/transport/inc/rpcLog.h
source/libs/transport/inc/rpcLog.h
+0
-72
source/libs/transport/inc/rpcTcp.h
source/libs/transport/inc/rpcTcp.h
+0
-40
source/libs/transport/inc/rpcUdp.h
source/libs/transport/inc/rpcUdp.h
+0
-41
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+13
-20
source/libs/transport/inc/transLog.h
source/libs/transport/inc/transLog.h
+38
-0
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+1
-3
source/libs/transport/src/rpcCache.c
source/libs/transport/src/rpcCache.c
+0
-294
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+0
-1682
source/libs/transport/src/rpcTcp.c
source/libs/transport/src/rpcTcp.c
+0
-657
source/libs/transport/src/rpcUdp.c
source/libs/transport/src/rpcUdp.c
+0
-261
source/libs/transport/test/pushServer.c
source/libs/transport/test/pushServer.c
+1
-1
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+1
-1
source/libs/transport/test/rserver.c
source/libs/transport/test/rserver.c
+1
-1
source/libs/transport/test/syncClient.c
source/libs/transport/test/syncClient.c
+1
-1
source/libs/transport/test/transUT.cpp
source/libs/transport/test/transUT.cpp
+1
-1
未找到文件。
include/libs/transport/rpcHead.h
已删除
100644 → 0
浏览文件 @
d0625393
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPCHEAD_H
#define TDENGINE_RPCHEAD_H
#include <tdef.h>
#ifdef __cplusplus
extern
"C"
{
#endif
#define RPC_CONN_TCP 2
extern
int
tsRpcOverhead
;
typedef
struct
{
void
*
msg
;
int
msgLen
;
uint32_t
ip
;
uint16_t
port
;
int
connType
;
void
*
shandle
;
void
*
thandle
;
void
*
chandle
;
}
SRecvInfo
;
#pragma pack(push, 1)
typedef
struct
{
char
version
:
4
;
// RPC version
char
comp
:
4
;
// compression algorithm, 0:no compression 1:lz4
char
resflag
:
2
;
// reserved bits
char
spi
:
3
;
// security parameter index
char
encrypt
:
3
;
// encrypt algorithm, 0: no encryption
uint16_t
tranId
;
// transcation ID
uint32_t
linkUid
;
// for unique connection ID assigned by client
uint64_t
ahandle
;
// ahandle assigned by client
uint32_t
sourceId
;
// source ID, an index for connection list
uint32_t
destId
;
// destination ID, an index for connection list
uint32_t
destIp
;
// destination IP address, for NAT scenario
char
user
[
TSDB_UNI_LEN
];
// user ID
uint16_t
port
;
// for UDP only, port may be changed
char
empty
[
1
];
// reserved
uint16_t
msgType
;
// message type
int32_t
msgLen
;
// message length including the header iteslf
uint32_t
msgVer
;
int32_t
code
;
// code in response message
uint8_t
content
[
0
];
// message body starts from here
}
SRpcHead
;
typedef
struct
{
int32_t
reserved
;
int32_t
contLen
;
}
SRpcComp
;
typedef
struct
{
uint32_t
timeStamp
;
uint8_t
auth
[
TSDB_AUTH_LEN
];
}
SRpcDigest
;
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RPCHEAD_H
include/libs/transport/transport.h
浏览文件 @
305fa254
...
...
@@ -20,9 +20,8 @@
extern
"C"
{
#endif
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TRANSPORT_H_*/
\ No newline at end of file
#endif
/*_TD_TRANSPORT_H_*/
source/libs/transport/inc/rpcLog.h
已删除
100644 → 0
浏览文件 @
d0625393
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPC_LOG_H
#define TDENGINE_RPC_LOG_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "tlog.h"
#define tFatal(...) \
{ \
if (rpcDebugFlag & DEBUG_FATAL) { \
taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tError(...) \
{ \
if (rpcDebugFlag & DEBUG_ERROR) { \
taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tWarn(...) \
{ \
if (rpcDebugFlag & DEBUG_WARN) { \
taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tInfo(...) \
{ \
if (rpcDebugFlag & DEBUG_INFO) { \
taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tDebug(...) \
{ \
if (rpcDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tTrace(...) \
{ \
if (rpcDebugFlag & DEBUG_TRACE) { \
taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); \
} \
}
#define tDump(x, y) \
{ \
if (rpcDebugFlag & DEBUG_DUMP) { \
taosDumpData((unsigned char *)x, y); \
} \
}
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RPC_LOG_H
source/libs/transport/inc/rpcTcp.h
已删除
100644 → 0
浏览文件 @
d0625393
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_tcp_header_
#define _rpc_tcp_header_
#include <stdint.h>
#ifdef __cplusplus
extern
"C"
{
#endif
void
*
taosInitTcpServer
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
taosStopTcpServer
(
void
*
param
);
void
taosCleanUpTcpServer
(
void
*
param
);
void
*
taosInitTcpClient
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
num
,
void
*
fp
,
void
*
shandle
);
void
taosStopTcpClient
(
void
*
chandle
);
void
taosCleanUpTcpClient
(
void
*
chandle
);
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
);
void
taosCloseTcpConnection
(
void
*
chandle
);
int
taosSendTcpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
);
#ifdef __cplusplus
}
#endif
#endif
source/libs/transport/inc/rpcUdp.h
已删除
100644 → 0
浏览文件 @
d0625393
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _rpc_udp_header_
#define _rpc_udp_header_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "taosdef.h"
void
*
taosInitUdpConnection
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
,
void
*
fp
,
void
*
shandle
);
void
taosStopUdpConnection
(
void
*
handle
);
void
taosCleanUpUdpConnection
(
void
*
handle
);
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
);
void
*
taosOpenUdpConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
);
void
taosFreeMsgHdr
(
void
*
hdr
);
int
taosMsgHdrSize
(
void
*
hdr
);
void
taosSendMsgHdr
(
void
*
hdr
,
TdFilePtr
pFile
);
void
taosInitMsgHdr
(
void
**
hdr
,
void
*
dest
,
int
maxPkts
);
void
taosSetMsgHdrData
(
void
*
hdr
,
char
*
data
,
int
dataLen
);
#ifdef __cplusplus
}
#endif
#endif
source/libs/transport/inc/transComm.h
浏览文件 @
305fa254
...
...
@@ -12,7 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#ifndef _TD_TRANSPORT_COMM_H
#define _TD_TRANSPORT_COMM_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -22,9 +23,6 @@ extern "C" {
#include "lz4.h"
#include "os.h"
#include "osSocket.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
...
...
@@ -33,6 +31,7 @@ extern "C" {
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transLog.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
...
...
@@ -193,12 +192,7 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_MSG_OVERHEAD (sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
...
...
@@ -209,15 +203,14 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
);
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
);
int
transAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
void
transBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
bool
transCompressMsg
(
char
*
msg
,
int32_t
len
,
int32_t
*
flen
);
bool
transDecompressMsg
(
char
*
msg
,
int32_t
len
,
int32_t
*
flen
);
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
//
// int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
// bool transCompressMsg(char* msg, int32_t len, int32_t* flen);
// bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void
transFreeMsg
(
void
*
msg
);
...
...
@@ -365,4 +358,4 @@ void transThreadOnce();
}
#endif
#endif
#endif
// _TD_TRANSPORT_COMM_H
source/libs/transport/inc/
rpcCache
.h
→
source/libs/transport/inc/
transLog
.h
浏览文件 @
305fa254
...
...
@@ -13,22 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_RPC_CACHE_H
#define TDENGINE_RPC_CACHE_H
#include <stdint.h>
#ifndef _TD_TRANSPORT_LOG_H
#define _TD_TRANSPORT_LOG_H
#ifdef __cplusplus
extern
"C"
{
#endif
void
*
rpcOpenConnCache
(
int
maxSessions
,
void
(
*
cleanFp
)(
void
*
),
void
*
tmrCtrl
,
int64_t
keepTimer
);
void
rpcCloseConnCache
(
void
*
handle
);
void
rpcAddConnIntoCache
(
void
*
handle
,
void
*
data
,
char
*
fqdn
,
uint16_t
port
,
int8_t
connType
);
void
*
rpcGetConnFromCache
(
void
*
handle
,
char
*
fqdn
,
uint16_t
port
,
int8_t
connType
);
// clang-format off
#include "tlog.h"
#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0)
#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0)
#define tWarn(...) do { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tInfo(...) do { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
// clang-format on
#ifdef __cplusplus
}
#endif
#endif //
TDENGINE_RPC_CACHE
_H
#endif //
__TRANS_LOG
_H
source/libs/transport/inc/transportInt.h
浏览文件 @
305fa254
...
...
@@ -21,14 +21,12 @@
#endif
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmsg.h"
#include "transLog.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
...
...
source/libs/transport/src/rpcCache.c
已删除
100644 → 0
浏览文件 @
d0625393
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rpcCache.h"
#include "os.h"
#include "rpcLog.h"
#include "taosdef.h"
#include "tglobal.h"
#include "tmempool.h"
#include "ttimer.h"
#include "tutil.h"
typedef
struct
SConnHash
{
char
fqdn
[
TSDB_FQDN_LEN
];
uint16_t
port
;
char
connType
;
struct
SConnHash
*
prev
;
struct
SConnHash
*
next
;
void
*
data
;
uint64_t
time
;
}
SConnHash
;
typedef
struct
{
SConnHash
**
connHashList
;
mpool_h
connHashMemPool
;
int
maxSessions
;
int
total
;
int
*
count
;
int64_t
keepTimer
;
TdThreadMutex
mutex
;
void
(
*
cleanFp
)(
void
*
);
void
*
tmrCtrl
;
void
*
pTimer
;
int64_t
*
lockedBy
;
}
SConnCache
;
static
int
rpcHashConn
(
void
*
handle
,
char
*
fqdn
,
uint16_t
port
,
int8_t
connType
);
static
void
rpcLockCache
(
int64_t
*
lockedBy
);
static
void
rpcUnlockCache
(
int64_t
*
lockedBy
);
static
void
rpcCleanConnCache
(
void
*
handle
,
void
*
tmrId
);
static
void
rpcRemoveExpiredNodes
(
SConnCache
*
pCache
,
SConnHash
*
pNode
,
int
hash
,
uint64_t
time
);
void
*
rpcOpenConnCache
(
int
maxSessions
,
void
(
*
cleanFp
)(
void
*
),
void
*
tmrCtrl
,
int64_t
keepTimer
)
{
SConnHash
**
connHashList
;
mpool_h
connHashMemPool
;
SConnCache
*
pCache
;
connHashMemPool
=
taosMemPoolInit
(
maxSessions
,
sizeof
(
SConnHash
));
if
(
connHashMemPool
==
0
)
return
NULL
;
connHashList
=
taosMemoryCalloc
(
sizeof
(
SConnHash
*
),
maxSessions
);
if
(
connHashList
==
0
)
{
taosMemPoolCleanUp
(
connHashMemPool
);
return
NULL
;
}
pCache
=
taosMemoryMalloc
(
sizeof
(
SConnCache
));
if
(
pCache
==
NULL
)
{
taosMemPoolCleanUp
(
connHashMemPool
);
taosMemoryFree
(
connHashList
);
return
NULL
;
}
memset
(
pCache
,
0
,
sizeof
(
SConnCache
));
pCache
->
count
=
taosMemoryCalloc
(
sizeof
(
int
),
maxSessions
);
pCache
->
total
=
0
;
pCache
->
keepTimer
=
keepTimer
;
pCache
->
maxSessions
=
maxSessions
;
pCache
->
connHashMemPool
=
connHashMemPool
;
pCache
->
connHashList
=
connHashList
;
pCache
->
cleanFp
=
cleanFp
;
pCache
->
tmrCtrl
=
tmrCtrl
;
pCache
->
lockedBy
=
taosMemoryCalloc
(
sizeof
(
int64_t
),
maxSessions
);
taosTmrReset
(
rpcCleanConnCache
,
(
int32_t
)(
pCache
->
keepTimer
*
2
),
pCache
,
pCache
->
tmrCtrl
,
&
pCache
->
pTimer
);
taosThreadMutexInit
(
&
pCache
->
mutex
,
NULL
);
return
pCache
;
}
void
rpcCloseConnCache
(
void
*
handle
)
{
SConnCache
*
pCache
;
pCache
=
(
SConnCache
*
)
handle
;
if
(
pCache
==
NULL
||
pCache
->
maxSessions
==
0
)
return
;
taosThreadMutexLock
(
&
pCache
->
mutex
);
taosTmrStopA
(
&
(
pCache
->
pTimer
));
if
(
pCache
->
connHashMemPool
)
taosMemPoolCleanUp
(
pCache
->
connHashMemPool
);
taosMemoryFreeClear
(
pCache
->
connHashList
);
taosMemoryFreeClear
(
pCache
->
count
);
taosMemoryFreeClear
(
pCache
->
lockedBy
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
taosThreadMutexDestroy
(
&
pCache
->
mutex
);
memset
(
pCache
,
0
,
sizeof
(
SConnCache
));
taosMemoryFree
(
pCache
);
}
void
rpcAddConnIntoCache
(
void
*
handle
,
void
*
data
,
char
*
fqdn
,
uint16_t
port
,
int8_t
connType
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pCache
;
uint64_t
time
=
taosGetTimestampMs
();
pCache
=
(
SConnCache
*
)
handle
;
assert
(
pCache
);
assert
(
data
);
hash
=
rpcHashConn
(
pCache
,
fqdn
,
port
,
connType
);
pNode
=
(
SConnHash
*
)
taosMemPoolMalloc
(
pCache
->
connHashMemPool
);
tstrncpy
(
pNode
->
fqdn
,
fqdn
,
sizeof
(
pNode
->
fqdn
));
pNode
->
port
=
port
;
pNode
->
connType
=
connType
;
pNode
->
data
=
data
;
pNode
->
prev
=
NULL
;
pNode
->
time
=
time
;
rpcLockCache
(
pCache
->
lockedBy
+
hash
);
pNode
->
next
=
pCache
->
connHashList
[
hash
];
if
(
pCache
->
connHashList
[
hash
]
!=
NULL
)
(
pCache
->
connHashList
[
hash
])
->
prev
=
pNode
;
pCache
->
connHashList
[
hash
]
=
pNode
;
pCache
->
count
[
hash
]
++
;
rpcRemoveExpiredNodes
(
pCache
,
pNode
->
next
,
hash
,
time
);
rpcUnlockCache
(
pCache
->
lockedBy
+
hash
);
pCache
->
total
++
;
// tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
return
;
}
void
*
rpcGetConnFromCache
(
void
*
handle
,
char
*
fqdn
,
uint16_t
port
,
int8_t
connType
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pCache
;
void
*
pData
=
NULL
;
pCache
=
(
SConnCache
*
)
handle
;
assert
(
pCache
);
uint64_t
time
=
taosGetTimestampMs
();
hash
=
rpcHashConn
(
pCache
,
fqdn
,
port
,
connType
);
rpcLockCache
(
pCache
->
lockedBy
+
hash
);
pNode
=
pCache
->
connHashList
[
hash
];
while
(
pNode
)
{
if
(
time
>=
pCache
->
keepTimer
+
pNode
->
time
)
{
rpcRemoveExpiredNodes
(
pCache
,
pNode
,
hash
,
time
);
pNode
=
NULL
;
break
;
}
if
(
strcmp
(
pNode
->
fqdn
,
fqdn
)
==
0
&&
pNode
->
port
==
port
&&
pNode
->
connType
==
connType
)
break
;
pNode
=
pNode
->
next
;
}
if
(
pNode
)
{
rpcRemoveExpiredNodes
(
pCache
,
pNode
->
next
,
hash
,
time
);
if
(
pNode
->
prev
)
{
pNode
->
prev
->
next
=
pNode
->
next
;
}
else
{
pCache
->
connHashList
[
hash
]
=
pNode
->
next
;
}
if
(
pNode
->
next
)
{
pNode
->
next
->
prev
=
pNode
->
prev
;
}
pData
=
pNode
->
data
;
taosMemPoolFree
(
pCache
->
connHashMemPool
,
(
char
*
)
pNode
);
pCache
->
total
--
;
pCache
->
count
[
hash
]
--
;
}
rpcUnlockCache
(
pCache
->
lockedBy
+
hash
);
if
(
pData
)
{
// tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
}
else
{
// tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash,
// pCache->count[hash]);
}
return
pData
;
}
static
void
rpcCleanConnCache
(
void
*
handle
,
void
*
tmrId
)
{
int
hash
;
SConnHash
*
pNode
;
SConnCache
*
pCache
;
pCache
=
(
SConnCache
*
)
handle
;
if
(
pCache
==
NULL
||
pCache
->
maxSessions
==
0
)
return
;
if
(
pCache
->
pTimer
!=
tmrId
)
return
;
taosThreadMutexLock
(
&
pCache
->
mutex
);
uint64_t
time
=
taosGetTimestampMs
();
for
(
hash
=
0
;
hash
<
pCache
->
maxSessions
;
++
hash
)
{
rpcLockCache
(
pCache
->
lockedBy
+
hash
);
pNode
=
pCache
->
connHashList
[
hash
];
rpcRemoveExpiredNodes
(
pCache
,
pNode
,
hash
,
time
);
rpcUnlockCache
(
pCache
->
lockedBy
+
hash
);
}
// tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset
(
rpcCleanConnCache
,
(
int32_t
)(
pCache
->
keepTimer
*
2
),
pCache
,
pCache
->
tmrCtrl
,
&
pCache
->
pTimer
);
taosThreadMutexUnlock
(
&
pCache
->
mutex
);
}
static
void
rpcRemoveExpiredNodes
(
SConnCache
*
pCache
,
SConnHash
*
pNode
,
int
hash
,
uint64_t
time
)
{
if
(
pNode
==
NULL
||
(
time
<
pCache
->
keepTimer
+
pNode
->
time
))
return
;
SConnHash
*
pPrev
=
pNode
->
prev
,
*
pNext
;
while
(
pNode
)
{
(
*
pCache
->
cleanFp
)(
pNode
->
data
);
pNext
=
pNode
->
next
;
pCache
->
total
--
;
pCache
->
count
[
hash
]
--
;
// tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port,
// pNode->connType, hash, pNode,
// pCache->count[hash]);
taosMemPoolFree
(
pCache
->
connHashMemPool
,
(
char
*
)
pNode
);
pNode
=
pNext
;
}
if
(
pPrev
)
pPrev
->
next
=
NULL
;
else
pCache
->
connHashList
[
hash
]
=
NULL
;
}
static
int
rpcHashConn
(
void
*
handle
,
char
*
fqdn
,
uint16_t
port
,
int8_t
connType
)
{
SConnCache
*
pCache
=
(
SConnCache
*
)
handle
;
int
hash
=
0
;
char
*
temp
=
fqdn
;
while
(
*
temp
)
{
hash
+=
*
temp
;
++
temp
;
}
hash
+=
port
;
hash
+=
connType
;
hash
=
hash
%
pCache
->
maxSessions
;
return
hash
;
}
static
void
rpcLockCache
(
int64_t
*
lockedBy
)
{
int64_t
tid
=
taosGetSelfPthreadId
();
int
i
=
0
;
while
(
atomic_val_compare_exchange_64
(
lockedBy
,
0
,
tid
)
!=
0
)
{
if
(
++
i
%
100
==
0
)
{
sched_yield
();
}
}
}
static
void
rpcUnlockCache
(
int64_t
*
lockedBy
)
{
int64_t
tid
=
taosGetSelfPthreadId
();
if
(
atomic_val_compare_exchange_64
(
lockedBy
,
tid
,
0
)
!=
tid
)
{
assert
(
false
);
}
}
source/libs/transport/src/rpcMain.c
已删除
100644 → 0
浏览文件 @
d0625393
此差异已折叠。
点击以展开。
source/libs/transport/src/rpcTcp.c
已删除
100644 → 0
浏览文件 @
d0625393
此差异已折叠。
点击以展开。
source/libs/transport/src/rpcUdp.c
已删除
100644 → 0
浏览文件 @
d0625393
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rpcUdp.h"
#include "os.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taosdef.h"
#include "taoserror.h"
#include "ttimer.h"
#include "tutil.h"
#ifndef USE_UV
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds
#define RPC_MAX_UDP_SIZE 65480
typedef
struct
{
int
index
;
TdSocketPtr
pSocket
;
uint16_t
port
;
// peer port
uint16_t
localPort
;
// local port
char
label
[
TSDB_LABEL_LEN
];
// copy from udpConnSet;
TdThread
thread
;
void
*
hash
;
void
*
shandle
;
// handle passed by upper layer during server initialization
void
*
pSet
;
void
*
(
*
processData
)(
SRecvInfo
*
pRecv
);
char
*
buffer
;
// buffer to receive data
}
SUdpConn
;
typedef
struct
{
int
index
;
int
server
;
uint32_t
ip
;
// local IP
uint16_t
port
;
// local Port
void
*
shandle
;
// handle passed by upper layer during server initialization
int
threads
;
char
label
[
TSDB_LABEL_LEN
];
void
*
(
*
fp
)(
SRecvInfo
*
pPacket
);
SUdpConn
udpConn
[];
}
SUdpConnSet
;
static
void
*
taosRecvUdpData
(
void
*
param
);
void
*
taosInitUdpConnection
(
uint32_t
ip
,
uint16_t
port
,
char
*
label
,
int
threads
,
void
*
fp
,
void
*
shandle
)
{
SUdpConn
*
pConn
;
SUdpConnSet
*
pSet
;
int
size
=
(
int
)
sizeof
(
SUdpConnSet
)
+
threads
*
(
int
)
sizeof
(
SUdpConn
);
pSet
=
(
SUdpConnSet
*
)
taosMemoryMalloc
((
size_t
)
size
);
if
(
pSet
==
NULL
)
{
tError
(
"%s failed to allocate UdpConn"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
memset
(
pSet
,
0
,
(
size_t
)
size
);
pSet
->
ip
=
ip
;
pSet
->
port
=
port
;
pSet
->
shandle
=
shandle
;
pSet
->
fp
=
fp
;
pSet
->
threads
=
threads
;
tstrncpy
(
pSet
->
label
,
label
,
sizeof
(
pSet
->
label
));
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_JOINABLE
);
int
i
;
uint16_t
ownPort
;
for
(
i
=
0
;
i
<
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
ownPort
=
(
port
?
port
+
i
:
0
);
pConn
->
pSocket
=
taosOpenUdpSocket
(
ip
,
ownPort
);
if
(
pConn
->
pSocket
==
NULL
)
{
tError
(
"%s failed to open UDP socket %x:%hu"
,
label
,
ip
,
port
);
break
;
}
pConn
->
buffer
=
taosMemoryMalloc
(
RPC_MAX_UDP_SIZE
);
if
(
NULL
==
pConn
->
buffer
)
{
tError
(
"%s failed to malloc recv buffer"
,
label
);
break
;
}
struct
sockaddr_in
sin
;
unsigned
int
addrlen
=
sizeof
(
sin
);
if
(
taosGetSocketName
(
pConn
->
pSocket
,
(
struct
sockaddr
*
)
&
sin
,
&
addrlen
)
==
0
&&
sin
.
sin_family
==
AF_INET
&&
addrlen
==
sizeof
(
sin
))
{
pConn
->
localPort
=
(
uint16_t
)
ntohs
(
sin
.
sin_port
);
}
tstrncpy
(
pConn
->
label
,
label
,
sizeof
(
pConn
->
label
));
pConn
->
shandle
=
shandle
;
pConn
->
processData
=
fp
;
pConn
->
index
=
i
;
pConn
->
pSet
=
pSet
;
int
code
=
taosThreadCreate
(
&
pConn
->
thread
,
&
thAttr
,
taosRecvUdpData
,
pConn
);
if
(
code
!=
0
)
{
tError
(
"%s failed to create thread to process UDP data(%s)"
,
label
,
strerror
(
errno
));
break
;
}
}
taosThreadAttrDestroy
(
&
thAttr
);
if
(
i
!=
threads
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosCleanUpUdpConnection
(
pSet
);
return
NULL
;
}
tDebug
(
"%s UDP connection is initialized, ip:%x:%hu threads:%d"
,
label
,
ip
,
port
,
threads
);
return
pSet
;
}
void
taosStopUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
if
(
pSet
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
if
(
pConn
->
pSocket
!=
NULL
)
taosShutDownSocketRDWR
(
pConn
->
pSocket
);
if
(
pConn
->
pSocket
!=
NULL
)
taosCloseSocket
(
&
pConn
->
pSocket
);
pConn
->
pSocket
=
NULL
;
}
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
if
(
taosCheckPthreadValid
(
pConn
->
thread
))
{
taosThreadJoin
(
pConn
->
thread
,
NULL
);
}
taosMemoryFreeClear
(
pConn
->
buffer
);
// tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
}
tDebug
(
"%s UDP is stopped"
,
pSet
->
label
);
}
void
taosCleanUpUdpConnection
(
void
*
handle
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
handle
;
SUdpConn
*
pConn
;
if
(
pSet
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pSet
->
threads
;
++
i
)
{
pConn
=
pSet
->
udpConn
+
i
;
if
(
pConn
->
pSocket
!=
NULL
)
taosCloseSocket
(
&
pConn
->
pSocket
);
}
tDebug
(
"%s UDP is cleaned up"
,
pSet
->
label
);
taosMemoryFreeClear
(
pSet
);
}
void
*
taosOpenUdpConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
)
{
SUdpConnSet
*
pSet
=
(
SUdpConnSet
*
)
shandle
;
pSet
->
index
=
(
pSet
->
index
+
1
)
%
pSet
->
threads
;
SUdpConn
*
pConn
=
pSet
->
udpConn
+
pSet
->
index
;
pConn
->
port
=
port
;
tDebug
(
"%s UDP connection is setup, ip:%x:%hu localPort:%hu"
,
pConn
->
label
,
ip
,
port
,
pConn
->
localPort
);
return
pConn
;
}
static
void
*
taosRecvUdpData
(
void
*
param
)
{
SUdpConn
*
pConn
=
param
;
struct
sockaddr_in
sourceAdd
;
ssize_t
dataLen
;
unsigned
int
addLen
;
uint16_t
port
;
SRecvInfo
recvInfo
;
memset
(
&
sourceAdd
,
0
,
sizeof
(
sourceAdd
));
addLen
=
sizeof
(
sourceAdd
);
tDebug
(
"%s UDP thread is created, index:%d"
,
pConn
->
label
,
pConn
->
index
);
char
*
msg
=
pConn
->
buffer
;
setThreadName
(
"recvUdpData"
);
while
(
1
)
{
dataLen
=
taosReadFromSocket
(
pConn
->
pSocket
,
pConn
->
buffer
,
RPC_MAX_UDP_SIZE
,
0
,
(
struct
sockaddr
*
)
&
sourceAdd
,
&
addLen
);
if
(
dataLen
<=
0
)
{
tDebug
(
"%s UDP socket was closed, exiting(%s), dataLen:%d"
,
pConn
->
label
,
strerror
(
errno
),
(
int32_t
)
dataLen
);
// for windows usage, remote shutdown also returns - 1 in windows client
if
(
pConn
->
pSocket
==
NULL
)
{
break
;
}
else
{
continue
;
}
}
port
=
ntohs
(
sourceAdd
.
sin_port
);
if
(
dataLen
<
sizeof
(
SRpcHead
))
{
tError
(
"%s recvfrom failed(%s)"
,
pConn
->
label
,
strerror
(
errno
));
continue
;
}
int32_t
size
=
dataLen
+
tsRpcOverhead
;
char
*
tmsg
=
taosMemoryMalloc
(
size
);
if
(
NULL
==
tmsg
)
{
tError
(
"%s failed to allocate memory, size:%"
PRId64
,
pConn
->
label
,
(
int64_t
)
dataLen
);
continue
;
}
else
{
tTrace
(
"UDP malloc mem:%p size:%d"
,
tmsg
,
size
);
}
tmsg
+=
tsRpcOverhead
;
// overhead for SRpcReqContext
memcpy
(
tmsg
,
msg
,
dataLen
);
recvInfo
.
msg
=
tmsg
;
recvInfo
.
msgLen
=
dataLen
;
recvInfo
.
ip
=
sourceAdd
.
sin_addr
.
s_addr
;
recvInfo
.
port
=
port
;
recvInfo
.
shandle
=
pConn
->
shandle
;
recvInfo
.
thandle
=
NULL
;
recvInfo
.
chandle
=
pConn
;
recvInfo
.
connType
=
0
;
(
*
(
pConn
->
processData
))(
&
recvInfo
);
}
return
NULL
;
}
int
taosSendUdpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
dataLen
,
void
*
chandle
)
{
SUdpConn
*
pConn
=
(
SUdpConn
*
)
chandle
;
if
(
pConn
==
NULL
)
return
-
1
;
struct
sockaddr_in
destAdd
;
memset
(
&
destAdd
,
0
,
sizeof
(
destAdd
));
destAdd
.
sin_family
=
AF_INET
;
destAdd
.
sin_addr
.
s_addr
=
ip
;
destAdd
.
sin_port
=
htons
(
port
);
int
ret
=
taosSendto
(
pConn
->
pSocket
,
data
,
(
size_t
)
dataLen
,
0
,
(
struct
sockaddr
*
)
&
destAdd
,
sizeof
(
destAdd
));
return
ret
;
}
#endif
\ No newline at end of file
source/libs/transport/test/pushServer.c
浏览文件 @
305fa254
...
...
@@ -15,9 +15,9 @@
//#define _DEFAULT_SOURCE
#include "os.h"
#include "rpcLog.h"
#include "tglobal.h"
#include "tqueue.h"
#include "transLog.h"
#include "trpc.h"
int
msgSize
=
128
;
...
...
source/libs/transport/test/rclient.c
浏览文件 @
305fa254
...
...
@@ -14,9 +14,9 @@
*/
#include <tdatablock.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
...
...
source/libs/transport/test/rserver.c
浏览文件 @
305fa254
...
...
@@ -15,9 +15,9 @@
//#define _DEFAULT_SOURCE
#include "os.h"
#include "rpcLog.h"
#include "tglobal.h"
#include "tqueue.h"
#include "transLog.h"
#include "trpc.h"
int
msgSize
=
128
;
...
...
source/libs/transport/test/syncClient.c
浏览文件 @
305fa254
...
...
@@ -14,9 +14,9 @@
*/
#include <tdatablock.h>
#include "os.h"
#include "rpcLog.h"
#include "taoserror.h"
#include "tglobal.h"
#include "transLog.h"
#include "trpc.h"
#include "tutil.h"
...
...
source/libs/transport/test/transUT.cpp
浏览文件 @
305fa254
...
...
@@ -15,10 +15,10 @@
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "rpcLog.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tlog.h"
#include "transLog.h"
#include "trpc.h"
using
namespace
std
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录