提交 f7cf8229 编写于 作者: S Shengliang Guan

refact(cluster): node mgmt

上级 a86b9faa
......@@ -81,7 +81,7 @@ int32_t dndRun(SDnode *pDnode);
* @param pDnode The dnode object to close.
* @param event The event to handle.
*/
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
void dndSetEvent(SDnode *pDnode, EDndEvent event);
#ifdef __cplusplus
}
......
......@@ -32,7 +32,7 @@ static struct {
static void dndStopDnode(int signum, void *info, void *ctx) {
SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode);
if (pDnode != NULL) {
dndHandleEvent(pDnode, DND_EVENT_STOP);
dndSetEvent(pDnode, DND_EVENT_STOP);
}
}
......
......@@ -25,14 +25,6 @@ extern "C" {
int32_t dndOpenNode(SMgmtWrapper *pWrapper);
void dndCloseNode(SMgmtWrapper *pWrapper);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndTransport.c
int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
......
......@@ -149,72 +149,4 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
}
}
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
return pRetWrapper;
}
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
return code;
}
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
}
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}
\ No newline at end of file
......@@ -16,67 +16,33 @@
#ifndef _TD_DND_INT_H_
#define _TD_DND_INT_H_
#include "dndLog.h"
#include "dndDef.h"
#ifdef __cplusplus
extern "C" {
#endif
const char *dndStatName(EDndRunStatus stat);
const char *dndLogName(EDndNodeType ntype);
const char *dndProcName(EDndNodeType ntype);
const char *dndEventName(EDndEvent ev);
// dndExec.c
int32_t dndOpenNode(SMgmtWrapper *pWrapper);
void dndCloseNode(SMgmtWrapper *pWrapper);
// dndFile.c
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
TdFilePtr dndCheckRunning(const char *dataDir);
int32_t dndReadShmFile(SDnode *pDnode);
int32_t dndWriteShmFile(SDnode *pDnode);
// dndInt.c
EDndRunStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndRunStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
const char *dndStatName(EDndRunStatus stat);
const char *dndLogName(EDndNodeType ntype);
const char *dndProcName(EDndNodeType ntype);
const char *dndEventName(EDndEvent ev);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
EDndRunStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndRunStatus stat);
void dndSetEvent(SDnode *pDnode, EDndEvent event);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndTransport.c
int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
// mgmt
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
void bmSetMgmtFp(SMgmtWrapper *pWrapper);
void qmSetMgmtFp(SMgmtWrapper *pMgmt);
void smSetMgmtFp(SMgmtWrapper *pWrapper);
void vmSetMgmtFp(SMgmtWrapper *pWrapper);
void mmSetMgmtFp(SMgmtWrapper *pMgmt);
void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pMsg);
void dmGetMonitorSysInfo(SMonSysInfo *pInfo);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo);
void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo);
void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo);
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo);
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo);
// dndFile.c
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
TdFilePtr dndCheckRunning(const char *dataDir);
int32_t dndReadShmFile(SDnode *pDnode);
int32_t dndWriteShmFile(SDnode *pDnode);
#ifdef __cplusplus
}
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dndNode.h"
#include "dndInt.h"
#include "wal.h"
static int8_t once = DND_ENV_INIT;
......
......@@ -232,7 +232,8 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
if (ntype == NODE_END - 1) {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dndProcName(ntype), pWrapper->procShm.size);
} else {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndProcName(ntype), pWrapper->procShm.size);
len +=
snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndProcName(ntype), pWrapper->procShm.size);
}
}
len += snprintf(content + len, MAXLEN - len, "}\n");
......
......@@ -16,15 +16,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
EDndRunStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndRunStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dndStatName(pDnode->status), dndStatName(status));
pDnode->status = status;
}
}
const char *dndStatName(EDndRunStatus status) {
switch (status) {
case DND_STAT_INIT:
......@@ -83,4 +74,89 @@ const char *dndEventName(EDndEvent ev) {
default:
return "UNKNOWN";
}
}
\ No newline at end of file
}
EDndRunStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndRunStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dndStatName(pDnode->status), dndStatName(status));
pDnode->status = status;
}
}
void dndSetEvent(SDnode *pDnode, EDndEvent event) {
if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
}
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
return pRetWrapper;
}
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
return code;
}
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
}
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}
......@@ -68,7 +68,7 @@ bool TestServer::Start(const char* path, const char* fqdn, uint16_t port, const
}
void TestServer::Stop() {
dndHandleEvent(pDnode, DND_EVENT_STOP);
dndSetEvent(pDnode, DND_EVENT_STOP);
taosThreadJoin(threadId, NULL);
if (pDnode != NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册