提交 fd7f61a0 编写于 作者: S Shengliang Guan

shm

上级 1363021f
...@@ -67,11 +67,11 @@ void mndClose(SMnode *pMnode); ...@@ -67,11 +67,11 @@ void mndClose(SMnode *pMnode);
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption); int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption);
/** /**
* @brief Drop a mnode. * @brief Start mnode
* *
* @param path Path of the mnode. * @param pMnode The mnode object.
*/ */
void mndDestroy(const char *path); int32_t mndStart(SMnode *pMnode);
/** /**
* @brief Get mnode monitor info. * @brief Get mnode monitor info.
......
...@@ -69,6 +69,7 @@ typedef struct SBnodeMgmt SBnodeMgmt; ...@@ -69,6 +69,7 @@ typedef struct SBnodeMgmt SBnodeMgmt;
typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg); typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg);
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
typedef int32_t (*StartNodeFp)(SMgmtWrapper *pWrapper);
typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
...@@ -95,6 +96,7 @@ typedef struct SMsgHandle { ...@@ -95,6 +96,7 @@ typedef struct SMsgHandle {
typedef struct SMgmtFp { typedef struct SMgmtFp {
OpenNodeFp openFp; OpenNodeFp openFp;
CloseNodeFp closeFp; CloseNodeFp closeFp;
StartNodeFp startFp;
CreateNodeFp createMsgFp; CreateNodeFp createMsgFp;
DropNodeFp dropMsgFp; DropNodeFp dropMsgFp;
RequireNodeFp requiredFp; RequireNodeFp requiredFp;
......
...@@ -84,14 +84,19 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { ...@@ -84,14 +84,19 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
} }
} }
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); dndSetStatus(pDnode, DND_STAT_RUNNING);
int32_t code = dmStart(pWrapper->pMgmt);
if (code != 0) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
dError("failed to start dnode worker since %s", terrstr()); SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
} }
dndReleaseWrapper(pWrapper); return 0;
return code;
} }
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
...@@ -198,6 +203,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -198,6 +203,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
} }
} }
#if 0
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) { if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) {
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper);
...@@ -206,6 +212,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -206,6 +212,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
} }
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper);
#endif
return 0; return 0;
} }
...@@ -222,7 +229,6 @@ int32_t dndRun(SDnode *pDnode) { ...@@ -222,7 +229,6 @@ int32_t dndRun(SDnode *pDnode) {
} }
} }
dndSetStatus(pDnode, DND_STAT_RUNNING);
dndReportStartup(pDnode, "TDengine", "initialized successfully"); dndReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) { while (1) {
......
...@@ -26,8 +26,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { ...@@ -26,8 +26,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
if (pRsp == NULL || pRsp->pCont == NULL) return; // if (pRsp == NULL || pRsp->pCont == NULL) return;
dTrace("rsp:%s ignored since dnode exiting, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
return; return;
} }
...@@ -276,8 +276,13 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { ...@@ -276,8 +276,13 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
} else { } else {
STransMgmt *pTrans = &pWrapper->pDnode->trans; SDnode *pDnode = pWrapper->pDnode;
return dndSendRpcReq(pTrans, pEpSet, pReq); if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
terrno = TSDB_CODE_DND_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
return -1;
}
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
} }
} }
......
...@@ -30,7 +30,6 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper); ...@@ -30,7 +30,6 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStart(SDnodeMgmt *pMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -53,6 +53,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); ...@@ -53,6 +53,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
// dmWorker.c // dmWorker.c
int32_t dmStartWorker(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt);
void dmStopWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt);
int32_t dmStartThread(SDnodeMgmt *pMgmt);
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -73,7 +73,10 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -73,7 +73,10 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) {
rpcSendRedirectRsp(pReq->handle, &epSet); rpcSendRedirectRsp(pReq->handle, &epSet);
} }
int32_t dmStart(SDnodeMgmt *pMgmt) { return dmStartWorker(pMgmt); } static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode mgmt start to run");
return dmStartThread(pWrapper->pMgmt);
}
int32_t dmInit(SMgmtWrapper *pWrapper) { int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
...@@ -105,6 +108,10 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { ...@@ -105,6 +108,10 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
if (dmStartWorker(pMgmt) != 0) {
return -1;
}
pWrapper->pMgmt = pMgmt; pWrapper->pMgmt = pMgmt;
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;
...@@ -145,6 +152,7 @@ void dmGetMgmtFp(SMgmtWrapper *pWrapper) { ...@@ -145,6 +152,7 @@ void dmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0}; SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit; mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup; mgmtFp.closeFp = dmCleanup;
mgmtFp.startFp = dmStart;
mgmtFp.requiredFp = dmRequire; mgmtFp.requiredFp = dmRequire;
dmInitMsgHandles(pWrapper); dmInitMsgHandles(pWrapper);
......
...@@ -108,6 +108,10 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) { ...@@ -108,6 +108,10 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
return -1; return -1;
} }
return 0;
}
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt); pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
if (pMgmt->threadId == NULL) { if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread"); dError("failed to init dnode thread");
......
...@@ -222,10 +222,17 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { ...@@ -222,10 +222,17 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
return mmOpenFromMsg(pWrapper, NULL); return mmOpenFromMsg(pWrapper, NULL);
} }
static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode mgmt start to run");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndStart(pMgmt->pMnode);
}
void mmGetMgmtFp(SMgmtWrapper *pWrapper) { void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0}; SMgmtFp mgmtFp = {0};
mgmtFp.openFp = mmOpen; mgmtFp.openFp = mmOpen;
mgmtFp.closeFp = mmClose; mgmtFp.closeFp = mmClose;
mgmtFp.startFp = mmStart;
mgmtFp.createMsgFp = mmProcessCreateReq; mgmtFp.createMsgFp = mmProcessCreateReq;
mgmtFp.dropMsgFp = mmProcessDropReq; mgmtFp.dropMsgFp = mmProcessDropReq;
mgmtFp.requiredFp = mmRequire; mgmtFp.requiredFp = mmRequire;
......
...@@ -23,7 +23,7 @@ void Testbase::InitLog(const char* path) { ...@@ -23,7 +23,7 @@ void Testbase::InitLog(const char* path) {
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 135; tmrDebugFlag = 135;
uDebugFlag = 135; uDebugFlag = 135;
rpcDebugFlag = 135; rpcDebugFlag = 143;
qDebugFlag = 0; qDebugFlag = 0;
wDebugFlag = 0; wDebugFlag = 0;
sDebugFlag = 0; sDebugFlag = 0;
......
...@@ -928,13 +928,13 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -928,13 +928,13 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0; return 0;
} else { } else {
mError("trans:%d, all %d actions executed, code:0x%04x", pTrans->id, numOfActions, errCode); mError("trans:%d, all %d actions executed, code:0x%04x", pTrans->id, numOfActions, errCode & 0XFFFF);
mndTransResetActions(pMnode, pTrans, pArray); mndTransResetActions(pMnode, pTrans, pArray);
terrno = errCode; terrno = errCode;
return errCode; return errCode;
} }
} else { } else {
mDebug("trans:%d, %d of %d actions executed, code:0x%04x", pTrans->id, numOfReceived, numOfActions, errCode); mDebug("trans:%d, %d of %d actions executed, code:0x%04x", pTrans->id, numOfReceived, numOfActions, errCode & 0XFFFF);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
} }
......
...@@ -230,7 +230,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { ...@@ -230,7 +230,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
} else { } else {
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
} }
if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1; // if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
...@@ -379,10 +379,8 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -379,10 +379,8 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return 0; return 0;
} }
void mndDestroy(const char *path) { int32_t mndStart(SMnode *pMnode) {
mDebug("start to destroy mnode at %s", path); return mndInitTimer(pMnode);
taosRemoveDir(path);
mDebug("mnode is destroyed");
} }
int32_t mndProcessMsg(SNodeMsg *pMsg) { int32_t mndProcessMsg(SNodeMsg *pMsg) {
......
...@@ -204,6 +204,8 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ...@@ -204,6 +204,8 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
} }
taosMsleep(1000);
{ {
// show trans // show trans
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
...@@ -241,6 +243,7 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ...@@ -241,6 +243,7 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
EXPECT_EQ(test.GetShowRows(), 0); EXPECT_EQ(test.GetShowRows(), 0);
} }
uInfo("======== re-create trans");
// re-create trans // re-create trans
{ {
SMCreateQnodeReq createReq = {0}; SMCreateQnodeReq createReq = {0};
...@@ -255,10 +258,14 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ...@@ -255,10 +258,14 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
} }
uInfo("======== kill and restart server")
KillThenRestartServer(); KillThenRestartServer();
uInfo("======== server2 start")
server2.DoStart(); server2.DoStart();
uInfo("======== server2 started")
{ {
int32_t retry = 0; int32_t retry = 0;
int32_t retryMax = 20; int32_t retryMax = 20;
......
...@@ -74,7 +74,7 @@ BUILD_DIR=$TAOS_DIR/$BIN_DIR ...@@ -74,7 +74,7 @@ BUILD_DIR=$TAOS_DIR/$BIN_DIR
SIM_DIR=$TAOS_DIR/sim SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME NODE_DIR=$SIM_DIR/$NODE_NAME
EXE_DIR=$BUILD_DIR/source/dnode/mgmt/daemon EXE_DIR=$BUILD_DIR/source/dnode/mgmt/main
CFG_DIR=$NODE_DIR/cfg CFG_DIR=$NODE_DIR/cfg
LOG_DIR=$NODE_DIR/log LOG_DIR=$NODE_DIR/log
DATA_DIR=$NODE_DIR/data DATA_DIR=$NODE_DIR/data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册