提交 3af4fe31 编写于 作者: S Shengliang Guan

refact dnode mgmt worker

上级 f34013a0
...@@ -80,21 +80,20 @@ typedef struct { ...@@ -80,21 +80,20 @@ typedef struct {
} SDnodeDir; } SDnodeDir;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
int64_t clusterId; int64_t clusterId;
int64_t dver; int64_t dver;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int8_t statusSent; int8_t statusSent;
SEpSet mnodeEpSet; SEpSet mnodeEpSet;
char *file; char *file;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
STaosQueue *pMgmtQ; SDnodeWorker mgmtWorker;
SWorkerPool mgmtPool;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
......
...@@ -21,12 +21,9 @@ ...@@ -21,12 +21,9 @@
#include "dndSnode.h" #include "dndSnode.h"
#include "dndTransport.h" #include "dndTransport.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#include "dndWorker.h"
static int32_t dndInitMgmtWorker(SDnode *pDnode); static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static void dndCleanupMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMgmtQueue(SDnode *pDnode);
static void dndFreeMgmtQueue(SDnode *pDnode);
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndReadDnodes(SDnode *pDnode); static int32_t dndReadDnodes(SDnode *pDnode);
static int32_t dndWriteDnodes(SDnode *pDnode); static int32_t dndWriteDnodes(SDnode *pDnode);
...@@ -534,13 +531,8 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -534,13 +531,8 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
if (dndInitMgmtWorker(pDnode) != 0) { if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndAllocMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -551,15 +543,14 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -551,15 +543,14 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
dInfo("dnode-dnode is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;
} }
void dndCleanupDnode(SDnode *pDnode) { void dndCleanupDnode(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupMgmtWorker(pDnode); dndCleanupWorker(&pMgmt->mgmtWorker);
dndFreeMgmtQueue(pDnode);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
...@@ -584,62 +575,22 @@ void dndCleanupDnode(SDnode *pDnode) { ...@@ -584,62 +575,22 @@ void dndCleanupDnode(SDnode *pDnode) {
} }
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
dInfo("dnode-dnode is cleaned up"); dInfo("dnode-mgmt is cleaned up");
}
static int32_t dndInitMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "dnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("dnode mgmt worker is initialized");
return 0;
}
static void dndCleanupMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("dnode mgmt worker is closed");
} }
static int32_t dndAllocMgmtQueue(SDnode *pDnode) { void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMgmtQueue(SDnode *pDnode) { if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pDnode, pEpSet); dndUpdateMnodeEpSet(pDnode, pEpSet);
} }
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); if (dndWriteMsgToWorker(&pMgmt->mgmtWorker, pMsg, sizeof(SRpcMsg)) != 0) {
if (pMsg != NULL) *pMsg = *pRpcMsg; if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册