提交 487c4f8d 编写于 作者: S Shengliang Guan

shm

上级 c022f478
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_BNODE_INT_H_
#define _TD_DND_BNODE_INT_H_
#include "mm.h"
#include "bm.h"
#include "dm.h"
#ifdef __cplusplus
......@@ -35,43 +35,21 @@ typedef struct SBnodeMgmt {
SDnodeWorker writeWorker;
} SBnodeMgmt;
// mmFile.c
// bmFile.c
int32_t bmReadFile(SBnodeMgmt *pMgmt);
int32_t bmWriteFile(SBnodeMgmt *pMgmt);
SBnode *bmAcquire(SBnodeMgmt *pMgmt);
void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode);
// SBnode *mmAcquire(SMnodeMgmt *pMgmt);
// void mmRelease(SMnodeMgmt *pMgmt, SBnode *pMnode);
// int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
// int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption);
// int32_t mmDrop(SMnodeMgmt *pMgmt);
// bmInt.c
int32_t bmOpen(SBnodeMgmt *pMgmt);
int32_t bmDrop(SBnodeMgmt *pMgmt);
// void bmGetMgmtFp(SMgmtWrapper *pMgmt);
// int32_t dndInitBnode(SDnode *pDnode);
// void dndCleanupBnode(SDnode *pDnode);
// void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
// int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
// int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pRpcMsg);
// void bmInitMsgHandles(SMgmtWrapper *pWrapper);
// int32_t bmStartWorker(SDnode *pDnode);
// void bmStopWorker(SDnode *pDnode);
// void bmInitMsgFp(SMnodeMgmt *pMgmt);
// void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
// int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
// int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
// void bmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
// void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
// void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
void bmStopWorker(SBnodeMgmt *pMgmt);
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -98,7 +98,7 @@ int32_t bmWriteFile(SBnodeMgmt *pMgmt) {
free(content);
char realfile[PATH_MAX];
snprintf(realfile, sizeof(realfile), "%s%sbnode.json", pMgmt->path);
snprintf(realfile, sizeof(realfile), "%s%sbnode.json", pMgmt->path, TD_DIRSEP);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR;
......
......@@ -47,7 +47,7 @@ void bmRelease(SBnodeMgmt *pMgmt, SBnode *pBnode) {
static bool bmRequire(SMgmtWrapper *pWrapper) {
SBnodeMgmt mgmt = {0};
mgmt.path = pWrapper->path;
if (mmReadFile(&mgmt) != 0) {
if (bmReadFile(&mgmt) != 0) {
return false;
}
......@@ -59,18 +59,12 @@ static bool bmRequire(SMgmtWrapper *pWrapper) {
if (mgmt.deployed) {
dInfo("bnode has been deployed");
return true;
}
bool required = mmDeployRequired(pWrapper->pDnode);
if (required) {
dInfo("bnode need to be deployed");
}
return required;
return mgmt.deployed;
}
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
......@@ -80,24 +74,25 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
pOption->clusterId = pDnode->clusterId;
}
int32_t bmOpen(SBnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
int32_t bmOpen(SBnodeMgmt *pMgmt) {
SBnodeOpt option = {0};
bmInitOption(pMgmt, &option);
SBnode *pBnode = bmAcquire(pDnode);
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode != NULL) {
bmRelease(pDnode, pBnode);
bmRelease(pMgmt, pBnode);
terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
dError("failed to create bnode since %s", terrstr());
return -1;
}
pBnode = bndOpen(pMgmt->path, pOption);
pBnode = bndOpen(pMgmt->path, &option);
if (pBnode == NULL) {
dError("failed to open bnode since %s", terrstr());
return -1;
}
if (bmStartWorker(pDnode) != 0) {
if (bmStartWorker(pMgmt) != 0) {
dError("failed to start bnode worker since %s", terrstr());
bndClose(pBnode);
bndDestroy(pMgmt->path);
......@@ -105,10 +100,10 @@ int32_t bmOpen(SBnodeMgmt *pMgmt, SMnodeOpt *pOption) {
}
pMgmt->deployed = 1;
if (bmWriteFile(pDnode) != 0) {
if (bmWriteFile(pMgmt) != 0) {
dError("failed to write bnode file since %s", terrstr());
pMgmt->deployed = 0;
bmStopWorker(pDnode);
bmStopWorker(pMgmt);
bndClose(pBnode);
bndDestroy(pMgmt->path);
return -1;
......@@ -155,11 +150,25 @@ int32_t bmDrop(SBnodeMgmt *pMgmt) {
return 0;
}
static void bmCleanup(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("bnode-mgmt start to cleanup");
if (pMgmt->pBnode) {
bmStopWorker(pMgmt);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("bnode-mgmt is cleaned up");
}
static int32_t bmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SBnodeMgmt *pMgmt = calloc(1, sizeof(SBnodeMgmt));
int32_t code = -1;
SBnodeOpt option = {0};
dInfo("bnode-mgmt start to init");
if (pMgmt == NULL) goto _OVER;
......@@ -175,8 +184,7 @@ static int32_t bmInit(SMgmtWrapper *pWrapper) {
}
dInfo("bnode start to open");
bmInitOption(pDnode, &option);
code = bmOpen(pMgmt, &option);
code = bmOpen(pMgmt);
_OVER:
if (code == 0) {
......@@ -190,21 +198,6 @@ _OVER:
return code;
}
static void bmCleanup(SMgmtWrapper *pWrapper) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("bnode-mgmt start to cleanup");
if (pMgmt->pBnode) {
bmStopWorker(pMgmt);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("bnode-mgmt is cleaned up");
}
void bmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = bmInit;
......
......@@ -31,7 +31,7 @@ int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dError("failed to create bnode since %s", terrstr());
return -1;
} else {
return bmOpen(pDnode);
return bmOpen(pMgmt);
}
}
......@@ -50,7 +50,7 @@ int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
return bmDrop(pDnode);
return bmDrop(pMgmt);
}
}
......
......@@ -14,17 +14,12 @@
*/
#define _DEFAULT_SOURCE
// #include "dndBnode.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
#include "bmInt.h"
#if 0
static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs);
static int32_t bmStartWorker(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) {
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) {
dError("failed to start bnode write worker since %s", terrstr());
return -1;
}
......@@ -32,9 +27,7 @@ static int32_t bmStartWorker(SDnode *pDnode) {
return 0;
}
static void bmStopWorker(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
void bmStopWorker(SBnodeMgmt *pMgmt) {
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
......@@ -46,103 +39,68 @@ static void bmStopWorker(SDnode *pDnode) {
dndCleanupWorker(&pMgmt->writeWorker);
}
static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rpcRsp);
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t numOfMsgs, int32_t code) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
dndSendBnodeErrorRsp(pMsg, code);
bmSendErrorRsp(pWrapper, pMsg, code);
}
}
static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
SBnode *pBnode = bmAcquire(pDnode);
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
SMgmtWrapper *pWrapper = pMgmt->pWrapper;
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode == NULL) {
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
}
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
bmRelease(pDnode, pBnode);
dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
bmRelease(pMgmt, pBnode);
bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
void *ptr = taosArrayPush(pArray, &pMsg);
if (ptr == NULL) {
dndSendBnodeErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
bmRelease(pMgmt, pBnode);
bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
}
bndProcessWMsgs(pBnode, pArray);
for (size_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
rpcFreeCont(pNodeMsg->rpcMsg.pCont);
taosFreeQitem(pNodeMsg);
}
taosArrayDestroy(pArray);
bmRelease(pDnode, pBnode);
bmRelease(pMgmt, pBnode);
}
static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
SBnode *pBnode = bmAcquire(pDnode);
if (pBnode != NULL) {
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
bmRelease(pDnode, pBnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
}
void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteBnodeMsgToWorker(pDnode, &pDnode->bmgmt.writeWorker, pMsg);
}
static int32_t bmPutMsgToWorker(SBnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
SBnode *pBnode = bmAcquire(pMgmt);
if (pBnode == NULL) return -1;
int32_t dndInitBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
taosInitRWLatch(&pMgmt->latch);
if (dndReadBnodeFile(pDnode) != 0) {
return -1;
}
if (pMgmt->dropped) {
dInfo("bnode has been deployed and needs to be deleted");
bndDestroy(pDnode->dir.bnode);
return 0;
}
if (!pMgmt->deployed) return 0;
return bmOpen(pDnode);
}
void dndCleanupBnode(SDnode *pDnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
if (pMgmt->pBnode) {
bmStopWorker(pDnode);
bndClose(pMgmt->pBnode);
pMgmt->pBnode = NULL;
}
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0);
bmRelease(pMgmt, pBnode);
return code;
}
#endif
\ No newline at end of file
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return bmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
}
\ No newline at end of file
......@@ -18,7 +18,7 @@
#include "dnd.h"
#include "bmInt.h"
#include "bm.h"
#include "dm.h"
#include "dndInt.h"
#include "mm.h"
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "bmInt.h"
#include "bm.h"
#include "dmInt.h"
#include "mm.h"
#include "qmInt.h"
......
......@@ -59,7 +59,7 @@ int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
return -1;
}
SMnode *pMnode = mndOpen(pMgmt->path, pOption);
pMnode = mndOpen(pMgmt->path, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
......
......@@ -21,10 +21,10 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcess
static void vmProcessFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); }
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
for (int32_t i = 0; i < numOfMsgs; ++i) {
SRpcMsg *pMsg = NULL;
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
void *ptr = taosArrayPush(pArray, &pMsg);
assert(ptr != NULL);
......@@ -33,9 +33,10 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
vnodeProcessWMsgs(pVnode->pImpl, pArray);
for (size_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pRsp = NULL;
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
SRpcMsg *pRsp = NULL;
SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pMsg = &pNodeMsg->rpcMsg;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp);
......@@ -48,9 +49,9 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
}
for (size_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
SNodeMsg *pNodeMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
rpcFreeCont(pNodeMsg->rpcMsg.pCont);
taosFreeQitem(pNodeMsg);
}
taosArrayDestroy(pArray);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册