提交 27c17bcc 编写于 作者: S Shengliang Guan

shm

上级 266bf53a
...@@ -24,15 +24,25 @@ extern "C" { ...@@ -24,15 +24,25 @@ extern "C" {
typedef struct SProcQueue SProcQueue; typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj; typedef struct SProcObj SProcObj;
typedef void *(*ProcFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont);
typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
typedef struct { typedef struct {
int32_t childQueueSize; int32_t childQueueSize;
int32_t parentQueueSize; ProcConsumeFp childConsumeFp;
ProcFp childFp; ProcMallocFp childMallocHeadFp;
ProcFp parentFp; ProcFreeFp childFreeHeadFp;
void *pParent; ProcMallocFp childMallocBodyFp;
bool testFlag; ProcFreeFp childFreeBodyFp;
int32_t parentQueueSize;
ProcConsumeFp parentConsumeFp;
ProcMallocFp parentdMallocHeadFp;
ProcFreeFp parentFreeHeadFp;
ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp;
bool testFlag;
void *pParent;
} SProcCfg; } SProcCfg;
SProcObj *taosProcInit(const SProcCfg *pCfg); SProcObj *taosProcInit(const SProcCfg *pCfg);
......
...@@ -81,7 +81,7 @@ typedef struct { ...@@ -81,7 +81,7 @@ typedef struct {
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SProcObj *pProcess; SProcObj *pProcess;
bool singleProc; bool singleProc;
} SMnodeMgmt; } SMndMgmt;
typedef struct { typedef struct {
int32_t refCount; int32_t refCount;
...@@ -144,7 +144,7 @@ typedef struct SDnode { ...@@ -144,7 +144,7 @@ typedef struct SDnode {
SDnodeDir dir; SDnodeDir dir;
TdFilePtr pLockFile; TdFilePtr pLockFile;
SDnodeMgmt dmgmt; SDnodeMgmt dmgmt;
SMnodeMgmt mmgmt; SMndMgmt mmgmt;
SQnodeMgmt qmgmt; SQnodeMgmt qmgmt;
SSnodeMgmt smgmt; SSnodeMgmt smgmt;
SBnodeMgmt bmgmt; SBnodeMgmt bmgmt;
......
...@@ -48,7 +48,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe ...@@ -48,7 +48,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe
// mmWorker // mmWorker
int32_t mmStartWorker(SDnode *pDnode); int32_t mmStartWorker(SDnode *pDnode);
void mmStopWorker(SDnode *pDnode); void mmStopWorker(SDnode *pDnode);
void mmInitMsgFp(SMnodeMgmt *pMgmt); void mmInitMsgFp(SMndMgmt *pMgmt);
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "mm.h" #include "mm.h"
int32_t mmReadFile(SDnode *pDnode) { int32_t mmReadFile(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
...@@ -115,7 +115,7 @@ PRASE_MNODE_OVER: ...@@ -115,7 +115,7 @@ PRASE_MNODE_OVER:
} }
int32_t mmWriteFile(SDnode *pDnode) { int32_t mmWriteFile(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
char file[PATH_MAX]; char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP);
......
...@@ -122,7 +122,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro ...@@ -122,7 +122,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro
} }
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) { if (pMnode == NULL) {
......
...@@ -29,7 +29,7 @@ int32_t mmInit(SDnode *pDnode) { ...@@ -29,7 +29,7 @@ int32_t mmInit(SDnode *pDnode) {
dInfo("mnode mgmt start to init"); dInfo("mnode mgmt start to init");
int32_t code = -1; int32_t code = -1;
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch); taosInitRWLatch(&pMgmt->latch);
mmInitMsgFp(pMgmt); mmInitMsgFp(pMgmt);
...@@ -76,7 +76,7 @@ _OVER: ...@@ -76,7 +76,7 @@ _OVER:
void mmCleanup(SDnode *pDnode) { void mmCleanup(SDnode *pDnode) {
dInfo("mnode mgmt start to clean up"); dInfo("mnode mgmt start to clean up");
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
if (pMgmt->pMnode) { if (pMgmt->pMnode) {
mmStopWorker(pDnode); mmStopWorker(pDnode);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
...@@ -86,7 +86,7 @@ void mmCleanup(SDnode *pDnode) { ...@@ -86,7 +86,7 @@ void mmCleanup(SDnode *pDnode) {
} }
SMnode *mmAcquire(SDnode *pDnode) { SMnode *mmAcquire(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = NULL; SMnode *pMnode = NULL;
int32_t refCount = 0; int32_t refCount = 0;
...@@ -108,7 +108,7 @@ SMnode *mmAcquire(SDnode *pDnode) { ...@@ -108,7 +108,7 @@ SMnode *mmAcquire(SDnode *pDnode) {
void mmRelease(SDnode *pDnode, SMnode *pMnode) { void mmRelease(SDnode *pDnode, SMnode *pMnode) {
if (pMnode == NULL) return; if (pMnode == NULL) return;
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
...@@ -116,19 +116,26 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) { ...@@ -116,19 +116,26 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) {
} }
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->singleProc = false; pMgmt->singleProc = false;
int32_t code = mmOpenImp(pDnode, pOption); int32_t code = mmOpenImp(pDnode, pOption);
if (code == 0 && !pMgmt->singleProc) { if (code == 0 && !pMgmt->singleProc) {
SProcCfg cfg = {0}; SProcCfg cfg = {.childQueueSize = 1024 * 1024,
cfg.childFp = (ProcFp)mmConsumeChildQueue; .childConsumeFp = (ProcConsumeFp)mmConsumeChildQueue,
cfg.parentFp = (ProcFp)mmConsumeParentQueue; .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
cfg.childQueueSize = 1024 * 1024; .childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
cfg.parentQueueSize = 1024 * 1024; .childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
cfg.testFlag = true; .childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
cfg.pParent = pDnode; .parentQueueSize = 1024 * 1024,
.parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.parentFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = true,
.pParent = pDnode};
pMgmt->pProcess = taosProcInit(&cfg); pMgmt->pProcess = taosProcInit(&cfg);
if (pMgmt->pProcess == NULL) { if (pMgmt->pProcess == NULL) {
...@@ -142,7 +149,7 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -142,7 +149,7 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
} }
int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) { if (pMnode == NULL) {
...@@ -161,7 +168,7 @@ int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -161,7 +168,7 @@ int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) {
} }
int32_t mmDrop(SDnode *pDnode) { int32_t mmDrop(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) { if (pMnode == NULL) {
...@@ -230,7 +237,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -230,7 +237,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
pReplica->port = pDnode->cfg.serverPort; pReplica->port = pDnode->cfg.serverPort;
memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica; pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
...@@ -238,7 +245,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -238,7 +245,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) {
mmInitOption(pDnode, pOption); mmInitOption(pDnode, pOption);
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
pOption->selfIndex = pMgmt->selfIndex; pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica; pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
...@@ -266,7 +273,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe ...@@ -266,7 +273,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe
return -1; return -1;
} }
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
pMgmt->replica = pOption->replica; pMgmt->replica = pOption->replica;
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
...@@ -274,7 +281,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe ...@@ -274,7 +281,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe
} }
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption); SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) { if (pMnode == NULL) {
......
...@@ -28,7 +28,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs ...@@ -28,7 +28,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs
static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg); static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg);
int32_t mmStartWorker(SDnode *pDnode) { int32_t mmStartWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeQueue) != 0) { if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr()); dError("failed to start mnode read worker since %s", terrstr());
return -1; return -1;
...@@ -48,7 +48,7 @@ int32_t mmStartWorker(SDnode *pDnode) { ...@@ -48,7 +48,7 @@ int32_t mmStartWorker(SDnode *pDnode) {
} }
void mmStopWorker(SDnode *pDnode) { void mmStopWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0; pMgmt->deployed = 0;
...@@ -63,7 +63,7 @@ void mmStopWorker(SDnode *pDnode) { ...@@ -63,7 +63,7 @@ void mmStopWorker(SDnode *pDnode) {
dndCleanupWorker(&pMgmt->syncWorker); dndCleanupWorker(&pMgmt->syncWorker);
} }
void mmInitMsgFp(SMnodeMgmt *pMgmt) { void mmInitMsgFp(SMndMgmt *pMgmt) {
// Requests handled by DNODE // Requests handled by DNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg;
...@@ -146,15 +146,9 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { ...@@ -146,15 +146,9 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
return -1; return -1;
} }
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->rpcMsg = *pRpc;
pMsg->createdTime = taosGetTimestampSec();
char *pCont = (char *)pMsg + sizeof(SMndMsg); memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
memcpy(pCont, pRpc->pCont, pRpc->contLen);
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
pMsg->rpcMsg.pCont = pCont;
pMsg->createdTime = taosGetTimestampSec(); pMsg->createdTime = taosGetTimestampSec();
dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user); dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user);
...@@ -162,9 +156,9 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { ...@@ -162,9 +156,9 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) {
} }
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1; int32_t code = -1;
SMndMsg *pMsg = NULL; SMndMsg *pMsg = NULL;
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) { if (msgFp == NULL) {
...@@ -172,8 +166,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -172,8 +166,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER; goto _OVER;
} }
int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; pMsg = taosAllocateQitem(sizeof(SMndMsg));
pMsg = taosAllocateQitem(contLen);
if (pMsg == NULL) { if (pMsg == NULL) {
goto _OVER; goto _OVER;
} }
...@@ -185,18 +178,13 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -185,18 +178,13 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pMgmt->singleProc) { if (pMgmt->singleProc) {
code = (*msgFp)(pDnode, pMsg); code = (*msgFp)(pDnode, pMsg);
} else { } else {
code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(pMsg), pRpc->pCont, pRpc->contLen); code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(SMndMsg), pRpc->pCont, pRpc->contLen);
} }
_OVER: _OVER:
if (code == 0) { if (code != 0) {
if (!pMgmt->singleProc) {
taosFreeQitem(pMsg);
}
} else {
bool isReq = (pRpc->msgType & 1U); bool isReq = (pRpc->msgType & 1U);
if (isReq) { if (isReq) {
if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc); dndSendRedirectRsp(pDnode, pRpc);
...@@ -206,9 +194,8 @@ _OVER: ...@@ -206,9 +194,8 @@ _OVER:
} }
} }
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
} }
rpcFreeCont(pRpc->pCont);
} }
int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) { int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) {
...@@ -235,64 +222,61 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs ...@@ -235,64 +222,61 @@ static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMs
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
if (pMnode == NULL) return -1; if (pMnode == NULL) return -1;
pMsg->pMnode = pMnode;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
mmRelease(pDnode, pMnode); mmRelease(pDnode, pMnode);
return code; return code;
} }
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; SMndMsg *pMsg = taosAllocateQitem(sizeof(SMndMsg));
SMndMsg *pMsg = taosAllocateQitem(contLen);
if (pMsg == NULL) { if (pMsg == NULL) {
return -1; return -1;
} }
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
pMsg->rpcMsg.pCont = (char *)pMsg + sizeof(SMndMsg); pMsg->createdTime = taosGetTimestampSec();
memcpy(pMsg->rpcMsg.pCont, pRpc->pCont, pRpc->contLen);
rpcFreeCont(pRpc->pCont);
int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg);
if (code != 0) { if (code != 0) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
} }
return code; return code;
} }
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = (char *)pMsg + sizeof(SMndMsg); pRpc->pCont = pCont;
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pDnode, pMsg); int32_t code = (*msgFp)(pDnode, pMsg);
if (code == 0) return; if (code != 0) {
bool isReq = (pRpc->msgType & 1U);
bool isReq = (pRpc->msgType & 1U); if (isReq) {
if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) {
if (isReq) { dndSendRedirectRsp(pDnode, pRpc);
if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { } else {
dndSendRedirectRsp(pDnode, pRpc); SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
} else { rpcSendResponse(&rsp);
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; }
rpcSendResponse(&rsp);
} }
taosFreeQitem(pMsg);
rpcFreeCont(pCont);
} }
taosFreeQitem(pMsg);
} }
void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {} void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {}
static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
pMsg->pMnode = pMnode;
mndProcessMsg(pMsg); mndProcessMsg(pMsg);
mmRelease(pDnode, pMnode); mmRelease(pDnode, pMnode);
} else { } else {
...@@ -300,4 +284,5 @@ static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { ...@@ -300,4 +284,5 @@ static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) {
} }
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
} }
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
#define CEIL8(n) (ceil((float)(n) / 8) * 8) #define CEIL8(n) (ceil((float)(n) / 8) * 8)
typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue { typedef struct SProcQueue {
int32_t head; int32_t head;
...@@ -29,18 +30,21 @@ typedef struct SProcQueue { ...@@ -29,18 +30,21 @@ typedef struct SProcQueue {
int32_t avail; int32_t avail;
int32_t items; int32_t items;
char *pBuffer; char *pBuffer;
ProcMallocFp mallocHeadFp;
ProcFreeFp freeHeadFp;
ProcMallocFp mallocBodyFp;
ProcFreeFp freeBodyFp;
ProcConsumeFp consumeFp;
void *pParent;
tsem_t sem; tsem_t sem;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SProcQueue; } SProcQueue;
typedef struct SProcObj { typedef struct SProcObj {
SProcQueue *pChildQueue;
SProcQueue *pParentQueue;
pthread_t childThread; pthread_t childThread;
SProcQueue *pChildQueue;
pthread_t parentThread; pthread_t parentThread;
ProcFp childFp; SProcQueue *pParentQueue;
ProcFp parentFp;
void *pParent;
int32_t pid; int32_t pid;
bool isChild; bool isChild;
bool stopFlag; bool stopFlag;
...@@ -144,6 +148,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -144,6 +148,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
pQueue->items++; pQueue->items++;
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(&pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
(*pQueue->freeHeadFp)(pHead);
(*pQueue->freeBodyFp)(pBody);
return 0; return 0;
} }
...@@ -169,13 +176,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea ...@@ -169,13 +176,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
bodyLen = *(int32_t *)(pQueue->pBuffer + 4); bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
} }
void *pHead = taosAllocateQitem(headLen); void *pHead = (*pQueue->mallocHeadFp)(headLen);
void *pBody = malloc(bodyLen); void *pBody = (*pQueue->mallocBodyFp)(bodyLen);
if (pHead == NULL || pBody == NULL) { if (pHead == NULL || pBody == NULL) {
pthread_mutex_unlock(&pQueue->mutex); pthread_mutex_unlock(&pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
taosFreeQitem(pHead); (*pQueue->freeHeadFp)(pHead);
free(pBody); (*pQueue->freeBodyFp)(pBody);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -229,13 +236,8 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -229,13 +236,8 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
return NULL; return NULL;
} }
pProc->pParent = pCfg->pParent;
pProc->childFp = pCfg->childFp;
pProc->parentFp = pCfg->parentFp;
pProc->testFlag = pCfg->testFlag;
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue); taosProcQueueCleanup(pProc->pParentQueue);
...@@ -243,17 +245,31 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { ...@@ -243,17 +245,31 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
return NULL; return NULL;
} }
pProc->testFlag = pCfg->testFlag;
pProc->pChildQueue->pParent = pCfg->pParent;
pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp;
pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp;
pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp;
pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp;
pProc->pChildQueue->consumeFp = pCfg->childConsumeFp;
pProc->pParentQueue->pParent = pCfg->pParent;
pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp;
pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp;
pProc->pParentQueue->mallocBodyFp = pCfg->parentMallocBodyFp;
pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp;
pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp;
// todo // todo
pProc->isChild = 0; pProc->isChild = 0;
return pProc; return pProc;
} }
static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) { static void taosProcThreadLoop(SProcQueue *pQueue) {
void *pHead; ProcConsumeFp consumeFp = pQueue->consumeFp;
void *pBody; void *pParent = pQueue->pParent;
int32_t headLen; void *pHead, *pBody;
int32_t bodyLen; int32_t headLen, bodyLen;
while (1) { while (1) {
int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); int32_t code = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen);
...@@ -265,30 +281,18 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent) ...@@ -265,30 +281,18 @@ static void taosProcThreadLoop(SProcQueue *pQueue, ProcFp procFp, void *pParent)
taosMsleep(1); taosMsleep(1);
continue; continue;
} else { } else {
(*procFp)(pParent, pHead, headLen, pBody, bodyLen); (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen);
} }
} }
} }
static void *taosProcThreadChildLoop(void *param) {
SProcObj *pProc = param;
taosProcThreadLoop(pProc->pChildQueue, pProc->childFp, pProc->pParent);
return NULL;
}
static void *taosProcThreadParentLoop(void *param) {
SProcObj *pProc = param;
taosProcThreadLoop(pProc->pParentQueue, pProc->parentFp, pProc->pParent);
return NULL;
}
int32_t taosProcStart(SProcObj *pProc) { int32_t taosProcStart(SProcObj *pProc) {
pthread_attr_t thAttr = {0}; pthread_attr_t thAttr = {0};
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pProc->isChild || pProc->testFlag) { if (pProc->isChild || pProc->testFlag) {
if (pthread_create(&pProc->childThread, &thAttr, taosProcThreadChildLoop, pProc) != 0) { if (pthread_create(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
return -1; return -1;
...@@ -296,7 +300,7 @@ int32_t taosProcStart(SProcObj *pProc) { ...@@ -296,7 +300,7 @@ int32_t taosProcStart(SProcObj *pProc) {
} }
if (!pProc->isChild || pProc->testFlag) { if (!pProc->isChild || pProc->testFlag) {
if (pthread_create(&pProc->parentThread, &thAttr, taosProcThreadParentLoop, pProc) != 0) { if (pthread_create(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr()); uError("failed to create thread since %s", terrstr());
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册