提交 c018ea80 编写于 作者: S Shengliang Guan

shm

上级 0f6e5e01
......@@ -86,6 +86,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122)
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123)
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0124)
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0125)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
......
......@@ -105,6 +105,7 @@ typedef struct SMgmtWrapper {
const char *name;
char *path;
int32_t refCount;
SRWLatch latch;
bool deployed;
bool dropped;
bool required;
......@@ -147,7 +148,7 @@ typedef struct SDnode {
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
......
......@@ -50,6 +50,9 @@ SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
// dndTransport.c
int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode);
......
......@@ -25,27 +25,40 @@ static void dndResetLog(SMgmtWrapper *pMgmt) {
taosInitLog(logname, 1);
}
static bool dndRequireNode(SMgmtWrapper *pMgmt) {
bool required = (*pMgmt->fp.requiredFp)(pMgmt);
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
bool required = (*pWrapper->fp.requiredFp)(pWrapper);
if (!required) {
dDebug("node:%s, no need to start", pMgmt->name);
dDebug("node:%s, no need to start", pWrapper->name);
} else {
dDebug("node:%s, need to start", pMgmt->name);
dDebug("node:%s, need to start", pWrapper->name);
}
return required;
}
int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); }
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
int32_t code = (*pWrapper->fp.openFp)(pWrapper);
if (code != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
} else {
dDebug("node:%s, has been opened", pWrapper->name);
}
pWrapper->deployed = true;
return 0;
}
void dndCloseNode(SMgmtWrapper *pWrapper) {
if (pWrapper->required) {
taosWLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
(*pWrapper->fp.closeFp)(pWrapper);
pWrapper->required = false;
pWrapper->deployed = false;
}
if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL;
}
taosWUnLockLatch(&pWrapper->latch);
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
......@@ -70,13 +83,14 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
}
}
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
if (dmStart(pWrapper->pMgmt) != 0) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
int32_t code = dmStart(pWrapper->pMgmt);
if (code != 0) {
dError("failed to start dnode worker since %s", terrstr());
return -1;
}
return 0;
dndReleaseWrapper(pWrapper);
return code;
}
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
......@@ -184,12 +198,14 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
}
}
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) {
dndReleaseWrapper(pWrapper);
dError("failed to start dnode worker since %s", terrstr());
return -1;
}
dndReleaseWrapper(pWrapper);
return 0;
}
......
......@@ -65,8 +65,6 @@ void dndCleanup() {
dInfo("dnode env is cleaned up");
}
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
}
......
......@@ -22,7 +22,7 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
return vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo);
return vmGetTfsMonitorInfo(dndAcquireWrapper(pDnode, VNODES), pInfo);
}
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
......@@ -45,8 +45,8 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
vmGetVnodeReqs(dndGetWrapper(pDnode, VNODES), pInfo);
pInfo->has_mnode = (dndGetWrapper(pDnode, MNODE)->required);
vmGetVnodeReqs(dndAcquireWrapper(pDnode, VNODES), pInfo);
pInfo->has_mnode = (dndAcquireWrapper(pDnode, MNODE)->required);
}
void dndSendMonitorReport(SDnode *pDnode) {
......@@ -63,7 +63,7 @@ void dndSendMonitorReport(SDnode *pDnode) {
SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0};
if (mmGetMonitorInfo(dndGetWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
if (mmGetMonitorInfo(dndAcquireWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
......
......@@ -16,8 +16,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
......@@ -34,7 +32,7 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet);
dmUpdateMnodeEpSet(dndAcquireWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet);
}
int32_t code = -1;
......@@ -91,13 +89,13 @@ static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = NULL;
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE:
return dndGetWrapper(pDnode, MNODE);
return dndAcquireWrapper(pDnode, MNODE);
case TDMT_DND_CREATE_QNODE:
return dndGetWrapper(pDnode, QNODE);
return dndAcquireWrapper(pDnode, QNODE);
case TDMT_DND_CREATE_SNODE:
return dndGetWrapper(pDnode, SNODE);
return dndAcquireWrapper(pDnode, SNODE);
case TDMT_DND_CREATE_BNODE:
return dndGetWrapper(pDnode, BNODE);
return dndAcquireWrapper(pDnode, BNODE);
default:
return NULL;
}
......
......@@ -94,6 +94,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
}
pWrapper->procType = PROC_SINGLE;
taosInitRWLatch(&pWrapper->latch);
}
code = 0;
......@@ -136,3 +137,29 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode);
pDnode->event = event;
}
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
return pRetWrapper;
}
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
}
\ No newline at end of file
......@@ -125,7 +125,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet);
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
......@@ -159,7 +159,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return 0;
}
if (mmGetUserAuth(dndGetWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) {
if (mmGetUserAuth(dndAcquireWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
......@@ -287,14 +287,14 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0};
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet);
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
return dndSendRpcReq(pTrans, &epSet, pReq);
}
}
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndGetWrapper(pWrapper->pDnode, DNODE);
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
} else {
rpcSendResponse(pRsp);
......
......@@ -42,7 +42,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
taosRUnLockLatch(&pMgmt->latch);
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmGetVnodeLoads(dndGetWrapper(pDnode, VNODES), req.pVloads);
vmGetVnodeLoads(dndAcquireWrapper(pDnode, VNODES), req.pVloads);
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
......
......@@ -69,6 +69,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_ALREADY_DEPLOYED, "Node already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_NODE_NOT_DEPLOYED, "Node not deployed")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册