diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 09c66d33ae34cd5374994d3615930d330ec99660..9bbcd60fc4d386b9afc1b52c82b9cbe645193f7a 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -26,6 +26,8 @@ extern "C" { #define RPC_CONN_TCPC 3 #define RPC_CONN_TCP 2 +extern int tsRpcOverhead; + typedef struct { void *msg; int msgLen; diff --git a/src/rpc/inc/tqueue.h b/src/rpc/inc/tqueue.h new file mode 100644 index 0000000000000000000000000000000000000000..09a25e7e93af8d57a320db865ff90e0cb54fa671 --- /dev/null +++ b/src/rpc/inc/tqueue.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSCHED_H +#define TDENGINE_TSCHED_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct _sched_msg { + void *msg; + int msgLen; + int8_t type; + int32_t code; + void *handle; +} SRpcMsg; + +void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label); +int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg); +void taosCleanUpMsgQueue(void *param); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCHED_H diff --git a/src/rpc/src/rpcClient.c b/src/rpc/src/rpcClient.c index b616dae82f17bdf910ccbb405ddd84c0973eb45d..b362b1ba4494ad6dc0ba1b077f10ad06d3c18afb 100644 --- a/src/rpc/src/rpcClient.c +++ b/src/rpc/src/rpcClient.c @@ -236,6 +236,7 @@ static void *taosReadTcpData(void *param) { STcpFd *pFdObj; struct epoll_event events[maxTcpEvents]; SRecvInfo recvInfo; + SRpcHead rpcHead; while (1) { pthread_mutex_lock(&pTcp->mutex); @@ -260,37 +261,24 @@ static void *taosReadTcpData(void *param) { continue; } - void *buffer = malloc(1024); - if (NULL == buffer) { - tTrace("%s TCP malloc(size:1024) fail\n", pTcp->label); - taosCleanUpTcpFdObj(pFdObj); - continue; - } - - int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead)); + int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { tError("%s read error, headLen:%d", pTcp->label, headLen); - tfree(buffer); taosCleanUpTcpFdObj(pFdObj); continue; } - int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen); - if (dataLen > 1024) { - void *b = realloc(buffer, (size_t)dataLen); - if (NULL == b) { - tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, dataLen); - tfree(buffer); - taosCleanUpTcpFdObj(pFdObj); - continue; - } - buffer = b; + int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead); + if (NULL == buffer) { + tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen); + taosCleanUpTcpFdObj(pFdObj); + continue; } - int leftLen = dataLen - headLen; - int retLen = taosReadMsg(pFdObj->fd, buffer + headLen, leftLen); - - // tTrace("%s TCP data is received, ip:%s port:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, dataLen); + char *msg = buffer + tsRpcOverhead; + int32_t leftLen = msgLen - headLen; + int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen); @@ -299,8 +287,11 @@ static void *taosReadTcpData(void *param) { continue; } - recvInfo.msg = buffer; - recvInfo.msgLen = dataLen; + // tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen); + + memcpy(msg, &rpcHead, sizeof(SRpcHead)); + recvInfo.msg = msg; + recvInfo.msgLen = msgLen; recvInfo.ip = pFdObj->ip; recvInfo.port = pFdObj->port; recvInfo.shandle = pTcp->shandle; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 93cd1cbe796196ea0e385cf2ff370f1bf064f197..247e21a4cf7bffce568ba6c82c649f3db1fa6f8c 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -126,6 +126,7 @@ int tsRpcProgressTime = 10; // milliseocnds // not configurable int tsRpcMaxRetry; int tsRpcHeadSize; +int tsRpcOverhead; // server:0 client:1 tcp:2 udp:0 #define RPC_CONN_UDPS 0 @@ -188,7 +189,7 @@ static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId); -static void rpcFreeOutMsg(void *msg); +static void rpcFreeMsg(void *msg); static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); @@ -201,6 +202,7 @@ void *rpcOpen(SRpcInit *pInit) { tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; tsRpcHeadSize = RPC_MSG_OVERHEAD; + tsRpcOverhead = sizeof(SRpcReqContext); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; @@ -313,8 +315,8 @@ void *rpcMallocCont(int contLen) { void rpcFreeCont(void *cont) { if ( cont ) { - char *msg = ((char *)cont) - sizeof(SRpcHead); - free(msg); + char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); + free(temp); } } @@ -351,7 +353,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in pContext->oldInUse = pIpSet->inUse; pContext->connType = RPC_CONN_UDPC; - if (contLen > 16000) pContext->connType = RPC_CONN_TCPC; + if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection @@ -406,7 +408,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pConn->inType = 0; // response message is released until new response is sent - rpcFreeOutMsg(pConn->pRspMsg); + rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; @@ -444,6 +446,13 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { strcpy(pInfo->user, pConn->user); } +static void rpcFreeMsg(void *msg) { + if ( msg ) { + char *temp = (char *)msg - sizeof(SRpcReqContext); + free(temp); + } +} + static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) { SRpcConn *pConn; @@ -492,7 +501,7 @@ static void rpcCloseConn(void *thandle) { char hashstr[40] = {0}; sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType); taosDeleteStrHash(pRpc->hash, hashstr); - rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg + rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; pConn->inType = 0; pConn->inTranId = 0; @@ -804,7 +813,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d port:%hu", + tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } @@ -824,7 +833,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } } - if ( terrno ) free (pRecv->msg); + if (terrno) rpcFreeMsg(pRecv->msg); return pConn; } @@ -858,7 +867,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); - rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg + rpcFreeCont(pContext->pCont); // free the request msg } } } @@ -972,12 +981,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if ( rpcIsReq(pHead->msgType)) { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s %p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d", + tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d", + tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } @@ -999,8 +1008,8 @@ static void rpcProcessConnError(void *param, void *id) { tTrace("%s connection error happens", pRpc->label); if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { - rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); + rpcFreeCont(pContext->pCont); // free the request msg } else { // move to next IP pContext->ipSet.inUse++; @@ -1130,7 +1139,8 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { int contLen = htonl(pComp->contLen); // prepare the temporary buffer to decompress message - pNewHead = (SRpcHead *)malloc(contLen + RPC_MSG_OVERHEAD); + char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD); + pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext if (pNewHead) { int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; @@ -1139,7 +1149,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { memcpy(pNewHead, pHead, sizeof(SRpcHead)); pNewHead->msgLen = rpcMsgLenFromCont(origLen); - free(pHead); // free the compressed message buffer + rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); } else { diff --git a/src/rpc/src/rpcServer.c b/src/rpc/src/rpcServer.c index 010a9ae69bbc7b1794d25e277d5e8266cc140f3f..1aadabc5f73f3a124261ca5af1c794dbd297d9d3 100644 --- a/src/rpc/src/rpcServer.c +++ b/src/rpc/src/rpcServer.c @@ -191,8 +191,9 @@ static void taosProcessTcpData(void *param) { int i, fdNum; SFdObj * pFdObj; struct epoll_event events[maxEvents]; - SRecvInfo recvInfo; + SRecvInfo recvInfo; pThreadObj = (SThreadObj *)param; + SRpcHead rpcHead; while (1) { pthread_mutex_lock(&pThreadObj->threadMutex); @@ -219,24 +220,24 @@ static void taosProcessTcpData(void *param) { continue; } - void *buffer = malloc(1024); - int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead)); - + int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno); taosCleanUpFdObj(pFdObj); - tfree(buffer); continue; } - int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen); - if (dataLen > 1024) buffer = realloc(buffer, (size_t)dataLen); - - int leftLen = dataLen - headLen; - int retLen = taosReadMsg(pFdObj->fd, buffer + headLen, leftLen); + int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + char *buffer = malloc(msgLen + tsRpcOverhead); + if ( NULL == buffer) { + tError("%s TCP malloc(size:%d) fail\n", pThreadObj->label, msgLen); + taosCleanUpFdObj(pFdObj); + continue; + } - // tTrace("%s TCP data is received, ip:%s port:%u len:%d", - // pThreadObj->label, pFdObj->ipstr, pFdObj->port, dataLen); + char *msg = buffer + tsRpcOverhead; + int32_t leftLen = msgLen - headLen; + int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen); @@ -245,8 +246,11 @@ static void taosProcessTcpData(void *param) { continue; } - recvInfo.msg = buffer; - recvInfo.msgLen = dataLen; + // tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen); + + memcpy(msg, &rpcHead, sizeof(SRpcHead)); + recvInfo.msg = msg; + recvInfo.msgLen = msgLen; recvInfo.ip = pFdObj->ip; recvInfo.port = pFdObj->port; recvInfo.shandle = pThreadObj->shandle; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 245a98b2c7383c72629b64d179ce535bfb870faf..471b1bcc723b8e1557b1ab158a7570c39bf1f466 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -40,12 +40,12 @@ typedef struct { char label[12]; // copy from udpConnSet; pthread_t thread; pthread_mutex_t mutex; - void * tmrCtrl; // copy from UdpConnSet; - void * hash; - void * shandle; // handle passed by upper layer during server initialization - void * pSet; - void *(*processData)(SRecvInfo *pRecv); - char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data + void *tmrCtrl; // copy from UdpConnSet; + void *hash; + void *shandle; // handle passed by upper layer during server initialization + void *pSet; + void *(*processData)(SRecvInfo *pRecv); + char *buffer; // buffer to receive data } SUdpConn; typedef struct { @@ -96,15 +96,15 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pSet->fp = fp; strcpy(pSet->label, label); - // if ( tsUdpDelay ) { - char udplabel[12]; - sprintf(udplabel, "%s.b", label); - pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel); - if (pSet->tmrCtrl == NULL) { - tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet); - return NULL; + if ( tsUdpDelay ) { + char udplabel[12]; + sprintf(udplabel, "%s.b", label); + pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel); + if (pSet->tmrCtrl == NULL) { + tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet); + return NULL; + } } - // } pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); @@ -120,6 +120,13 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v return NULL; } + pConn->buffer = malloc(RPC_MAX_UDP_SIZE); + if (NULL == pConn->buffer) { + tError("%s failed to malloc recv buffer", label); + taosCleanUpUdpConnection(pSet); + return NULL; + } + struct sockaddr_in sin; unsigned int addrlen = sizeof(sin); if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && @@ -128,14 +135,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v } strcpy(pConn->label, label); - - if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { - tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); - taosCloseSocket(pConn->fd); - taosCleanUpUdpConnection(pSet); - return NULL; - } - pConn->shandle = shandle; pConn->processData = fp; pConn->index = i; @@ -146,6 +145,14 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v pthread_mutex_init(&pConn->mutex, NULL); pConn->tmrCtrl = pSet->tmrCtrl; } + + if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { + tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); + taosCloseSocket(pConn->fd); + taosCleanUpUdpConnection(pSet); + return NULL; + } + ++pSet->threads; } @@ -164,6 +171,7 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; pConn->signature = NULL; + free(pConn->buffer); pthread_cancel(pConn->thread); taosCloseSocket(pConn->fd); if (pConn->hash) { @@ -210,7 +218,7 @@ static void *taosRecvUdpData(void *param) { tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index); while (1) { - dataLen = recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen); + dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port), dataLen); @@ -235,9 +243,15 @@ static void *taosRecvUdpData(void *param) { break; } - char *data = malloc((size_t)msgLen); - memcpy(data, msg, (size_t)msgLen); - recvInfo.msg = data; + char *tmsg = malloc((size_t)msgLen + tsRpcOverhead); + if (NULL == tmsg) { + tError("%s failed to allocate memory, size:%d", pConn->label, msgLen); + break; + } + + tmsg += tsRpcOverhead; // overhead for SRpcReqContext + memcpy(tmsg, msg, (size_t)msgLen); + recvInfo.msg = tmsg; recvInfo.msgLen = msgLen; recvInfo.ip = sourceAdd.sin_addr.s_addr; recvInfo.port = port; diff --git a/src/rpc/src/tqueue.c b/src/rpc/src/tqueue.c new file mode 100644 index 0000000000000000000000000000000000000000..2f6f9ac106df98c6d5da0092ddf42d1a50d19cac --- /dev/null +++ b/src/rpc/src/tqueue.c @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tlog.h" +#include "tqueue.h" + +#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. + +typedef struct { + char label[16]; + int num; + tsem_t emptySem; + tsem_t fullSem; + pthread_mutex_t queueMutex; + int fullSlot; + int emptySlot; + int queueSize; + SRpcMsg *queue; + SRpcMsg *oqueue; + pthread_t qthread; + void (*fp)(int num, SRpcMsg *); +} SRpcQueue; + +static void *taosProcessMsgQueue(void *param); + +void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) { + pthread_attr_t attr; + SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue)); + if (pQueue == NULL) { + pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno)); + goto _error; + } + + memset(pQueue, 0, sizeof(SRpcQueue)); + pQueue->queueSize = queueSize; + strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow + pQueue->label[sizeof(pQueue->label)-1] = '\0'; + pQueue->fp = fp; + + if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) { + pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) { + pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + if (tsem_init(&pQueue->fullSem, 0, 0) != 0) { + pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { + pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); + + if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { + pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); + + pQueue->fullSlot = 0; + pQueue->fullSlot = 0; + pQueue->emptySlot = 0; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) { + pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno)); + goto _error; + } + + pTrace("%s RPC msg queue is initialized", pQueue->label); + + return (void *)pQueue; + +_error: + taosCleanUpMsgQueue(pQueue); + return NULL; +} + +void *taosProcessMsgQueue(void *param) { + SRpcQueue *pQueue = (SRpcQueue *)param; + int num = 0; + + while (1) { + if (tsem_wait(&pQueue->fullSem) != 0) { + if (errno == EINTR) { + /* sem_wait is interrupted by interrupt, ignore and continue */ + pTrace("wait %s fullSem was interrupted", pQueue->label); + continue; + } + pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno)); + } + + if (pthread_mutex_lock(&pQueue->queueMutex) != 0) + pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + + num = 0; + do { + pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot]; + pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize; + ++num; + pQueue->num--; + } while (pQueue->fullSlot != pQueue->emptySlot); + + if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) + pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno)); + + for (int i= 0; iemptySem) != 0) + pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno)); + } + + for (int i=0; ifullSem) != 0) + pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno)); + } + + (*pQueue->fp)(num, pQueue->oqueue); + + } + + return NULL; +} + +int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) { + SRpcQueue *pQueue = (SRpcQueue *)qhandle; + if (pQueue == NULL) { + pError("sched is not ready, msg:%p is dropped", pMsg); + return 0; + } + + while (tsem_wait(&pQueue->emptySem) != 0) { + if (errno != EINTR) { + pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno)); + break; + } + } + + if (pthread_mutex_lock(&pQueue->queueMutex) != 0) + pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + + pQueue->queue[pQueue->emptySlot] = *pMsg; + pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize; + pQueue->num++; + + if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) + pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + + if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno)); + + return 0; +} + +void taosCleanUpMsgQueue(void *param) { + SRpcQueue *pQueue = (SRpcQueue *)param; + if (pQueue == NULL) return; + + pthread_cancel(pQueue->qthread); + + tsem_destroy(&pQueue->emptySem); + tsem_destroy(&pQueue->fullSem); + pthread_mutex_destroy(&pQueue->queueMutex); + + free(pQueue->queue); + free(pQueue); +} + diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 1b5e1b6ee7adbc10a93ff3eeb53639bb8af67cd6..7e418444fc44ac47ef5767fcba8b9b8cfd6e638e 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -17,21 +17,35 @@ #include "os.h" #include "tlog.h" #include "trpc.h" +#include "tqueue.h" #include int msgSize = 128; int commit = 0; int dataFd = -1; +void *qhandle = NULL; -void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) { +void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) { static int num = 0; - tTrace("request is received, type:%d, contLen:%d", type, contLen); - if (dataFd >=0) - write(dataFd, pCont, contLen); + tTrace("%d shell msgs are received", numOfMsgs); + for (int i=0; i=0) { + if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) { + tPrint("failed to write data file, reason:%s", strerror(errno)); + } + } + + void *rsp = rpcMallocCont(msgSize); + rpcSendResponse(pMsg->handle, 1, rsp, msgSize); + rpcFreeCont(pMsg->msg); + pMsg++; + } + if (commit >=2) { - ++num; + num += numOfMsgs; if ( fsync(dataFd) < 0 ) { tPrint("failed to flush data to file, reason:%s", strerror(errno)); } @@ -41,9 +55,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32 } } - void *rsp = rpcMallocCont(msgSize); - - rpcSendResponse(thandle, 1, rsp, msgSize); /* SRpcIpSet ipSet; @@ -55,7 +66,17 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32 rpcSendRedirectRsp(ahandle, &ipSet); */ - rpcFreeCont(pCont); +} + +void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) { + tTrace("request is received, type:%d, contLen:%d", type, contLen); + SRpcMsg rpcMsg; + rpcMsg.msg = pCont; + rpcMsg.msgLen = contLen; + rpcMsg.code = code; + rpcMsg.handle = thandle; + rpcMsg.type = type; + taosPutIntoMsgQueue(qhandle, &rpcMsg); } int main(int argc, char *argv[]) { @@ -88,6 +109,7 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); + uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]); printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp); @@ -117,11 +139,13 @@ int main(int argc, char *argv[]) { tPrint("RPC server is running, ctrl-c to exit"); if (commit) { - dataFd = open(dataName, O_APPEND | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); if (dataFd<0) tPrint("failed to open data file, reason:%s", strerror(errno)); } + qhandle = taosInitMsgQueue(1000, processShellMsg, "SER"); + // loop forever while(1) { sleep(1); diff --git a/src/util/inc/tglobalcfg.h b/src/util/inc/tglobalcfg.h index 067eb389ecb1f889bda583e6f1fd36cf7acc45ab..018f5dbcbb75b8a7ba7577f36bdf88f43c7b7e3b 100644 --- a/src/util/inc/tglobalcfg.h +++ b/src/util/inc/tglobalcfg.h @@ -178,6 +178,7 @@ extern uint32_t taosMaxTmrCtrl; extern int tsRpcTimer; extern int tsRpcMaxTime; +extern int tsRpcMaxUdpSize; extern int tsUdpDelay; extern char version[]; extern char compatible_version[]; diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 73cb2bd24b3ca7c16b141aa5804530be233ea6f5..cdb8d7c8f2f93821342cf052728c0335b52090b8 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -199,6 +199,7 @@ int tsIsCluster = 0; int tsRpcTimer = 300; int tsRpcMaxTime = 600; // seconds; +int tsRpcMaxUdpSize = 15000; // bytes char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; int tsMonitorInterval = 30; // seconds