提交 6e523a47 编写于 作者: S Shengliang Guan

refactor: multi-process mode

上级 84261149
...@@ -86,7 +86,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -86,7 +86,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527}; SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527};
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); dTrace("send status msg to mnode, app:%p", rpcMsg.ahandle);
tmsgSendMnodeRecv(&rpcMsg, &rpcRsp); tmsgSendMnodeRecv(&rpcMsg, &rpcRsp);
dmProcessStatusRsp(pMgmt, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp);
} }
......
...@@ -125,8 +125,6 @@ typedef struct SDnode { ...@@ -125,8 +125,6 @@ typedef struct SDnode {
SDnodeTrans trans; SDnodeTrans trans;
SUdfdData udfdData; SUdfdData udfdData;
TdThreadMutex mutex; TdThreadMutex mutex;
SRWLatch latch;
SEpSet mnodeEps;
TdFilePtr lockfile; TdFilePtr lockfile;
SDnodeData data; SDnodeData data;
SMgmtWrapper wrappers[NODE_END]; SMgmtWrapper wrappers[NODE_END];
......
...@@ -32,6 +32,10 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) { ...@@ -32,6 +32,10 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, does not require startup in child process", pWrapper->name); dDebug("node:%s, does not require startup in child process", pWrapper->name);
} }
if (required) {
dDebug("node:%s, required to startup", pWrapper->name);
}
return required; return required;
} }
...@@ -71,8 +75,8 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -71,8 +75,8 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pData->disks = pOption->disks; pData->disks = pOption->disks;
pData->dataDir = strdup(pOption->dataDir); pData->dataDir = strdup(pOption->dataDir);
if (pData->dataDir == NULL || pData->localEp == NULL || pData->localFqdn == NULL || if (pData->dataDir == NULL || pData->localEp == NULL || pData->localFqdn == NULL || pData->firstEp == NULL ||
pData->firstEp == NULL || pData->secondEp == NULL) { pData->secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -191,10 +195,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -191,10 +195,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER; goto _OVER;
} }
if (dmInitClient(pDnode) != 0) {
goto _OVER;
}
if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) { if (OnlyInSingleProc(pDnode->ptype) || InParentProc(pDnode->ptype)) {
pDnode->lockfile = dmCheckRunning(pOption->dataDir); pDnode->lockfile = dmCheckRunning(pOption->dataDir);
if (pDnode->lockfile == NULL) { if (pDnode->lockfile == NULL) {
...@@ -207,6 +207,10 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { ...@@ -207,6 +207,10 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
} }
} }
if (dmInitClient(pDnode) != 0) {
goto _OVER;
}
dmReportStartup(pDnode, "dnode-transport", "initialized"); dmReportStartup(pDnode, "dnode-transport", "initialized");
dInfo("dnode is created, data:%p", pDnode); dInfo("dnode is created, data:%p", pDnode);
code = 0; code = 0;
...@@ -226,7 +230,6 @@ void dmClose(SDnode *pDnode) { ...@@ -226,7 +230,6 @@ void dmClose(SDnode *pDnode) {
dmCleanupClient(pDnode); dmCleanupClient(pDnode);
dmCleanupServer(pDnode); dmCleanupServer(pDnode);
dmClearVars(pDnode); dmClearVars(pDnode);
dInfo("dnode is closed, data:%p", pDnode); dInfo("dnode is closed, data:%p", pDnode);
} }
......
...@@ -144,9 +144,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { ...@@ -144,9 +144,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
} }
int32_t dmStartNode(SMgmtWrapper *pWrapper) { int32_t dmStartNode(SMgmtWrapper *pWrapper) {
if (!pWrapper->required) return 0;
if (OnlyInParentProc(pWrapper->proc.ptype)) return 0; if (OnlyInParentProc(pWrapper->proc.ptype)) return 0;
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) { if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
...@@ -201,6 +199,7 @@ static int32_t dmOpenNodes(SDnode *pDnode) { ...@@ -201,6 +199,7 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (dmOpenNode(pWrapper) != 0) { if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
} }
...@@ -212,7 +211,7 @@ static int32_t dmOpenNodes(SDnode *pDnode) { ...@@ -212,7 +211,7 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
static int32_t dmStartNodes(SDnode *pDnode) { static int32_t dmStartNodes(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (ntype == DNODE && (InChildProc(pDnode->ptype) || !OnlyInTestProc(pDnode->ptype))) continue; if (!pWrapper->required) continue;
if (dmStartNode(pWrapper) != 0) { if (dmStartNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
......
...@@ -22,21 +22,23 @@ ...@@ -22,21 +22,23 @@
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosRLockLatch(&pDnode->latch); SDnodeData *pData = &pDnode->data;
*pEpSet = pDnode->mnodeEps; taosRLockLatch(&pData->latch);
taosRUnLockLatch(&pDnode->latch); *pEpSet = pData->mnodeEps;
taosRUnLockLatch(&pData->latch);
} }
static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SDnodeData *pData = &pDnode->data;
taosWLockLatch(&pDnode->latch); taosWLockLatch(&pData->latch);
pDnode->mnodeEps = *pEpSet; pData->mnodeEps = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} }
taosWUnLockLatch(&pDnode->latch); taosWUnLockLatch(&pData->latch);
} }
static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册