提交 4e3f4f8c 编写于 作者: J jtao1735

some changes

上级 dcb309ce
...@@ -195,11 +195,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -195,11 +195,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_CITY_LEN 20 #define TSDB_CITY_LEN 20
#define TSDB_STATE_LEN 20 #define TSDB_STATE_LEN 20
#define TSDB_COUNTRY_LEN 20 #define TSDB_COUNTRY_LEN 20
#define TSDB_VNODES_SUPPORT 6
#define TSDB_MGMT_SUPPORT 4
#define TSDB_LOCALE_LEN 64 #define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 64 #define TSDB_TIMEZONE_LEN 64
#define TSDB_FQDN_LEN 64
#define TSDB_IPv4ADDR_LEN 16 #define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128 #define TSDB_FILENAME_LEN 128
#define TSDB_METER_VNODE_BITS 20 #define TSDB_METER_VNODE_BITS 20
......
...@@ -187,7 +187,7 @@ extern char *taosMsg[]; ...@@ -187,7 +187,7 @@ extern char *taosMsg[];
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {
uint32_t ip; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
} SIpAddr; } SIpAddr;
......
...@@ -31,8 +31,8 @@ extern int tsRpcHeadSize; ...@@ -31,8 +31,8 @@ extern int tsRpcHeadSize;
typedef struct { typedef struct {
int8_t inUse; int8_t inUse;
int8_t numOfIps; int8_t numOfIps;
uint16_t port; uint16_t port[TSDB_MAX_MPEERS];
uint32_t ip[TSDB_MAX_MPEERS]; char fqdn[TSDB_MAX_MPEERS][TSDB_FQDN_LEN];
} SRpcIpSet; } SRpcIpSet;
typedef struct { typedef struct {
...@@ -51,7 +51,6 @@ typedef struct { ...@@ -51,7 +51,6 @@ typedef struct {
} SRpcMsg; } SRpcMsg;
typedef struct { typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
......
...@@ -38,14 +38,15 @@ typedef enum _TAOS_SYNC_STATUS { ...@@ -38,14 +38,15 @@ typedef enum _TAOS_SYNC_STATUS {
typedef struct { typedef struct {
uint32_t nodeId; // node ID assigned by TDengine uint32_t nodeId; // node ID assigned by TDengine
uint32_t nodeIp; // node IP address uint16_t nodePort; // node sync Port
char name[TSDB_FILENAME_LEN]; // external node name char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct {
uint32_t arbitratorIp; // arbitrator IP address
int8_t quorum; // number of confirms required, >=1 int8_t quorum; // number of confirms required, >=1
int8_t replica; // number of replications, >=1 int8_t replica; // number of replications, >=1
uint16_t arbitratorPort; // arbitrator port
char arbitratorFqdn[TSDB_FQDN_LEN]; // arbitrator IP address
SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA]; SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA];
} SSyncCfg; } SSyncCfg;
......
...@@ -22,8 +22,8 @@ extern "C" { ...@@ -22,8 +22,8 @@ extern "C" {
void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void rpcCloseConnCache(void *handle); void rpcCloseConnCache(void *handle);
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType); void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType);
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType); void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
extern "C" { extern "C" {
#endif #endif
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle);
void taosCleanUpTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port);
void taosCloseTcpConnection(void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
......
...@@ -22,10 +22,10 @@ extern "C" { ...@@ -22,10 +22,10 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int, void *fp, void *shandle);
void taosCleanUpUdpConnection(void *handle); void taosCleanUpUdpConnection(void *handle);
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle); int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle);
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port); void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port);
void taosFreeMsgHdr(void *hdr); void taosFreeMsgHdr(void *hdr);
int taosMsgHdrSize(void *hdr); int taosMsgHdrSize(void *hdr);
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include "rpcCache.h" #include "rpcCache.h"
typedef struct SConnHash { typedef struct SConnHash {
uint32_t ip; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
char connType; char connType;
struct SConnHash *prev; struct SConnHash *prev;
...@@ -46,7 +46,7 @@ typedef struct { ...@@ -46,7 +46,7 @@ typedef struct {
int64_t *lockedBy; int64_t *lockedBy;
} SConnCache; } SConnCache;
static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType); static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType);
static void rpcLockCache(int64_t *lockedBy); static void rpcLockCache(int64_t *lockedBy);
static void rpcUnlockCache(int64_t *lockedBy); static void rpcUnlockCache(int64_t *lockedBy);
static void rpcCleanConnCache(void *handle, void *tmrId); static void rpcCleanConnCache(void *handle, void *tmrId);
...@@ -114,7 +114,7 @@ void rpcCloseConnCache(void *handle) { ...@@ -114,7 +114,7 @@ void rpcCloseConnCache(void *handle) {
free(pCache); free(pCache);
} }
void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType) { void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) {
int hash; int hash;
SConnHash * pNode; SConnHash * pNode;
SConnCache *pCache; SConnCache *pCache;
...@@ -125,9 +125,9 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, i ...@@ -125,9 +125,9 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, i
assert(pCache); assert(pCache);
assert(data); assert(data);
hash = rpcHashConn(pCache, ip, port, connType); hash = rpcHashConn(pCache, fqdn, port, connType);
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
pNode->ip = ip; strcpy(pNode->fqdn, fqdn);
pNode->port = port; pNode->port = port;
pNode->connType = connType; pNode->connType = connType;
pNode->data = data; pNode->data = data;
...@@ -147,12 +147,12 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, i ...@@ -147,12 +147,12 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, i
pCache->total++; pCache->total++;
tTrace("%p ip:0x%x:%hu:%d:%d:%p added into cache, connections:%d", data, ip, port, connType, hash, pNode, pCache->count[hash]); tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
return; return;
} }
void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType) { void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) {
int hash; int hash;
SConnHash * pNode; SConnHash * pNode;
SConnCache *pCache; SConnCache *pCache;
...@@ -163,7 +163,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT ...@@ -163,7 +163,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
hash = rpcHashConn(pCache, ip, port, connType); hash = rpcHashConn(pCache, fqdn, port, connType);
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy+hash);
pNode = pCache->connHashList[hash]; pNode = pCache->connHashList[hash];
...@@ -174,7 +174,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT ...@@ -174,7 +174,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT
break; break;
} }
if (pNode->ip == ip && pNode->port == port && pNode->connType == connType) break; if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break;
pNode = pNode->next; pNode = pNode->next;
} }
...@@ -201,7 +201,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT ...@@ -201,7 +201,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy+hash);
if (pData) { if (pData) {
tTrace("%p ip:0x%x:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, ip, port, connType, hash, pNode, pCache->count[hash]); tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
} }
return pData; return pData;
...@@ -239,7 +239,7 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash ...@@ -239,7 +239,7 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pNext = pNode->next; pNext = pNode->next;
pCache->total--; pCache->total--;
pCache->count[hash]--; pCache->count[hash]--;
tTrace("%p ip:0x%x:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, pNode->connType, hash, pNode, tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
pCache->count[hash]); pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext; pNode = pNext;
...@@ -251,12 +251,16 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash ...@@ -251,12 +251,16 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pCache->connHashList[hash] = NULL; pCache->connHashList[hash] = NULL;
} }
static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType) { static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
SConnCache *pCache = (SConnCache *)handle; SConnCache *pCache = (SConnCache *)handle;
int hash = 0; int hash = 0;
char *temp = fqdn;
while (*temp) {
hash += *temp;
++temp;
}
hash = ip >> 16;
hash += (unsigned short)(ip & 0xFFFF);
hash += port; hash += port;
hash += connType; hash += connType;
......
...@@ -44,7 +44,6 @@ typedef struct { ...@@ -44,7 +44,6 @@ typedef struct {
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
int numOfThreads; // number of threads to process incoming messages int numOfThreads; // number of threads to process incoming messages
int idleTime; // milliseconds; int idleTime; // milliseconds;
char localIp[TSDB_IPv4ADDR_LEN];
uint16_t localPort; uint16_t localPort;
int8_t connType; int8_t connType;
int index; // for UDP server only, round robin for multiple threads int index; // for UDP server only, round robin for multiple threads
...@@ -101,9 +100,8 @@ typedef struct SRpcConn { ...@@ -101,9 +100,8 @@ typedef struct SRpcConn {
uint16_t localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t linkUid; // connection unique ID assigned by client uint32_t linkUid; // connection unique ID assigned by client
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint32_t destIp; // server destination IP to handle NAT
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string
uint16_t tranId; // outgoing transcation ID, for build message uint16_t tranId; // outgoing transcation ID, for build message
uint16_t outTranId; // outgoing transcation ID uint16_t outTranId; // outgoing transcation ID
uint16_t inTranId; // transcation ID for incoming msg uint16_t inTranId; // transcation ID for incoming msg
...@@ -140,7 +138,7 @@ int tsRpcOverhead; ...@@ -140,7 +138,7 @@ int tsRpcOverhead;
#define RPC_CONN_TCPC 3 #define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2 #define RPC_CONN_TCP 2
void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpConnection, taosInitUdpConnection,
taosInitUdpConnection, taosInitUdpConnection,
taosInitTcpServer, taosInitTcpServer,
...@@ -161,7 +159,7 @@ int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *cha ...@@ -161,7 +159,7 @@ int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *cha
taosSendTcpData taosSendTcpData
}; };
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = { void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
taosOpenUdpConnection, taosOpenUdpConnection,
taosOpenUdpConnection, taosOpenUdpConnection,
NULL, NULL,
...@@ -175,7 +173,7 @@ void (*taosCloseConn[])(void *chandle) = { ...@@ -175,7 +173,7 @@ void (*taosCloseConn[])(void *chandle) = {
taosCloseTcpConnection taosCloseTcpConnection
}; };
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext); static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext);
static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc);
...@@ -217,7 +215,6 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -217,7 +215,6 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp);
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions; pRpc->sessions = pInit->sessions;
...@@ -229,13 +226,13 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -229,13 +226,13 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label,
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
pRpc->udphandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label,
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
rpcClose(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
...@@ -457,7 +454,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -457,7 +454,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
pInfo->clientIp = pConn->peerIp; pInfo->clientIp = pConn->peerIp;
pInfo->clientPort = pConn->peerPort; pInfo->clientPort = pConn->peerPort;
pInfo->serverIp = pConn->destIp; // pInfo->serverIp = pConn->destIp;
strcpy(pInfo->user, pConn->user); strcpy(pInfo->user, pConn->user);
return 0; return 0;
...@@ -490,27 +487,32 @@ static void rpcFreeMsg(void *msg) { ...@@ -490,27 +487,32 @@ static void rpcFreeMsg(void *msg) {
} }
} }
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) {
SRpcConn *pConn; SRpcConn *pConn;
uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
if (peerIp == -1) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
return NULL;
}
pConn = rpcAllocateClientConn(pRpc); pConn = rpcAllocateClientConn(pRpc);
if (pConn) { if (pConn) {
strcpy(pConn->peerIpstr, peerIpStr); strcpy(pConn->peerFqdn, peerFqdn);
pConn->peerIp = inet_addr(peerIpStr); pConn->peerIp = peerIp;
pConn->peerPort = peerPort; pConn->peerPort = peerPort;
strcpy(pConn->user, pRpc->user); strcpy(pConn->user, pRpc->user);
pConn->connType = connType; pConn->connType = connType;
if (taosOpenConn[connType]) { if (taosOpenConn[connType]) {
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIpstr, pConn->peerPort); pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle) { if (pConn->chandle) {
tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu connType:%d", pRpc->label, tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label,
pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->connType); pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType);
} else { } else {
tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn, tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort);
pConn->peerIpstr, pConn->peerPort);
terrno = TSDB_CODE_NETWORK_UNAVAIL; terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn); rpcCloseConn(pConn);
pConn = NULL; pConn = NULL;
...@@ -661,12 +663,9 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { ...@@ -661,12 +663,9 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcIpSet *pIpSet = &pContext->ipSet; SRpcIpSet *pIpSet = &pContext->ipSet;
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->inUse], pIpSet->port, pContext->connType); pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
if ( pConn == NULL || pConn->user[0] == 0) { if ( pConn == NULL || pConn->user[0] == 0) {
char ipstr[20] = {0}; pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
tinet_ntoa(ipstr, pIpSet->ip[pIpSet->inUse]);
pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType);
if (pConn) pConn->destIp = pIpSet->ip[pIpSet->inUse];
} else { } else {
tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn);
} }
...@@ -789,7 +788,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -789,7 +788,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->peerIp = pRecv->ip; pConn->peerIp = pRecv->ip;
char ipstr[20] = {0}; char ipstr[20] = {0};
tinet_ntoa(ipstr, pRecv->ip); tinet_ntoa(ipstr, pRecv->ip);
strcpy(pConn->peerIpstr, ipstr); strcpy(pConn->peerFqdn, ipstr);
} }
if (pRecv->port) pConn->peerPort = pRecv->port; if (pRecv->port) pConn->peerPort = pRecv->port;
...@@ -922,7 +921,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -922,7 +921,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
pConn->destIp = pHead->destIp;
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->cfp))(&rpcMsg); (*(pRpc->cfp))(&rpcMsg);
} else { } else {
...@@ -932,7 +930,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -932,7 +930,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pConn->pContext = NULL; pConn->pContext = NULL;
// for UDP, port may be changed by server, the port in ipSet shall be used for cache // for UDP, port may be changed by server, the port in ipSet shall be used for cache
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType); rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType);
if (pHead->code == TSDB_CODE_REDIRECT) { if (pHead->code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1; pContext->redirect = 1;
...@@ -1053,7 +1051,6 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1053,7 +1051,6 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->tranId = pConn->tranId; pHead->tranId = pConn->tranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->destIp = pConn->destIp;
pHead->port = 0; pHead->port = 0;
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user));
...@@ -1081,12 +1078,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -1081,12 +1078,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if ( rpcIsReq(pHead->msgType)) { if ( rpcIsReq(pHead->msgType)) {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort,
htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
...@@ -1141,13 +1138,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1141,13 +1138,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
if (pConn->retry < 4) { if (pConn->retry < 4) {
tTrace("%s %p, re-send msg:%s to %s:%hud", pRpc->label, pConn, tTrace("%s %p, re-send msg:%s to %s:%hud", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
// close the connection // close the connection
tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
reportDisc = 1; reportDisc = 1;
} }
} else { } else {
......
...@@ -40,7 +40,7 @@ typedef struct SThreadObj { ...@@ -40,7 +40,7 @@ typedef struct SThreadObj {
SFdObj * pHead; SFdObj * pHead;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t fdReady; pthread_cond_t fdReady;
char ipstr[TSDB_IPv4ADDR_LEN]; uint32_t ip;
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
...@@ -50,7 +50,7 @@ typedef struct SThreadObj { ...@@ -50,7 +50,7 @@ typedef struct SThreadObj {
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
char ip[TSDB_IPv4ADDR_LEN]; uint32_t ip;
uint16_t port; uint16_t port;
char label[12]; char label[12];
int numOfThreads; int numOfThreads;
...@@ -65,12 +65,12 @@ static void taosFreeFdObj(SFdObj *pFdObj); ...@@ -65,12 +65,12 @@ static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj);
static void taosAcceptTcpConnection(void *arg); static void taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj; SServerObj *pServerObj;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
strcpy(pServerObj->ip, ip); pServerObj->ip = ip;
pServerObj->port = port; pServerObj->port = port;
strcpy(pServerObj->label, label); strcpy(pServerObj->label, label);
pServerObj->numOfThreads = numOfThreads; pServerObj->numOfThreads = numOfThreads;
...@@ -138,7 +138,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, ...@@ -138,7 +138,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
free(pServerObj); free(pServerObj);
pServerObj = NULL; pServerObj = NULL;
} else { } else {
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
} }
return (void *)pServerObj; return (void *)pServerObj;
...@@ -222,14 +222,14 @@ static void taosAcceptTcpConnection(void *arg) { ...@@ -222,14 +222,14 @@ static void taosAcceptTcpConnection(void *arg) {
} }
} }
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) { void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
pthread_attr_t thattr; pthread_attr_t thattr;
pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj)); pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
memset(pThreadObj, 0, sizeof(SThreadObj)); memset(pThreadObj, 0, sizeof(SThreadObj));
strcpy(pThreadObj->label, label); strcpy(pThreadObj->label, label);
strcpy(pThreadObj->ipstr, ip); pThreadObj->ip = ip;
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
...@@ -284,21 +284,19 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -284,21 +284,19 @@ void taosCleanUpTcpClient(void *chandle) {
tfree(pThreadObj); tfree(pThreadObj);
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SThreadObj * pThreadObj = shandle; SThreadObj * pThreadObj = shandle;
struct in_addr destIp;
int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ipstr); int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (fd <= 0) return NULL; if (fd <= 0) return NULL;
inet_aton(ip, &destIp);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
if (pFdObj) { if (pFdObj) {
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = destIp.s_addr; pFdObj->ip = ip;
tTrace("%s %p, TCP connection to %s:%hu is created, FD:%p numOfFds:%d", tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds); pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
} else { } else {
close(fd); close(fd);
...@@ -403,7 +401,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -403,7 +401,7 @@ static void *taosProcessTcpData(void *param) {
continue; continue;
} }
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pThreadObj->label, pFdObj->ipstr, pFdObj->port, msgLen); // tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead)); memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg; recvInfo.msg = msg;
......
...@@ -51,7 +51,7 @@ typedef struct { ...@@ -51,7 +51,7 @@ typedef struct {
typedef struct { typedef struct {
int index; int index;
int server; int server;
char ip[16]; // local IP uint32_t ip; // local IP
uint16_t port; // local Port uint16_t port; // local Port
void *shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
int threads; int threads;
...@@ -77,7 +77,7 @@ static void *taosRecvUdpData(void *param); ...@@ -77,7 +77,7 @@ static void *taosRecvUdpData(void *param);
static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
static void taosProcessUdpBufTimer(void *param, void *tmrId); static void taosProcessUdpBufTimer(void *param, void *tmrId);
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConn *pConn; SUdpConn *pConn;
SUdpConnSet *pSet; SUdpConnSet *pSet;
...@@ -89,7 +89,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -89,7 +89,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
} }
memset(pSet, 0, (size_t)size); memset(pSet, 0, (size_t)size);
strcpy(pSet->ip, ip); pSet->ip = ip;
pSet->port = port; pSet->port = port;
pSet->shandle = shandle; pSet->shandle = shandle;
pSet->fp = fp; pSet->fp = fp;
...@@ -111,7 +111,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -111,7 +111,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
ownPort = (port ? port + i : 0); ownPort = (port ? port + i : 0);
pConn->fd = taosOpenUdpSocket(ip, ownPort); pConn->fd = taosOpenUdpSocket(ip, ownPort);
if (pConn->fd < 0) { if (pConn->fd < 0) {
tError("%s failed to open UDP socket %s:%hu", label, ip, port); tError("%s failed to open UDP socket %x:%hu", label, ip, port);
taosCleanUpUdpConnection(pSet); taosCleanUpUdpConnection(pSet);
return NULL; return NULL;
} }
...@@ -157,7 +157,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -157,7 +157,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
++pSet->threads; ++pSet->threads;
} }
tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads); tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads);
return pSet; return pSet;
} }
...@@ -190,7 +190,7 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -190,7 +190,7 @@ void taosCleanUpUdpConnection(void *handle) {
tfree(pSet); tfree(pSet);
} }
void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port) { void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SUdpConnSet *pSet = (SUdpConnSet *)shandle; SUdpConnSet *pSet = (SUdpConnSet *)shandle;
pSet->index = (pSet->index + 1) % pSet->threads; pSet->index = (pSet->index + 1) % pSet->threads;
...@@ -198,7 +198,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por ...@@ -198,7 +198,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por
SUdpConn *pConn = pSet->udpConn + pSet->index; SUdpConn *pConn = pSet->udpConn + pSet->index;
pConn->port = port; pConn->port = port;
tTrace("%s UDP connection is setup, ip: %s:%hu, local: %s:%d", pConn->label, ip, port, pSet->ip, tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip,
ntohs((uint16_t)pConn->localPort)); ntohs((uint16_t)pConn->localPort));
return pConn; return pConn;
......
...@@ -88,13 +88,13 @@ int main(int argc, char *argv[]) { ...@@ -88,13 +88,13 @@ int main(int argc, char *argv[]) {
// server info // server info
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
ipSet.inUse = 0; ipSet.inUse = 0;
ipSet.port = 7000; ipSet.port[0] = 7000;
ipSet.ip[0] = inet_addr(serverIp); ipSet.port[1] = 7000;
ipSet.ip[1] = inet_addr("192.168.0.1"); strcpy(ipSet.fqdn[0], serverIp);
strcpy(ipSet.fqdn[1], "192.168.0.1");
// client info // client info
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
...@@ -110,11 +110,9 @@ int main(int argc, char *argv[]) { ...@@ -110,11 +110,9 @@ int main(int argc, char *argv[]) {
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]); ipSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) { } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]); strcpy(ipSet.fqdn[0], argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
...@@ -138,10 +136,9 @@ int main(int argc, char *argv[]) { ...@@ -138,10 +136,9 @@ int main(int argc, char *argv[]) {
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port); printf(" [-p port]: server port number, default is:%d\n", ipSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads); printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs); printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
......
...@@ -89,13 +89,14 @@ int main(int argc, char *argv[]) { ...@@ -89,13 +89,14 @@ int main(int argc, char *argv[]) {
// server info // server info
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
ipSet.inUse = 0; ipSet.inUse = 0;
ipSet.port = 7000; ipSet.port[0] = 7000;
ipSet.ip[0] = inet_addr(serverIp); ipSet.port[1] = 7000;
ipSet.ip[1] = inet_addr("192.168.0.1"); strcpy(ipSet.fqdn[0], serverIp);
strcpy(ipSet.fqdn[1], "192.168.0.1");
// client info // client info
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = "0.0.0.0"; //rpcInit.localIp = "0.0.0.0";
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "APP"; rpcInit.label = "APP";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
...@@ -111,11 +112,9 @@ int main(int argc, char *argv[]) { ...@@ -111,11 +112,9 @@ int main(int argc, char *argv[]) {
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
ipSet.port = atoi(argv[++i]); ipSet.port[0] = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) { } else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
ipSet.ip[0] = inet_addr(argv[++i]); strcpy(ipSet.fqdn[0], argv[++i]);
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
...@@ -139,10 +138,9 @@ int main(int argc, char *argv[]) { ...@@ -139,10 +138,9 @@ int main(int argc, char *argv[]) {
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
printf(" [-p port]: server port number, default is:%d\n", ipSet.port); printf(" [-p port]: server port number, default is:%d\n", ipSet.port[0]);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads); printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs); printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
......
...@@ -126,10 +126,8 @@ void processRequestMsg(SRpcMsg *pMsg) { ...@@ -126,10 +126,8 @@ void processRequestMsg(SRpcMsg *pMsg) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
SRpcInit rpcInit; SRpcInit rpcInit;
char dataName[20] = "server.data"; char dataName[20] = "server.data";
char localIp[40] = "0.0.0.0";
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = localIp;
rpcInit.localPort = 7000; rpcInit.localPort = 7000;
rpcInit.label = "SER"; rpcInit.label = "SER";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
...@@ -141,8 +139,6 @@ int main(int argc, char *argv[]) { ...@@ -141,8 +139,6 @@ int main(int argc, char *argv[]) {
for (int i=1; i<argc; ++i) { for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-p")==0 && i < argc-1) { if (strcmp(argv[i], "-p")==0 && i < argc-1) {
rpcInit.localPort = atoi(argv[++i]); rpcInit.localPort = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
strcpy(rpcInit.localIp, argv[++i]);
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) { } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
rpcInit.numOfThreads = atoi(argv[++i]); rpcInit.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) { } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
...@@ -159,7 +155,6 @@ int main(int argc, char *argv[]) { ...@@ -159,7 +155,6 @@ int main(int argc, char *argv[]) {
uDebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag;
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort); printf(" [-p port]: server port number, default is:%d\n", rpcInit.localPort);
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions); printf(" [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
......
...@@ -20,44 +20,27 @@ ...@@ -20,44 +20,27 @@
extern "C" { extern "C" {
#endif #endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <wchar.h>
int taosNonblockwrite(int fd, char *ptr, int nbytes);
int taosReadn(int sock, char *buffer, int len); int taosReadn(int sock, char *buffer, int len);
int taosWriteMsg(int fd, void *ptr, int nbytes); int taosWriteMsg(int fd, void *ptr, int nbytes);
int taosReadMsg(int fd, void *ptr, int nbytes); int taosReadMsg(int fd, void *ptr, int nbytes);
int taosNonblockwrite(int fd, char *ptr, int nbytes);
int taosOpenUdpSocket(char *ip, uint16_t port);
int taosOpenTcpClientSocket(char *ip, uint16_t port, char *localIp);
int taosOpenTcpServerSocket(char *ip, uint16_t port);
int taosKeepTcpAlive(int sockFd);
void taosCloseTcpSocket(int sockFd);
int taosOpenUDServerSocket(char *ip, uint16_t port);
int taosOpenUDClientSocket(char *ip, uint16_t port);
int taosOpenRawSocket(char *ip);
int taosCopyFds(int sfd, int dfd, int64_t len); int taosCopyFds(int sfd, int dfd, int64_t len);
int taosSetNonblocking(int sock, int on);
int taosGetPublicIp(char *const ip); int taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
int taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
int taosGetPrivateIp(char *const ip); int taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int taosKeepTcpAlive(int sockFd);
void taosCloseTcpSocket(int sockFd);
void tinet_ntoa(char *ipstr, unsigned int ip); int taosOpenUDServerSocket(uint32_t ip, uint16_t port);
int taosOpenUDClientSocket(uint32_t ip, uint16_t port);
int taosOpenRawSocket(uint32_t ip);
int taosSetNonblocking(int sock, int on); int taosGetFqdn(char *);
uint32_t taosGetIpFromFqdn(const char *);
void tinet_ntoa(char *ipstr, unsigned int ip);
uint32_t ip2uint(const char *const ip_addr);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -19,89 +19,23 @@ ...@@ -19,89 +19,23 @@
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
/* int taosGetFqdn(char *fqdn) {
* Function to get the public ip address of current machine. If get IP char hostname[1024];
* successfully, return 0, else, return -1. The return values is ip. hostname[1023] = '\0';
* gethostname(hostname, 1023);
* Use:
* if (taosGetPublicIp(ip) != 0) { struct hostent* h;
* perror("Fail to get public IP address\n"); h = gethostbyname(hostname);
* exit(EXIT_FAILURE); strcpy(fqdn, h->h_name);
* }
*/
int taosGetPublicIp(char *const ip) {
/* bool flag; */
int flag;
int sock;
char ** pptr = NULL;
struct sockaddr_in destAddr;
struct hostent * ptr = NULL;
char destIP[128];
char szBuffer[] = {
"GET / HTTP/1.1\nHost: ident.me\nUser-Agent: curl/7.47.0\nAccept: "
"*/*\n\n"};
char res[1024];
// Create socket
sock = (int)socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
return -1;
}
bzero((void *)&destAddr, sizeof(destAddr));
destAddr.sin_family = AF_INET;
destAddr.sin_port = htons(80);
ptr = gethostbyname("ident.me");
if (ptr == NULL) {
return -1;
}
// Loop to find a valid IP address
for (flag = 0, pptr = ptr->h_addr_list; NULL != *pptr; ++pptr) {
inet_ntop(ptr->h_addrtype, *pptr, destIP, sizeof(destIP));
destAddr.sin_addr.s_addr = inet_addr(destIP);
if (connect(sock, (struct sockaddr *)&destAddr, sizeof(struct sockaddr)) != -1) {
flag = 1;
break;
}
}
// Check if the host is available.
if (flag == 0) {
return -1;
}
// Check send.
if (strlen(szBuffer) != taosWriteSocket(sock, szBuffer, (size_t)strlen(szBuffer))) {
return -1;
}
// Receive response.
if (taosReadSocket(sock, res, 1024) == -1) {
return -1;
}
// Extract the IP address from the response.
int c_start = 0, c_end = 0;
for (; c_start < (int)strlen(res); c_start = c_end + 1) {
for (c_end = c_start; c_end < (int)strlen(res) && res[c_end] != '\n'; c_end++) {
}
if (c_end >= (int)strlen(res)) {
return -1;
}
if (res[c_start] >= '0' && res[c_start] <= '9') {
strncpy(ip, res + c_start, (size_t)(c_end - c_start));
ip[c_end - c_start] = '\0';
break;
}
}
return 0; return 0;
} }
uint32_t taosGetIpFromFqdn(const char *fqdn) {
struct hostent * record = gethostbyname(fqdn);
if(record == NULL) return -1;
return ((struct in_addr *)record->h_addr)->s_addr;
}
// Function converting an IP address string to an unsigned int. // Function converting an IP address string to an unsigned int.
uint32_t ip2uint(const char *const ip_addr) { uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20]; char ip_addr_cpy[20];
...@@ -259,7 +193,7 @@ int taosReadn(int fd, char *ptr, int nbytes) { ...@@ -259,7 +193,7 @@ int taosReadn(int fd, char *ptr, int nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int taosOpenUdpSocket(char *ip, uint16_t port) { int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr; struct sockaddr_in localAddr;
int sockFd; int sockFd;
int ttl = 128; int ttl = 128;
...@@ -270,7 +204,7 @@ int taosOpenUdpSocket(char *ip, uint16_t port) { ...@@ -270,7 +204,7 @@ int taosOpenUdpSocket(char *ip, uint16_t port) {
memset((char *)&localAddr, 0, sizeof(localAddr)); memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET; localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = inet_addr(ip); localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port); localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) < 0) { if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
...@@ -325,13 +259,11 @@ int taosOpenUdpSocket(char *ip, uint16_t port) { ...@@ -325,13 +259,11 @@ int taosOpenUdpSocket(char *ip, uint16_t port) {
return sockFd; return sockFd;
} }
int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
int sockFd = 0; int sockFd = 0;
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int ret; int ret;
// uTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp);
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd < 0) { if (sockFd < 0) {
...@@ -339,16 +271,16 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -339,16 +271,16 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
return -1; return -1;
} }
if (clientIp && clientIp[0] && clientIp[0] != '0') { if ( clientIp != 0) {
memset((char *)&clientAddr, 0, sizeof(clientAddr)); memset((char *)&clientAddr, 0, sizeof(clientAddr));
clientAddr.sin_family = AF_INET; clientAddr.sin_family = AF_INET;
clientAddr.sin_addr.s_addr = inet_addr(clientIp); clientAddr.sin_addr.s_addr = clientIp;
clientAddr.sin_port = 0; clientAddr.sin_port = 0;
/* bind socket to client address */ /* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
uError("bind tcp client socket failed, client(%s:0), dest(%s:%d), reason:%d(%s)", uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)",
clientIp, destIp, destPort, errno, strerror(errno)); clientIp, destIp, destPort, strerror(errno));
close(sockFd); close(sockFd);
return -1; return -1;
} }
...@@ -356,13 +288,13 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -356,13 +288,13 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
memset((char *)&serverAddr, 0, sizeof(serverAddr)); memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET; serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = inet_addr(destIp); serverAddr.sin_addr.s_addr = destIp;
serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort); serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) { if (ret != 0) {
//uError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); //uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
sockFd = -1; sockFd = -1;
} }
...@@ -420,7 +352,7 @@ int taosKeepTcpAlive(int sockFd) { ...@@ -420,7 +352,7 @@ int taosKeepTcpAlive(int sockFd) {
return 0; return 0;
} }
int taosOpenTcpServerSocket(char *ip, uint16_t port) { int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd; struct sockaddr_in serverAdd;
int sockFd; int sockFd;
int reuse; int reuse;
...@@ -429,7 +361,7 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) { ...@@ -429,7 +361,7 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) {
bzero((char *)&serverAdd, sizeof(serverAdd)); bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET; serverAdd.sin_family = AF_INET;
serverAdd.sin_addr.s_addr = inet_addr(ip); serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port); serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
...@@ -447,7 +379,7 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) { ...@@ -447,7 +379,7 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) {
/* bind socket to server address */ /* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
uError("bind tcp server socket failed, %s:%hu, reason:%d(%s)", ip, port, errno, strerror(errno)); uError("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
close(sockFd); close(sockFd);
return -1; return -1;
} }
...@@ -455,14 +387,14 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) { ...@@ -455,14 +387,14 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) {
if (taosKeepTcpAlive(sockFd) < 0) return -1; if (taosKeepTcpAlive(sockFd) < 0) return -1;
if (listen(sockFd, 10) < 0) { if (listen(sockFd, 10) < 0) {
uError("listen tcp server socket failed, %s:%hu, reason:%d(%s)", ip, port, errno, strerror(errno)); uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
return -1; return -1;
} }
return sockFd; return sockFd;
} }
int taosOpenRawSocket(char *ip) { int taosOpenRawSocket(uint32_t ip) {
int fd, hold; int fd, hold;
struct sockaddr_in rawAdd; struct sockaddr_in rawAdd;
...@@ -483,10 +415,10 @@ int taosOpenRawSocket(char *ip) { ...@@ -483,10 +415,10 @@ int taosOpenRawSocket(char *ip) {
bzero((char *)&rawAdd, sizeof(rawAdd)); bzero((char *)&rawAdd, sizeof(rawAdd));
rawAdd.sin_family = AF_INET; rawAdd.sin_family = AF_INET;
rawAdd.sin_addr.s_addr = inet_addr(ip); rawAdd.sin_addr.s_addr = ip;
if (bind(fd, (struct sockaddr *)&rawAdd, sizeof(rawAdd)) < 0) { if (bind(fd, (struct sockaddr *)&rawAdd, sizeof(rawAdd)) < 0) {
uError("failed to bind RAW socket: %d (%s)", errno, strerror(errno)); uError("failed to bind RAW socket:(%s)", strerror(errno));
close(fd); close(fd);
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册