提交 b16a21b2 编写于 作者: S Shengliang Guan

refactor: multi-process test mode

上级 0a1e96ec
...@@ -37,7 +37,7 @@ typedef struct SBnodeMgmt { ...@@ -37,7 +37,7 @@ typedef struct SBnodeMgmt {
// bmHandle.c // bmHandle.c
SArray *bmGetMsgHandles(); SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq);
// bmWorker.c // bmWorker.c
......
...@@ -67,7 +67,7 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -67,7 +67,7 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg; SRpcMsg *pReq = pMsg;
SDDropBnodeReq dropReq = {0}; SDDropBnodeReq dropReq = {0};
...@@ -76,14 +76,14 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -76,14 +76,14 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr()); dError("failed to drop bnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr()); dError("failed to write bnode file since %s", terrstr());
return -1; return -1;
} }
......
...@@ -49,7 +49,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); ...@@ -49,7 +49,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
......
...@@ -100,7 +100,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -100,7 +100,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg; SRpcMsg *pReq = pMsg;
SDDropMnodeReq dropReq = {0}; SDDropMnodeReq dropReq = {0};
...@@ -109,14 +109,18 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -109,14 +109,18 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr()); dError("failed to drop mnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path;
mgmt.name = pInput->name;
if (mmWriteFile(&mgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
......
...@@ -38,7 +38,7 @@ typedef struct SQnodeMgmt { ...@@ -38,7 +38,7 @@ typedef struct SQnodeMgmt {
// qmHandle.c // qmHandle.c
SArray *qmGetMsgHandles(); SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq);
// qmWorker.c // qmWorker.c
......
...@@ -67,7 +67,7 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -67,7 +67,7 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg; SRpcMsg *pReq = pMsg;
SDDropQnodeReq dropReq = {0}; SDDropQnodeReq dropReq = {0};
...@@ -76,14 +76,14 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -76,14 +76,14 @@ int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr()); dError("failed to drop qnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr()); dError("failed to write qnode file since %s", terrstr());
return -1; return -1;
} }
......
...@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt { ...@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt {
// smHandle.c // smHandle.c
SArray *smGetMsgHandles(); SArray *smGetMsgHandles();
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq);
// smWorker.c // smWorker.c
......
...@@ -67,7 +67,7 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -67,7 +67,7 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg; SRpcMsg *pReq = pMsg;
SDDropSnodeReq dropReq = {0}; SDDropSnodeReq dropReq = {0};
...@@ -76,14 +76,14 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -76,14 +76,14 @@ int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if (pMgmt->pData->dnodeId != 0 && dropReq.dnodeId != pMgmt->pData->dnodeId) { if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr()); dError("failed to drop snode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) { if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write snode file since %s", terrstr()); dError("failed to write snode file since %s", terrstr());
return -1; return -1;
} }
......
...@@ -106,22 +106,23 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { ...@@ -106,22 +106,23 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
return -1; return -1;
} }
taosThreadMutexLock(&pDnode->mutex);
pWrapper = &pDnode->wrappers[ntype]; pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
dmReleaseWrapper(pWrapper);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1; return -1;
} }
taosThreadMutexLock(&pDnode->mutex);
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
dInfo("node:%s, start to create", pWrapper->name);
int32_t code = (*pWrapper->func.createFp)(&input, pMsg); int32_t code = (*pWrapper->func.createFp)(&input, pMsg);
if (code != 0) { if (code != 0) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else { } else {
dDebug("node:%s, has been created", pWrapper->name); dInfo("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper); (void)dmOpenNode(pWrapper);
pWrapper->required = true; pWrapper->required = true;
pWrapper->deployed = true; pWrapper->deployed = true;
...@@ -143,12 +144,14 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { ...@@ -143,12 +144,14 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
} }
taosThreadMutexLock(&pDnode->mutex); taosThreadMutexLock(&pDnode->mutex);
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
int32_t code = (*pWrapper->func.dropFp)(pWrapper->pMgmt, pMsg); dInfo("node:%s, start to drop", pWrapper->name);
int32_t code = (*pWrapper->func.dropFp)(&input, pMsg);
if (code != 0) { if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else { } else {
dDebug("node:%s, has been dropped", pWrapper->name); dInfo("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false; pWrapper->required = false;
pWrapper->deployed = false; pWrapper->deployed = false;
} }
......
...@@ -126,7 +126,7 @@ typedef void (*NodeCloseFp)(void *pMgmt); ...@@ -126,7 +126,7 @@ typedef void (*NodeCloseFp)(void *pMgmt);
typedef int32_t (*NodeStartFp)(void *pMgmt); typedef int32_t (*NodeStartFp)(void *pMgmt);
typedef void (*NodeStopFp)(void *pMgmt); typedef void (*NodeStopFp)(void *pMgmt);
typedef int32_t (*NodeCreateFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); typedef int32_t (*NodeCreateFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
typedef int32_t (*NodeDropFp)(void *pMgmt, SRpcMsg *pMsg); typedef int32_t (*NodeDropFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required);
typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册