From 4e3f4f8c868a64c3603e271e5eb46f36bc42be75 Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Mon, 27 Apr 2020 01:34:16 +0000 Subject: [PATCH] some changes --- src/inc/taosdef.h | 3 +- src/inc/taosmsg.h | 2 +- src/inc/trpc.h | 5 +- src/inc/tsync.h | 7 ++- src/rpc/inc/rpcCache.h | 4 +- src/rpc/inc/rpcTcp.h | 6 +- src/rpc/inc/rpcUdp.h | 4 +- src/rpc/src/rpcCache.c | 32 +++++----- src/rpc/src/rpcMain.c | 61 +++++++++---------- src/rpc/src/rpcTcp.c | 26 ++++---- src/rpc/src/rpcUdp.c | 14 ++--- src/rpc/test/rclient.c | 17 +++--- src/rpc/test/rsclient.c | 18 +++--- src/rpc/test/rserver.c | 5 -- src/util/inc/tsocket.h | 45 +++++--------- src/util/src/tsocket.c | 128 ++++++++++------------------------------ 16 files changed, 140 insertions(+), 237 deletions(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index b46986d750..7ce968553b 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -195,11 +195,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_CITY_LEN 20 #define TSDB_STATE_LEN 20 #define TSDB_COUNTRY_LEN 20 -#define TSDB_VNODES_SUPPORT 6 -#define TSDB_MGMT_SUPPORT 4 #define TSDB_LOCALE_LEN 64 #define TSDB_TIMEZONE_LEN 64 +#define TSDB_FQDN_LEN 64 #define TSDB_IPv4ADDR_LEN 16 #define TSDB_FILENAME_LEN 128 #define TSDB_METER_VNODE_BITS 20 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d821f3117b..ed59773238 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -187,7 +187,7 @@ extern char *taosMsg[]; #pragma pack(push, 1) typedef struct { - uint32_t ip; + char fqdn[TSDB_FQDN_LEN]; uint16_t port; } SIpAddr; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 5845823b38..1bee184812 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -31,8 +31,8 @@ extern int tsRpcHeadSize; typedef struct { int8_t inUse; int8_t numOfIps; - uint16_t port; - uint32_t ip[TSDB_MAX_MPEERS]; + uint16_t port[TSDB_MAX_MPEERS]; + char fqdn[TSDB_MAX_MPEERS][TSDB_FQDN_LEN]; } SRpcIpSet; typedef struct { @@ -51,7 +51,6 @@ typedef struct { } SRpcMsg; typedef struct { - char *localIp; // local IP used uint16_t localPort; // local port char *label; // for debug purpose int numOfThreads; // number of threads to handle connections diff --git a/src/inc/tsync.h b/src/inc/tsync.h index c7a05c0a2d..4b766664f3 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -38,14 +38,15 @@ typedef enum _TAOS_SYNC_STATUS { typedef struct { uint32_t nodeId; // node ID assigned by TDengine - uint32_t nodeIp; // node IP address - char name[TSDB_FILENAME_LEN]; // external node name + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; typedef struct { - uint32_t arbitratorIp; // arbitrator IP address int8_t quorum; // number of confirms required, >=1 int8_t replica; // number of replications, >=1 + uint16_t arbitratorPort; // arbitrator port + char arbitratorFqdn[TSDB_FQDN_LEN]; // arbitrator IP address SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA]; } SSyncCfg; diff --git a/src/rpc/inc/rpcCache.h b/src/rpc/inc/rpcCache.h index 2a386c066e..3a996aab7c 100644 --- a/src/rpc/inc/rpcCache.h +++ b/src/rpc/inc/rpcCache.h @@ -22,8 +22,8 @@ extern "C" { void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); void rpcCloseConnCache(void *handle); -void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType); -void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType); +void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType); +void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcTcp.h b/src/rpc/inc/rpcTcp.h index 16972dbc7e..40fab00056 100644 --- a/src/rpc/inc/rpcTcp.h +++ b/src/rpc/inc/rpcTcp.h @@ -20,12 +20,12 @@ extern "C" { #endif -void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); +void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void taosCleanUpTcpServer(void *param); -void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle); +void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); void taosCleanUpTcpClient(void *chandle); -void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port); +void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); void taosCloseTcpConnection(void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); diff --git a/src/rpc/inc/rpcUdp.h b/src/rpc/inc/rpcUdp.h index a84f7c4a49..fd60f4a089 100644 --- a/src/rpc/inc/rpcUdp.h +++ b/src/rpc/inc/rpcUdp.h @@ -22,10 +22,10 @@ extern "C" { #include "taosdef.h" -void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int, void *fp, void *shandle); +void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int, void *fp, void *shandle); void taosCleanUpUdpConnection(void *handle); int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle); -void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port); +void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); void taosFreeMsgHdr(void *hdr); int taosMsgHdrSize(void *hdr); diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index 48d68c7511..ab9e679acf 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -23,7 +23,7 @@ #include "rpcCache.h" typedef struct SConnHash { - uint32_t ip; + char fqdn[TSDB_FQDN_LEN]; uint16_t port; char connType; struct SConnHash *prev; @@ -46,7 +46,7 @@ typedef struct { int64_t *lockedBy; } SConnCache; -static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType); +static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType); static void rpcLockCache(int64_t *lockedBy); static void rpcUnlockCache(int64_t *lockedBy); static void rpcCleanConnCache(void *handle, void *tmrId); @@ -114,7 +114,7 @@ void rpcCloseConnCache(void *handle) { free(pCache); } -void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, int8_t connType) { +void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, int8_t connType) { int hash; SConnHash * pNode; SConnCache *pCache; @@ -125,9 +125,9 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, i assert(pCache); assert(data); - hash = rpcHashConn(pCache, ip, port, connType); + hash = rpcHashConn(pCache, fqdn, port, connType); pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool); - pNode->ip = ip; + strcpy(pNode->fqdn, fqdn); pNode->port = port; pNode->connType = connType; pNode->data = data; @@ -147,12 +147,12 @@ void rpcAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, i pCache->total++; - tTrace("%p ip:0x%x:%hu:%d:%d:%p added into cache, connections:%d", data, ip, port, connType, hash, pNode, pCache->count[hash]); + tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); return; } -void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connType) { +void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connType) { int hash; SConnHash * pNode; SConnCache *pCache; @@ -163,7 +163,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT uint64_t time = taosGetTimestampMs(); - hash = rpcHashConn(pCache, ip, port, connType); + hash = rpcHashConn(pCache, fqdn, port, connType); rpcLockCache(pCache->lockedBy+hash); pNode = pCache->connHashList[hash]; @@ -174,7 +174,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT break; } - if (pNode->ip == ip && pNode->port == port && pNode->connType == connType) break; + if (strcmp(pNode->fqdn, fqdn) == 0 && pNode->port == port && pNode->connType == connType) break; pNode = pNode->next; } @@ -201,7 +201,7 @@ void *rpcGetConnFromCache(void *handle, uint32_t ip, uint16_t port, int8_t connT rpcUnlockCache(pCache->lockedBy+hash); if (pData) { - tTrace("%p ip:0x%x:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, ip, port, connType, hash, pNode, pCache->count[hash]); + tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); } return pData; @@ -239,7 +239,7 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash pNext = pNode->next; pCache->total--; pCache->count[hash]--; - tTrace("%p ip:0x%x:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->ip, pNode->port, pNode->connType, hash, pNode, + tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; @@ -251,12 +251,16 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash pCache->connHashList[hash] = NULL; } -static int rpcHashConn(void *handle, uint32_t ip, uint16_t port, int8_t connType) { +static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) { SConnCache *pCache = (SConnCache *)handle; int hash = 0; + char *temp = fqdn; + + while (*temp) { + hash += *temp; + ++temp; + } - hash = ip >> 16; - hash += (unsigned short)(ip & 0xFFFF); hash += port; hash += connType; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index dede169349..f616cd00b7 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -44,7 +44,6 @@ typedef struct { int sessions; // number of sessions allowed int numOfThreads; // number of threads to process incoming messages int idleTime; // milliseconds; - char localIp[TSDB_IPv4ADDR_LEN]; uint16_t localPort; int8_t connType; int index; // for UDP server only, round robin for multiple threads @@ -101,9 +100,8 @@ typedef struct SRpcConn { uint16_t localPort; // for UDP only uint32_t linkUid; // connection unique ID assigned by client uint32_t peerIp; // peer IP - uint32_t destIp; // server destination IP to handle NAT uint16_t peerPort; // peer port - char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string + char peerFqdn[TSDB_FQDN_LEN]; // peer FQDN or ip string uint16_t tranId; // outgoing transcation ID, for build message uint16_t outTranId; // outgoing transcation ID uint16_t inTranId; // transcation ID for incoming msg @@ -140,7 +138,7 @@ int tsRpcOverhead; #define RPC_CONN_TCPC 3 #define RPC_CONN_TCP 2 -void *(*taosInitConn[])(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { +void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, @@ -161,7 +159,7 @@ int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *cha taosSendTcpData }; -void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = { +void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = { taosOpenUdpConnection, taosOpenUdpConnection, NULL, @@ -175,7 +173,7 @@ void (*taosCloseConn[])(void *chandle) = { taosCloseTcpConnection }; -static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType); +static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType); static void rpcCloseConn(void *thandle); static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext); static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc); @@ -217,7 +215,6 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; - if (pInit->localIp) strcpy(pRpc->localIp, pInit->localIp); pRpc->localPort = pInit->localPort; pRpc->afp = pInit->afp; pRpc->sessions = pInit->sessions; @@ -229,13 +226,13 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; - pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(pRpc->localIp, pRpc->localPort, pRpc->label, + pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); - pRpc->udphandle = (*taosInitConn[pRpc->connType])(pRpc->localIp, pRpc->localPort, pRpc->label, + pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { - tError("%s failed to init network, %s:%d", pRpc->label, pRpc->localIp, pRpc->localPort); + tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); rpcClose(pRpc); return NULL; } @@ -457,7 +454,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { pInfo->clientIp = pConn->peerIp; pInfo->clientPort = pConn->peerPort; - pInfo->serverIp = pConn->destIp; + // pInfo->serverIp = pConn->destIp; strcpy(pInfo->user, pConn->user); return 0; @@ -490,27 +487,32 @@ static void rpcFreeMsg(void *msg) { } } -static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) { +static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType) { SRpcConn *pConn; + uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); + if (peerIp == -1) { + tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); + return NULL; + } + pConn = rpcAllocateClientConn(pRpc); if (pConn) { - strcpy(pConn->peerIpstr, peerIpStr); - pConn->peerIp = inet_addr(peerIpStr); + strcpy(pConn->peerFqdn, peerFqdn); + pConn->peerIp = peerIp; pConn->peerPort = peerPort; strcpy(pConn->user, pRpc->user); pConn->connType = connType; if (taosOpenConn[connType]) { void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; - pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIpstr, pConn->peerPort); + pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); if (pConn->chandle) { - tTrace("%s %p, rpc connection is set up, sid:%d id:%s ip:%s:%hu connType:%d", pRpc->label, - pConn, pConn->sid, pRpc->user, pConn->peerIpstr, pConn->peerPort, pConn->connType); + tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label, + pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType); } else { - tError("%s %p, failed to set up connection to ip:%s:%hu", pRpc->label, pConn, - pConn->peerIpstr, pConn->peerPort); + tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort); terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; @@ -661,12 +663,9 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SRpcInfo *pRpc = pContext->pRpc; SRpcIpSet *pIpSet = &pContext->ipSet; - pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->ip[pIpSet->inUse], pIpSet->port, pContext->connType); + pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { - char ipstr[20] = {0}; - tinet_ntoa(ipstr, pIpSet->ip[pIpSet->inUse]); - pConn = rpcOpenConn(pRpc, ipstr, pIpSet->port, pContext->connType); - if (pConn) pConn->destIp = pIpSet->ip[pIpSet->inUse]; + pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); } else { tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); } @@ -789,7 +788,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->peerIp = pRecv->ip; char ipstr[20] = {0}; tinet_ntoa(ipstr, pRecv->ip); - strcpy(pConn->peerIpstr, ipstr); + strcpy(pConn->peerFqdn, ipstr); } if (pRecv->port) pConn->peerPort = pRecv->port; @@ -922,7 +921,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; - pConn->destIp = pHead->destIp; taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); (*(pRpc->cfp))(&rpcMsg); } else { @@ -932,7 +930,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pConn->pContext = NULL; // for UDP, port may be changed by server, the port in ipSet shall be used for cache - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType); + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->ipSet.port[pContext->ipSet.inUse], pConn->connType); if (pHead->code == TSDB_CODE_REDIRECT) { pContext->redirect = 1; @@ -1053,7 +1051,6 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->tranId = pConn->tranId; pHead->sourceId = pConn->ownId; pHead->destId = pConn->peerId; - pHead->destIp = pConn->destIp; pHead->port = 0; pHead->linkUid = pConn->linkUid; if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); @@ -1081,12 +1078,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if ( rpcIsReq(pHead->msgType)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, + pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, + pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } @@ -1141,13 +1138,13 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { if (pConn->retry < 4) { tTrace("%s %p, re-send msg:%s to %s:%hud", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); + taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort); + taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); reportDisc = 1; } } else { diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index bac2ae879a..1260a34512 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -40,7 +40,7 @@ typedef struct SThreadObj { SFdObj * pHead; pthread_mutex_t mutex; pthread_cond_t fdReady; - char ipstr[TSDB_IPv4ADDR_LEN]; + uint32_t ip; int pollFd; int numOfFds; int threadId; @@ -50,7 +50,7 @@ typedef struct SThreadObj { } SThreadObj; typedef struct { - char ip[TSDB_IPv4ADDR_LEN]; + uint32_t ip; uint16_t port; char label[12]; int numOfThreads; @@ -65,12 +65,12 @@ static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); static void taosAcceptTcpConnection(void *arg); -void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { +void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { SServerObj *pServerObj; SThreadObj *pThreadObj; pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); - strcpy(pServerObj->ip, ip); + pServerObj->ip = ip; pServerObj->port = port; strcpy(pServerObj->label, label); pServerObj->numOfThreads = numOfThreads; @@ -138,7 +138,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, free(pServerObj); pServerObj = NULL; } else { - tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads); + tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); } return (void *)pServerObj; @@ -222,14 +222,14 @@ static void taosAcceptTcpConnection(void *arg) { } } -void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) { +void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) { SThreadObj *pThreadObj; pthread_attr_t thattr; pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj)); memset(pThreadObj, 0, sizeof(SThreadObj)); strcpy(pThreadObj->label, label); - strcpy(pThreadObj->ipstr, ip); + pThreadObj->ip = ip; pThreadObj->shandle = shandle; if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { @@ -284,21 +284,19 @@ void taosCleanUpTcpClient(void *chandle) { tfree(pThreadObj); } -void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) { +void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { SThreadObj * pThreadObj = shandle; - struct in_addr destIp; - int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ipstr); + int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); if (fd <= 0) return NULL; - inet_aton(ip, &destIp); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); if (pFdObj) { pFdObj->thandle = thandle; pFdObj->port = port; - pFdObj->ip = destIp.s_addr; - tTrace("%s %p, TCP connection to %s:%hu is created, FD:%p numOfFds:%d", + pFdObj->ip = ip; + tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds); } else { close(fd); @@ -403,7 +401,7 @@ static void *taosProcessTcpData(void *param) { continue; } - // tTrace("%s TCP data is received, ip:%s:%u len:%d", pThreadObj->label, pFdObj->ipstr, pFdObj->port, msgLen); + // tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen); memcpy(msg, &rpcHead, sizeof(SRpcHead)); recvInfo.msg = msg; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 25668fcb14..4a5fc7ba27 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -51,7 +51,7 @@ typedef struct { typedef struct { int index; int server; - char ip[16]; // local IP + uint32_t ip; // local IP uint16_t port; // local Port void *shandle; // handle passed by upper layer during server initialization int threads; @@ -77,7 +77,7 @@ static void *taosRecvUdpData(void *param); static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); static void taosProcessUdpBufTimer(void *param, void *tmrId); -void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { +void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { SUdpConn *pConn; SUdpConnSet *pSet; @@ -89,7 +89,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v } memset(pSet, 0, (size_t)size); - strcpy(pSet->ip, ip); + pSet->ip = ip; pSet->port = port; pSet->shandle = shandle; pSet->fp = fp; @@ -111,7 +111,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ownPort = (port ? port + i : 0); pConn->fd = taosOpenUdpSocket(ip, ownPort); if (pConn->fd < 0) { - tError("%s failed to open UDP socket %s:%hu", label, ip, port); + tError("%s failed to open UDP socket %x:%hu", label, ip, port); taosCleanUpUdpConnection(pSet); return NULL; } @@ -157,7 +157,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ++pSet->threads; } - tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads); + tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads); return pSet; } @@ -190,7 +190,7 @@ void taosCleanUpUdpConnection(void *handle) { tfree(pSet); } -void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t port) { +void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { SUdpConnSet *pSet = (SUdpConnSet *)shandle; pSet->index = (pSet->index + 1) % pSet->threads; @@ -198,7 +198,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por SUdpConn *pConn = pSet->udpConn + pSet->index; pConn->port = port; - tTrace("%s UDP connection is setup, ip: %s:%hu, local: %s:%d", pConn->label, ip, port, pSet->ip, + tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip, ntohs((uint16_t)pConn->localPort)); return pConn; diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 8fb549df8f..2aa1f0e4e9 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -88,13 +88,13 @@ int main(int argc, char *argv[]) { // server info ipSet.numOfIps = 1; ipSet.inUse = 0; - ipSet.port = 7000; - ipSet.ip[0] = inet_addr(serverIp); - ipSet.ip[1] = inet_addr("192.168.0.1"); + ipSet.port[0] = 7000; + ipSet.port[1] = 7000; + strcpy(ipSet.fqdn[0], serverIp); + strcpy(ipSet.fqdn[1], "192.168.0.1"); // client info memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = "0.0.0.0"; rpcInit.localPort = 0; rpcInit.label = "APP"; rpcInit.numOfThreads = 1; @@ -110,11 +110,9 @@ int main(int argc, char *argv[]) { for (int i=1; i -#include -#include -#include - -int taosNonblockwrite(int fd, char *ptr, int nbytes); - int taosReadn(int sock, char *buffer, int len); - int taosWriteMsg(int fd, void *ptr, int nbytes); - int taosReadMsg(int fd, void *ptr, int nbytes); - -int taosOpenUdpSocket(char *ip, uint16_t port); - -int taosOpenTcpClientSocket(char *ip, uint16_t port, char *localIp); - -int taosOpenTcpServerSocket(char *ip, uint16_t port); - -int taosKeepTcpAlive(int sockFd); - -void taosCloseTcpSocket(int sockFd); - -int taosOpenUDServerSocket(char *ip, uint16_t port); - -int taosOpenUDClientSocket(char *ip, uint16_t port); - -int taosOpenRawSocket(char *ip); - +int taosNonblockwrite(int fd, char *ptr, int nbytes); int taosCopyFds(int sfd, int dfd, int64_t len); +int taosSetNonblocking(int sock, int on); -int taosGetPublicIp(char *const ip); - -int taosGetPrivateIp(char *const ip); +int taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); +int taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); +int taosOpenTcpServerSocket(uint32_t ip, uint16_t port); +int taosKeepTcpAlive(int sockFd); +void taosCloseTcpSocket(int sockFd); -void tinet_ntoa(char *ipstr, unsigned int ip); +int taosOpenUDServerSocket(uint32_t ip, uint16_t port); +int taosOpenUDClientSocket(uint32_t ip, uint16_t port); +int taosOpenRawSocket(uint32_t ip); -int taosSetNonblocking(int sock, int on); +int taosGetFqdn(char *); +uint32_t taosGetIpFromFqdn(const char *); +void tinet_ntoa(char *ipstr, unsigned int ip); +uint32_t ip2uint(const char *const ip_addr); #ifdef __cplusplus } diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 4d3687e50d..9fe306921d 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -19,89 +19,23 @@ #include "tsocket.h" #include "tutil.h" -/* - * Function to get the public ip address of current machine. If get IP - * successfully, return 0, else, return -1. The return values is ip. - * - * Use: - * if (taosGetPublicIp(ip) != 0) { - * perror("Fail to get public IP address\n"); - * exit(EXIT_FAILURE); - * } - */ -int taosGetPublicIp(char *const ip) { - /* bool flag; */ - int flag; - int sock; - char ** pptr = NULL; - struct sockaddr_in destAddr; - struct hostent * ptr = NULL; - char destIP[128]; - char szBuffer[] = { - "GET / HTTP/1.1\nHost: ident.me\nUser-Agent: curl/7.47.0\nAccept: " - "*/*\n\n"}; - char res[1024]; - - // Create socket - sock = (int)socket(AF_INET, SOCK_STREAM, 0); - if (sock == -1) { - return -1; - } - - bzero((void *)&destAddr, sizeof(destAddr)); - destAddr.sin_family = AF_INET; - destAddr.sin_port = htons(80); - - ptr = gethostbyname("ident.me"); - if (ptr == NULL) { - return -1; - } - - // Loop to find a valid IP address - for (flag = 0, pptr = ptr->h_addr_list; NULL != *pptr; ++pptr) { - inet_ntop(ptr->h_addrtype, *pptr, destIP, sizeof(destIP)); - destAddr.sin_addr.s_addr = inet_addr(destIP); - if (connect(sock, (struct sockaddr *)&destAddr, sizeof(struct sockaddr)) != -1) { - flag = 1; - break; - } - } - - // Check if the host is available. - if (flag == 0) { - return -1; - } - - // Check send. - if (strlen(szBuffer) != taosWriteSocket(sock, szBuffer, (size_t)strlen(szBuffer))) { - return -1; - } - - // Receive response. - if (taosReadSocket(sock, res, 1024) == -1) { - return -1; - } - - // Extract the IP address from the response. - int c_start = 0, c_end = 0; - for (; c_start < (int)strlen(res); c_start = c_end + 1) { - for (c_end = c_start; c_end < (int)strlen(res) && res[c_end] != '\n'; c_end++) { - } - - if (c_end >= (int)strlen(res)) { - return -1; - } - - if (res[c_start] >= '0' && res[c_start] <= '9') { - strncpy(ip, res + c_start, (size_t)(c_end - c_start)); - ip[c_end - c_start] = '\0'; - break; - } - } - +int taosGetFqdn(char *fqdn) { + char hostname[1024]; + hostname[1023] = '\0'; + gethostname(hostname, 1023); + + struct hostent* h; + h = gethostbyname(hostname); + strcpy(fqdn, h->h_name); return 0; } +uint32_t taosGetIpFromFqdn(const char *fqdn) { + struct hostent * record = gethostbyname(fqdn); + if(record == NULL) return -1; + return ((struct in_addr *)record->h_addr)->s_addr; +} + // Function converting an IP address string to an unsigned int. uint32_t ip2uint(const char *const ip_addr) { char ip_addr_cpy[20]; @@ -259,7 +193,7 @@ int taosReadn(int fd, char *ptr, int nbytes) { return (nbytes - nleft); } -int taosOpenUdpSocket(char *ip, uint16_t port) { +int taosOpenUdpSocket(uint32_t ip, uint16_t port) { struct sockaddr_in localAddr; int sockFd; int ttl = 128; @@ -270,7 +204,7 @@ int taosOpenUdpSocket(char *ip, uint16_t port) { memset((char *)&localAddr, 0, sizeof(localAddr)); localAddr.sin_family = AF_INET; - localAddr.sin_addr.s_addr = inet_addr(ip); + localAddr.sin_addr.s_addr = ip; localAddr.sin_port = (uint16_t)htons(port); if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) < 0) { @@ -325,13 +259,11 @@ int taosOpenUdpSocket(char *ip, uint16_t port) { return sockFd; } -int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { +int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { int sockFd = 0; struct sockaddr_in serverAddr, clientAddr; int ret; - // uTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp); - sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if (sockFd < 0) { @@ -339,16 +271,16 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { return -1; } - if (clientIp && clientIp[0] && clientIp[0] != '0') { + if ( clientIp != 0) { memset((char *)&clientAddr, 0, sizeof(clientAddr)); clientAddr.sin_family = AF_INET; - clientAddr.sin_addr.s_addr = inet_addr(clientIp); + clientAddr.sin_addr.s_addr = clientIp; clientAddr.sin_port = 0; /* bind socket to client address */ if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { - uError("bind tcp client socket failed, client(%s:0), dest(%s:%d), reason:%d(%s)", - clientIp, destIp, destPort, errno, strerror(errno)); + uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", + clientIp, destIp, destPort, strerror(errno)); close(sockFd); return -1; } @@ -356,13 +288,13 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { memset((char *)&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; - serverAddr.sin_addr.s_addr = inet_addr(destIp); + serverAddr.sin_addr.s_addr = destIp; serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort); ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); if (ret != 0) { - //uError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); + //uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); taosCloseSocket(sockFd); sockFd = -1; } @@ -420,7 +352,7 @@ int taosKeepTcpAlive(int sockFd) { return 0; } -int taosOpenTcpServerSocket(char *ip, uint16_t port) { +int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; int sockFd; int reuse; @@ -429,7 +361,7 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) { bzero((char *)&serverAdd, sizeof(serverAdd)); serverAdd.sin_family = AF_INET; - serverAdd.sin_addr.s_addr = inet_addr(ip); + serverAdd.sin_addr.s_addr = ip; serverAdd.sin_port = (uint16_t)htons(port); if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { @@ -447,7 +379,7 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) { /* bind socket to server address */ if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { - uError("bind tcp server socket failed, %s:%hu, reason:%d(%s)", ip, port, errno, strerror(errno)); + uError("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); close(sockFd); return -1; } @@ -455,14 +387,14 @@ int taosOpenTcpServerSocket(char *ip, uint16_t port) { if (taosKeepTcpAlive(sockFd) < 0) return -1; if (listen(sockFd, 10) < 0) { - uError("listen tcp server socket failed, %s:%hu, reason:%d(%s)", ip, port, errno, strerror(errno)); + uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); return -1; } return sockFd; } -int taosOpenRawSocket(char *ip) { +int taosOpenRawSocket(uint32_t ip) { int fd, hold; struct sockaddr_in rawAdd; @@ -483,10 +415,10 @@ int taosOpenRawSocket(char *ip) { bzero((char *)&rawAdd, sizeof(rawAdd)); rawAdd.sin_family = AF_INET; - rawAdd.sin_addr.s_addr = inet_addr(ip); + rawAdd.sin_addr.s_addr = ip; if (bind(fd, (struct sockaddr *)&rawAdd, sizeof(rawAdd)) < 0) { - uError("failed to bind RAW socket: %d (%s)", errno, strerror(errno)); + uError("failed to bind RAW socket:(%s)", strerror(errno)); close(fd); return -1; } -- GitLab