From 908aa37a416a2d5d87d843144b18dff7030e1a14 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Mon, 18 May 2020 14:15:21 +0000 Subject: [PATCH] add new macro TSDB_EP_LEN --- src/client/src/tscUtil.c | 4 +- src/common/src/tglobal.c | 14 +- src/dnode/src/dnodeMgmt.c | 2 +- src/inc/taosdef.h | 3 +- src/inc/taosmsg.h | 10 +- src/mnode/inc/mgmtDef.h | 2 +- src/plugins/monitor/src/monitorMain.c | 2 +- src/rpc/inc/rpcHaship.h | 33 ---- src/rpc/src/rpcHaship.c | 167 ------------------ src/rpc/src/rpcUdp.c | 234 +++----------------------- 10 files changed, 43 insertions(+), 428 deletions(-) delete mode 100644 src/rpc/inc/rpcHaship.h delete mode 100644 src/rpc/src/rpcHaship.c diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4667606aa8..8570c2b304 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2169,7 +2169,7 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { tscMgmtIpSet.inUse = 0; if (first && first[0] != 0) { - if (strlen(first) >= TSDB_FQDN_LEN) { + if (strlen(first) >= TSDB_EP_LEN) { terrno = TSDB_CODE_INVALID_FQDN; return -1; } @@ -2178,7 +2178,7 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { } if (second && second[0] != 0) { - if (strlen(second) >= TSDB_FQDN_LEN) { + if (strlen(second) >= TSDB_EP_LEN) { terrno = TSDB_CODE_INVALID_FQDN; return -1; } diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 18d8c9ebe2..324edb422b 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -61,10 +61,10 @@ int32_t tscEmbedded = 0; */ int64_t tsMsPerDay[] = {86400000L, 86400000000L}; -char tsFirst[TSDB_FQDN_LEN] = {0}; -char tsSecond[TSDB_FQDN_LEN] = {0}; -char tsArbitrator[TSDB_FQDN_LEN] = {0}; -char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port +char tsFirst[TSDB_EP_LEN] = {0}; +char tsSecond[TSDB_EP_LEN] = {0}; +char tsArbitrator[TSDB_EP_LEN] = {0}; +char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port uint16_t tsServerPort = 6030; uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035] uint16_t tsDnodeDnodePort = 6035; // udp/tcp @@ -284,7 +284,7 @@ static void doInitGlobalConfig() { cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = TSDB_FQDN_LEN; + cfg.ptrLength = TSDB_EP_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); @@ -294,7 +294,7 @@ static void doInitGlobalConfig() { cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = TSDB_FQDN_LEN; + cfg.ptrLength = TSDB_EP_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); @@ -356,7 +356,7 @@ static void doInitGlobalConfig() { cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = TSDB_FQDN_LEN; + cfg.ptrLength = TSDB_EP_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 81d426b496..4b28992aa4 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -411,7 +411,7 @@ static bool dnodeReadMnodeInfos() { dError("failed to read mnode mgmtIpList.json, nodeName not found"); goto PARSE_OVER; } - strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN); + strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); } ret = true; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index ba015d7bbf..2393654f79 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -218,7 +218,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_LOCALE_LEN 64 #define TSDB_TIMEZONE_LEN 64 -#define TSDB_FQDN_LEN 256 +#define TSDB_FQDN_LEN 128 +#define TSDB_EP_LEN (TSDB_FQDN_LEN+6) #define TSDB_IPv4ADDR_LEN 16 #define TSDB_FILENAME_LEN 128 #define TSDB_METER_VNODE_BITS 20 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 27862ef489..3aa75523ea 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -530,7 +530,7 @@ typedef struct { typedef struct { int32_t nodeId; - char nodeEp[TSDB_FQDN_LEN]; + char nodeEp[TSDB_EP_LEN]; } SDMMnodeInfo; typedef struct { @@ -542,7 +542,7 @@ typedef struct { typedef struct { uint32_t version; int32_t dnodeId; - char dnodeEp[TSDB_FQDN_LEN]; + char dnodeEp[TSDB_EP_LEN]; uint32_t moduleStatus; uint32_t lastReboot; // time stamp for last reboot uint16_t numOfTotalVnodes; // from config file @@ -584,7 +584,7 @@ typedef struct { typedef struct { int32_t nodeId; - char nodeEp[TSDB_FQDN_LEN]; + char nodeEp[TSDB_EP_LEN]; } SMDVnodeDesc; typedef struct { @@ -669,7 +669,7 @@ typedef struct SCMShowRsp { } SCMShowRsp; typedef struct { - char ep[TSDB_FQDN_LEN]; // end point, hostname:port + char ep[TSDB_EP_LEN]; // end point, hostname:port } SCMCreateDnodeMsg, SCMDropDnodeMsg; typedef struct { @@ -684,7 +684,7 @@ typedef struct { } SDMConfigVnodeMsg; typedef struct { - char ep[TSDB_FQDN_LEN]; // end point, hostname:port + char ep[TSDB_EP_LEN]; // end point, hostname:port char config[64]; } SMDCfgDnodeMsg, SCMCfgDnodeMsg; diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index f0d694db2a..fac342901a 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -33,7 +33,7 @@ typedef struct SDnodeObj { int32_t dnodeId; uint16_t dnodePort; char dnodeFqdn[TSDB_FQDN_LEN + 1]; - char dnodeEp[TSDB_FQDN_LEN + 1]; + char dnodeEp[TSDB_EP_LEN + 1]; int64_t createdTime; uint32_t lastAccess; int32_t openVnodes; diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index ae8038e444..72efd5b552 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -68,7 +68,7 @@ typedef enum { typedef struct { void * conn; void * timer; - char ep[TSDB_FQDN_LEN]; + char ep[TSDB_EP_LEN]; int8_t cmdIndex; int8_t state; char sql[SQL_LENGTH]; diff --git a/src/rpc/inc/rpcHaship.h b/src/rpc/inc/rpcHaship.h deleted file mode 100644 index d3ed48997a..0000000000 --- a/src/rpc/inc/rpcHaship.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _rpc_hash_ip_header_ -#define _rpc_hash_ip_header_ - -#ifdef __cplusplus -extern "C" { -#endif - -void *rpcOpenIpHash(int maxSessions); -void rpcCloseIpHash(void *handle); -void *rpcAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port); -void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port); -void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/rpc/src/rpcHaship.c b/src/rpc/src/rpcHaship.c deleted file mode 100644 index 0183c87f70..0000000000 --- a/src/rpc/src/rpcHaship.c +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" -#include "tmempool.h" -#include "rpcLog.h" - -typedef struct SIpHash { - uint32_t ip; - uint16_t port; - int hash; - struct SIpHash *prev; - struct SIpHash *next; - void *data; -} SIpHash; - -typedef struct { - SIpHash **ipHashList; - mpool_h ipHashMemPool; - int maxSessions; -} SHashObj; - -int rpcHashIp(void *handle, uint32_t ip, uint16_t port) { - SHashObj *pObj = (SHashObj *)handle; - int hash = 0; - - hash = (int)(ip >> 16); - hash += (unsigned short)(ip & 0xFFFF); - hash += port; - - hash = hash % pObj->maxSessions; - - return hash; -} - -void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) { - int hash; - SIpHash *pNode; - SHashObj *pObj; - - pObj = (SHashObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - hash = rpcHashIp(pObj, ip, port); - pNode = (SIpHash *)taosMemPoolMalloc(pObj->ipHashMemPool); - pNode->ip = ip; - pNode->port = port; - pNode->data = data; - pNode->prev = 0; - pNode->next = pObj->ipHashList[hash]; - pNode->hash = hash; - - if (pObj->ipHashList[hash] != 0) (pObj->ipHashList[hash])->prev = pNode; - pObj->ipHashList[hash] = pNode; - - return pObj; -} - -void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) { - int hash; - SIpHash *pNode; - SHashObj *pObj; - - pObj = (SHashObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - - hash = rpcHashIp(pObj, ip, port); - - pNode = pObj->ipHashList[hash]; - while (pNode) { - if (pNode->ip == ip && pNode->port == port) break; - - pNode = pNode->next; - } - - if (pNode) { - if (pNode->prev) { - pNode->prev->next = pNode->next; - } else { - pObj->ipHashList[hash] = pNode->next; - } - - if (pNode->next) { - pNode->next->prev = pNode->prev; - } - - taosMemPoolFree(pObj->ipHashMemPool, (char *)pNode); - } -} - -void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) { - int hash; - SIpHash *pNode; - SHashObj *pObj; - - pObj = (SHashObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - hash = rpcHashIp(pObj, ip, port); - pNode = pObj->ipHashList[hash]; - - while (pNode) { - if (pNode->ip == ip && pNode->port == port) { - break; - } - pNode = pNode->next; - } - - if (pNode) { - return pNode->data; - } - return NULL; -} - -void *rpcOpenIpHash(int maxSessions) { - SIpHash **ipHashList; - mpool_h ipHashMemPool; - SHashObj *pObj; - - ipHashMemPool = taosMemPoolInit(maxSessions, sizeof(SIpHash)); - if (ipHashMemPool == 0) return NULL; - - ipHashList = calloc(sizeof(SIpHash *), (size_t)maxSessions); - if (ipHashList == 0) { - taosMemPoolCleanUp(ipHashMemPool); - return NULL; - } - - pObj = malloc(sizeof(SHashObj)); - if (pObj == NULL) { - taosMemPoolCleanUp(ipHashMemPool); - free(ipHashList); - return NULL; - } - - pObj->maxSessions = maxSessions; - pObj->ipHashMemPool = ipHashMemPool; - pObj->ipHashList = ipHashList; - - return pObj; -} - -void rpcCloseIpHash(void *handle) { - SHashObj *pObj; - - pObj = (SHashObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - - if (pObj->ipHashMemPool) taosMemPoolCleanUp(pObj->ipHashMemPool); - - if (pObj->ipHashList) free(pObj->ipHashList); - - memset(pObj, 0, sizeof(SHashObj)); - free(pObj); -} diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 677187e3b9..3a40f27e26 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -19,7 +19,6 @@ #include "ttimer.h" #include "tutil.h" #include "rpcLog.h" -#include "rpcHaship.h" #include "rpcUdp.h" #include "rpcHead.h" @@ -28,8 +27,6 @@ #define RPC_UDP_BUF_TIME 5 // mseconds #define RPC_MAX_UDP_SIZE 65480 -int tsUdpDelay = 0; - typedef struct { void *signature; int index; @@ -38,8 +35,6 @@ typedef struct { uint16_t localPort; // local port char label[12]; // copy from udpConnSet; pthread_t thread; - pthread_mutex_t mutex; - void *tmrCtrl; // copy from UdpConnSet; void *hash; void *shandle; // handle passed by upper layer during server initialization void *pSet; @@ -55,26 +50,11 @@ typedef struct { void *shandle; // handle passed by upper layer during server initialization int threads; char label[12]; - void *tmrCtrl; void *(*fp)(SRecvInfo *pPacket); SUdpConn udpConn[]; } SUdpConnSet; -typedef struct { - void *signature; - uint32_t ip; // dest IP - uint16_t port; // dest Port - SUdpConn *pConn; - struct sockaddr_in destAdd; - void *msgHdr; - int totalLen; - void *timer; - int emptyNum; -} SUdpBuf; - static void *taosRecvUdpData(void *param); -static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port); -static void taosProcessUdpBufTimer(void *param, void *tmrId); void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { SUdpConn *pConn; @@ -94,16 +74,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pSet->fp = fp; strcpy(pSet->label, label); - if ( tsUdpDelay ) { - char udplabel[12]; - sprintf(udplabel, "%s.b", label); - pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel); - if (pSet->tmrCtrl == NULL) { - tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet); - return NULL; - } - } - uint16_t ownPort; for (int i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; @@ -135,11 +105,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pConn->index = i; pConn->pSet = pSet; pConn->signature = pConn; - if (tsUdpDelay) { - pConn->hash = rpcOpenIpHash(RPC_MAX_UDP_CONNS); - pthread_mutex_init(&pConn->mutex, NULL); - pConn->tmrCtrl = pSet->tmrCtrl; - } pthread_attr_t thAttr; pthread_attr_init(&thAttr); @@ -173,10 +138,6 @@ void taosCleanUpUdpConnection(void *handle) { free(pConn->buffer); pthread_cancel(pConn->thread); taosCloseSocket(pConn->fd); - if (pConn->hash) { - rpcCloseIpHash(pConn->hash); - pthread_mutex_destroy(&pConn->mutex); - } } for (int i = 0; i < pSet->threads; ++i) { @@ -185,7 +146,6 @@ void taosCleanUpUdpConnection(void *handle) { tTrace("chandle:%p is closed", pConn); } - taosTmrCleanUp(pSet->tmrCtrl); tfree(pSet); } @@ -205,64 +165,42 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t static void *taosRecvUdpData(void *param) { SUdpConn *pConn = param; struct sockaddr_in sourceAdd; - int dataLen; + ssize_t dataLen; unsigned int addLen; uint16_t port; - int minSize = sizeof(SRpcHead); SRecvInfo recvInfo; memset(&sourceAdd, 0, sizeof(sourceAdd)); addLen = sizeof(sourceAdd); tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index); + char *msg = pConn->buffer; while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); port = ntohs(sourceAdd.sin_port); - //tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); if (dataLen < sizeof(SRpcHead)) { tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); continue; } - int processedLen = 0, leftLen = 0; - int msgLen = 0; - int count = 0; - char *msg = pConn->buffer; - while (processedLen < dataLen) { - leftLen = dataLen - processedLen; - SRpcHead *pHead = (SRpcHead *)msg; - msgLen = htonl((uint32_t)pHead->msgLen); - if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) { - tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen, - processedLen, count, msgLen); - break; - } - - char *tmsg = malloc((size_t)msgLen + tsRpcOverhead); - if (NULL == tmsg) { - tError("%s failed to allocate memory, size:%d", pConn->label, msgLen); - break; - } - - tmsg += tsRpcOverhead; // overhead for SRpcReqContext - memcpy(tmsg, msg, (size_t)msgLen); - recvInfo.msg = tmsg; - recvInfo.msgLen = msgLen; - recvInfo.ip = sourceAdd.sin_addr.s_addr; - recvInfo.port = port; - recvInfo.shandle = pConn->shandle; - recvInfo.thandle = NULL; - recvInfo.chandle = pConn; - recvInfo.connType = 0; - (*(pConn->processData))(&recvInfo); - - processedLen += msgLen; - msg += msgLen; - count++; + char *tmsg = malloc(dataLen + tsRpcOverhead); + if (NULL == tmsg) { + tError("%s failed to allocate memory, size:%d", pConn->label, dataLen); + continue; } - // tTrace("%s %d UDP packets are received together", pConn->label, count); + tmsg += tsRpcOverhead; // overhead for SRpcReqContext + memcpy(tmsg, msg, dataLen); + recvInfo.msg = tmsg; + recvInfo.msgLen = dataLen; + recvInfo.ip = sourceAdd.sin_addr.s_addr; + recvInfo.port = port; + recvInfo.shandle = pConn->shandle; + recvInfo.thandle = NULL; + recvInfo.chandle = pConn; + recvInfo.connType = 0; + (*(pConn->processData))(&recvInfo); } return NULL; @@ -270,141 +208,17 @@ static void *taosRecvUdpData(void *param) { int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { SUdpConn *pConn = (SUdpConn *)chandle; - SUdpBuf *pBuf; if (pConn == NULL || pConn->signature != pConn) return -1; - if (pConn->hash == NULL) { - struct sockaddr_in destAdd; - memset(&destAdd, 0, sizeof(destAdd)); - destAdd.sin_family = AF_INET; - destAdd.sin_addr.s_addr = ip; - destAdd.sin_port = htons(port); - - //tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr, - // port, dataLen, ret, pConn->localPort, chandle); - int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); - - return ret; - } - - pthread_mutex_lock(&pConn->mutex); - - pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port); - if (pBuf == NULL) { - pBuf = taosCreateUdpBuf(pConn, ip, port); - rpcAddIpHash(pConn->hash, pBuf, ip, port); - } - - if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) { - taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); - - taosSendMsgHdr(pBuf->msgHdr, pConn->fd); - pBuf->totalLen = 0; - } - - taosSetMsgHdrData(pBuf->msgHdr, data, dataLen); - - pBuf->totalLen += dataLen; - - pthread_mutex_unlock(&pConn->mutex); - - return dataLen; -} - -void taosFreeMsgHdr(void *hdr) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - free(msgHdr->msg_iov); -} - -int taosMsgHdrSize(void *hdr) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - return (int)msgHdr->msg_iovlen; -} - -void taosSendMsgHdr(void *hdr, int fd) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - sendmsg(fd, msgHdr, 0); - msgHdr->msg_iovlen = 0; -} - -void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) { - struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr)); - memset(msgHdr, 0, sizeof(struct msghdr)); - *hdr = msgHdr; - struct sockaddr_in *destAdd = (struct sockaddr_in *)dest; - - msgHdr->msg_name = destAdd; - msgHdr->msg_namelen = sizeof(struct sockaddr_in); - int size = (int)sizeof(struct iovec) * maxPkts; - msgHdr->msg_iov = (struct iovec *)malloc((size_t)size); - memset(msgHdr->msg_iov, 0, (size_t)size); -} - -void taosSetMsgHdrData(void *hdr, char *data, int dataLen) { - struct msghdr *msgHdr = (struct msghdr *)hdr; - msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data; - msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen; - msgHdr->msg_iovlen++; -} - -void taosRemoveUdpBuf(SUdpBuf *pBuf) { - taosTmrStopA(&pBuf->timer); - rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port); - - // tTrace("%s UDP buffer to:0x%lld:%d is removed", pBuf->pConn->label, - // pBuf->ip, pBuf->port); - - pBuf->signature = NULL; - taosFreeMsgHdr(pBuf->msgHdr); - free(pBuf); -} - -void taosProcessUdpBufTimer(void *param, void *tmrId) { - SUdpBuf *pBuf = (SUdpBuf *)param; - if (pBuf->signature != param) return; - if (pBuf->timer != tmrId) return; - - SUdpConn *pConn = pBuf->pConn; - - pthread_mutex_lock(&pConn->mutex); - - if (taosMsgHdrSize(pBuf->msgHdr) > 0) { - taosSendMsgHdr(pBuf->msgHdr, pConn->fd); - pBuf->totalLen = 0; - pBuf->emptyNum = 0; - } else { - pBuf->emptyNum++; - if (pBuf->emptyNum > 200) { - taosRemoveUdpBuf(pBuf); - pBuf = NULL; - } - } - - pthread_mutex_unlock(&pConn->mutex); - - if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); -} - -static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) { - SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf)); - memset(pBuf, 0, sizeof(SUdpBuf)); - - pBuf->ip = ip; - pBuf->port = port; - pBuf->pConn = pConn; - - pBuf->destAdd.sin_family = AF_INET; - pBuf->destAdd.sin_addr.s_addr = ip; - pBuf->destAdd.sin_port = (uint16_t)htons(port); - taosInitMsgHdr(&(pBuf->msgHdr), &(pBuf->destAdd), RPC_MAX_UDP_PKTS); - pBuf->signature = pBuf; - taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer); + struct sockaddr_in destAdd; + memset(&destAdd, 0, sizeof(destAdd)); + destAdd.sin_family = AF_INET; + destAdd.sin_addr.s_addr = ip; + destAdd.sin_port = htons(port); - // tTrace("%s UDP buffer to:0x%lld:%d is created", pBuf->pConn->label, - // pBuf->ip, pBuf->port); + int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd)); - return pBuf; + return ret; } - -- GitLab