提交 7babfaff 编写于 作者: S Shengliang Guan

shm

上级 f9c67813
......@@ -32,6 +32,8 @@ typedef void *(*ProcFp)(void *parent, SBlockItem *pItem);
typedef struct SProcQueue SProcQueue;
typedef struct {
void *pParent;
bool testFlag;
int32_t childQueueSize;
int32_t parentQueueSize;
ProcFp childFp;
......@@ -40,11 +42,12 @@ typedef struct {
typedef struct {
int32_t pid;
SProcCfg cfg;
SProcQueue *pChildQueue;
SProcQueue *pParentQueue;
pthread_t childThread;
pthread_t parentThread;
ProcFp childFp;
ProcFp parentFp;
void *pParent;
bool stopFlag;
bool testFlag;
......
......@@ -129,14 +129,14 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
cfg.parentFp = (ProcFp)mmConsumeParentQueue;
cfg.childQueueSize = 1024 * 1024;
cfg.parentQueueSize = 1024 * 1024;
cfg.testFlag = true;
cfg.pParent = pDnode;
pMgmt->pProcess = taosProcInit(&cfg);
if (pMgmt->pProcess == NULL) {
return -1;
}
pMgmt->pProcess->pParent = pDnode;
pMgmt->pProcess->testFlag = true;
return taosProcStart(pMgmt->pProcess);
}
......
......@@ -24,7 +24,7 @@ static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg);
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg);
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc);
static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
int32_t mmStartWorker(SDnode *pDnode) {
......@@ -139,7 +139,7 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg;
}
static int32_t mndBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) {
static int32_t mmBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
......@@ -178,7 +178,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER;
}
if (mndBuildMsg(pMsg, pRpc) != 0) {
if (mmBuildMsg(pMsg, pRpc) != 0) {
goto _OVER;
}
......@@ -211,52 +211,52 @@ _OVER:
rpcFreeCont(pRpc->pCont);
}
int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) {
return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMnodeMsg);
int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg) {
return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
}
int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) {
return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMnodeMsg);
int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg) {
return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
}
int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) {
return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMnodeMsg);
int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg) {
return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
}
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) {
return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpc);
}
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) {
return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) {
return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpc);
}
static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg) {
static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg) {
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) return -1;
pMnodeMsg->pMnode = pMnode;
int32_t code = dndWriteMsgToWorker(pWorker, pMnodeMsg, 0);
pMsg->pMnode = pMnode;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
mmRelease(pDnode, pMnode);
return code;
}
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) {
int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen;
SMnodeMsg *pMnodeMsg = taosAllocateQitem(contLen);
if (pMnodeMsg == NULL) {
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
int32_t contLen = sizeof(SMnodeMsg) + pRpc->contLen;
SMnodeMsg *pMsg = taosAllocateQitem(contLen);
if (pMsg == NULL) {
return -1;
}
pMnodeMsg->contLen = pRpcMsg->contLen;
pMnodeMsg->pCont = (char *)pMnodeMsg + sizeof(SMnodeMsg);
memcpy(pMnodeMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen);
rpcFreeCont(pRpcMsg->pCont);
pMsg->contLen = pRpc->contLen;
pMsg->pCont = (char *)pMsg + sizeof(SMnodeMsg);
memcpy(pMsg->pCont, pRpc->pCont, pRpc->contLen);
rpcFreeCont(pRpc->pCont);
int32_t code = mmPutMsgToWorker(pDnode, pWorker, pMnodeMsg);
int32_t code = mmPutMsgToWorker(pDnode, pWorker, pMsg);
if (code != 0) {
taosFreeQitem(pMnodeMsg);
taosFreeQitem(pMsg);
}
return code;
......
......@@ -33,6 +33,8 @@ typedef struct SProcQueue {
} SProcQueue;
static SProcQueue *taosProcQueueInit(int32_t size) {
if (size <= 0) size = SHM_DEFAULT_SIZE;
int32_t bufSize = CEIL4(size);
int32_t headSize = CEIL4(sizeof(SProcQueue));
......@@ -180,18 +182,12 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
return NULL;
}
pProc->cfg = *pCfg;
if (pProc->cfg.childQueueSize <= 0) {
pProc->cfg.childQueueSize = SHM_DEFAULT_SIZE;
}
if (pProc->cfg.parentQueueSize <= 0) {
pProc->cfg.parentQueueSize = SHM_DEFAULT_SIZE;
}
pProc->pChildQueue = taosProcQueueInit(pProc->cfg.childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pProc->cfg.parentQueueSize);
pProc->pParent = pCfg->pParent;
pProc->childFp = pCfg->childFp;
pProc->parentFp = pCfg->parentFp;
pProc->testFlag = pCfg->testFlag;
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
return pProc;
}
......@@ -218,13 +214,13 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent)
static void *taosProcThreadChildLoop(void *param) {
SProcObj *pProc = param;
taosProcThreadLoop(pProc->pChildQueue, pProc->cfg.childFp, pProc->pParent);
taosProcThreadLoop(pProc->pChildQueue, pProc->childFp, pProc->pParent);
return NULL;
}
static void *taosProcThreadParentLoop(void *param) {
SProcObj *pProc = param;
taosProcThreadLoop(pProc->pParentQueue, pProc->cfg.parentFp, pProc->pParent);
taosProcThreadLoop(pProc->pParentQueue, pProc->parentFp, pProc->pParent);
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册