提交 c6d23422 编写于 作者: S Shengliang Guan

refact queue

上级 40917712
......@@ -28,7 +28,7 @@ typedef struct SBnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SDnodeWorker writeWorker;
SWWorkerAll writeWorker;
} SBnodeMgmt;
// bmInt.c
......
......@@ -63,14 +63,15 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs
}
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->writeWorker;
SWWorkerAll *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) {
SWWorkerAllCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt};
if (tWWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
return -1;
}
......@@ -78,4 +79,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); }
void bmStopWorker(SBnodeMgmt *pMgmt) { tWWorkerAllCleanup(&pMgmt->writeWorker); }
......@@ -74,20 +74,6 @@ typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
typedef struct {
EWorkerType type;
const char *name;
int32_t minNum;
int32_t maxNum;
void *queueFp;
void *param;
STaosQueue *queue;
union {
SQWorkerPool pool;
SWWorkerPool mpool;
};
} SDnodeWorker;
typedef struct SMsgHandle {
int32_t vgId;
NodeMsgFp vgIdMsgFp;
......@@ -161,11 +147,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp) {
if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
pWorker->type = type;
pWorker->name = name;
pWorker->minNum = minNum;
pWorker->maxNum = maxNum;
pWorker->queueFp = queueFp;
pWorker->param = param;
if (pWorker->type == DND_WORKER_SINGLE) {
SQWorkerPool *pPool = &pWorker->pool;
pPool->name = name;
pPool->min = minNum;
pPool->max = maxNum;
if (tQWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} else if (pWorker->type == DND_WORKER_MULTI) {
SWWorkerPool *pPool = &pWorker->mpool;
pPool->name = name;
pPool->max = maxNum;
if (tWWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} else {
terrno = TSDB_CODE_INVALID_PARA;
}
return 0;
}
void dndCleanupWorker(SDnodeWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
if (pWorker->type == DND_WORKER_SINGLE) {
tQWorkerCleanup(&pWorker->pool);
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
} else if (pWorker->type == DND_WORKER_MULTI) {
tWWorkerCleanup(&pWorker->mpool);
tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
} else {
}
}
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) {
if (pWorker == NULL || pWorker->queue == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
......@@ -29,10 +29,10 @@ typedef struct SDnodeMgmt {
SEpSet mnodeEpSet;
SHashObj *dnodeHash;
SArray *dnodeEps;
TdThread *threadId;
TdThread *threadId;
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
SQWorkerAll mgmtWorker;
SQWorkerAll statusWorker;
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
......
......@@ -53,9 +53,9 @@ static void *dmThreadRoutine(void *param) {
}
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
int32_t code = -1;
dTrace("msg:%p, will be processed in dnode queue", pMsg);
switch (pRpc->msgType) {
......@@ -98,13 +98,17 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) {
SQWorkerAllCfg mgmtCfg = {
.minNum = 0, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
SQWorkerAllCfg statusCfg = {
.minNum = 0, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) {
dError("failed to start dnode status worker since %s", terrstr());
return -1;
}
......@@ -123,8 +127,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) {
}
void dmStopWorker(SDnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->mgmtWorker);
dndCleanupWorker(&pMgmt->statusWorker);
tQWorkerAllCleanup(&pMgmt->mgmtWorker);
tQWorkerAllCleanup(&pMgmt->statusWorker);
if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId);
......@@ -133,11 +137,11 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
}
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
SQWorkerAll *pWorker = &pMgmt->mgmtWorker;
if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -28,8 +28,8 @@ typedef struct SQnodeMgmt {
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
SQWorkerAll queryWorker;
SQWorkerAll fetchWorker;
} SQnodeMgmt;
// qmInt.c
......
......@@ -45,16 +45,16 @@ static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) {
static int32_t qmPutMsgToWorker(SQWorkerAll *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); }
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
return -1;
......@@ -63,7 +63,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
int32_t code = taosWriteQitem(pWorker->queue, pMsg);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
......@@ -89,15 +89,25 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
int32_t maxQueryThreads = minQueryThreads;
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads,
qmProcessQueryQueue) != 0) {
dError("failed to start qnode query worker since %s", terrstr());
SQWorkerAllCfg queryCfg = {.minNum = minQueryThreads,
.maxNum = maxQueryThreads,
.name = "qnode-query",
.fp = (FItem)qmProcessQueryQueue,
.param = pMgmt};
if (tQWorkerAllInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start qnode-query worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads,
qmProcessFetchQueue) != 0) {
dError("failed to start qnode fetch worker since %s", terrstr());
SQWorkerAllCfg fetchCfg = {.minNum = minFetchThreads,
.maxNum = maxFetchThreads,
.name = "qnode-fetch",
.fp = (FItem)qmProcessFetchQueue,
.param = pMgmt};
if (tQWorkerAllInit(&pMgmt->queryWorker, &fetchCfg) != 0) {
dError("failed to start qnode-fetch worker since %s", terrstr());
return -1;
}
......@@ -105,6 +115,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
}
void qmStopWorker(SQnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->queryWorker);
dndCleanupWorker(&pMgmt->fetchWorker);
tQWorkerAllCleanup(&pMgmt->queryWorker);
tQWorkerAllCleanup(&pMgmt->fetchWorker);
}
......@@ -30,8 +30,8 @@ typedef struct SSnodeMgmt {
const char *path;
SRWLatch latch;
int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SDnodeWorker*>
SDnodeWorker sharedWorker;
SArray *uniqueWorkers; // SArray<SWWorkerAll*>
SQWorkerAll sharedWorker;
} SSnodeMgmt;
// smInt.c
......
......@@ -40,20 +40,23 @@ static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SWWorkerAll *));
if (pMgmt->uniqueWorkers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
SWWorkerAll *pUniqueWorker = malloc(sizeof(SWWorkerAll));
if (pUniqueWorker == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) {
dError("failed to start snode unique worker since %s", terrstr());
SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = (FItems)smProcessUniqueQueue, .param = pMgmt};
if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr());
return -1;
}
if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) {
......@@ -62,9 +65,14 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
}
}
if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
SND_SHARED_THREAD_NUM, smProcessSharedQueue)) {
dError("failed to start snode shared worker since %s", terrstr());
SQWorkerAllCfg cfg = {.minNum = SND_SHARED_THREAD_NUM,
.maxNum = SND_SHARED_THREAD_NUM,
.name = "snode-shared",
.fp = (FItem)smProcessSharedQueue,
.param = pMgmt};
if (tQWorkerAllInit(&pMgmt->sharedWorker, &cfg)) {
dError("failed to start snode shared-worker since %s", terrstr());
return -1;
}
......@@ -73,11 +81,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
void smStopWorker(SSnodeMgmt *pMgmt) {
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i);
dndCleanupWorker(worker);
SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
tWWorkerAllCleanup(pWorker);
}
taosArrayDestroy(pMgmt->uniqueWorkers);
dndCleanupWorker(&pMgmt->sharedWorker);
tQWorkerAllCleanup(&pMgmt->sharedWorker);
}
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
......@@ -93,33 +101,33 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
}
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->sharedWorker;
SQWorkerAll *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
......@@ -36,7 +36,7 @@ typedef struct SVnodesMgmt {
const char *path;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
SDnodeWorker mgmtWorker;
SQWorkerAll mgmtWorker;
} SVnodesMgmt;
typedef struct {
......
......@@ -207,9 +207,9 @@ int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
SQWorkerAll *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg);
return taosWriteQitem(pWorker->queue, pMsg);
}
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
......@@ -319,7 +319,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool->max = maxSyncThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
SQWorkerAllCfg cfg = {
.minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start vnode-mgmt worker since %s", terrstr());
return -1;
}
......@@ -329,7 +331,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
}
void vmStopWorker(SVnodesMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->mgmtWorker);
tQWorkerAllCleanup(&pMgmt->mgmtWorker);
tQWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册