提交 a340baa8 编写于 作者: S Shengliang Guan

refact queue

上级 b36356ae
...@@ -42,8 +42,15 @@ shall be used to set up the protection. ...@@ -42,8 +42,15 @@ shall be used to set up the protection.
typedef struct STaosQueue STaosQueue; typedef struct STaosQueue STaosQueue;
typedef struct STaosQset STaosQset; typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall; typedef struct STaosQall STaosQall;
typedef void (*FItem)(void *ahandle, void *pItem); typedef struct {
typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); void *ahandle;
int32_t qsize;
int32_t workerId;
int32_t threadNum;
} SQueueInfo;
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
......
...@@ -51,6 +51,7 @@ typedef struct SWWorker { ...@@ -51,6 +51,7 @@ typedef struct SWWorker {
typedef struct SWWorkerPool { typedef struct SWWorkerPool {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t num;
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
const char *name; const char *name;
SWWorker *workers; SWWorker *workers;
......
...@@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num ...@@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num
} }
} }
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SBnodeMgmt *pMgmt = pInfo->ahandle;
SMgmtWrapper *pWrapper = pMgmt->pWrapper; SMgmtWrapper *pWrapper = pMgmt->pWrapper;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
......
...@@ -52,7 +52,9 @@ static void *dmThreadRoutine(void *param) { ...@@ -52,7 +52,9 @@ static void *dmThreadRoutine(void *param) {
} }
} }
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle;
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in mnode queue", pMsg); dTrace("msg:%p, will be processed in mnode queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
......
...@@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { ...@@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
dndSendRsp(pWrapper, &rsp); dndSendRsp(pWrapper, &rsp);
} }
static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in qnode-query queue", pMsg); dTrace("msg:%p, will be processed in qnode-query queue", pMsg);
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg); int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg);
if (code != 0) { if (code != 0) {
...@@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg); dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg);
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg); int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg);
if (code != 0) { if (code != 0) {
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "smInt.h" #include "smInt.h"
static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
for (int32_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num ...@@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num
} }
} }
static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in snode shared queue", pMsg); dTrace("msg:%p, will be processed in snode shared queue", pMsg);
sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg); sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
...@@ -53,7 +57,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { ...@@ -53,7 +57,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = (FItems)smProcessUniqueQueue, .param = pMgmt}; SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr()); dError("failed to start snode-unique worker since %s", terrstr());
......
...@@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { ...@@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
dndSendRsp(pWrapper, &rsp); dndSendRsp(pWrapper, &rsp);
} }
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1; int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType; tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
...@@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-query queue", pMsg); dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) { if (code != 0) {
...@@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { ...@@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
} }
} }
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) { if (code != 0) {
...@@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { ...@@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
} }
} }
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) { if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
...@@ -126,7 +134,8 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO ...@@ -126,7 +134,8 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
...@@ -138,7 +147,8 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO ...@@ -138,7 +147,8 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
} }
} }
static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
......
...@@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) { ...@@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
} }
if (fp != NULL) { if (fp != NULL) {
(*fp)(ahandle, msg); SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
(*fp)(&info, msg);
} }
} }
...@@ -210,6 +211,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { ...@@ -210,6 +211,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
} }
if (fp != NULL) { if (fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
(*fp)(ahandle, worker->qall, numOfMsgs); (*fp)(ahandle, worker->qall, numOfMsgs);
} }
} }
...@@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { ...@@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
pool->num++;
if (pool->num > pool->max) pool->num = pool->max;
} else { } else {
taosAddIntoQset(worker->qset, queue, ahandle); taosAddIntoQset(worker->qset, queue, ahandle);
pool->nextId = (pool->nextId + 1) % pool->max; pool->nextId = (pool->nextId + 1) % pool->max;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册