Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
70905e21
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
70905e21
编写于
4月 24, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(tools) add network speed test codes
上级
eaaee9c5
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
94 addition
and
480 deletion
+94
-480
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+6
-0
source/dnode/mgmt/interface/inc/dmInt.h
source/dnode/mgmt/interface/inc/dmInt.h
+1
-0
source/dnode/mgmt/interface/src/dmInt.c
source/dnode/mgmt/interface/src/dmInt.c
+8
-0
tools/shell/inc/shellInt.h
tools/shell/inc/shellInt.h
+7
-6
tools/shell/src/shellNettest.c
tools/shell/src/shellNettest.c
+71
-474
未找到文件。
include/common/tmsgdef.h
浏览文件 @
70905e21
...
...
@@ -85,6 +85,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_VNODE
,
"dnode-drop-vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CONFIG_DNODE
,
"dnode-config-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_SERVER_STATUS
,
"dnode-server-status"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_NET_TEST
,
"dnode-net-test"
,
NULL
,
NULL
)
// Requests handled by MNODE
TD_NEW_MSG_SEG
(
TDMT_MND_MSG
)
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
70905e21
...
...
@@ -131,6 +131,12 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
return
;
}
if
(
msgType
==
TDMT_DND_NET_TEST
)
{
dTrace
(
"net test req will be processed, handle:%p, app:%p"
,
pMsg
->
handle
,
pMsg
->
ahandle
);
dmProcessServerStatusReq
(
pDnode
,
pMsg
);
return
;
}
if
(
pDnode
->
status
!=
DND_STAT_RUNNING
)
{
dError
(
"msg:%s ignored since dnode not running, handle:%p app:%p"
,
TMSG_INFO
(
msgType
),
pMsg
->
handle
,
pMsg
->
ahandle
);
if
(
isReq
)
{
...
...
source/dnode/mgmt/interface/inc/dmInt.h
浏览文件 @
70905e21
...
...
@@ -37,6 +37,7 @@ void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgF
void
dmReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dmReportStartupByWrapper
(
SMgmtWrapper
*
pWrapper
,
const
char
*
pName
,
const
char
*
pDesc
);
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmProcessNettestReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
);
// dmFile.c
...
...
source/dnode/mgmt/interface/src/dmInt.c
浏览文件 @
70905e21
...
...
@@ -171,6 +171,14 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
}
}
void
dmProcessNettestReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
)
{
dDebug
(
"net test req is received"
);
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
shell
.
args
.
pktLen
);
rsp
.
contLen
=
shell
.
args
.
pktLen
;
rpcSendResponse
(
&
rsp
);
}
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
dDebug
(
"server status req is received"
);
...
...
tools/shell/inc/shellInt.h
浏览文件 @
70905e21
...
...
@@ -22,6 +22,7 @@
#include "taoserror.h"
#include "tconfig.h"
#include "tglobal.h"
#include "trpc.h"
#include "ttypes.h"
#include "tutil.h"
...
...
@@ -30,12 +31,12 @@
#define SHELL_HISTORY_FILE ".taos_history"
#define SHELL_DEFAULT_RES_SHOW_NUM 100
#define SHELL_DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30
#define SHELL_MAX_PKG_LEN 2 * 1024 * 1024
#define SHELL_MIN_PKG_LEN 1
#define SHELL_DEF_PKG_LEN 1024
#define SHELL_MAX_PKG_NUM
1
* 1024 * 1024
#define SHELL_MIN_PKG_NUM 1
#define SHELL_DEF_PKG_NUM 100
#define SHELL_MAX_PKG_LEN
2 * 1024 * 1024
#define SHELL_MIN_PKG_LEN
1
#define SHELL_DEF_PKG_LEN
1024
#define SHELL_MAX_PKG_NUM
1
* 1024 * 1024
#define SHELL_MIN_PKG_NUM
1
#define SHELL_DEF_PKG_NUM
100
typedef
struct
{
char
*
hist
[
SHELL_MAX_HISTORY_SIZE
];
...
...
tools/shell/src/shellNettest.c
浏览文件 @
70905e21
...
...
@@ -16,507 +16,104 @@
#define _GNU_SOURCE
#include "shellInt.h"
void
shellTestNetWork
()
{}
static
void
shellWorkAsClient
()
{
SRpcInit
rpcInit
=
{
0
};
SEpSet
epSet
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
SRpcMsg
rpcRsp
=
{
0
};
void
*
clientRpc
=
NULL
;
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
#if 0
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "rpcHead.h"
#include "syncMsg.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tglobal.h"
#include "tlog.h"
#include "tmsg.h"
#include "trpc.h"
taosEncryptPass_c
((
uint8_t
*
)(
"_pwd"
),
strlen
(
"_pwd"
),
pass
);
rpcInit
.
label
=
"CHK"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
sessions
=
16
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"_dnd"
;
rpcInit
.
ckey
=
"_key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
pass
;
#include "osSocket.h"
#define MAX_PKG_LEN (64 * 1000)
#define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024)
#define MIN_SPEED_PKG_LEN 1024
#define MAX_SPEED_PKG_NUM 10000
#define MIN_SPEED_PKG_NUM 1
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
extern int tsRpcMaxUdpSize;
typedef struct {
char * hostFqdn;
uint32_t hostIp;
int32_t port;
int32_t pktLen;
} STestInfo;
static void *taosNetBindUdpPort(void *sarg) {
STestInfo *pinfo = (STestInfo *)sarg;
int32_t port = pinfo->port;
SOCKET serverSocket;
char buffer[BUFFER_SIZE];
int32_t iDataNum;
socklen_t sin_size;
int32_t bufSize = 1024000;
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
setThreadName("netBindUdpPort");
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create UDP socket since %s", strerror(errno));
return NULL;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind UDP port:%d since %s", port, strerror(errno));
return NULL;
}
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(serverSocket);
return NULL;
}
pSocket->fd = serverSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return NULL;
}
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return NULL;
}
uInfo("UDP server at port:%d is listening", port);
while (1) {
memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&server_addr);
iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);
if (iDataNum < 0) {
uDebug("failed to perform recvfrom func at %d since %s", port, strerror(errno));
continue;
}
uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
if (iDataNum > 0) {
iDataNum = taosSendto(pSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
}
uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
}
taosCloseSocket(&pSocket);
return NULL;
}
static void *taosNetBindTcpPort(void *sarg) {
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
STestInfo *pinfo = sarg;
int32_t port = pinfo->port;
SOCKET serverSocket;
int32_t addr_len = sizeof(clientAddr);
SOCKET client;
char buffer[BUFFER_SIZE];
setThreadName("netBindTcpPort");
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
uError("failed to create TCP socket since %s", strerror(errno));
return NULL;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int32_t reuse = 1;
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(serverSocket);
return NULL;
}
pSocket->fd = serverSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind TCP port:%d since %s", port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (taosKeepTcpAlive(pSocket) < 0) {
uError("failed to set tcp server keep-alive option since %s", strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (listen(serverSocket, 10) < 0) {
uError("failed to listen TCP port:%d since %s", port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
uInfo("TCP server at port:%d is listening", port);
while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) {
uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno));
continue;
}
int32_t ret = taosReadMsg(pSocket, buffer, pinfo->pktLen);
if (ret < 0 || ret != pinfo->pktLen) {
uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
ret = taosWriteMsg(pSocket, buffer, pinfo->pktLen);
if (ret < 0) {
uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
}
taosCloseSocket(&pSocket);
return NULL;
}
static int32_t taosNetCheckTcpPort(STestInfo *info) {
SOCKET clientSocket;
char buffer[BUFFER_SIZE] = {0};
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
uError("failed to create TCP client socket since %s", strerror(errno));
return -1;
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
clientRpc
==
NULL
)
{
printf
(
"failed to init net test client since %s
\n
"
,
terrstr
());
goto
_OVER
;
}
printf
(
"net test client is initialized
\n
"
);
int32_t reuse = 1;
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(clientSocket);
return -1;
}
pSocket->fd = clientSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
tstrncpy
(
epSet
.
eps
[
0
].
fqdn
,
shell
.
args
.
host
,
TSDB_FQDN_LEN
);
epSet
.
eps
[
0
].
port
=
(
uint16_t
)
shell
.
args
.
port
;
struct sockaddr_in serverAddr;
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = (uint16_t)htons((uint16_t)info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
int32_t
totalSucc
=
0
;
uint64_t
startTime
=
taosGetTimestampUs
();
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
for
(
int32_t
i
=
0
;
i
<
shell
.
args
.
pktNum
;
++
i
)
{
SRpcMsg
rpcMsg
=
{.
ahandle
=
(
void
*
)
0x9525
,
.
msgType
=
TDMT_DND_NET_TEST
};
rpcMsg
.
pCont
=
rpcMallocCont
(
shell
.
args
.
pktLen
);
rpcMsg
.
contLen
=
shell
.
args
.
pktLen
;
taosKeepTcpAlive(pSocket);
printf
(
"net test request is sent, size:%d
\n
"
,
rpcMsg
.
contLen
);
rpcSendRecv
(
clientRpc
,
&
epSet
,
&
rpcMsg
,
&
rpcRsp
);
printf
(
"net test response is received, size:%d
\n
"
,
rpcMsg
.
contLen
);
sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
sprintf(buffer + info->pktLen - 16, "1122334455667788");
if
(
rpcRsp
.
code
==
0
)
totalSucc
++
;
int32_t ret = taosWriteMsg(pSocket, buffer, info->pktLen);
if (ret < 0) {
uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
rpcFreeCont
(
rpcRsp
.
pCont
);
rpcRsp
.
pCont
=
NULL
;
}
ret = taosReadMsg(pSocket, buffer, info->pktLen);
if (ret < 0) {
uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
taosCloseSocket(&pSocket);
return 0;
}
static int32_t taosNetCheckUdpPort(STestInfo *info) {
SOCKET clientSocket;
char buffer[BUFFER_SIZE] = {0};
int32_t iDataNum = 0;
int32_t bufSize = 1024000;
struct sockaddr_in serverAddr;
uint64_t
endTime
=
taosGetTimestampUs
();
uint64_t
elT
=
endTime
-
startTime
;
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
uError("failed to create udp client socket since %s", strerror(errno));
return -1;
}
printf
(
"
\n
total succ:%5d/%d
\t
cost:%8.2lf ms
\t
speed:%8.2lf MB/s
\n
"
,
totalSucc
,
shell
.
args
.
pktNum
,
elT
/
1000
.
0
,
shell
.
args
.
pktLen
/
(
elT
/
1000000
.
0
)
/
1024
.
0
/
1024
.
0
*
totalSucc
);
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(clientSocket);
return -1;
_OVER:
if
(
clientRpc
!=
NULL
)
{
rpcClose
(
clientRpc
);
}
pSocket->fd = clientSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return -1;
if
(
rpcRsp
.
pCont
!=
NULL
)
{
rpcFreeCont
(
rpcRsp
.
pCont
);
}
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return -1;
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
struct in_addr ipStr;
memcpy(&ipStr, &info->hostIp, 4);
sprintf(buffer, "client send udp pkg to %s:%d, content: 1122334455", taosInetNtoa(ipStr), info->port);
sprintf(buffer + info->pktLen - 16, "1122334455667788");
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
iDataNum = taosSendto(pSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: failed to perform sendto func since %s", strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&serverAddr);
iDataNum = recvfrom(clientSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port,
strerror(errno)); taosCloseSocket(&pSocket); return -1;
}
taosCloseSocket(&pSocket);
return 0;
}
static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort, int32_t pktLen) {
int32_t ret;
STestInfo info;
memset(&info, 0, sizeof(STestInfo));
info.hostIp = hostIp;
info.pktLen = pktLen;
for (int32_t port = startPort; port <= endPort; port++) {
info.port = port;
ret = taosNetCheckTcpPort(&info);
if (ret != 0) {
printf("failed to test TCP port:%d\n", port);
} else {
printf("successed to test TCP port:%d\n", port);
}
ret = taosNetCheckUdpPort(&info);
if (ret != 0) {
printf("failed to test UDP port:%d\n", port);
} else {
printf("successed to test UDP port:%d\n", port);
}
}
static
void
shellProcessMsg
(
void
*
p
,
SRpcMsg
*
pRpc
,
SEpSet
*
pEpSet
)
{
printf
(
"net test request is received, size:%d
\n
"
,
pRpc
->
contLen
);
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
shell
.
args
.
pktLen
);
rsp
.
contLen
=
shell
.
args
.
pktLen
;
rpcSendResponse
(
&
rsp
);
}
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
uInfo("work as client, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
void
shellNettestHandler
(
int32_t
signum
,
void
*
sigInfo
,
void
*
context
)
{
shellExit
();
}
uint32_t serverIp = taosGetIpv4FromFqdn(host);
if (serverIp == 0xFFFFFFFF) {
uError("failed to resolve fqdn:%s", host);
exit(-1);
}
static
void
shellWorkAsServer
()
{
SRpcInit
rpcInit
=
{
0
};
rpcInit
.
localPort
=
shell
.
args
.
port
;
rpcInit
.
label
=
"CHK"
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
rpcInit
.
cfp
=
(
RpcCfp
)
shellProcessMsg
;
rpcInit
.
sessions
=
10
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host);
taosNetCheckPort(serverIp, startPort, startPort, pkgLen);
}
static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
uInfo("work as server, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
int32_t port = startPort;
int32_t num = 1;
if (num < 0) num = 1;
TdThread *pids = taosMemoryMalloc(2 * num * sizeof(TdThread));
STestInfo *tinfos = taosMemoryMalloc(num * sizeof(STestInfo));
STestInfo *uinfos = taosMemoryMalloc(num * sizeof(STestInfo));
for (int32_t i = 0; i < num; i++) {
STestInfo *tcpInfo = tinfos + i;
tcpInfo->port = port + i;
tcpInfo->pktLen = pkgLen;
if (taosThreadCreate(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
uInfo("failed to create TCP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1);
}
STestInfo *udpInfo = uinfos + i;
udpInfo->port = port + i;
tcpInfo->pktLen = pkgLen;
if (taosThreadCreate(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
uInfo("failed to create UDP test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1);
}
}
for (int32_t i = 0; i < num; i++) {
taosThreadJoin(pids[i], NULL);
taosThreadJoin(pids[(num + i)], NULL);
}
}
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType) {
#if 0
// record config
int32_t compressTmp = tsCompressMsgSize;
int32_t maxUdpSize = tsRpcMaxUdpSize;
int32_t forceTcp = tsRpcForceTcp;
if (0 == strcmp("tcp", pkgType)){
tsRpcForceTcp = 1;
tsRpcMaxUdpSize = 0; // force tcp
} else {
tsRpcForceTcp = 0;
tsRpcMaxUdpSize = INT_MAX;
}
tsCompressMsgSize = -1;
SEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void * pRpcConn;
char secretEncrypt[32] = {0};
char spi = 0;
pRpcConn = taosNetInitRpc(secretEncrypt, spi);
if (NULL == pRpcConn) {
uError("failed to init client rpc");
return;
void
*
serverRpc
=
rpcOpen
(
&
rpcInit
);
if
(
serverRpc
==
NULL
)
{
printf
(
"failed to init net test server since %s"
,
terrstr
());
}
printf("check net spend, host:%s port:%d pkgLen:%d pkgNum:%d pkgType:%s\n\n", host, port, pkgLen, pkgNum, pkgType);
int32_t totalSucc = 0;
uint64_t startT = taosGetTimestampUs();
for (int32_t i = 1; i <= pkgNum; i++) {
uint64_t startTime = taosGetTimestampUs();
memset(&epSet, 0, sizeof(SEpSet));
strcpy(epSet.eps[0].fqdn, host);
epSet.eps[0].port = port;
epSet.numOfEps = 1;
reqMsg.msgType = TDMT_DND_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pkgLen);
reqMsg.contLen = pkgLen;
reqMsg.code = 0;
reqMsg.handle = NULL; // rpc handle returned to app
reqMsg.ahandle = NULL; // app handle set by client
strcpy(reqMsg.pCont, "nettest speed");
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
int code = 0;
if ((rspMsg.code != 0) || (rspMsg.msgType != TDMT_DND_NETWORK_TEST + 1)) {
uError("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
code = -1;
}else{
totalSucc ++;
}
printf
(
"net test server is initialized
\n
"
);
rpcFreeCont(rspMsg.pCont);
uint64_t endTime = taosGetTimestampUs();
uint64_t el = endTime - startTime;
printf("progress:%5d/%d\tstatus:%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", i, pkgNum, code, el/1000.0,
pkgLen/(el/1000000.0)/1024.0/1024.0);
}
int64_t endT = taosGetTimestampUs();
uint64_t elT = endT - startT;
printf("\ntotal succ:%5d/%d\tcost:%8.2lf ms\tspeed:%8.2lf MB/s\n", totalSucc, pkgNum, elT/1000.0,
pkgLen/(elT/1000000.0)/1024.0/1024.0*totalSucc);
rpcClose(pRpcConn);
// return config
tsCompressMsgSize = compressTmp;
tsRpcMaxUdpSize = maxUdpSize;
tsRpcForceTcp = forceTcp;
return;
#endif
taosSetSignal
(
SIGTERM
,
shellNettestHandler
);
while
(
1
)
taosMsleep
(
10
);
}
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType) {
tsLogEmbedded = 1;
if (host == NULL) host = tsLocalFqdn;
if (port == 0) port = tsServerPort;
if (0 == strcmp("speed", role)) {
if (pkgLen <= MIN_SPEED_PKG_LEN) pkgLen = MIN_SPEED_PKG_LEN;
if (pkgLen > MAX_SPEED_PKG_LEN) pkgLen = MAX_SPEED_PKG_LEN;
if (pkgNum <= MIN_SPEED_PKG_NUM) pkgNum = MIN_SPEED_PKG_NUM;
if (pkgNum > MAX_SPEED_PKG_NUM) pkgNum = MAX_SPEED_PKG_NUM;
} else {
if (pkgLen <= 10) pkgLen = 1000;
if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
void
shellTestNetWork
()
{
if
(
strcmp
(
shell
.
args
.
netrole
,
"client"
)
==
0
)
{
shellWorkAsClient
();
}
if (0 == strcmp("client", role)) {
taosNetTestClient(host, port, pkgLen);
} else if (0 == strcmp("server", role)) {
taosNetTestServer(host, port, pkgLen);
} else if (0 == strcmp("speed", role)) {
tsLogEmbedded = 0;
char type[10] = {0};
taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
} else {
TASSERT(1);
if
(
strcmp
(
shell
.
args
.
netrole
,
"server"
)
==
0
)
{
shellWorkAsServer
();
}
tsLogEmbedded = 0;
}
#endif
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录