未验证 提交 693f9f2b 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1280 from taosdata/refact/rpc

Refact/rpc
......@@ -26,6 +26,8 @@ extern "C" {
#define RPC_CONN_TCPC 3
#define RPC_CONN_TCP 2
extern int tsRpcOverhead;
typedef struct {
void *msg;
int msgLen;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCHED_H
#define TDENGINE_TSCHED_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct _sched_msg {
void *msg;
int msgLen;
int8_t type;
int32_t code;
void *handle;
} SRpcMsg;
void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label);
int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg);
void taosCleanUpMsgQueue(void *param);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCHED_H
......@@ -236,6 +236,7 @@ static void *taosReadTcpData(void *param) {
STcpFd *pFdObj;
struct epoll_event events[maxTcpEvents];
SRecvInfo recvInfo;
SRpcHead rpcHead;
while (1) {
pthread_mutex_lock(&pTcp->mutex);
......@@ -260,37 +261,24 @@ static void *taosReadTcpData(void *param) {
continue;
}
void *buffer = malloc(1024);
if (NULL == buffer) {
tTrace("%s TCP malloc(size:1024) fail\n", pTcp->label);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead));
int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d", pTcp->label, headLen);
tfree(buffer);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen);
if (dataLen > 1024) {
void *b = realloc(buffer, (size_t)dataLen);
if (NULL == b) {
tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, dataLen);
tfree(buffer);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
buffer = b;
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
if (NULL == buffer) {
tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen);
taosCleanUpTcpFdObj(pFdObj);
continue;
}
int leftLen = dataLen - headLen;
int retLen = taosReadMsg(pFdObj->fd, buffer + headLen, leftLen);
// tTrace("%s TCP data is received, ip:%s port:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, dataLen);
char *msg = buffer + tsRpcOverhead;
int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen);
......@@ -299,8 +287,11 @@ static void *taosReadTcpData(void *param) {
continue;
}
recvInfo.msg = buffer;
recvInfo.msgLen = dataLen;
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pTcp->shandle;
......
......@@ -126,6 +126,7 @@ int tsRpcProgressTime = 10; // milliseocnds
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;
// server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0
......@@ -188,7 +189,7 @@ static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId);
static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeOutMsg(void *msg);
static void rpcFreeMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
......@@ -201,6 +202,7 @@ void *rpcOpen(SRpcInit *pInit) {
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
tsRpcHeadSize = RPC_MSG_OVERHEAD;
tsRpcOverhead = sizeof(SRpcReqContext);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL;
......@@ -313,8 +315,8 @@ void *rpcMallocCont(int contLen) {
void rpcFreeCont(void *cont) {
if ( cont ) {
char *msg = ((char *)cont) - sizeof(SRpcHead);
free(msg);
char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
free(temp);
}
}
......@@ -351,7 +353,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
pContext->oldInUse = pIpSet->inUse;
pContext->connType = RPC_CONN_UDPC;
if (contLen > 16000) pContext->connType = RPC_CONN_TCPC;
if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
......@@ -406,7 +408,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pConn->inType = 0;
// response message is released until new response is sent
rpcFreeOutMsg(pConn->pRspMsg);
rpcFreeMsg(pConn->pRspMsg);
pConn->pRspMsg = msg;
pConn->rspMsgLen = msgLen;
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
......@@ -444,6 +446,13 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
strcpy(pInfo->user, pConn->user);
}
static void rpcFreeMsg(void *msg) {
if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext);
free(temp);
}
}
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType) {
SRpcConn *pConn;
......@@ -492,7 +501,7 @@ static void rpcCloseConn(void *thandle) {
char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->peerUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr);
rpcFreeOutMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL;
pConn->inType = 0;
pConn->inTranId = 0;
......@@ -804,7 +813,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pConn = rpcProcessMsgHead(pRpc, pRecv);
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d port:%hu",
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
}
......@@ -824,7 +833,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
}
}
if ( terrno ) free (pRecv->msg);
if (terrno) rpcFreeMsg(pRecv->msg);
return pConn;
}
......@@ -858,7 +867,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
rpcFreeCont(pContext->pCont); // free the request msg
}
}
}
......@@ -972,12 +981,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if ( rpcIsReq(pHead->msgType)) {
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s %p, %s is sent to %s:%hu, len:%d source:0x%08x dest:0x%08x tranId:%d",
tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else {
if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d source:0x%08x dest:0x%08x tranId:%d",
tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
}
......@@ -999,8 +1008,8 @@ static void rpcProcessConnError(void *param, void *id) {
tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
rpcFreeCont(pContext->pCont); // free the request msg
} else {
// move to next IP
pContext->ipSet.inUse++;
......@@ -1130,7 +1139,8 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message
pNewHead = (SRpcHead *)malloc(contLen + RPC_MSG_OVERHEAD);
char *temp = (char *)malloc(contLen + RPC_MSG_OVERHEAD);
pNewHead = (SRpcHead *)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
......@@ -1139,7 +1149,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(origLen);
free(pHead); // free the compressed message buffer
rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead;
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
} else {
......
......@@ -191,8 +191,9 @@ static void taosProcessTcpData(void *param) {
int i, fdNum;
SFdObj * pFdObj;
struct epoll_event events[maxEvents];
SRecvInfo recvInfo;
SRecvInfo recvInfo;
pThreadObj = (SThreadObj *)param;
SRpcHead rpcHead;
while (1) {
pthread_mutex_lock(&pThreadObj->threadMutex);
......@@ -219,24 +220,24 @@ static void taosProcessTcpData(void *param) {
continue;
}
void *buffer = malloc(1024);
int headLen = taosReadMsg(pFdObj->fd, buffer, sizeof(SRpcHead));
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno);
taosCleanUpFdObj(pFdObj);
tfree(buffer);
continue;
}
int dataLen = (int32_t)htonl((uint32_t)((SRpcHead *)buffer)->msgLen);
if (dataLen > 1024) buffer = realloc(buffer, (size_t)dataLen);
int leftLen = dataLen - headLen;
int retLen = taosReadMsg(pFdObj->fd, buffer + headLen, leftLen);
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) {
tError("%s TCP malloc(size:%d) fail\n", pThreadObj->label, msgLen);
taosCleanUpFdObj(pFdObj);
continue;
}
// tTrace("%s TCP data is received, ip:%s port:%u len:%d",
// pThreadObj->label, pFdObj->ipstr, pFdObj->port, dataLen);
char *msg = buffer + tsRpcOverhead;
int32_t leftLen = msgLen - headLen;
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen);
......@@ -245,8 +246,11 @@ static void taosProcessTcpData(void *param) {
continue;
}
recvInfo.msg = buffer;
recvInfo.msgLen = dataLen;
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
memcpy(msg, &rpcHead, sizeof(SRpcHead));
recvInfo.msg = msg;
recvInfo.msgLen = msgLen;
recvInfo.ip = pFdObj->ip;
recvInfo.port = pFdObj->port;
recvInfo.shandle = pThreadObj->shandle;
......
......@@ -40,12 +40,12 @@ typedef struct {
char label[12]; // copy from udpConnSet;
pthread_t thread;
pthread_mutex_t mutex;
void * tmrCtrl; // copy from UdpConnSet;
void * hash;
void * shandle; // handle passed by upper layer during server initialization
void * pSet;
void *(*processData)(SRecvInfo *pRecv);
char buffer[RPC_MAX_UDP_SIZE]; // buffer to receive data
void *tmrCtrl; // copy from UdpConnSet;
void *hash;
void *shandle; // handle passed by upper layer during server initialization
void *pSet;
void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data
} SUdpConn;
typedef struct {
......@@ -96,15 +96,15 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pSet->fp = fp;
strcpy(pSet->label, label);
// if ( tsUdpDelay ) {
char udplabel[12];
sprintf(udplabel, "%s.b", label);
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
if (pSet->tmrCtrl == NULL) {
tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet);
return NULL;
if ( tsUdpDelay ) {
char udplabel[12];
sprintf(udplabel, "%s.b", label);
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
if (pSet->tmrCtrl == NULL) {
tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet);
return NULL;
}
}
// }
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
......@@ -120,6 +120,13 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
return NULL;
}
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
if (NULL == pConn->buffer) {
tError("%s failed to malloc recv buffer", label);
taosCleanUpUdpConnection(pSet);
return NULL;
}
struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
......@@ -128,14 +135,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
}
strcpy(pConn->label, label);
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet);
return NULL;
}
pConn->shandle = shandle;
pConn->processData = fp;
pConn->index = i;
......@@ -146,6 +145,14 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pthread_mutex_init(&pConn->mutex, NULL);
pConn->tmrCtrl = pSet->tmrCtrl;
}
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet);
return NULL;
}
++pSet->threads;
}
......@@ -164,6 +171,7 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
pConn->signature = NULL;
free(pConn->buffer);
pthread_cancel(pConn->thread);
taosCloseSocket(pConn->fd);
if (pConn->hash) {
......@@ -210,7 +218,7 @@ static void *taosRecvUdpData(void *param) {
tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index);
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, sizeof(pConn->buffer), 0, (struct sockaddr *)&sourceAdd, &addLen);
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, ntohs(sourceAdd.sin_port),
dataLen);
......@@ -235,9 +243,15 @@ static void *taosRecvUdpData(void *param) {
break;
}
char *data = malloc((size_t)msgLen);
memcpy(data, msg, (size_t)msgLen);
recvInfo.msg = data;
char *tmsg = malloc((size_t)msgLen + tsRpcOverhead);
if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%d", pConn->label, msgLen);
break;
}
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
memcpy(tmsg, msg, (size_t)msgLen);
recvInfo.msg = tmsg;
recvInfo.msgLen = msgLen;
recvInfo.ip = sourceAdd.sin_addr.s_addr;
recvInfo.port = port;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tlog.h"
#include "tqueue.h"
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
typedef struct {
char label[16];
int num;
tsem_t emptySem;
tsem_t fullSem;
pthread_mutex_t queueMutex;
int fullSlot;
int emptySlot;
int queueSize;
SRpcMsg *queue;
SRpcMsg *oqueue;
pthread_t qthread;
void (*fp)(int num, SRpcMsg *);
} SRpcQueue;
static void *taosProcessMsgQueue(void *param);
void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) {
pthread_attr_t attr;
SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue));
if (pQueue == NULL) {
pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno));
goto _error;
}
memset(pQueue, 0, sizeof(SRpcQueue));
pQueue->queueSize = queueSize;
strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow
pQueue->label[sizeof(pQueue->label)-1] = '\0';
pQueue->fp = fp;
if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) {
pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) {
pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if (tsem_init(&pQueue->fullSem, 0, 0) != 0) {
pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
pQueue->fullSlot = 0;
pQueue->fullSlot = 0;
pQueue->emptySlot = 0;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) {
pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
pTrace("%s RPC msg queue is initialized", pQueue->label);
return (void *)pQueue;
_error:
taosCleanUpMsgQueue(pQueue);
return NULL;
}
void *taosProcessMsgQueue(void *param) {
SRpcQueue *pQueue = (SRpcQueue *)param;
int num = 0;
while (1) {
if (tsem_wait(&pQueue->fullSem) != 0) {
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
pTrace("wait %s fullSem was interrupted", pQueue->label);
continue;
}
pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno));
}
if (pthread_mutex_lock(&pQueue->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
num = 0;
do {
pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot];
pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize;
++num;
pQueue->num--;
} while (pQueue->fullSlot != pQueue->emptySlot);
if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno));
for (int i= 0; i<num; ++i) {
if (tsem_post(&pQueue->emptySem) != 0)
pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno));
}
for (int i=0; i<num-1; ++i) {
if (tsem_wait(&pQueue->fullSem) != 0)
pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno));
}
(*pQueue->fp)(num, pQueue->oqueue);
}
return NULL;
}
int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) {
SRpcQueue *pQueue = (SRpcQueue *)qhandle;
if (pQueue == NULL) {
pError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
}
while (tsem_wait(&pQueue->emptySem) != 0) {
if (errno != EINTR) {
pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno));
break;
}
}
if (pthread_mutex_lock(&pQueue->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
pQueue->queue[pQueue->emptySlot] = *pMsg;
pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize;
pQueue->num++;
if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno));
return 0;
}
void taosCleanUpMsgQueue(void *param) {
SRpcQueue *pQueue = (SRpcQueue *)param;
if (pQueue == NULL) return;
pthread_cancel(pQueue->qthread);
tsem_destroy(&pQueue->emptySem);
tsem_destroy(&pQueue->fullSem);
pthread_mutex_destroy(&pQueue->queueMutex);
free(pQueue->queue);
free(pQueue);
}
......@@ -17,21 +17,35 @@
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include "tqueue.h"
#include <stdint.h>
int msgSize = 128;
int commit = 0;
int dataFd = -1;
void *qhandle = NULL;
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) {
static int num = 0;
tTrace("request is received, type:%d, contLen:%d", type, contLen);
if (dataFd >=0)
write(dataFd, pCont, contLen);
tTrace("%d shell msgs are received", numOfMsgs);
for (int i=0; i<numOfMsgs; ++i) {
if (dataFd >=0) {
if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno));
}
}
void *rsp = rpcMallocCont(msgSize);
rpcSendResponse(pMsg->handle, 1, rsp, msgSize);
rpcFreeCont(pMsg->msg);
pMsg++;
}
if (commit >=2) {
++num;
num += numOfMsgs;
if ( fsync(dataFd) < 0 ) {
tPrint("failed to flush data to file, reason:%s", strerror(errno));
}
......@@ -41,9 +55,6 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
}
}
void *rsp = rpcMallocCont(msgSize);
rpcSendResponse(thandle, 1, rsp, msgSize);
/*
SRpcIpSet ipSet;
......@@ -55,7 +66,17 @@ void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32
rpcSendRedirectRsp(ahandle, &ipSet);
*/
rpcFreeCont(pCont);
}
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
tTrace("request is received, type:%d, contLen:%d", type, contLen);
SRpcMsg rpcMsg;
rpcMsg.msg = pCont;
rpcMsg.msgLen = contLen;
rpcMsg.code = code;
rpcMsg.handle = thandle;
rpcMsg.type = type;
taosPutIntoMsgQueue(qhandle, &rpcMsg);
}
int main(int argc, char *argv[]) {
......@@ -88,6 +109,7 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]);
uDebugFlag = rpcDebugFlag;
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
......@@ -117,11 +139,13 @@ int main(int argc, char *argv[]) {
tPrint("RPC server is running, ctrl-c to exit");
if (commit) {
dataFd = open(dataName, O_APPEND | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
if (dataFd<0)
tPrint("failed to open data file, reason:%s", strerror(errno));
}
qhandle = taosInitMsgQueue(1000, processShellMsg, "SER");
// loop forever
while(1) {
sleep(1);
......
......@@ -178,6 +178,7 @@ extern uint32_t taosMaxTmrCtrl;
extern int tsRpcTimer;
extern int tsRpcMaxTime;
extern int tsRpcMaxUdpSize;
extern int tsUdpDelay;
extern char version[];
extern char compatible_version[];
......
......@@ -199,6 +199,7 @@ int tsIsCluster = 0;
int tsRpcTimer = 300;
int tsRpcMaxTime = 600; // seconds;
int tsRpcMaxUdpSize = 15000; // bytes
char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log";
int tsMonitorInterval = 30; // seconds
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册