未验证 提交 4d7bb97f 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 13558 taos shell refactor (#10946)

* [TD-13558]<feature>: taos shell refactor

add taosTools as submodule

* add tools/taos-tools

* add more client interface for taosTools compile

* update taos-tools

* update taos-tools

* refactor shell

* [TD-13558]<feature>: taos shell test speed
上级 d5a2cf1e
......@@ -75,6 +75,8 @@ typedef enum {
extern char *qtypeStr[];
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TSDB_PORT_HTTP 11
#ifdef __cplusplus
......
......@@ -51,10 +51,31 @@
extern "C" {
#endif
#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
#if defined(WINDOWS)
#define htobe64 htonll
#endif
#if defined(WINDOWS)
#define TAOS_EPOLL_WAIT_TIME 100
typedef SOCKET eventfd_t;
#define eventfd(a, b) -1
typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd)
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
#elif defined(_TD_DARWIN_64)
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd)
#else
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
#define EpollClose(pollFd) taosCloseSocket(pollFd)
#endif
#if defined(_TD_DARWIN_64)
// #define htobe64 htonll
......@@ -64,12 +85,12 @@ extern "C" {
# define htole16(x) OSSwapHostToLittleInt16(x)
# define be16toh(x) OSSwapBigToHostInt16(x)
# define le16toh(x) OSSwapLittleToHostInt16(x)
# define htobe32(x) OSSwapHostToBigInt32(x)
# define htole32(x) OSSwapHostToLittleInt32(x)
# define be32toh(x) OSSwapBigToHostInt32(x)
# define le32toh(x) OSSwapLittleToHostInt32(x)
# define htobe64(x) OSSwapHostToBigInt64(x)
# define htole64(x) OSSwapHostToLittleInt64(x)
# define be64toh(x) OSSwapBigToHostInt64(x)
......@@ -83,6 +104,17 @@ extern "C" {
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SocketFd;
typedef SocketFd EpollFd;
typedef struct TdSocket {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
#endif
int refId;
SocketFd fd;
} *TdSocketPtr, TdSocket;
typedef struct TdSocketServer *TdSocketServerPtr;
typedef struct TdSocket *TdSocketPtr;
typedef struct TdEpoll *TdEpollPtr;
......@@ -91,6 +123,7 @@ int32_t taosSendto(TdSocketPtr pSocket, void * msg, int len, unsigned int flags,
int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len);
int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len);
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen);
int32_t taosCloseSocketNoCheck1(SocketFd fd);
int32_t taosCloseSocket(TdSocketPtr *ppSocket);
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer);
int32_t taosShutDownSocketRD(TdSocketPtr pSocket);
......
......@@ -47,9 +47,6 @@
#endif
#endif
typedef int32_t SocketFd;
typedef SocketFd EpollFd;
typedef struct TdSocketServer {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
......@@ -58,14 +55,6 @@ typedef struct TdSocketServer {
SocketFd fd;
} *TdSocketServerPtr, TdSocketServer;
typedef struct TdSocket {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
#endif
int refId;
SocketFd fd;
} *TdSocketPtr, TdSocket;
typedef struct TdEpoll {
#if SOCKET_WITH_LOCK
TdThreadRwlock rwlock;
......
......@@ -4,6 +4,7 @@ IF (TD_TAOS_TOOLS)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/common)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/util)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/os)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/libs/transport)
ADD_SUBDIRECTORY(taos-tools)
ENDIF ()
......
......@@ -4,9 +4,7 @@ add_executable(shell ${SHELL_SRC})
target_link_libraries(
shell
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
PRIVATE os common transport util
)
target_include_directories(
shell
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_MSG_H
#define TDENGINE_SYNC_MSG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tsync.h"
typedef enum {
TAOS_SMSG_START = 0,
TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_SYNC_DATA_RSP = 2,
TAOS_SMSG_SYNC_FWD = 3,
TAOS_SMSG_SYNC_FWD_RSP = 4,
TAOS_SMSG_SYNC_REQ = 5,
TAOS_SMSG_SYNC_REQ_RSP = 6,
TAOS_SMSG_SYNC_MUST = 7,
TAOS_SMSG_SYNC_MUST_RSP = 8,
TAOS_SMSG_STATUS = 9,
TAOS_SMSG_STATUS_RSP = 10,
TAOS_SMSG_SETUP = 11,
TAOS_SMSG_SETUP_RSP = 12,
TAOS_SMSG_SYNC_FILE = 13,
TAOS_SMSG_SYNC_FILE_RSP = 14,
TAOS_SMSG_TEST = 15,
TAOS_SMSG_END = 16
} ESyncMsgType;
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
#pragma pack(push, 1)
typedef struct {
int8_t type; // msg type
int8_t protocol; // protocol version
uint16_t signature; // fixed value
int32_t code; //
int32_t cId; // cluster Id
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
uint32_t cksum;
} SSyncHead;
typedef struct {
SSyncHead head;
uint16_t port;
uint16_t tranId;
int32_t sourceId; // only for arbitrator
char fqdn[TSDB_FQDN_LEN];
} SSyncMsg;
typedef struct {
SSyncHead head;
int8_t sync;
int8_t reserved;
uint16_t tranId;
int8_t reserverd[4];
} SSyncRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
SSyncHead head;
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA];
} SPeersStatus;
typedef struct {
SSyncHead head;
uint64_t fversion;
} SFileVersion;
typedef struct {
SSyncHead head;
int8_t ack;
} SFileAck;
typedef struct {
SSyncHead head;
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
#define SYNC_PROTOCOL_VERSION 1
#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
extern char *statusType[];
uint16_t syncGenTranId();
int32_t syncCheckHead(SSyncHead *pHead);
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len);
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code);
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
void syncBuildFileVersion(SFileVersion *pMsg, int32_t vgId);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEPEER_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_H
#define TDENGINE_SYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#define TAOS_SYNC_MAX_REPLICA 5
#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF
typedef enum {
TAOS_SYNC_ROLE_OFFLINE = 0,
TAOS_SYNC_ROLE_UNSYNCED = 1,
TAOS_SYNC_ROLE_SYNCING = 2,
TAOS_SYNC_ROLE_SLAVE = 3,
TAOS_SYNC_ROLE_MASTER = 4
} ESyncRole;
typedef enum {
TAOS_SYNC_STATUS_INIT = 0,
TAOS_SYNC_STATUS_START = 1,
TAOS_SYNC_STATUS_FILE = 2,
TAOS_SYNC_STATUS_CACHE = 3
} ESyncStatus;
typedef struct {
uint32_t nodeId; // node ID assigned by TDengine
uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo;
typedef struct {
int8_t quorum; // number of confirms required, >=1
int8_t replica; // number of replications, >=1
SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA];
} SSyncCfg;
typedef struct {
int32_t selfIndex;
uint32_t nodeId[TAOS_SYNC_MAX_REPLICA];
int32_t role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole;
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
// when forward is confirmed by peer, master call this API to notify app
typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code);
// when role is changed, call this to notify app
typedef void (*FNotifyRole)(int32_t vgId, int8_t role);
// if a number of retrieving data failed, call this to start flow control
typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
// when data file is synced successfully, notity app
typedef void (*FStartSyncFile)(int32_t vgId);
typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
typedef struct {
int32_t vgId; // vgroup ID
uint64_t version; // initial version
SSyncCfg syncCfg; // configuration from mgmt
char path[TSDB_FILENAME_LEN]; // path to the file
void * pTsdb;
FGetWalInfo getWalInfoFp;
FWriteToCache writeToCacheFp;
FConfirmForward confirmForward;
FNotifyRole notifyRoleFp;
FNotifyFlowCtrl notifyFlowCtrlFp;
FStartSyncFile startSyncFileFp;
FStopSyncFile stopSyncFileFp;
FGetVersion getVersionFp;
FSendFile sendFileFp;
FRecvFile recvFileFp;
} SSyncInfo;
typedef void *tsync_h;
int32_t syncInit();
void syncCleanUp();
int64_t syncStart(const SSyncInfo *);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg *);
int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force);
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force);
void syncRecover(int64_t rid); // recover from other nodes:
int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[];
//global configurable parameters
extern int32_t sDebugFlag;
extern char tsArbitrator[];
extern uint16_t tsSyncPort;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SYNC_H
......@@ -32,6 +32,8 @@
int indicator = 1;
void insertChar(Command *cmd, char *c, int size);
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType);
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
static char doc[] = "";
......@@ -59,7 +61,8 @@ static struct argp_option options[] = {
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
{"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."},
// Shuduo: 3.0 does not support UDP any more
// {"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."},
{0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
......@@ -629,16 +632,25 @@ int main(int argc, char *argv[]) {
taosDumpGlobalCfg();
exit(0);
}
#endif
if (args.netTestRole && args.netTestRole[0] != 0) {
if (taos_init()) {
TAOS *con = NULL;
if (args.auth == NULL) {
con = taos_connect(args.host, args.user, args.password, args.database, args.port);
} else {
con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port);
}
/* if (taos_init()) {
printf("Failed to init taos");
exit(EXIT_FAILURE);
}
*/
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
taos_close(con);
exit(0);
}
#endif
/* Initialize the shell */
TAOS *con = shellInit(&args);
......
......@@ -14,18 +14,20 @@
*/
#define _DEFAULT_SOURCE
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "taosdef.h"
#include "tmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "tglobal.h"
#include "tsocket.h"
#include "trpc.h"
#include "rpcHead.h"
#include "tchecksum.h"
#include "syncMsg.h"
#include "osSocket.h"
#define MAX_PKG_LEN (64 * 1000)
#define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024)
#define MIN_SPEED_PKG_LEN 1024
......@@ -33,7 +35,7 @@
#define MIN_SPEED_PKG_NUM 1
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
extern int32_t tsRpcMaxUdpSize;
extern int tsRpcMaxUdpSize;
typedef struct {
char * hostFqdn;
......@@ -71,15 +73,23 @@ static void *taosNetBindUdpPort(void *sarg) {
return NULL;
}
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(serverSocket);
return NULL;
}
pSocket->fd = serverSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
......@@ -98,13 +108,13 @@ static void *taosNetBindUdpPort(void *sarg) {
uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
if (iDataNum > 0) {
iDataNum = taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
iDataNum = taosSendto(pSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
}
uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
}
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
......@@ -132,25 +142,35 @@ static void *taosNetBindTcpPort(void *sarg) {
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int32_t reuse = 1;
if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(serverSocket);
return NULL;
}
pSocket->fd = serverSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
uError("failed to bind TCP port:%d since %s", port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
if (taosKeepTcpAlive(serverSocket) < 0) {
if (taosKeepTcpAlive(pSocket) < 0) {
uError("failed to set tcp server keep-alive option since %s", strerror(errno));
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
if (listen(serverSocket, 10) < 0) {
uError("failed to listen TCP port:%d since %s", port, strerror(errno));
taosCloseSocket(&pSocket);
return NULL;
}
......@@ -163,26 +183,26 @@ static void *taosNetBindTcpPort(void *sarg) {
continue;
}
int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen);
int32_t ret = taosReadMsg(pSocket, buffer, pinfo->pktLen);
if (ret < 0 || ret != pinfo->pktLen) {
uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
ret = taosWriteMsg(client, buffer, pinfo->pktLen);
ret = taosWriteMsg(pSocket, buffer, pinfo->pktLen);
if (ret < 0) {
uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno));
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port);
}
taosCloseSocket(serverSocket);
taosCloseSocket(&pSocket);
return NULL;
}
......@@ -196,9 +216,17 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) {
}
int32_t reuse = 1;
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(clientSocket);
return -1;
}
pSocket->fd = clientSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(clientSocket);
taosCloseSocket(&pSocket);
return -1;
}
......@@ -210,27 +238,30 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) {
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
taosKeepTcpAlive(clientSocket);
taosKeepTcpAlive(pSocket);
sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port);
sprintf(buffer + info->pktLen - 16, "1122334455667788");
int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen);
int32_t ret = taosWriteMsg(pSocket, buffer, info->pktLen);
if (ret < 0) {
uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
ret = taosReadMsg(clientSocket, buffer, info->pktLen);
ret = taosReadMsg(pSocket, buffer, info->pktLen);
if (ret < 0) {
uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
taosCloseSocket(clientSocket);
taosCloseSocket(&pSocket);
return 0;
}
......@@ -247,13 +278,23 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) {
return -1;
}
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket));
if (pSocket == NULL) {
taosCloseSocketNoCheck1(clientSocket);
return -1;
}
pSocket->fd = clientSocket;
pSocket->refId = 0;
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return -1;
}
if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(&pSocket);
return -1;
}
......@@ -268,9 +309,10 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) {
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
iDataNum = taosSendto(clientSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
iDataNum = taosSendto(pSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: failed to perform sendto func since %s", strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
......@@ -280,10 +322,11 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) {
if (iDataNum < 0 || iDataNum != info->pktLen) {
uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno));
taosCloseSocket(&pSocket);
return -1;
}
taosCloseSocket(clientSocket);
taosCloseSocket(&pSocket);
return 0;
}
......@@ -339,7 +382,7 @@ void *taosNetInitRpc(char *secretEncrypt, char spi) {
}
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupReq *pStep) {
SRpcEpSet epSet;
SEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void * pRpcConn;
......@@ -352,11 +395,10 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p
return TSDB_CODE_RPC_NETWORK_UNAVAIL;
}
memset(&epSet, 0, sizeof(SRpcEpSet));
epSet.inUse = 0;
memset(&epSet, 0, sizeof(SEpSet));
strcpy(epSet.eps[0].fqdn, serverFqdn);
epSet.eps[0].port = port;
epSet.numOfEps = 1;
epSet.port[0] = port;
strcpy(epSet.fqdn[0], serverFqdn);
reqMsg.msgType = TDMT_DND_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pktLen);
......@@ -425,8 +467,8 @@ static void taosNetCheckSync(char *host, int32_t port) {
return;
}
SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
if (connFd < 0) {
TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
if (pSocket == NULL) {
uError("failed to create socket while test port:%d since %s", port, strerror(errno));
return;
}
......@@ -443,17 +485,17 @@ static void taosNetCheckSync(char *host, int32_t port) {
pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead);
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
if (taosWriteMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
uError("failed to test port:%d while send msg since %s", port, strerror(errno));
return;
}
if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
if (taosReadMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
uError("failed to test port:%d while recv msg since %s", port, strerror(errno));
}
uInfo("successed to test TCP port:%d", port);
taosCloseSocket(connFd);
taosCloseSocket(&pSocket);
}
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
......@@ -494,7 +536,6 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
}
taosNetCheckSync(host, startPort + TSDB_PORT_SYNC);
taosNetCheckSync(host, startPort + TSDB_PORT_ARBITRATOR);
}
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
......@@ -578,7 +619,7 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
}
tsCompressMsgSize = -1;
SRpcEpSet epSet;
SEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void * pRpcConn;
......@@ -596,11 +637,10 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
for (int32_t i = 1; i <= pkgNum; i++) {
uint64_t startTime = taosGetTimestampUs();
memset(&epSet, 0, sizeof(SRpcEpSet));
epSet.inUse = 0;
memset(&epSet, 0, sizeof(SEpSet));
strcpy(epSet.eps[0].fqdn, host);
epSet.eps[0].port = port;
epSet.numOfEps = 1;
epSet.port[0] = port;
strcpy(epSet.fqdn[0], host);
reqMsg.msgType = TDMT_DND_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pkgLen);
......@@ -641,7 +681,7 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
int32_t pkgNum, char *pkgType) {
tscEmbedded = 1;
tsLogEmbedded = 1;
if (host == NULL) host = tsLocalFqdn;
if (port == 0) port = tsServerPort;
if (0 == strcmp("speed", role)){
......@@ -659,14 +699,14 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
} else if (0 == strcmp("server", role)) {
taosNetTestServer(host, port, pkgLen);
} else if (0 == strcmp("rpc", role)) {
tscEmbedded = 0;
tsLogEmbedded = 0;
taosNetTestRpc(host, port, pkgLen);
} else if (0 == strcmp("sync", role)) {
taosNetCheckSync(host, port);
} else if (0 == strcmp("startup", role)) {
taosNetTestStartup(host, port);
} else if (0 == strcmp("speed", role)) {
tscEmbedded = 0;
tsLogEmbedded = 0;
char type[10] = {0};
taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
}else if (0 == strcmp("fqdn", role)) {
......@@ -675,5 +715,5 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
taosNetTestStartup(host, port);
}
tscEmbedded = 0;
tsLogEmbedded = 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册