提交 eb561f31 编写于 作者: S Shengliang Guan

shm

上级 44cd8c00
......@@ -83,7 +83,7 @@ typedef struct {
int32_t minNum;
int32_t maxNum;
void *queueFp;
SDnode *pDnode;
void *param;
STaosQueue *queue;
union {
SQWorkerPool pool;
......@@ -161,7 +161,7 @@ int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
// dndWorker.h
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
......
......@@ -22,7 +22,7 @@
extern "C" {
#endif
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
......
......@@ -16,9 +16,9 @@
#define _DEFAULT_SOURCE
#include "dndWorker.h"
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp) {
if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) {
if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
......@@ -28,7 +28,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
pWorker->minNum = minNum;
pWorker->maxNum = maxNum;
pWorker->queueFp = queueFp;
pWorker->pDnode = pDnode;
pWorker->param = param;
if (pWorker->type == DND_WORKER_SINGLE) {
SQWorkerPool *pPool = &pWorker->pool;
......@@ -39,7 +39,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FItem)queueFp);
pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -52,7 +52,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FItems)queueFp);
pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......
......@@ -54,7 +54,7 @@ static void *dmThreadRoutine(void *param) {
}
}
static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
int32_t code = 0;
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
dTrace("msg:%p, will be processed", pNodeMsg);
......@@ -134,14 +134,14 @@ static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) {
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt->pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessMgmtQueue) !=
0) {
SDnode *pDnode = pMgmt->pDnode;
if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pMgmt->pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1,
dmProcessMgmtQueue) != 0) {
if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
......
......@@ -23,21 +23,20 @@ extern "C" {
#endif
typedef struct SMnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMnode *pMnode;
SDnode *pDnode;
SProcObj *pProcess;
bool singleProc;
SRWLatch latch;
const char *path;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int32_t refCount;
int8_t deployed;
int8_t dropped;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMnode *pMnode;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
} SMnodeMgmt;
// interface
......
......@@ -220,6 +220,7 @@ static int32_t mmInit(SMgmtWrapper *pWrapper) {
dInfo("mnode-mgmt is initialized");
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode;
pMgmt->pWrapper = pWrapper;
taosInitRWLatch(&pMgmt->latch);
if (mmReadFile(pMgmt) != 0) {
......@@ -293,7 +294,7 @@ static bool mmRequire(SMgmtWrapper *pWrapper) {
void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = mmInit;
mgmtFp.closeFp = NULL;
mgmtFp.closeFp = mmCleanup;
mgmtFp.requiredFp = mmRequire;
mmInitMsgHandles(pWrapper);
......
......@@ -20,32 +20,8 @@
#include "dndTransport.h"
#include "dndWorker.h"
static void mmSendRpcRsp(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(pMgmt->pDnode, pRpc);
} else {
rpcSendResponse(pRpc);
}
}
void mmPutRpcRspToWorker(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
int32_t code = -1;
if (pMgmt->singleProc) {
mmSendRpcRsp(pMgmt, pRpc);
} else {
do {
code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
}
}
static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed", pMsg);
SMnode *pMnode = mmAcquire(pMgmt);
SRpcMsg *pRpc = &pMsg->rpcMsg;
bool isReq = (pRpc->msgType & 1U);
......@@ -53,7 +29,7 @@ static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
if (pMnode != NULL) {
pMsg->pNode = pMnode;
code = mndProcessMsg((SMndMsg*)pMsg);
code = mndProcessMsg((SMndMsg *)pMsg);
mmRelease(pMgmt, pMnode);
}
......@@ -61,11 +37,11 @@ static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
if (pMsg->rpcMsg.handle == NULL) return;
if (code == 0) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
mmPutRpcRspToWorker(pMgmt, &rsp);
dndSendRsp(pMgmt->pWrapper, &rsp);
} else {
if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = terrno};
mmPutRpcRspToWorker(pMgmt, &rsp);
dndSendRsp(pMgmt->pWrapper, &rsp);
}
}
}
......@@ -76,18 +52,17 @@ static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) {
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeMsgQueue) != 0) {
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode write worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeMsgQueue) != 0) {
if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) {
dError("failed to start mnode sync worker since %s", terrstr());
return -1;
}
......@@ -119,18 +94,15 @@ static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeM
return code;
}
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
}
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
}
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册