提交 cebd07db 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

new version for tqueue.c

上级 5049b99b
...@@ -42,6 +42,14 @@ typedef struct { ...@@ -42,6 +42,14 @@ typedef struct {
char *user; char *user;
} SRpcConnInfo; } SRpcConnInfo;
typedef struct {
char msgType;
void *pCont;
int contLen;
int32_t code;
void *handle;
} SRpcMsg;
typedef struct { typedef struct {
char *localIp; // local IP used char *localIp; // local IP used
uint16_t localPort; // local port uint16_t localPort; // local port
...@@ -59,7 +67,7 @@ typedef struct { ...@@ -59,7 +67,7 @@ typedef struct {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(char type, void *pCont, int contLen, void *handle, int32_t code); void (*cfp)(SRpcMsg *);
// call back to process notify the ipSet changes, for client app only // call back to process notify the ipSet changes, for client app only
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
...@@ -73,8 +81,8 @@ void rpcClose(void *); ...@@ -73,8 +81,8 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, char msgType, void *pCont, int contLen, void *ahandle); void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
void rpcSendResponse(void *pConn, int32_t code, void *pCont, int contLen); void rpcSendResponse(SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
......
...@@ -13,27 +13,43 @@ ...@@ -13,27 +13,43 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TSCHED_H #ifndef TAOS_QUEUE_H
#define TDENGINE_TSCHED_H #define TAOS_QUEUE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct _sched_msg { typedef void* taos_queue;
void *msg; typedef void* taos_qset;
int msgLen; typedef void* taos_qall;
int8_t type;
int32_t code;
void *handle;
} SRpcMsg;
void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label); taos_queue taosOpenQueue(int itemSize);
int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg); void taosCloseQueue(taos_queue);
void taosCleanUpMsgQueue(void *param); int taosWriteQitem(taos_queue, void *item);
int taosReadQitem(taos_queue, void *item);
int taosReadAllQitems(taos_queue, taos_qall *);
int taosGetQitem(taos_qall, void *item);
void taosResetQitems(taos_qall);
void taosFreeQitems(taos_qall);
taos_qset taosOpenQset();
void taosCloseQset();
int taosAddIntoQset(taos_qset, taos_queue);
void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, void *item);
int taosReadAllQitemsFromQset(taos_qset, taos_qall *);
int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // TDENGINE_TSCHED_H #endif
...@@ -56,7 +56,7 @@ typedef struct { ...@@ -56,7 +56,7 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(char type, void *pCont, int contLen, void *ahandle, int32_t code); void (*cfp)(SRpcMsg *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*ufp)(void *ahandle, SRpcIpSet *pIpSet); void (*ufp)(void *ahandle, SRpcIpSet *pIpSet);
...@@ -339,25 +339,27 @@ void *rpcReallocCont(void *ptr, int contLen) { ...@@ -339,25 +339,27 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, int contLen, void *ahandle) { void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
contLen = rpcCompressRpcMsg(pCont, contLen); pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
pContext = (SRpcReqContext *) (pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
pContext->ahandle = ahandle; pContext->ahandle = pMsg->handle;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
pContext->ipSet = *pIpSet; pContext->ipSet = *pIpSet;
pContext->contLen = contLen; pContext->contLen = pMsg->contLen;
pContext->pCont = pCont; pContext->pCont = pMsg->pCont;
pContext->msgType = type; pContext->msgType = pMsg->msgType;
pContext->oldInUse = pIpSet->inUse; pContext->oldInUse = pIpSet->inUse;
pContext->connType = RPC_CONN_UDPC; pContext->connType = RPC_CONN_UDPC;
if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC; if (pMsg->contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_SHOW ) type == TSDB_MSG_TYPE_SHOW )
...@@ -368,21 +370,21 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in ...@@ -368,21 +370,21 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
return; return;
} }
void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { void rpcSendResponse(SRpcMsg *pMsg) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)handle; SRpcConn *pConn = (SRpcConn *)pMsg->handle;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if ( pCont == NULL ) { if ( pMsg->pCont == NULL ) {
pCont = rpcMallocCont(0); pMsg->pCont = rpcMallocCont(0);
contLen = 0; pMsg->contLen = 0;
} }
SRpcHead *pHead = rpcHeadFromCont(pCont); SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
char *msg = (char *)pHead; char *msg = (char *)pHead;
contLen = rpcCompressRpcMsg(pCont, contLen); pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
msgLen = rpcMsgLenFromCont(contLen); msgLen = rpcMsgLenFromCont(pMsg->contLen);
rpcLockConn(pConn); rpcLockConn(pConn);
...@@ -402,7 +404,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -402,7 +404,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort); pHead->port = htons(pConn->localPort);
pHead->code = htonl(code); pHead->code = htonl(pMsg->code);
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
...@@ -416,6 +418,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -416,6 +418,7 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
pConn->secured = 1; // connection shall be secured pConn->secured = 1; // connection shall be secured
...@@ -423,15 +426,18 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { ...@@ -423,15 +426,18 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) {
} }
void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) { void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
char *pMsg; SRpcMsg rpcMsg;
int msgLen = sizeof(SRpcIpSet);
rpcMsg.contLen = sizeof(SRpcIpSet);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
if (rpcMsg.pCont == NULL) return;
pMsg = rpcMallocCont(msgLen); memcpy(rpcMsg.pCont, pIpSet, sizeof(SRpcIpSet));
if (pMsg == NULL) return;
memcpy(pMsg, pIpSet, sizeof(SRpcIpSet)); rpcMsg.code = TSDB_CODE_REDIRECT;
rpcMsg.handle = thandle;
rpcSendResponse(thandle, TSDB_CODE_REDIRECT, pMsg, msgLen); rpcSendResponse(&rpcMsg);
return; return;
} }
...@@ -813,11 +819,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -813,11 +819,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
} }
if (pRpc->connType == TAOS_CONN_SERVER && pConn && pRpc->idleTime) {
// only for server, starts the idle timer. For client, it is started by cache mgmt
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
}
if (terrno != TSDB_CODE_ALREADY_PROCESSED) { if (terrno != TSDB_CODE_ALREADY_PROCESSED) {
if (terrno != 0) { // parsing error if (terrno != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
...@@ -835,24 +836,29 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -835,24 +836,29 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg;
pHead = rpcDecompressRpcMsg(pHead); pHead = rpcDecompressRpcMsg(pHead);
int contLen = rpcContLenFromMsg(pHead->msgLen); rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
uint8_t *pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn;
pConn->destIp = pHead->destIp; pConn->destIp = pHead->destIp;
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0); (*(pRpc->cfp))(&rpcMsg);
} else { } else {
// it's a response // it's a response
int32_t code = pHead->code;
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
rpcMsg.handle = pContext->ahandle;
pConn->pContext = NULL; pConn->pContext = NULL;
// for UDP, port may be changed by server, the port in ipSet shall be used for cache // for UDP, port may be changed by server, the port in ipSet shall be used for cache
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType); rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerIp, pContext->ipSet.port, pConn->connType);
if (code == TSDB_CODE_REDIRECT) { if (pHead->code == TSDB_CODE_REDIRECT) {
pContext->redirect = 1; pContext->redirect = 1;
pContext->numOfTry = 0; pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
...@@ -861,7 +867,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -861,7 +867,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
} else { } else {
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
(*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, code); (*pRpc->cfp)(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg rpcFreeCont(pContext->pCont); // free the request msg
} }
} }
...@@ -963,8 +969,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -963,8 +969,8 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcSendMsgToPeer(pConn, msg, msgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
rpcSendMsgToPeer(pConn, msg, msgLen);
} }
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
...@@ -998,12 +1004,18 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -998,12 +1004,18 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcMsg rpcMsg;
tTrace("%s connection error happens", pRpc->label); tTrace("%s connection error happens", pRpc->label);
if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) { if ( pContext->numOfTry >= pContext->ipSet.numOfIps ) {
(*(pRpc->cfp))(pContext->msgType+1, NULL, 0, pContext->ahandle, pContext->code); rpcMsg.msgType = pContext->msgType+1;
rpcMsg.handle = pContext->ahandle;
rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
(*(pRpc->cfp))(&rpcMsg);
rpcFreeCont(pContext->pCont); // free the request msg rpcFreeCont(pContext->pCont); // free the request msg
} else { } else {
// move to next IP // move to next IP
...@@ -1070,7 +1082,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { ...@@ -1070,7 +1082,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
if (pConn->inType && pConn->user[0]) { if (pConn->inType && pConn->user[0]) {
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer<<pConn->retry, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else { } else {
tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
} }
......
...@@ -15,176 +15,357 @@ ...@@ -15,176 +15,357 @@
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#include "taoserror.h"
#include "tqueue.h" #include "tqueue.h"
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. typedef struct _taos_qnode {
struct _taos_qnode *next;
typedef struct { char item[];
char label[16]; } STaosQnode;
int num;
tsem_t emptySem; typedef struct _taos_q {
tsem_t fullSem; int itemSize;
pthread_mutex_t queueMutex; int numOfItems;
int fullSlot; struct _taos_qnode *head;
int emptySlot; struct _taos_qnode *tail;
int queueSize; struct _taos_q *next; // for queue set
SRpcMsg *queue; struct _taos_qset *qset; // for queue set
SRpcMsg *oqueue; pthread_mutex_t mutex;
pthread_t qthread; } STaosQueue;
void (*fp)(int num, SRpcMsg *);
} SRpcQueue; typedef struct _taos_qset {
STaosQueue *head;
static void *taosProcessMsgQueue(void *param); STaosQueue *current;
pthread_mutex_t mutex;
void *taosInitMsgQueue(int queueSize, void (*fp)(int num, SRpcMsg *), const char *label) { int numOfQueues;
pthread_attr_t attr; int numOfItems;
SRpcQueue * pQueue = (SRpcQueue *)malloc(sizeof(SRpcQueue)); } STaosQset;
if (pQueue == NULL) {
pError("%s: no enough memory for pQueue, reason: %s", label, strerror(errno)); typedef struct _taos_qall {
goto _error; STaosQnode *current;
STaosQnode *start;
int itemSize;
int numOfItems;
} STaosQall;
taos_queue taosOpenQueue(int itemSize) {
STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
if (queue == NULL) {
terrno = TSDB_CODE_NO_RESOURCE;
return NULL;
} }
memset(pQueue, 0, sizeof(SRpcQueue)); pthread_mutex_init(&queue->mutex, NULL);
pQueue->queueSize = queueSize; queue->itemSize = itemSize;
strncpy(pQueue->label, label, sizeof(pQueue->label)); // fix buffer overflow
pQueue->label[sizeof(pQueue->label)-1] = '\0';
pQueue->fp = fp;
if (pthread_mutex_init(&pQueue->queueMutex, NULL) < 0) { return queue;
pError("init %s:queueMutex failed, reason:%s", pQueue->label, strerror(errno)); }
goto _error;
}
if (tsem_init(&pQueue->emptySem, 0, (unsigned int)pQueue->queueSize) != 0) { void taosCloseQueue(taos_queue param) {
pError("init %s:empty semaphore failed, reason:%s", pQueue->label, strerror(errno)); STaosQueue *queue = (STaosQueue *)param;
goto _error; STaosQnode *pTemp;
} STaosQnode *pNode = queue->head;
queue->head = NULL;
if (tsem_init(&pQueue->fullSem, 0, 0) != 0) { pthread_mutex_lock(&queue->mutex);
pError("init %s:full semaphore failed, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
if ((pQueue->queue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { if (queue->qset) taosRemoveFromQset(queue->qset, queue);
pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno));
goto _error;
}
memset(pQueue->queue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg));
if ((pQueue->oqueue = (SRpcMsg *)malloc((size_t)pQueue->queueSize * sizeof(SRpcMsg))) == NULL) { while (pNode) {
pError("%s: no enough memory for queue, reason:%s", pQueue->label, strerror(errno)); pTemp = pNode;
goto _error; pNode = pNode->next;
free (pTemp);
} }
memset(pQueue->oqueue, 0, (size_t)pQueue->queueSize * sizeof(SRpcMsg)); pthread_mutex_unlock(&queue->mutex);
pQueue->fullSlot = 0; free(queue);
pQueue->fullSlot = 0; }
pQueue->emptySlot = 0;
pthread_attr_init(&attr); int taosWriteQitem(taos_queue param, void *item) {
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); STaosQueue *queue = (STaosQueue *)param;
if (pthread_create(&pQueue->qthread, &attr, taosProcessMsgQueue, (void *)pQueue) != 0) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1);
pError("%s: failed to create taos thread, reason:%s", pQueue->label, strerror(errno)); if ( pNode == NULL ) {
goto _error; terrno = TSDB_CODE_NO_RESOURCE;
return -1;
} }
pTrace("%s RPC msg queue is initialized", pQueue->label); memcpy(pNode->item, item, queue->itemSize);
return (void *)pQueue; pthread_mutex_lock(&queue->mutex);
_error: if (queue->tail) {
taosCleanUpMsgQueue(pQueue); queue->tail->next = pNode;
return NULL; queue->tail = pNode;
} else {
queue->head = pNode;
queue->tail = pNode;
}
queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
pthread_mutex_unlock(&queue->mutex);
} }
void *taosProcessMsgQueue(void *param) { int taosReadQitem(taos_queue param, void *item) {
SRpcQueue *pQueue = (SRpcQueue *)param; STaosQueue *queue = (STaosQueue *)param;
int num = 0; STaosQnode *pNode = NULL;
int code = 0;
while (1) { pthread_mutex_lock(&queue->mutex);
if (tsem_wait(&pQueue->fullSem) != 0) {
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
pTrace("wait %s fullSem was interrupted", pQueue->label);
continue;
}
pError("wait %s fullSem failed, errno:%d, reason:%s", pQueue->label, errno, strerror(errno));
}
if (pthread_mutex_lock(&pQueue->queueMutex) != 0) if (queue->head) {
pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno)); pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize);
queue->head = pNode->next;
if (queue->head == NULL)
queue->tail = NULL;
free(pNode);
queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1;
}
num = 0; pthread_mutex_unlock(&queue->mutex);
do {
pQueue->oqueue[num] = pQueue->queue[pQueue->fullSlot];
pQueue->fullSlot = (pQueue->fullSlot + 1) % pQueue->queueSize;
++num;
pQueue->num--;
} while (pQueue->fullSlot != pQueue->emptySlot);
if (pthread_mutex_unlock(&pQueue->queueMutex) != 0) return code;
pError("unlock %s queueMutex failed, reason:%s\n", pQueue->label, strerror(errno)); }
for (int i= 0; i<num; ++i) { int taosReadAllQitems(taos_queue param, taos_qall *res) {
if (tsem_post(&pQueue->emptySem) != 0) STaosQueue *queue = (STaosQueue *)param;
pError("post %s emptySem failed, reason:%s\n", pQueue->label, strerror(errno)); STaosQall *qall = NULL;
int code = 0;
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
if ( qall == NULL ) {
terrno = TSDB_CODE_NO_RESOURCE;
code = -1;
} else {
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
} }
}
for (int i=0; i<num-1; ++i) { pthread_mutex_unlock(&queue->mutex);
if (tsem_wait(&pQueue->fullSem) != 0)
pError("wait %s fullSem failed, reason:%s\n", pQueue->label, strerror(errno)); *res = qall;
} return code;
}
(*pQueue->fp)(num, pQueue->oqueue); int taosGetQitem(taos_qall param, void *item) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode;
int num = 0;
pNode = qall->current;
if (pNode)
qall->current = pNode->next;
if (pNode) {
memcpy(item, pNode->item, qall->itemSize);
num = 1;
} }
return NULL; return num;
} }
int taosPutIntoMsgQueue(void *qhandle, SRpcMsg *pMsg) { void taosResetQitems(taos_qall param) {
SRpcQueue *pQueue = (SRpcQueue *)qhandle; STaosQall *qall = (STaosQall *)param;
if (pQueue == NULL) { qall->current = qall->start;
pError("sched is not ready, msg:%p is dropped", pMsg); }
return 0;
void taosFreeQitems(taos_qall param) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode;
while (qall->current) {
pNode = qall->current;
qall->current = pNode->next;
free(pNode);
} }
while (tsem_wait(&pQueue->emptySem) != 0) { free(qall);
if (errno != EINTR) { }
pError("wait %s emptySem failed, reason:%s", pQueue->label, strerror(errno));
break; taos_qset taosOpenQset() {
}
STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
if (qset == NULL) {
terrno = TSDB_CODE_NO_RESOURCE;
return NULL;
} }
if (pthread_mutex_lock(&pQueue->queueMutex) != 0) pthread_mutex_init(&qset->mutex, NULL);
pError("lock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
pQueue->queue[pQueue->emptySlot] = *pMsg; return qset;
pQueue->emptySlot = (pQueue->emptySlot + 1) % pQueue->queueSize; }
pQueue->num++;
if (pthread_mutex_unlock(&pQueue->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s", pQueue->label, strerror(errno));
if (tsem_post(&pQueue->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pQueue->label, strerror(errno)); void taosCloseQset(taos_qset param) {
STaosQset *qset = (STaosQset *)param;
free(qset);
}
int taosAddIntoQset(taos_qset p1, taos_queue p2) {
STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1;
if (queue->qset) return -1;
pthread_mutex_lock(&qset->mutex);
queue->next = qset->head;
qset->head = queue;
qset->numOfQueues++;
pthread_mutex_lock(&queue->mutex);
atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
queue->qset = qset;
pthread_mutex_unlock(&queue->mutex);
pthread_mutex_unlock(&qset->mutex);
return 0; return 0;
} }
void taosCleanUpMsgQueue(void *param) { void taosRemoveFromQset(taos_qset p1, taos_queue p2) {
SRpcQueue *pQueue = (SRpcQueue *)param; STaosQueue *queue = (STaosQueue *)p2;
if (pQueue == NULL) return; STaosQset *qset = (STaosQset *)p1;
STaosQueue *tqueue;
pthread_mutex_lock(&qset->mutex);
if (qset->head) {
if (qset->head == queue) {
qset->head = qset->head->next;
qset->numOfQueues--;
} else {
STaosQueue *prev = qset->head;
tqueue = qset->head->next;
while (tqueue) {
if (tqueue== queue) {
prev->next = tqueue->next;
if (qset->current == queue) qset->current = tqueue->next;
qset->numOfQueues--;
pthread_mutex_lock(&queue->mutex);
atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
queue->qset = NULL;
pthread_mutex_unlock(&queue->mutex);
} else {
prev = tqueue;
tqueue = tqueue->next;
}
}
}
}
pthread_mutex_unlock(&qset->mutex);
}
int taosGetQueueNumber(taos_qset param) {
return ((STaosQset *)param)->numOfQueues;
}
int taosReadQitemFromQset(taos_qset param, void *item) {
STaosQset *qset = (STaosQset *)param;
STaosQnode *pNode = NULL;
int code = 0;
for(int i=0; i<qset->numOfQueues; ++i) {
pthread_mutex_lock(&qset->mutex);
if (qset->current == NULL)
qset->current = qset->head;
STaosQueue *queue = qset->current;
qset->current = queue->next;
pthread_mutex_unlock(&qset->mutex);
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize);
queue->head = pNode->next;
if (queue->head == NULL)
queue->tail = NULL;
free(pNode);
queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
}
pthread_mutex_unlock(&queue->mutex);
if (pNode) break;
}
return code;
}
int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
STaosQset *qset = (STaosQset *)param;
STaosQueue *queue;
STaosQall *qall = NULL;
int code = 0;
for(int i=0; i<qset->numOfQueues; ++i) {
pthread_mutex_lock(&qset->mutex);
if (qset->current == NULL)
qset->current = qset->head;
queue = qset->current;
qset->current = queue->next;
pthread_mutex_unlock(&qset->mutex);
pthread_mutex_lock(&queue->mutex);
if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
if (qall == NULL) {
terrno = TSDB_CODE_NO_RESOURCE;
code = -1;
} else {
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
}
}
pthread_mutex_unlock(&queue->mutex);
if (code != 0) break;
}
pthread_cancel(pQueue->qthread); *res = qall;
tsem_destroy(&pQueue->emptySem); return code;
tsem_destroy(&pQueue->fullSem); }
pthread_mutex_destroy(&pQueue->queueMutex);
free(pQueue->queue); int taosGetQueueItemsNumber(taos_queue param) {
free(pQueue); STaosQueue *queue = (STaosQueue *)param;
return queue->numOfItems;
} }
int taosGetQsetItemsNumber(taos_qset param) {
STaosQset *qset = (STaosQset *)param;
return qset->numOfItems;
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <pthread.h> #include <pthread.h>
#include <errno.h> #include <errno.h>
#include <signal.h> #include <signal.h>
#include <semaphore.h>
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
...@@ -39,11 +40,11 @@ typedef struct { ...@@ -39,11 +40,11 @@ typedef struct {
void *pRpc; void *pRpc;
} SInfo; } SInfo;
void processResponse(char type, void *pCont, int contLen, void *ahandle, int32_t code) { void processResponse(SRpcMsg *pMsg) {
SInfo *pInfo = (SInfo *)ahandle; SInfo *pInfo = (SInfo *)pMsg->handle;
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, type, contLen, code); tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
if (pCont) rpcFreeCont(pCont); rpcFreeCont(pMsg->pCont);
sem_post(&pInfo->rspSem); sem_post(&pInfo->rspSem);
} }
...@@ -58,16 +59,19 @@ void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { ...@@ -58,16 +59,19 @@ void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
int tcount = 0; int tcount = 0;
void *sendRequest(void *param) { void *sendRequest(void *param) {
SInfo *pInfo = (SInfo *)param; SInfo *pInfo = (SInfo *)param;
char *cont; SRpcMsg rpcMsg;
tTrace("thread:%d, start to send request", pInfo->index); tTrace("thread:%d, start to send request", pInfo->index);
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
pInfo->num++; pInfo->num++;
cont = rpcMallocCont(pInfo->msgSize); rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.handle = pInfo;
rpcMsg.msgType = 1;
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, 1, cont, pInfo->msgSize, pInfo); rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg);
if ( pInfo->num % 20000 == 0 ) if ( pInfo->num % 20000 == 0 )
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
sem_wait(&pInfo->rspSem); sem_wait(&pInfo->rspSem);
...@@ -161,7 +165,6 @@ int main(int argc, char *argv[]) { ...@@ -161,7 +165,6 @@ int main(int argc, char *argv[]) {
} }
taosInitLog("client.log", 100000, 10); taosInitLog("client.log", 100000, 10);
tPrint("rpcDebugFlag:%d", rpcDebugFlag);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
......
...@@ -25,37 +25,56 @@ int commit = 0; ...@@ -25,37 +25,56 @@ int commit = 0;
int dataFd = -1; int dataFd = -1;
void *qhandle = NULL; void *qhandle = NULL;
void processShellMsg(int numOfMsgs, SRpcMsg *pMsg) { void processShellMsg() {
static int num = 0; static int num = 0;
taos_qall qall;
SRpcMsg rpcMsg;
while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, &qall);
if (numOfMsgs <= 0) {
usleep(1000);
continue;
}
tTrace("%d shell msgs are received", numOfMsgs);
for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
if (dataFd >=0) {
if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno));
}
}
}
tTrace("%d shell msgs are received", numOfMsgs); if (commit >=2) {
num += numOfMsgs;
if ( fsync(dataFd) < 0 ) {
tPrint("failed to flush data to file, reason:%s", strerror(errno));
}
for (int i=0; i<numOfMsgs; ++i) { if (num % 10000 == 0) {
tPrint("%d request have been written into disk", num);
if (dataFd >=0) {
if ( write(dataFd, pMsg->msg, pMsg->msgLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno));
} }
} }
void *rsp = rpcMallocCont(msgSize); taosResetQitems(qall);
rpcSendResponse(pMsg->handle, 1, rsp, msgSize); for (int i=0; i<numOfMsgs; ++i) {
rpcFreeCont(pMsg->msg); taosGetQitem(qall, &rpcMsg);
pMsg++;
} rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = rpcMallocCont(msgSize);
if (commit >=2) { rpcMsg.contLen = msgSize;
num += numOfMsgs; rpcMsg.handle = rpcMsg.handle;
if ( fsync(dataFd) < 0 ) { rpcMsg.code = 1;
tPrint("failed to flush data to file, reason:%s", strerror(errno)); rpcSendResponse(&rpcMsg);
} }
if (num % 10000 == 0) { taosFreeQitems(qall);
tPrint("%d request have been written into disk", num);
}
} }
/* /*
SRpcIpSet ipSet; SRpcIpSet ipSet;
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
...@@ -88,15 +107,9 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char ...@@ -88,15 +107,9 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
return ret; return ret;
} }
void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) { void processRequestMsg(SRpcMsg *pMsg) {
tTrace("request is received, type:%d, contLen:%d", type, contLen); tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen);
SRpcMsg rpcMsg; taosWriteQitem(qhandle, pMsg);
rpcMsg.msg = pCont;
rpcMsg.msgLen = contLen;
rpcMsg.code = code;
rpcMsg.handle = thandle;
rpcMsg.type = type;
taosPutIntoMsgQueue(qhandle, &rpcMsg);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
...@@ -165,12 +178,9 @@ int main(int argc, char *argv[]) { ...@@ -165,12 +178,9 @@ int main(int argc, char *argv[]) {
tPrint("failed to open data file, reason:%s", strerror(errno)); tPrint("failed to open data file, reason:%s", strerror(errno));
} }
qhandle = taosInitMsgQueue(1000, processShellMsg, "SER"); qhandle = taosOpenQueue(sizeof(SRpcMsg));
// loop forever processShellMsg();
while(1) {
sleep(1);
}
if (dataFd >= 0) { if (dataFd >= 0) {
close(dataFd); close(dataFd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册