提交 37da08f0 编写于 作者: S Shengliang Guan

shm

上级 5f0069e3
......@@ -23,10 +23,6 @@ extern "C" {
#endif
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -24,27 +24,25 @@ extern "C" {
#endif
typedef struct SBnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
const char *path;
SRWLatch latch;
SDnodeWorker writeWorker;
} SBnodeMgmt;
// bmFile.c
int32_t bmReadFile(SBnodeMgmt *pMgmt);
int32_t bmWriteFile(SBnodeMgmt *pMgmt);
SBnode *bmAcquire(SBnodeMgmt *pMgmt);
void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode);
int32_t bmReadFile(SBnodeMgmt *pMgmt, bool *pDeployed);
int32_t bmWriteFile(SBnodeMgmt *pMgmt, bool deployed);
// bmInt.c
int32_t bmOpen(SBnodeMgmt *pMgmt);
int32_t bmDrop(SBnodeMgmt *pMgmt);
int32_t bmOpen(SMgmtWrapper *pWrapper);
int32_t bmDrop(SMgmtWrapper *pWrapper);
// bmMsg.c
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
int32_t bmReadFile(SBnodeMgmt *pMgmt) {
int32_t bmReadFile(SBnodeMgmt *pMgmt, bool *pDeployed) {
int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 1024;
......@@ -51,17 +51,10 @@ int32_t bmReadFile(SBnodeMgmt *pMgmt) {
dError("failed to read %s since deployed not found", file);
goto PRASE_BNODE_OVER;
}
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_BNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
*pDeployed = deployed->valueint != 0;
code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
PRASE_BNODE_OVER:
if (content != NULL) free(content);
......@@ -72,7 +65,7 @@ PRASE_BNODE_OVER:
return code;
}
int32_t bmWriteFile(SBnodeMgmt *pMgmt) {
int32_t bmWriteFile(SBnodeMgmt *pMgmt, bool deployed) {
char file[PATH_MAX];
snprintf(file, sizeof(file), "%s%sbnode.json", pMgmt->path, TD_DIRSEP);
......@@ -88,8 +81,7 @@ int32_t bmWriteFile(SBnodeMgmt *pMgmt) {
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
......@@ -106,6 +98,6 @@ int32_t bmWriteFile(SBnodeMgmt *pMgmt) {
return -1;
}
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
dInfo("successed to write %s, deployed:%d", realfile, deployed);
return 0;
}
......@@ -16,55 +16,17 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
SBnode *bmAcquire(SBnodeMgmt *pMgmt) {
SBnode *pBnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pBnode = pMgmt->pBnode;
} else {
terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pBnode != NULL) {
dTrace("acquire bnode, refCount:%d", refCount);
}
return pBnode;
}
void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode) {
if (pBnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release bnode, refCount:%d", refCount);
}
static bool bmRequire(SMgmtWrapper *pWrapper) {
SBnodeMgmt mgmt = {0};
mgmt.path = pWrapper->path;
if (bmReadFile(&mgmt) != 0) {
return false;
}
if (mgmt.dropped) {
dInfo("bnode has been dropped and needs to be deleted");
taosRemoveDir(mgmt.path);
return false;
}
if (mgmt.deployed) {
dInfo("bnode has been deployed");
}
bool deployed = false;
(void)bmReadFile(&mgmt, &deployed);
return mgmt.deployed;
return deployed;
}
void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
......@@ -73,125 +35,87 @@ void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
pOption->clusterId = pDnode->clusterId;
}
int32_t bmOpen(SBnodeMgmt *pMgmt) {
static int32_t bmOpenImp(SBnodeMgmt *pMgmt) {
SBnodeOpt option = {0};
bmInitOption(pMgmt, &option);
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode != NULL) {
bmRelease(pMgmt, pBnode);
terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
dError("failed to create bnode since %s", terrstr());
return -1;
}
pBnode = bndOpen(pMgmt->path, &option);
if (pBnode == NULL) {
pMgmt->pBnode = bndOpen(pMgmt->path, &option);
if (pMgmt->pBnode == NULL) {
dError("failed to open bnode since %s", terrstr());
return -1;
}
if (bmStartWorker(pMgmt) != 0) {
dError("failed to start bnode worker since %s", terrstr());
bndClose(pBnode);
bndDestroy(pMgmt->path);
return -1;
}
pMgmt->deployed = 1;
if (bmWriteFile(pMgmt) != 0) {
bool deployed = true;
if (bmWriteFile(pMgmt, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr());
pMgmt->deployed = 0;
bmStopWorker(pMgmt);
bndClose(pBnode);
bndDestroy(pMgmt->path);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pBnode = pBnode;
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("bnode open successfully");
return 0;
}
int32_t bmDrop(SBnodeMgmt *pMgmt) {
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode == NULL) {
dError("failed to drop bnode since %s", terrstr());
return -1;
static void bmCloseImp(SBnodeMgmt *pMgmt) {
if (pMgmt->pBnode == NULL) {
bmStopWorker(pMgmt);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (bmWriteFile(pMgmt) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
int32_t bmDrop(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return 0;
bmRelease(pMgmt, pBnode);
dInfo("bnode-mgmt start to drop");
bool deployed = false;
if (bmWriteFile(pMgmt, deployed) != 0) {
dError("failed to drop bnode since %s", terrstr());
return -1;
}
bmRelease(pMgmt, pBnode);
bmStopWorker(pMgmt);
pMgmt->deployed = 0;
bmWriteFile(pMgmt);
bndClose(pBnode);
pMgmt->pBnode = NULL;
bmCloseImp(pMgmt);
bndDestroy(pMgmt->path);
pWrapper->pMgmt = NULL;
free(pMgmt);
dInfo("bnode-mgmt is dropped");
return 0;
}
static void bmCleanup(SMgmtWrapper *pWrapper) {
static void bmClose(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("bnode-mgmt start to cleanup");
if (pMgmt->pBnode) {
bmStopWorker(pMgmt);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
free(pMgmt);
bmCloseImp(pMgmt);
pWrapper->pMgmt = NULL;
free(pMgmt);
dInfo("bnode-mgmt is cleaned up");
}
static int32_t bmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt));
int32_t code = -1;
int32_t bmOpen(SMgmtWrapper *pWrapper) {
dInfo("bnode-mgmt start to init");
if (pMgmt == NULL) goto _OVER;
SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt));
if (pMgmt == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode;
pMgmt->pWrapper = pWrapper;
taosInitRWLatch(&pMgmt->latch);
if (bmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr());
goto _OVER;
}
pWrapper->pMgmt = pMgmt;
dInfo("bnode start to open");
code = bmOpen(pMgmt);
_OVER:
if (code == 0) {
pWrapper->pMgmt = pMgmt;
dInfo("bnode-mgmt is initialized");
} else {
int32_t code = bmOpenImp(pMgmt);
if (code != 0) {
dError("failed to init bnode-mgmt since %s", terrstr());
bmCleanup(pWrapper);
bmClose(pWrapper);
} else {
dInfo("bnode-mgmt is initialized");
}
return code;
......@@ -199,8 +123,10 @@ _OVER:
void bmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = bmInit;
mgmtFp.closeFp = bmCleanup;
mgmtFp.openFp = bmOpen;
mgmtFp.closeFp = bmClose;
mgmtFp.createMsgFp = bmProcessCreateReq;
mgmtFp.dropMsgFp = bmProcessDropReq;
mgmtFp.requiredFp = bmRequire;
bmInitMsgHandles(pWrapper);
......
......@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateBnodeReq createReq = {0};
......@@ -31,12 +31,12 @@ int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dError("failed to create bnode since %s", terrstr());
return -1;
} else {
return bmOpen(pMgmt);
return bmOpen(pWrapper);
}
}
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropBnodeReq dropReq = {0};
......@@ -50,7 +50,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
return bmDrop(pMgmt);
return bmDrop(pWrapper);
}
}
......
......@@ -16,32 +16,11 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs);
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
return -1;
}
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) {
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 0) {
taosMsleep(10);
}
dndCleanupWorker(&pMgmt->writeWorker);
}
static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rpcRsp);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
......@@ -57,15 +36,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
SMgmtWrapper *pWrapper = pMgmt->pWrapper;
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode == NULL) {
bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
}
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
bmRelease(pMgmt, pBnode);
bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
}
......@@ -73,34 +45,36 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs
for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
void *ptr = taosArrayPush(pArray, &pMsg);
if (ptr == NULL) {
bmRelease(pMgmt, pBnode);
if (taosArrayPush(pArray, &pMsg) == NULL) {
bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
}
bndProcessWMsgs(pBnode, pArray);
bndProcessWMsgs(pMgmt->pBnode, pArray);
for (size_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
rpcFreeCont(pNodeMsg->rpcMsg.pCont);
taosFreeQitem(pNodeMsg);
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
taosArrayDestroy(pArray);
bmRelease(pMgmt, pBnode);
}
static int32_t bmPutMsgToWorker(SBnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode == NULL) return -1;
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
bmRelease(pMgmt, pBnode);
return code;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
}
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return bmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
}
\ No newline at end of file
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
return -1;
}
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); }
......@@ -74,6 +74,8 @@ typedef struct SBnodeMgmt SBnodeMgmt;
typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg);
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper);
typedef struct {
......@@ -98,6 +100,8 @@ typedef struct SMsgHandle {
typedef struct SMgmtFp {
OpenNodeFp openFp;
CloseNodeFp closeFp;
CreateNodeFp createMsgFp;
DropNodeFp dropMsgFp;
RequireNodeFp requiredFp;
} SMgmtFp;
......
......@@ -41,8 +41,8 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
// dndExec.c
int32_t dndOpenNode(SDnode *pDnode, ENodeType nodeType);
int32_t dndCloseNode(SDnode *pDnode, ENodeType nodeType);
int32_t dndOpenNode(SMgmtWrapper *pWrapper);
void dndCloseNode(SMgmtWrapper *pWrapper);
int32_t dndRun(SDnode *pDnode);
// dndObj.c
......
......@@ -35,51 +35,37 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
return required;
}
int32_t dndOpenNode(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dndReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
return -1;
}
pWrapper = &pDnode->wrappers[ntype];
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
int32_t code = (*pWrapper->fp.openFp)(pWrapper);
if (code != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
} else {
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
}
return code;
pWrapper->deployed = true;
return 0;
}
int32_t dndCloseNode(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
return -1;
}
void dndCloseNode(SMgmtWrapper *pWrapper) {
taosWLockLatch(&pWrapper->latch);
(*pWrapper->fp.closeFp)(pWrapper);
pWrapper->deployed = false;
if (pWrapper->deployed) {
(*pWrapper->fp.closeFp)(pWrapper);
pWrapper->deployed = false;
}
if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL;
}
taosWUnLockLatch(&pWrapper->latch);
dndReleaseWrapper(pWrapper);
return 0;
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process mode");
for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
......@@ -91,7 +77,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("node:%s, will start in single process", pWrapper->name);
pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pDnode, ntype) != 0) {
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -109,9 +95,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
dndCleanupServer(pDnode);
for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
if (except == ntype) continue;
(void)dndCloseNode(pDnode, ntype);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
if (except == n) continue;
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper);
}
}
......@@ -145,8 +132,8 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("dnode run in multi process mode");
for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
......@@ -156,10 +143,10 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
return -1;
}
if (ntype == DNODE) {
if (n == DNODE) {
dInfo("node:%s, will start in parent process", pWrapper->name);
pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pDnode, ntype) != 0) {
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -195,10 +182,10 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dndResetLog(pWrapper);
dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
dndClearNodesExecpt(pDnode, ntype);
dndClearNodesExecpt(pDnode, n);
dInfo("node:%s, will be initialized in child process", pWrapper->name);
dndOpenNode(pDnode, ntype);
dndOpenNode(pWrapper);
} else {
dInfo("node:%s, will not start in parent process", pWrapper->name);
pWrapper->procType = PROC_PARENT;
......
......@@ -90,24 +90,55 @@ _OVER:
dndReleaseWrapper(pWrapper);
}
static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dndReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
return -1;
}
pWrapper = &pDnode->wrappers[ntype];
int32_t code = (*pWrapper->fp.createMsgFp)(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
}
return code;
}
static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
return -1;
}
dndCloseNode(pWrapper);
dndReleaseWrapper(pWrapper);
}
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE:
return dndOpenNode(pDnode, MNODE);
return dndProcessCreateNodeMsg(pDnode, MNODE, pMsg);
case TDMT_DND_DROP_MNODE:
return dndCloseNode(pDnode, MNODE);
return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
case TDMT_DND_CREATE_QNODE:
return dndOpenNode(pDnode, QNODE);
return dndProcessCreateNodeMsg(pDnode, QNODE, pMsg);
case TDMT_DND_DROP_QNODE:
return dndCloseNode(pDnode, QNODE);
return dndProcessDropNodeMsg(pDnode, QNODE, pMsg);
case TDMT_DND_CREATE_SNODE:
return dndOpenNode(pDnode, SNODE);
return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg);
case TDMT_DND_DROP_SNODE:
return dndCloseNode(pDnode, MNODE);
return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
case TDMT_DND_CREATE_BNODE:
return dndOpenNode(pDnode, BNODE);
return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
case TDMT_DND_DROP_BNODE:
return dndCloseNode(pDnode, BNODE);
return dndProcessDropNodeMsg(pDnode, BNODE, pMsg);
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......
......@@ -124,8 +124,9 @@ void dndClose(SDnode *pDnode) {
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
(void)dndCloseNode(pDnode, ntype);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper);
}
dndClearMemory(pDnode);
......
......@@ -49,7 +49,7 @@ TEST_F(MndTestBnode, 01_Show_Bnode) {
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
#if 0
TEST_F(MndTestBnode, 02_Create_Bnode) {
{
SMCreateBnodeReq createReq = {0};
......@@ -317,4 +317,5 @@ TEST_F(MndTestBnode, 04_Drop_Bnode_Rollback) {
ASSERT_NE(retry, retryMax);
}
}
\ No newline at end of file
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册