提交 7d5b51ab 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

change the API rpcFreeCont implementation, so app can release the request message

fix a few minor bugs
上级 14c63859
...@@ -27,6 +27,7 @@ extern "C" { ...@@ -27,6 +27,7 @@ extern "C" {
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
extern int tsRpcMaxUdpSize;
typedef struct { typedef struct {
int8_t inUse; int8_t inUse;
......
...@@ -26,6 +26,8 @@ extern "C" { ...@@ -26,6 +26,8 @@ extern "C" {
#define RPC_CONN_TCPC 3 #define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2 #define RPC_CONN_TCP 2
extern int tsRpcOverhead;
typedef struct { typedef struct {
void *msg; void *msg;
int msgLen; int msgLen;
......
...@@ -236,6 +236,7 @@ static void *taosReadTcpData(void *param) { ...@@ -236,6 +236,7 @@ static void *taosReadTcpData(void *param) {
STcpFd *pFdObj; STcpFd *pFdObj;
struct epoll_event events[maxTcpEvents]; struct epoll_event events[maxTcpEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
SRpcHead rpcHead;
while (1) { while (1) {
pthread_mutex_lock(&pTcp->mutex); pthread_mutex_lock(&pTcp->mutex);
...@@ -260,37 +261,24 @@ static void *taosReadTcpData(void *param) { ...@@ -260,37 +261,24 @@ static void *taosReadTcpData(void *param) {
continue; continue;
} }
void *buffer = malloc(1024); int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (NULL == buffer) {
tTrace("%s TCP malloc(size:1024) fail\n", pTcp->label);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) { if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d", pTcp->label, headLen); tError("%s read error, headLen:%d", pTcp->label, headLen);
tfree(buffer);
taosCleanUpTcpFdObj(pFdObj); taosCleanUpTcpFdObj(pFdObj);
continue; continue;
} }
int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen); int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (dataLen > 1024) { char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
void *b = realloc(buffer, (size_t)dataLen); if (NULL == buffer) {
if (NULL == b) { tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen);
tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, dataLen);
tfree(buffer);
taosCleanUpTcpFdObj(pFdObj); taosCleanUpTcpFdObj(pFdObj);
continue; continue;
} }
buffer = b;
}
int leftLen = dataLen - headLen; char *msg = buffer + tsRpcOverhead;
int retLen = taosReadMsg(pFdObj->fd, buffer + headLen, leftLen); int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
// tTrace("%s TCP data is received, ip:%s port:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, dataLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen); tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen);
...@@ -299,8 +287,11 @@ static void *taosReadTcpData(void *param) { ...@@ -299,8 +287,11 @@ static void *taosReadTcpData(void *param) {
continue; continue;
} }
recvInfo.msg = buffer; // tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
recvInfo.msgLen = dataLen;
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip; recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port; recvInfo.port = pFdObj->port;
recvInfo.shandle = pTcp->shandle; recvInfo.shandle = pTcp->shandle;
......
...@@ -122,10 +122,12 @@ typedef struct _RpcConn { ...@@ -122,10 +122,12 @@ typedef struct _RpcConn {
} SRpcConn; } SRpcConn;
int tsRpcProgressTime = 10; // milliseocnds int tsRpcProgressTime = 10; // milliseocnds
int tsRpcMaxUdpSize = 15000; // bytes;
// not configurable // not configurable
int tsRpcMaxRetry; int tsRpcMaxRetry;
int tsRpcHeadSize; int tsRpcHeadSize;
int tsRpcOverhead;
// server:0 client:1 tcp:2 udp:0 // server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0 #define RPC_CONN_UDPS 0
...@@ -188,7 +190,7 @@ static void rpcProcessRetryTimer(void *, void *); ...@@ -188,7 +190,7 @@ static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessIdleTimer(void *param, void *tmrId);
static void rpcProcessProgressTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeOutMsg(void *msg); static void rpcFreeMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
...@@ -201,6 +203,7 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -201,6 +203,7 @@ void *rpcOpen(SRpcInit *pInit) {
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
tsRpcHeadSize = RPC_MSG_OVERHEAD; tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
...@@ -313,8 +316,8 @@ void *rpcMallocCont(int contLen) { ...@@ -313,8 +316,8 @@ void *rpcMallocCont(int contLen) {
void rpcFreeCont(void *cont) { void rpcFreeCont(void *cont) {
if ( cont ) { if ( cont ) {
char *msg = ((char *)cont) - sizeof(SRpcHead); char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
free(msg); free(temp);
} }
} }
...@@ -351,7 +354,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in ...@@ -351,7 +354,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
pContext->oldInUse = pIpSet->inUse; pContext->oldInUse = pIpSet->inUse;
pContext->connType = RPC_CONN_UDPC; pContext->connType = RPC_CONN_UDPC;
if (contLen > 16000) pContext->connType = RPC_CONN_TCPC; if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
...@@ -406,7 +409,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -406,7 +409,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pConn->inType = 0; pConn->inType = 0;
// response message is released until new response is sent // response message is released until new response is sent
rpcFreeOutMsg(pConn->pRspMsg); rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg; pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen; pConn->rspMsgLen = msgLen;
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
...@@ -442,6 +445,13 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -442,6 +445,13 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
strcpy(pInfo->user, pConn->user); strcpy(pInfo->user, pConn->user);
} }
static void rpcFreeMsg(void *msg) {
if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext);
free(temp);
}
}
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) { static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) {
SRpcConn *pConn; SRpcConn *pConn;
...@@ -490,7 +500,7 @@ static void rpcCloseConn(void *thandle) { ...@@ -490,7 +500,7 @@ static void rpcCloseConn(void *thandle) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType); sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr); taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
pConn->inType = 0; pConn->inType = 0;
pConn->inTranId = 0; pConn->inTranId = 0;
...@@ -822,7 +832,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -822,7 +832,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
} }
} }
if ( terrno ) free (pRecv->msg); if (terrno) rpcFreeMsg(pRecv->msg);
return pConn; return pConn;
} }
...@@ -855,7 +865,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -855,7 +865,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg rpcFreeCont(pContext->pCont); // free the request msg
} }
} }
} }
...@@ -996,8 +1006,8 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -996,8 +1006,8 @@ static void rpcProcessConnError(void *param, void *id) {
tTrace("%s connection error happens", pRpc->label); tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
rpcFreeCont(pContext->pCont); // free the request msg
} else { } else {
// move to next IP // move to next IP
pContext->ipSet.inUse++; pContext->ipSet.inUse++;
...@@ -1127,7 +1137,8 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1127,7 +1137,8 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int contLen = htonl(pComp->contLen); int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message // prepare the temporary buffer to decompress message
pNewHead = (SRpcHead *)malloc(contLen + RPC_MSG_OVERHEAD); char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
if (pNewHead) { if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
...@@ -1136,7 +1147,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1136,7 +1147,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
memcpy(pNewHead, pHead, sizeof(SRpcHead)); memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(origLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
free(pHead); // free the compressed message buffer rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
} else { } else {
......
...@@ -193,6 +193,7 @@ static void taosProcessTcpData(void *param) { ...@@ -193,6 +193,7 @@ static void taosProcessTcpData(void *param) {
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
pThreadObj = (SThreadObj *)param; pThreadObj = (SThreadObj *)param;
SRpcHead rpcHead;
while (1) { while (1) {
pthread_mutex_lock(&pThreadObj->threadMutex); pthread_mutex_lock(&pThreadObj->threadMutex);
...@@ -219,24 +220,24 @@ static void taosProcessTcpData(void *param) { ...@@ -219,24 +220,24 @@ static void taosProcessTcpData(void *param) {
continue; continue;
} }
void *buffer = malloc(1024); int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) { if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno); tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno);
taosCleanUpFdObj(pFdObj); taosCleanUpFdObj(pFdObj);
tfree(buffer);
continue; continue;
} }
int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen); int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (dataLen > 1024) buffer = realloc(buffer, (size_t)dataLen); char *buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) {
int leftLen = dataLen - headLen; tError("%s TCP malloc(size:%d) fail\n", pThreadObj->label, msgLen);
int retLen = taosReadMsg(pFdObj->fd, buffer + headLen, leftLen); taosCleanUpFdObj(pFdObj);
continue;
}
// tTrace("%s TCP data is received, ip:%s port:%u len:%d", char *msg = buffer + tsRpcOverhead;
// pThreadObj->label, pFdObj->ipstr, pFdObj->port, dataLen); int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen); tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen);
...@@ -245,8 +246,11 @@ static void taosProcessTcpData(void *param) { ...@@ -245,8 +246,11 @@ static void taosProcessTcpData(void *param) {
continue; continue;
} }
recvInfo.msg = buffer; // tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
recvInfo.msgLen = dataLen;
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip; recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port; recvInfo.port = pFdObj->port;
recvInfo.shandle = pThreadObj->shandle; recvInfo.shandle = pThreadObj->shandle;
......
...@@ -40,12 +40,12 @@ typedef struct { ...@@ -40,12 +40,12 @@ typedef struct {
char label[12]; // copy from udpConnSet; char label[12]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
pthread_mutex_t mutex; pthread_mutex_t mutex;
void * tmrCtrl; // copy from UdpConnSet; void *tmrCtrl; // copy from UdpConnSet;
void * hash; void *hash;
void * shandle; // handle passed by upper layer during server initialization void *shandle; // handle passed by upper layer during server initialization
void * pSet; void *pSet;
void *(*processData)(SRecvInfo *pRecv); void *(*processData)(SRecvInfo *pRecv);
char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data char *buffer; // buffer to receive data
} SUdpConn; } SUdpConn;
typedef struct { typedef struct {
...@@ -96,7 +96,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -96,7 +96,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pSet->fp = fp; pSet->fp = fp;
strcpy(pSet->label, label); strcpy(pSet->label, label);
// if ( tsUdpDelay ) { if ( tsUdpDelay ) {
char udplabel[12]; char udplabel[12];
sprintf(udplabel, "%s.b", label); sprintf(udplabel, "%s.b", label);
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel); pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
...@@ -104,7 +104,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -104,7 +104,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet); tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet);
return NULL; return NULL;
} }
// } }
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
...@@ -120,6 +120,13 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -120,6 +120,13 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
return NULL; return NULL;
} }
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
if (NULL == pConn->buffer) {
tError("%s failed to malloc recv buffer", label);
taosCleanUpUdpConnection(pSet);
return NULL;
}
struct sockaddr_in sin; struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
...@@ -128,14 +135,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -128,14 +135,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
} }
strcpy(pConn->label, label); strcpy(pConn->label, label);
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet);
return NULL;
}
pConn->shandle = shandle; pConn->shandle = shandle;
pConn->processData = fp; pConn->processData = fp;
pConn->index = i; pConn->index = i;
...@@ -146,6 +145,14 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v ...@@ -146,6 +145,14 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pthread_mutex_init(&pConn->mutex, NULL); pthread_mutex_init(&pConn->mutex, NULL);
pConn->tmrCtrl = pSet->tmrCtrl; pConn->tmrCtrl = pSet->tmrCtrl;
} }
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet);
return NULL;
}
++pSet->threads; ++pSet->threads;
} }
...@@ -164,6 +171,7 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -164,6 +171,7 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pConn->signature = NULL; pConn->signature = NULL;
free(pConn->buffer);
pthread_cancel(pConn->thread); pthread_cancel(pConn->thread);
taosCloseSocket(pConn->fd); taosCloseSocket(pConn->fd);
if (pConn->hash) { if (pConn->hash) {
...@@ -210,7 +218,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -210,7 +218,7 @@ static void *taosRecvUdpData(void *param) {
tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index); tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index);
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port), tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port),
dataLen); dataLen);
...@@ -235,9 +243,15 @@ static void *taosRecvUdpData(void *param) { ...@@ -235,9 +243,15 @@ static void *taosRecvUdpData(void *param) {
break; break;
} }
char *data = malloc((size_t)msgLen); char *tmsg = malloc((size_t)msgLen + tsRpcOverhead);
memcpy(data, msg, (size_t)msgLen); if (NULL == tmsg) {
recvInfo.msg = data; tError("%s failed to allocate memory, size:%d", pConn->label, msgLen);
break;
}
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
memcpy(tmsg, msg, (size_t)msgLen);
recvInfo.msg = tmsg;
recvInfo.msgLen = msgLen; recvInfo.msgLen = msgLen;
recvInfo.ip = sourceAdd.sin_addr.s_addr; recvInfo.ip = sourceAdd.sin_addr.s_addr;
recvInfo.port = port; recvInfo.port = port;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册