From cebd07dba0c0d1b0dc4747b75264246b38b7b78c Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 7 Mar 2020 22:57:25 +0800 Subject: [PATCH] new version for tqueue.c --- src/inc/trpc.h | 14 +- src/rpc/inc/tqueue.h | 42 ++-- src/rpc/src/rpcMain.c | 90 +++++---- src/rpc/src/tqueue.c | 431 +++++++++++++++++++++++++++++------------ src/rpc/test/rclient.c | 21 +- src/rpc/test/rserver.c | 82 ++++---- 6 files changed, 455 insertions(+), 225 deletions(-) diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 710e9bc5e6..bde863522a 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -42,6 +42,14 @@ typedef struct { char *user; } SRpcConnInfo; +typedef struct { + char msgType; + void *pCont; + int contLen; + int32_t code; + void *handle; +} SRpcMsg; + typedef struct { char *localIp; // local IP used uint16_t localPort; // local port @@ -59,7 +67,7 @@ typedef struct { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(char type, void *pCont, int contLen, void *handle, int32_t code); + void (*cfp)(SRpcMsg *); // call back to process notify the ipSet changes, for client app only void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); @@ -73,8 +81,8 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); -void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); +void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); +void rpcSendResponse(SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); diff --git a/src/rpc/inc/tqueue.h b/src/rpc/inc/tqueue.h index 09a25e7e93..97408110d4 100644 --- a/src/rpc/inc/tqueue.h +++ b/src/rpc/inc/tqueue.h @@ -13,27 +13,43 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TSCHED_H -#define TDENGINE_TSCHED_H +#ifndef TAOS_QUEUE_H +#define TAOS_QUEUE_H #ifdef __cplusplus extern "C" { #endif -typedef struct _sched_msg { - void *msg; - int msgLen; - int8_t type; - int32_t code; - void *handle; -} SRpcMsg; +typedef void* taos_queue; +typedef void* taos_qset; +typedef void* taos_qall; -void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label); -int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg); -void taosCleanUpMsgQueue(void *param); +taos_queue taosOpenQueue(int itemSize); +void taosCloseQueue(taos_queue); +int taosWriteQitem(taos_queue, void *item); +int taosReadQitem(taos_queue, void *item); + +int taosReadAllQitems(taos_queue, taos_qall *); +int taosGetQitem(taos_qall, void *item); +void taosResetQitems(taos_qall); +void taosFreeQitems(taos_qall); + +taos_qset taosOpenQset(); +void taosCloseQset(); +int taosAddIntoQset(taos_qset, taos_queue); +void taosRemoveFromQset(taos_qset, taos_queue); +int taosGetQueueNumber(taos_qset); + +int taosReadQitemFromQset(taos_qset, void *item); +int taosReadAllQitemsFromQset(taos_qset, taos_qall *); + +int taosGetQueueItemsNumber(taos_queue param); +int taosGetQsetItemsNumber(taos_qset param); #ifdef __cplusplus } #endif -#endif // TDENGINE_TSCHED_H +#endif + + diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index e596aaa3e8..d8a6e94f0e 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -56,7 +56,7 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); + void (*cfp)(SRpcMsg *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); @@ -339,25 +339,27 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) { +void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; - contLen = rpcCompressRpcMsg(pCont, contLen); - pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); - pContext->ahandle = ahandle; + pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + pContext->ahandle = pMsg->handle; pContext->pRpc = (SRpcInfo *)shandle; pContext->ipSet = *pIpSet; - pContext->contLen = contLen; - pContext->pCont = pCont; - pContext->msgType = type; + pContext->contLen = pMsg->contLen; + pContext->pCont = pMsg->pCont; + pContext->msgType = pMsg->msgType; pContext->oldInUse = pIpSet->inUse; pContext->connType = RPC_CONN_UDPC; - if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; + if (pMsg->contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection + char type = pMsg->msgType; + if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_SHOW ) @@ -368,21 +370,21 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in return; } -void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { +void rpcSendResponse(SRpcMsg *pMsg) { int msgLen = 0; - SRpcConn *pConn = (SRpcConn *)handle; + SRpcConn *pConn = (SRpcConn *)pMsg->handle; SRpcInfo *pRpc = pConn->pRpc; - if ( pCont == NULL ) { - pCont = rpcMallocCont(0); - contLen = 0; + if ( pMsg->pCont == NULL ) { + pMsg->pCont = rpcMallocCont(0); + pMsg->contLen = 0; } - SRpcHead *pHead = rpcHeadFromCont(pCont); + SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); char *msg = (char *)pHead; - contLen = rpcCompressRpcMsg(pCont, contLen); - msgLen = rpcMsgLenFromCont(contLen); + pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + msgLen = rpcMsgLenFromCont(pMsg->contLen); rpcLockConn(pConn); @@ -402,7 +404,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { pHead->destId = pConn->peerId; pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); - pHead->code = htonl(code); + pHead->code = htonl(pMsg->code); // set pConn parameters pConn->inType = 0; @@ -416,6 +418,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { rpcUnlockConn(pConn); taosTmrStopA(&pConn->pTimer); + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured @@ -423,15 +426,18 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { } void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { - char *pMsg; - int msgLen = sizeof(SRpcIpSet); + SRpcMsg rpcMsg; + + rpcMsg.contLen = sizeof(SRpcIpSet); + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + if (rpcMsg.pCont == NULL) return; - pMsg = rpcMallocCont(msgLen); - if (pMsg == NULL) return; + memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet)); - memcpy(pMsg, pIpSet, sizeof(SRpcIpSet)); + rpcMsg.code = TSDB_CODE_REDIRECT; + rpcMsg.handle = thandle; - rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen); + rpcSendResponse(&rpcMsg); return; } @@ -813,11 +819,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } - if (pRpc->connType == TAOS_CONN_SERVER && pConn && pRpc->idleTime) { - // only for server, starts the idle timer. For client, it is started by cache mgmt - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); - } - if (terrno != TSDB_CODE_ALREADY_PROCESSED) { if (terrno != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { @@ -835,24 +836,29 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; + SRpcMsg rpcMsg; pHead = rpcDecompressRpcMsg(pHead); - int contLen = rpcContLenFromMsg(pHead->msgLen); - uint8_t *pCont = pHead->content; + rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); + rpcMsg.pCont = pHead->content; + rpcMsg.msgType = pHead->msgType; + rpcMsg.code = pHead->code; if ( rpcIsReq(pHead->msgType) ) { + rpcMsg.handle = pConn; pConn->destIp = pHead->destIp; taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); - (*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0); + (*(pRpc->cfp))(&rpcMsg); } else { // it's a response - int32_t code = pHead->code; SRpcReqContext *pContext = pConn->pContext; + rpcMsg.handle = pContext->ahandle; pConn->pContext = NULL; + // for UDP, port may be changed by server, the port in ipSet shall be used for cache rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType); - if (code == TSDB_CODE_REDIRECT) { + if (pHead->code == TSDB_CODE_REDIRECT) { pContext->redirect = 1; pContext->numOfTry = 0; memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); @@ -861,7 +867,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } else { if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet - (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); + (*pRpc->cfp)(&rpcMsg); rpcFreeCont(pContext->pCont); // free the request msg } } @@ -963,8 +969,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { rpcUnlockConn(pConn); - rpcSendMsgToPeer(pConn, msg, msgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); + rpcSendMsgToPeer(pConn, msg, msgLen); } static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { @@ -998,12 +1004,18 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcProcessConnError(void *param, void *id) { SRpcReqContext *pContext = (SRpcReqContext *)param; - SRpcInfo *pRpc = pContext->pRpc; + SRpcInfo *pRpc = pContext->pRpc; + SRpcMsg rpcMsg; tTrace("%s connection error happens", pRpc->label); if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { - (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); + rpcMsg.msgType = pContext->msgType+1; + rpcMsg.handle = pContext->ahandle; + rpcMsg.code = pContext->code; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; + (*(pRpc->cfp))(&rpcMsg); rpcFreeCont(pContext->pCont); // free the request msg } else { // move to next IP @@ -1070,7 +1082,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { if (pConn->inType && pConn->user[0]) { tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); - taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); + taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); } diff --git a/src/rpc/src/tqueue.c b/src/rpc/src/tqueue.c index 2f6f9ac106..ab440d4819 100644 --- a/src/rpc/src/tqueue.c +++ b/src/rpc/src/tqueue.c @@ -15,176 +15,357 @@ #include "os.h" #include "tlog.h" +#include "taoserror.h" #include "tqueue.h" -#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. - -typedef struct { - char label[16]; - int num; - tsem_t emptySem; - tsem_t fullSem; - pthread_mutex_t queueMutex; - int fullSlot; - int emptySlot; - int queueSize; - SRpcMsg *queue; - SRpcMsg *oqueue; - pthread_t qthread; - void (*fp)(int num, SRpcMsg *); -} SRpcQueue; - -static void *taosProcessMsgQueue(void *param); - -void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) { - pthread_attr_t attr; - SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue)); - if (pQueue == NULL) { - pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno)); - goto _error; +typedef struct _taos_qnode { + struct _taos_qnode *next; + char item[]; +} STaosQnode; + +typedef struct _taos_q { + int itemSize; + int numOfItems; + struct _taos_qnode *head; + struct _taos_qnode *tail; + struct _taos_q *next; // for queue set + struct _taos_qset *qset; // for queue set + pthread_mutex_t mutex; +} STaosQueue; + +typedef struct _taos_qset { + STaosQueue *head; + STaosQueue *current; + pthread_mutex_t mutex; + int numOfQueues; + int numOfItems; +} STaosQset; + +typedef struct _taos_qall { + STaosQnode *current; + STaosQnode *start; + int itemSize; + int numOfItems; +} STaosQall; + +taos_queue taosOpenQueue(int itemSize) { + + STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1); + if (queue == NULL) { + terrno = TSDB_CODE_NO_RESOURCE; + return NULL; } - memset(pQueue, 0, sizeof(SRpcQueue)); - pQueue->queueSize = queueSize; - strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow - pQueue->label[sizeof(pQueue->label)-1] = '\0'; - pQueue->fp = fp; + pthread_mutex_init(&queue->mutex, NULL); + queue->itemSize = itemSize; - if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) { - pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno)); - goto _error; - } + return queue; +} - if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) { - pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno)); - goto _error; - } +void taosCloseQueue(taos_queue param) { + STaosQueue *queue = (STaosQueue *)param; + STaosQnode *pTemp; + STaosQnode *pNode = queue->head; + queue->head = NULL; - if (tsem_init(&pQueue->fullSem, 0, 0) != 0) { - pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno)); - goto _error; - } + pthread_mutex_lock(&queue->mutex); - if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { - pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); - goto _error; - } - - memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); + if (queue->qset) taosRemoveFromQset(queue->qset, queue); - if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { - pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); - goto _error; + while (pNode) { + pTemp = pNode; + pNode = pNode->next; + free (pTemp); } - memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); + pthread_mutex_unlock(&queue->mutex); - pQueue->fullSlot = 0; - pQueue->fullSlot = 0; - pQueue->emptySlot = 0; + free(queue); +} - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); +int taosWriteQitem(taos_queue param, void *item) { + STaosQueue *queue = (STaosQueue *)param; - if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) { - pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno)); - goto _error; + STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1); + if ( pNode == NULL ) { + terrno = TSDB_CODE_NO_RESOURCE; + return -1; } - pTrace("%s RPC msg queue is initialized", pQueue->label); + memcpy(pNode->item, item, queue->itemSize); - return (void *)pQueue; + pthread_mutex_lock(&queue->mutex); -_error: - taosCleanUpMsgQueue(pQueue); - return NULL; + if (queue->tail) { + queue->tail->next = pNode; + queue->tail = pNode; + } else { + queue->head = pNode; + queue->tail = pNode; + } + + queue->numOfItems++; + if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); + + pthread_mutex_unlock(&queue->mutex); } -void *taosProcessMsgQueue(void *param) { - SRpcQueue *pQueue = (SRpcQueue *)param; - int num = 0; +int taosReadQitem(taos_queue param, void *item) { + STaosQueue *queue = (STaosQueue *)param; + STaosQnode *pNode = NULL; + int code = 0; - while (1) { - if (tsem_wait(&pQueue->fullSem) != 0) { - if (errno == EINTR) { - /* sem_wait is interrupted by interrupt, ignore and continue */ - pTrace("wait %s fullSem was interrupted", pQueue->label); - continue; - } - pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno)); - } + pthread_mutex_lock(&queue->mutex); - if (pthread_mutex_lock(&pQueue->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + if (queue->head) { + pNode = queue->head; + memcpy(item, pNode->item, queue->itemSize); + queue->head = pNode->next; + if (queue->head == NULL) + queue->tail = NULL; + free(pNode); + queue->numOfItems--; + if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); + code = 1; + } - num = 0; - do { - pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot]; - pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize; - ++num; - pQueue->num--; - } while (pQueue->fullSlot != pQueue->emptySlot); + pthread_mutex_unlock(&queue->mutex); - if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno)); + return code; +} - for (int i= 0; iemptySem) != 0) - pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno)); +int taosReadAllQitems(taos_queue param, taos_qall *res) { + STaosQueue *queue = (STaosQueue *)param; + STaosQall *qall = NULL; + int code = 0; + + pthread_mutex_lock(&queue->mutex); + + if (queue->head) { + qall = (STaosQall *) calloc(sizeof(STaosQall), 1); + if ( qall == NULL ) { + terrno = TSDB_CODE_NO_RESOURCE; + code = -1; + } else { + qall->current = queue->head; + qall->start = queue->head; + qall->numOfItems = queue->numOfItems; + qall->itemSize = queue->itemSize; + code = qall->numOfItems; + + queue->head = NULL; + queue->tail = NULL; + queue->numOfItems = 0; + if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } + } - for (int i=0; ifullSem) != 0) - pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno)); - } + pthread_mutex_unlock(&queue->mutex); + + *res = qall; + return code; +} - (*pQueue->fp)(num, pQueue->oqueue); +int taosGetQitem(taos_qall param, void *item) { + STaosQall *qall = (STaosQall *)param; + STaosQnode *pNode; + int num = 0; + pNode = qall->current; + if (pNode) + qall->current = pNode->next; + + if (pNode) { + memcpy(item, pNode->item, qall->itemSize); + num = 1; } - return NULL; + return num; } -int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) { - SRpcQueue *pQueue = (SRpcQueue *)qhandle; - if (pQueue == NULL) { - pError("sched is not ready, msg:%p is dropped", pMsg); - return 0; +void taosResetQitems(taos_qall param) { + STaosQall *qall = (STaosQall *)param; + qall->current = qall->start; +} + +void taosFreeQitems(taos_qall param) { + STaosQall *qall = (STaosQall *)param; + STaosQnode *pNode; + + while (qall->current) { + pNode = qall->current; + qall->current = pNode->next; + free(pNode); } - while (tsem_wait(&pQueue->emptySem) != 0) { - if (errno != EINTR) { - pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno)); - break; - } + free(qall); +} + +taos_qset taosOpenQset() { + + STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1); + if (qset == NULL) { + terrno = TSDB_CODE_NO_RESOURCE; + return NULL; } - if (pthread_mutex_lock(&pQueue->queueMutex) != 0) - pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + pthread_mutex_init(&qset->mutex, NULL); - pQueue->queue[pQueue->emptySlot] = *pMsg; - pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize; - pQueue->num++; - - if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) - pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); + return qset; +} - if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno)); +void taosCloseQset(taos_qset param) { + STaosQset *qset = (STaosQset *)param; + free(qset); +} + +int taosAddIntoQset(taos_qset p1, taos_queue p2) { + STaosQueue *queue = (STaosQueue *)p2; + STaosQset *qset = (STaosQset *)p1; + + if (queue->qset) return -1; + + pthread_mutex_lock(&qset->mutex); + + queue->next = qset->head; + qset->head = queue; + qset->numOfQueues++; + + pthread_mutex_lock(&queue->mutex); + atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems); + queue->qset = qset; + pthread_mutex_unlock(&queue->mutex); + + pthread_mutex_unlock(&qset->mutex); return 0; } -void taosCleanUpMsgQueue(void *param) { - SRpcQueue *pQueue = (SRpcQueue *)param; - if (pQueue == NULL) return; +void taosRemoveFromQset(taos_qset p1, taos_queue p2) { + STaosQueue *queue = (STaosQueue *)p2; + STaosQset *qset = (STaosQset *)p1; + + STaosQueue *tqueue; + + pthread_mutex_lock(&qset->mutex); + + if (qset->head) { + if (qset->head == queue) { + qset->head = qset->head->next; + qset->numOfQueues--; + } else { + STaosQueue *prev = qset->head; + tqueue = qset->head->next; + while (tqueue) { + if (tqueue== queue) { + prev->next = tqueue->next; + if (qset->current == queue) qset->current = tqueue->next; + qset->numOfQueues--; + + pthread_mutex_lock(&queue->mutex); + atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems); + queue->qset = NULL; + pthread_mutex_unlock(&queue->mutex); + } else { + prev = tqueue; + tqueue = tqueue->next; + } + } + } + } + + pthread_mutex_unlock(&qset->mutex); +} + +int taosGetQueueNumber(taos_qset param) { + return ((STaosQset *)param)->numOfQueues; +} + +int taosReadQitemFromQset(taos_qset param, void *item) { + STaosQset *qset = (STaosQset *)param; + STaosQnode *pNode = NULL; + int code = 0; + + for(int i=0; inumOfQueues; ++i) { + pthread_mutex_lock(&qset->mutex); + if (qset->current == NULL) + qset->current = qset->head; + STaosQueue *queue = qset->current; + qset->current = queue->next; + pthread_mutex_unlock(&qset->mutex); + + pthread_mutex_lock(&queue->mutex); + + if (queue->head) { + pNode = queue->head; + memcpy(item, pNode->item, queue->itemSize); + queue->head = pNode->next; + if (queue->head == NULL) + queue->tail = NULL; + free(pNode); + queue->numOfItems--; + atomic_sub_fetch_32(&qset->numOfItems, 1); + code = 1; + } + + pthread_mutex_unlock(&queue->mutex); + if (pNode) break; + } + + return code; +} + +int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { + STaosQset *qset = (STaosQset *)param; + STaosQueue *queue; + STaosQall *qall = NULL; + int code = 0; + + for(int i=0; inumOfQueues; ++i) { + pthread_mutex_lock(&qset->mutex); + if (qset->current == NULL) + qset->current = qset->head; + queue = qset->current; + qset->current = queue->next; + pthread_mutex_unlock(&qset->mutex); + + pthread_mutex_lock(&queue->mutex); + + if (queue->head) { + qall = (STaosQall *) calloc(sizeof(STaosQall), 1); + if (qall == NULL) { + terrno = TSDB_CODE_NO_RESOURCE; + code = -1; + } else { + qall->current = queue->head; + qall->start = queue->head; + qall->numOfItems = queue->numOfItems; + qall->itemSize = queue->itemSize; + code = qall->numOfItems; + + queue->head = NULL; + queue->tail = NULL; + queue->numOfItems = 0; + atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); + } + } + + pthread_mutex_unlock(&queue->mutex); + + if (code != 0) break; + } - pthread_cancel(pQueue->qthread); + *res = qall; - tsem_destroy(&pQueue->emptySem); - tsem_destroy(&pQueue->fullSem); - pthread_mutex_destroy(&pQueue->queueMutex); + return code; +} - free(pQueue->queue); - free(pQueue); +int taosGetQueueItemsNumber(taos_queue param) { + STaosQueue *queue = (STaosQueue *)param; + return queue->numOfItems; } +int taosGetQsetItemsNumber(taos_qset param) { + STaosQset *qset = (STaosQset *)param; + return qset->numOfItems; +} diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index aa97535e31..562d0fff96 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "os.h" #include "tlog.h" #include "trpc.h" @@ -39,11 +40,11 @@ typedef struct { void *pRpc; } SInfo; -void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t code) { - SInfo *pInfo = (SInfo *)ahandle; - tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, type, contLen, code); +void processResponse(SRpcMsg *pMsg) { + SInfo *pInfo = (SInfo *)pMsg->handle; + tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - if (pCont) rpcFreeCont(pCont); + rpcFreeCont(pMsg->pCont); sem_post(&pInfo->rspSem); } @@ -58,16 +59,19 @@ void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { int tcount = 0; void *sendRequest(void *param) { - SInfo *pInfo = (SInfo *)param; - char *cont; + SInfo *pInfo = (SInfo *)param; + SRpcMsg rpcMsg; tTrace("thread:%d, start to send request", pInfo->index); while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { pInfo->num++; - cont = rpcMallocCont(pInfo->msgSize); + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.handle = pInfo; + rpcMsg.msgType = 1; tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); - rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, 1, cont, pInfo->msgSize, pInfo); + rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg); if ( pInfo->num % 20000 == 0 ) tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); sem_wait(&pInfo->rspSem); @@ -161,7 +165,6 @@ int main(int argc, char *argv[]) { } taosInitLog("client.log", 100000, 10); - tPrint("rpcDebugFlag:%d", rpcDebugFlag); void *pRpc = rpcOpen(&rpcInit); if (pRpc == NULL) { diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 718b0c5b6e..d9e5da51a6 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -25,37 +25,56 @@ int commit = 0; int dataFd = -1; void *qhandle = NULL; -void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) { +void processShellMsg() { static int num = 0; + taos_qall qall; + SRpcMsg rpcMsg; + + while (1) { + int numOfMsgs = taosReadAllQitems(qhandle, &qall); + if (numOfMsgs <= 0) { + usleep(1000); + continue; + } + + tTrace("%d shell msgs are received", numOfMsgs); + + for (int i=0; i=0) { + if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { + tPrint("failed to write data file, reason:%s", strerror(errno)); + } + } + } - tTrace("%d shell msgs are received", numOfMsgs); + if (commit >=2) { + num += numOfMsgs; + if ( fsync(dataFd) < 0 ) { + tPrint("failed to flush data to file, reason:%s", strerror(errno)); + } - for (int i=0; i=0) { - if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) { - tPrint("failed to write data file, reason:%s", strerror(errno)); + if (num % 10000 == 0) { + tPrint("%d request have been written into disk", num); } } - - void *rsp = rpcMallocCont(msgSize); - rpcSendResponse(pMsg->handle, 1, rsp, msgSize); - rpcFreeCont(pMsg->msg); - pMsg++; - } - - if (commit >=2) { - num += numOfMsgs; - if ( fsync(dataFd) < 0 ) { - tPrint("failed to flush data to file, reason:%s", strerror(errno)); + + taosResetQitems(qall); + for (int i=0; imsgType, pMsg->contLen); + taosWriteQitem(qhandle, pMsg); } int main(int argc, char *argv[]) { @@ -165,12 +178,9 @@ int main(int argc, char *argv[]) { tPrint("failed to open data file, reason:%s", strerror(errno)); } - qhandle = taosInitMsgQueue(1000, processShellMsg, "SER"); + qhandle = taosOpenQueue(sizeof(SRpcMsg)); - // loop forever - while(1) { - sleep(1); - } + processShellMsg(); if (dataFd >= 0) { close(dataFd); -- GitLab