diff --git a/src/inc/trpc.h b/src/inc/trpc.h
index 710e9bc5e6a0f2e7f56fc447b1a74ccc8204ec1f..bde863522a5b7cd4b51b03eee9e8e2f634cdc97b 100644
--- a/src/inc/trpc.h
+++ b/src/inc/trpc.h
@@ -42,6 +42,14 @@ typedef struct {
char *user;
} SRpcConnInfo;
+typedef struct {
+ char msgType;
+ void *pCont;
+ int contLen;
+ int32_t code;
+ void *handle;
+} SRpcMsg;
+
typedef struct {
char *localIp; // local IP used
uint16_t localPort; // local port
@@ -59,7 +67,7 @@ typedef struct {
char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app
- void (*cfp)(char type, void *pCont, int contLen, void *handle, int32_t code);
+ void (*cfp)(SRpcMsg *);
// call back to process notify the ipSet changes, for client app only
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
@@ -73,8 +81,8 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
-void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle);
-void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen);
+void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
+void rpcSendResponse(SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
diff --git a/src/rpc/inc/tqueue.h b/src/rpc/inc/tqueue.h
index 09a25e7e93af8d57a320db865ff90e0cb54fa671..97408110d4d8acc9c5b77b0fe296e58c32ec8275 100644
--- a/src/rpc/inc/tqueue.h
+++ b/src/rpc/inc/tqueue.h
@@ -13,27 +13,43 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_TSCHED_H
-#define TDENGINE_TSCHED_H
+#ifndef TAOS_QUEUE_H
+#define TAOS_QUEUE_H
#ifdef __cplusplus
extern "C" {
#endif
-typedef struct _sched_msg {
- void *msg;
- int msgLen;
- int8_t type;
- int32_t code;
- void *handle;
-} SRpcMsg;
+typedef void* taos_queue;
+typedef void* taos_qset;
+typedef void* taos_qall;
-void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label);
-int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg);
-void taosCleanUpMsgQueue(void *param);
+taos_queue taosOpenQueue(int itemSize);
+void taosCloseQueue(taos_queue);
+int taosWriteQitem(taos_queue, void *item);
+int taosReadQitem(taos_queue, void *item);
+
+int taosReadAllQitems(taos_queue, taos_qall *);
+int taosGetQitem(taos_qall, void *item);
+void taosResetQitems(taos_qall);
+void taosFreeQitems(taos_qall);
+
+taos_qset taosOpenQset();
+void taosCloseQset();
+int taosAddIntoQset(taos_qset, taos_queue);
+void taosRemoveFromQset(taos_qset, taos_queue);
+int taosGetQueueNumber(taos_qset);
+
+int taosReadQitemFromQset(taos_qset, void *item);
+int taosReadAllQitemsFromQset(taos_qset, taos_qall *);
+
+int taosGetQueueItemsNumber(taos_queue param);
+int taosGetQsetItemsNumber(taos_qset param);
#ifdef __cplusplus
}
#endif
-#endif // TDENGINE_TSCHED_H
+#endif
+
+
diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c
index e596aaa3e88f89f755446cf25b159db6b91b6343..d8a6e94f0ed9c0256f7d2c2a7879e366f0668fcf 100755
--- a/src/rpc/src/rpcMain.c
+++ b/src/rpc/src/rpcMain.c
@@ -56,7 +56,7 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key
- void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code);
+ void (*cfp)(SRpcMsg *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
@@ -339,25 +339,27 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}
-void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) {
+void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext;
- contLen = rpcCompressRpcMsg(pCont, contLen);
- pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
- pContext->ahandle = ahandle;
+ pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
+ pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
+ pContext->ahandle = pMsg->handle;
pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = *pIpSet;
- pContext->contLen = contLen;
- pContext->pCont = pCont;
- pContext->msgType = type;
+ pContext->contLen = pMsg->contLen;
+ pContext->pCont = pMsg->pCont;
+ pContext->msgType = pMsg->msgType;
pContext->oldInUse = pIpSet->inUse;
pContext->connType = RPC_CONN_UDPC;
- if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
+ if (pMsg->contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
+ char type = pMsg->msgType;
+
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_SHOW )
@@ -368,21 +370,21 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
return;
}
-void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
+void rpcSendResponse(SRpcMsg *pMsg) {
int msgLen = 0;
- SRpcConn *pConn = (SRpcConn *)handle;
+ SRpcConn *pConn = (SRpcConn *)pMsg->handle;
SRpcInfo *pRpc = pConn->pRpc;
- if ( pCont == NULL ) {
- pCont = rpcMallocCont(0);
- contLen = 0;
+ if ( pMsg->pCont == NULL ) {
+ pMsg->pCont = rpcMallocCont(0);
+ pMsg->contLen = 0;
}
- SRpcHead *pHead = rpcHeadFromCont(pCont);
+ SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
char *msg = (char *)pHead;
- contLen = rpcCompressRpcMsg(pCont, contLen);
- msgLen = rpcMsgLenFromCont(contLen);
+ pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
+ msgLen = rpcMsgLenFromCont(pMsg->contLen);
rpcLockConn(pConn);
@@ -402,7 +404,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort);
- pHead->code = htonl(code);
+ pHead->code = htonl(pMsg->code);
// set pConn parameters
pConn->inType = 0;
@@ -416,6 +418,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
rpcUnlockConn(pConn);
taosTmrStopA(&pConn->pTimer);
+ taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
rpcSendMsgToPeer(pConn, msg, msgLen);
pConn->secured = 1; // connection shall be secured
@@ -423,15 +426,18 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
}
void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
- char *pMsg;
- int msgLen = sizeof(SRpcIpSet);
+ SRpcMsg rpcMsg;
+
+ rpcMsg.contLen = sizeof(SRpcIpSet);
+ rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
+ if (rpcMsg.pCont == NULL) return;
- pMsg = rpcMallocCont(msgLen);
- if (pMsg == NULL) return;
+ memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet));
- memcpy(pMsg, pIpSet, sizeof(SRpcIpSet));
+ rpcMsg.code = TSDB_CODE_REDIRECT;
+ rpcMsg.handle = thandle;
- rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen);
+ rpcSendResponse(&rpcMsg);
return;
}
@@ -813,11 +819,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
}
- if (pRpc->connType == TAOS_CONN_SERVER && pConn && pRpc->idleTime) {
- // only for server, starts the idle timer. For client, it is started by cache mgmt
- taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
- }
-
if (terrno != TSDB_CODE_ALREADY_PROCESSED) {
if (terrno != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) {
@@ -835,24 +836,29 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc;
+ SRpcMsg rpcMsg;
pHead = rpcDecompressRpcMsg(pHead);
- int contLen = rpcContLenFromMsg(pHead->msgLen);
- uint8_t *pCont = pHead->content;
+ rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
+ rpcMsg.pCont = pHead->content;
+ rpcMsg.msgType = pHead->msgType;
+ rpcMsg.code = pHead->code;
if ( rpcIsReq(pHead->msgType) ) {
+ rpcMsg.handle = pConn;
pConn->destIp = pHead->destIp;
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
- (*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0);
+ (*(pRpc->cfp))(&rpcMsg);
} else {
// it's a response
- int32_t code = pHead->code;
SRpcReqContext *pContext = pConn->pContext;
+ rpcMsg.handle = pContext->ahandle;
pConn->pContext = NULL;
+
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType);
- if (code == TSDB_CODE_REDIRECT) {
+ if (pHead->code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1;
pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
@@ -861,7 +867,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
} else {
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
- (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code);
+ (*pRpc->cfp)(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg
}
}
@@ -963,8 +969,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
rpcUnlockConn(pConn);
- rpcSendMsgToPeer(pConn, msg, msgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
+ rpcSendMsgToPeer(pConn, msg, msgLen);
}
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
@@ -998,12 +1004,18 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param;
- SRpcInfo *pRpc = pContext->pRpc;
+ SRpcInfo *pRpc = pContext->pRpc;
+ SRpcMsg rpcMsg;
tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
- (*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code);
+ rpcMsg.msgType = pContext->msgType+1;
+ rpcMsg.handle = pContext->ahandle;
+ rpcMsg.code = pContext->code;
+ rpcMsg.pCont = NULL;
+ rpcMsg.contLen = 0;
+ (*(pRpc->cfp))(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg
} else {
// move to next IP
@@ -1070,7 +1082,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
if (pConn->inType && pConn->user[0]) {
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
- taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<retry, pConn, pRpc->tmrCtrl, &pConn->pTimer);
+ taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else {
tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
}
diff --git a/src/rpc/src/tqueue.c b/src/rpc/src/tqueue.c
index 2f6f9ac106df98c6d5da0092ddf42d1a50d19cac..ab440d4819b359d6a140ae5d754c046df560fa7d 100644
--- a/src/rpc/src/tqueue.c
+++ b/src/rpc/src/tqueue.c
@@ -15,176 +15,357 @@
#include "os.h"
#include "tlog.h"
+#include "taoserror.h"
#include "tqueue.h"
-#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
-
-typedef struct {
- char label[16];
- int num;
- tsem_t emptySem;
- tsem_t fullSem;
- pthread_mutex_t queueMutex;
- int fullSlot;
- int emptySlot;
- int queueSize;
- SRpcMsg *queue;
- SRpcMsg *oqueue;
- pthread_t qthread;
- void (*fp)(int num, SRpcMsg *);
-} SRpcQueue;
-
-static void *taosProcessMsgQueue(void *param);
-
-void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) {
- pthread_attr_t attr;
- SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue));
- if (pQueue == NULL) {
- pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno));
- goto _error;
+typedef struct _taos_qnode {
+ struct _taos_qnode *next;
+ char item[];
+} STaosQnode;
+
+typedef struct _taos_q {
+ int itemSize;
+ int numOfItems;
+ struct _taos_qnode *head;
+ struct _taos_qnode *tail;
+ struct _taos_q *next; // for queue set
+ struct _taos_qset *qset; // for queue set
+ pthread_mutex_t mutex;
+} STaosQueue;
+
+typedef struct _taos_qset {
+ STaosQueue *head;
+ STaosQueue *current;
+ pthread_mutex_t mutex;
+ int numOfQueues;
+ int numOfItems;
+} STaosQset;
+
+typedef struct _taos_qall {
+ STaosQnode *current;
+ STaosQnode *start;
+ int itemSize;
+ int numOfItems;
+} STaosQall;
+
+taos_queue taosOpenQueue(int itemSize) {
+
+ STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
+ if (queue == NULL) {
+ terrno = TSDB_CODE_NO_RESOURCE;
+ return NULL;
}
- memset(pQueue, 0, sizeof(SRpcQueue));
- pQueue->queueSize = queueSize;
- strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow
- pQueue->label[sizeof(pQueue->label)-1] = '\0';
- pQueue->fp = fp;
+ pthread_mutex_init(&queue->mutex, NULL);
+ queue->itemSize = itemSize;
- if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) {
- pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno));
- goto _error;
- }
+ return queue;
+}
- if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) {
- pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno));
- goto _error;
- }
+void taosCloseQueue(taos_queue param) {
+ STaosQueue *queue = (STaosQueue *)param;
+ STaosQnode *pTemp;
+ STaosQnode *pNode = queue->head;
+ queue->head = NULL;
- if (tsem_init(&pQueue->fullSem, 0, 0) != 0) {
- pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno));
- goto _error;
- }
+ pthread_mutex_lock(&queue->mutex);
- if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) {
- pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
- goto _error;
- }
-
- memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
+ if (queue->qset) taosRemoveFromQset(queue->qset, queue);
- if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) {
- pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
- goto _error;
+ while (pNode) {
+ pTemp = pNode;
+ pNode = pNode->next;
+ free (pTemp);
}
- memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
+ pthread_mutex_unlock(&queue->mutex);
- pQueue->fullSlot = 0;
- pQueue->fullSlot = 0;
- pQueue->emptySlot = 0;
+ free(queue);
+}
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+int taosWriteQitem(taos_queue param, void *item) {
+ STaosQueue *queue = (STaosQueue *)param;
- if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) {
- pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno));
- goto _error;
+ STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1);
+ if ( pNode == NULL ) {
+ terrno = TSDB_CODE_NO_RESOURCE;
+ return -1;
}
- pTrace("%s RPC msg queue is initialized", pQueue->label);
+ memcpy(pNode->item, item, queue->itemSize);
- return (void *)pQueue;
+ pthread_mutex_lock(&queue->mutex);
-_error:
- taosCleanUpMsgQueue(pQueue);
- return NULL;
+ if (queue->tail) {
+ queue->tail->next = pNode;
+ queue->tail = pNode;
+ } else {
+ queue->head = pNode;
+ queue->tail = pNode;
+ }
+
+ queue->numOfItems++;
+ if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
+
+ pthread_mutex_unlock(&queue->mutex);
}
-void *taosProcessMsgQueue(void *param) {
- SRpcQueue *pQueue = (SRpcQueue *)param;
- int num = 0;
+int taosReadQitem(taos_queue param, void *item) {
+ STaosQueue *queue = (STaosQueue *)param;
+ STaosQnode *pNode = NULL;
+ int code = 0;
- while (1) {
- if (tsem_wait(&pQueue->fullSem) != 0) {
- if (errno == EINTR) {
- /* sem_wait is interrupted by interrupt, ignore and continue */
- pTrace("wait %s fullSem was interrupted", pQueue->label);
- continue;
- }
- pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno));
- }
+ pthread_mutex_lock(&queue->mutex);
- if (pthread_mutex_lock(&pQueue->queueMutex) != 0)
- pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
+ if (queue->head) {
+ pNode = queue->head;
+ memcpy(item, pNode->item, queue->itemSize);
+ queue->head = pNode->next;
+ if (queue->head == NULL)
+ queue->tail = NULL;
+ free(pNode);
+ queue->numOfItems--;
+ if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
+ code = 1;
+ }
- num = 0;
- do {
- pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot];
- pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize;
- ++num;
- pQueue->num--;
- } while (pQueue->fullSlot != pQueue->emptySlot);
+ pthread_mutex_unlock(&queue->mutex);
- if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
- pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno));
+ return code;
+}
- for (int i= 0; iemptySem) != 0)
- pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno));
+int taosReadAllQitems(taos_queue param, taos_qall *res) {
+ STaosQueue *queue = (STaosQueue *)param;
+ STaosQall *qall = NULL;
+ int code = 0;
+
+ pthread_mutex_lock(&queue->mutex);
+
+ if (queue->head) {
+ qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
+ if ( qall == NULL ) {
+ terrno = TSDB_CODE_NO_RESOURCE;
+ code = -1;
+ } else {
+ qall->current = queue->head;
+ qall->start = queue->head;
+ qall->numOfItems = queue->numOfItems;
+ qall->itemSize = queue->itemSize;
+ code = qall->numOfItems;
+
+ queue->head = NULL;
+ queue->tail = NULL;
+ queue->numOfItems = 0;
+ if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
}
+ }
- for (int i=0; ifullSem) != 0)
- pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno));
- }
+ pthread_mutex_unlock(&queue->mutex);
+
+ *res = qall;
+ return code;
+}
- (*pQueue->fp)(num, pQueue->oqueue);
+int taosGetQitem(taos_qall param, void *item) {
+ STaosQall *qall = (STaosQall *)param;
+ STaosQnode *pNode;
+ int num = 0;
+ pNode = qall->current;
+ if (pNode)
+ qall->current = pNode->next;
+
+ if (pNode) {
+ memcpy(item, pNode->item, qall->itemSize);
+ num = 1;
}
- return NULL;
+ return num;
}
-int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) {
- SRpcQueue *pQueue = (SRpcQueue *)qhandle;
- if (pQueue == NULL) {
- pError("sched is not ready, msg:%p is dropped", pMsg);
- return 0;
+void taosResetQitems(taos_qall param) {
+ STaosQall *qall = (STaosQall *)param;
+ qall->current = qall->start;
+}
+
+void taosFreeQitems(taos_qall param) {
+ STaosQall *qall = (STaosQall *)param;
+ STaosQnode *pNode;
+
+ while (qall->current) {
+ pNode = qall->current;
+ qall->current = pNode->next;
+ free(pNode);
}
- while (tsem_wait(&pQueue->emptySem) != 0) {
- if (errno != EINTR) {
- pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno));
- break;
- }
+ free(qall);
+}
+
+taos_qset taosOpenQset() {
+
+ STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
+ if (qset == NULL) {
+ terrno = TSDB_CODE_NO_RESOURCE;
+ return NULL;
}
- if (pthread_mutex_lock(&pQueue->queueMutex) != 0)
- pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
+ pthread_mutex_init(&qset->mutex, NULL);
- pQueue->queue[pQueue->emptySlot] = *pMsg;
- pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize;
- pQueue->num++;
-
- if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
- pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
+ return qset;
+}
- if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno));
+void taosCloseQset(taos_qset param) {
+ STaosQset *qset = (STaosQset *)param;
+ free(qset);
+}
+
+int taosAddIntoQset(taos_qset p1, taos_queue p2) {
+ STaosQueue *queue = (STaosQueue *)p2;
+ STaosQset *qset = (STaosQset *)p1;
+
+ if (queue->qset) return -1;
+
+ pthread_mutex_lock(&qset->mutex);
+
+ queue->next = qset->head;
+ qset->head = queue;
+ qset->numOfQueues++;
+
+ pthread_mutex_lock(&queue->mutex);
+ atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
+ queue->qset = qset;
+ pthread_mutex_unlock(&queue->mutex);
+
+ pthread_mutex_unlock(&qset->mutex);
return 0;
}
-void taosCleanUpMsgQueue(void *param) {
- SRpcQueue *pQueue = (SRpcQueue *)param;
- if (pQueue == NULL) return;
+void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
+ STaosQueue *queue = (STaosQueue *)p2;
+ STaosQset *qset = (STaosQset *)p1;
+
+ STaosQueue *tqueue;
+
+ pthread_mutex_lock(&qset->mutex);
+
+ if (qset->head) {
+ if (qset->head == queue) {
+ qset->head = qset->head->next;
+ qset->numOfQueues--;
+ } else {
+ STaosQueue *prev = qset->head;
+ tqueue = qset->head->next;
+ while (tqueue) {
+ if (tqueue== queue) {
+ prev->next = tqueue->next;
+ if (qset->current == queue) qset->current = tqueue->next;
+ qset->numOfQueues--;
+
+ pthread_mutex_lock(&queue->mutex);
+ atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
+ queue->qset = NULL;
+ pthread_mutex_unlock(&queue->mutex);
+ } else {
+ prev = tqueue;
+ tqueue = tqueue->next;
+ }
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&qset->mutex);
+}
+
+int taosGetQueueNumber(taos_qset param) {
+ return ((STaosQset *)param)->numOfQueues;
+}
+
+int taosReadQitemFromQset(taos_qset param, void *item) {
+ STaosQset *qset = (STaosQset *)param;
+ STaosQnode *pNode = NULL;
+ int code = 0;
+
+ for(int i=0; inumOfQueues; ++i) {
+ pthread_mutex_lock(&qset->mutex);
+ if (qset->current == NULL)
+ qset->current = qset->head;
+ STaosQueue *queue = qset->current;
+ qset->current = queue->next;
+ pthread_mutex_unlock(&qset->mutex);
+
+ pthread_mutex_lock(&queue->mutex);
+
+ if (queue->head) {
+ pNode = queue->head;
+ memcpy(item, pNode->item, queue->itemSize);
+ queue->head = pNode->next;
+ if (queue->head == NULL)
+ queue->tail = NULL;
+ free(pNode);
+ queue->numOfItems--;
+ atomic_sub_fetch_32(&qset->numOfItems, 1);
+ code = 1;
+ }
+
+ pthread_mutex_unlock(&queue->mutex);
+ if (pNode) break;
+ }
+
+ return code;
+}
+
+int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
+ STaosQset *qset = (STaosQset *)param;
+ STaosQueue *queue;
+ STaosQall *qall = NULL;
+ int code = 0;
+
+ for(int i=0; inumOfQueues; ++i) {
+ pthread_mutex_lock(&qset->mutex);
+ if (qset->current == NULL)
+ qset->current = qset->head;
+ queue = qset->current;
+ qset->current = queue->next;
+ pthread_mutex_unlock(&qset->mutex);
+
+ pthread_mutex_lock(&queue->mutex);
+
+ if (queue->head) {
+ qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
+ if (qall == NULL) {
+ terrno = TSDB_CODE_NO_RESOURCE;
+ code = -1;
+ } else {
+ qall->current = queue->head;
+ qall->start = queue->head;
+ qall->numOfItems = queue->numOfItems;
+ qall->itemSize = queue->itemSize;
+ code = qall->numOfItems;
+
+ queue->head = NULL;
+ queue->tail = NULL;
+ queue->numOfItems = 0;
+ atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
+ }
+ }
+
+ pthread_mutex_unlock(&queue->mutex);
+
+ if (code != 0) break;
+ }
- pthread_cancel(pQueue->qthread);
+ *res = qall;
- tsem_destroy(&pQueue->emptySem);
- tsem_destroy(&pQueue->fullSem);
- pthread_mutex_destroy(&pQueue->queueMutex);
+ return code;
+}
- free(pQueue->queue);
- free(pQueue);
+int taosGetQueueItemsNumber(taos_queue param) {
+ STaosQueue *queue = (STaosQueue *)param;
+ return queue->numOfItems;
}
+int taosGetQsetItemsNumber(taos_qset param) {
+ STaosQset *qset = (STaosQset *)param;
+ return qset->numOfItems;
+}
diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c
index aa97535e314b41da4a33c8950855ca87c65ce40a..562d0fff96acab47d8121e21ed10e9fb984c105f 100644
--- a/src/rpc/test/rclient.c
+++ b/src/rpc/test/rclient.c
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
#include "os.h"
#include "tlog.h"
#include "trpc.h"
@@ -39,11 +40,11 @@ typedef struct {
void *pRpc;
} SInfo;
-void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
- SInfo *pInfo = (SInfo *)ahandle;
- tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, type, contLen, code);
+void processResponse(SRpcMsg *pMsg) {
+ SInfo *pInfo = (SInfo *)pMsg->handle;
+ tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
- if (pCont) rpcFreeCont(pCont);
+ rpcFreeCont(pMsg->pCont);
sem_post(&pInfo->rspSem);
}
@@ -58,16 +59,19 @@ void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
int tcount = 0;
void *sendRequest(void *param) {
- SInfo *pInfo = (SInfo *)param;
- char *cont;
+ SInfo *pInfo = (SInfo *)param;
+ SRpcMsg rpcMsg;
tTrace("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++;
- cont = rpcMallocCont(pInfo->msgSize);
+ rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
+ rpcMsg.contLen = pInfo->msgSize;
+ rpcMsg.handle = pInfo;
+ rpcMsg.msgType = 1;
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
- rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, 1, cont, pInfo->msgSize, pInfo);
+ rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg);
if ( pInfo->num % 20000 == 0 )
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
sem_wait(&pInfo->rspSem);
@@ -161,7 +165,6 @@ int main(int argc, char *argv[]) {
}
taosInitLog("client.log", 100000, 10);
- tPrint("rpcDebugFlag:%d", rpcDebugFlag);
void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) {
diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c
index 718b0c5b6e1112d3995dcdd32c0063ebc83c080a..d9e5da51a67f34c7eeeff3c148e93b26defdc9c4 100644
--- a/src/rpc/test/rserver.c
+++ b/src/rpc/test/rserver.c
@@ -25,37 +25,56 @@ int commit = 0;
int dataFd = -1;
void *qhandle = NULL;
-void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) {
+void processShellMsg() {
static int num = 0;
+ taos_qall qall;
+ SRpcMsg rpcMsg;
+
+ while (1) {
+ int numOfMsgs = taosReadAllQitems(qhandle, &qall);
+ if (numOfMsgs <= 0) {
+ usleep(1000);
+ continue;
+ }
+
+ tTrace("%d shell msgs are received", numOfMsgs);
+
+ for (int i=0; i=0) {
+ if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) {
+ tPrint("failed to write data file, reason:%s", strerror(errno));
+ }
+ }
+ }
- tTrace("%d shell msgs are received", numOfMsgs);
+ if (commit >=2) {
+ num += numOfMsgs;
+ if ( fsync(dataFd) < 0 ) {
+ tPrint("failed to flush data to file, reason:%s", strerror(errno));
+ }
- for (int i=0; i=0) {
- if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) {
- tPrint("failed to write data file, reason:%s", strerror(errno));
+ if (num % 10000 == 0) {
+ tPrint("%d request have been written into disk", num);
}
}
-
- void *rsp = rpcMallocCont(msgSize);
- rpcSendResponse(pMsg->handle, 1, rsp, msgSize);
- rpcFreeCont(pMsg->msg);
- pMsg++;
- }
-
- if (commit >=2) {
- num += numOfMsgs;
- if ( fsync(dataFd) < 0 ) {
- tPrint("failed to flush data to file, reason:%s", strerror(errno));
+
+ taosResetQitems(qall);
+ for (int i=0; imsgType, pMsg->contLen);
+ taosWriteQitem(qhandle, pMsg);
}
int main(int argc, char *argv[]) {
@@ -165,12 +178,9 @@ int main(int argc, char *argv[]) {
tPrint("failed to open data file, reason:%s", strerror(errno));
}
- qhandle = taosInitMsgQueue(1000, processShellMsg, "SER");
+ qhandle = taosOpenQueue(sizeof(SRpcMsg));
- // loop forever
- while(1) {
- sleep(1);
- }
+ processShellMsg();
if (dataFd >= 0) {
close(dataFd);