提交 fd8f246a 编写于 作者: S Shengliang Guan

refactor: node mgmt

上级 2623202c
......@@ -23,12 +23,9 @@ static void *dmStatusThreadFp(void *param) {
setThreadName("dnode-status");
while (1) {
taosThreadTestCancel();
taosMsleep(200);
if (pMgmt->data.status != DND_STAT_RUNNING || pMgmt->data.dropped) {
continue;
}
taosThreadTestCancel();
if (pMgmt->data.dropped) continue;
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
......@@ -48,12 +45,9 @@ static void *dmMonitorThreadFp(void *param) {
setThreadName("dnode-monitor");
while (1) {
taosThreadTestCancel();
taosMsleep(200);
if (pMgmt->data.status != DND_STAT_RUNNING || pMgmt->data.dropped) {
continue;
}
taosThreadTestCancel();
if (pMgmt->data.dropped) continue;
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
......
......@@ -266,7 +266,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
SArray *mmGetMsgHandles() {
SArray *vmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
......
......@@ -16,15 +16,98 @@
#ifndef _TD_DND_IMP_H_
#define _TD_DND_IMP_H_
#include "dmInt.h"
// tobe deleted
#include "uv.h"
#include "dmUtil.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SMgmtWrapper {
SDnode *pDnode;
SMgmtFunc func;
void *pMgmt;
const char *name;
char *path;
int32_t refCount;
SRWLatch latch;
EDndNodeType nodeType;
bool deployed;
bool required;
EDndProcType procType;
int32_t procId;
SProcObj *procObj;
SShm procShm;
NodeMsgFp msgFps[TDMT_MAX];
} SMgmtWrapper;
typedef struct {
EDndNodeType defaultNtype;
bool needCheckVgId;
} SMsgHandle;
typedef struct {
void *serverRpc;
void *clientRpc;
SMsgHandle msgHandles[TDMT_MAX];
} SDnodeTrans;
typedef struct {
char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN];
} SStartupInfo;
typedef struct SUdfdData {
bool startCalled;
bool needCleanUp;
uv_loop_t loop;
uv_thread_t thread;
uv_barrier_t barrier;
uv_process_t process;
int spawnErr;
uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
int32_t stopCalled;
int32_t dnodeId;
} SUdfdData;
typedef struct SDnode {
EDndProcType ptype;
EDndNodeType ntype;
EDndEvent event;
EDndRunStatus status;
SStartupInfo startup;
SDnodeTrans trans;
SUdfdData udfdData;
TdThreadMutex mutex;
SRWLatch latch;
SEpSet mnodeEps;
TdFilePtr lockfile;
SMgmtInputOpt input;
SMgmtWrapper wrappers[NODE_END];
} SDnode;
// dmExec.c
int32_t dmOpenNode(SMgmtWrapper *pWrapper);
void dmCloseNode(SMgmtWrapper *pWrapper);
// dmObj.c
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmSetEvent(SDnode *pDnode, EDndEvent event);
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc);
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
// dmTransport.c
int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode);
......@@ -36,49 +119,13 @@ int32_t dmInitMsgHandle(SDnode *pDnode);
void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp);
// dmEps.c
int32_t dmReadEps(SDnode *pDnode);
int32_t dmWriteEps(SDnode *pDnode);
void dmUpdateEps(SDnode *pDnode, SArray *pDnodeEps);
// dmHandle.c
void dmSendStatusReq(SDnode *pDnode);
int32_t dmProcessConfigReq(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
// dmMonitor.c
void dmGetVnodeLoads(SDnode *pDnode, SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SDnode *pDnode, SMonMloadInfo *pInfo);
void dmSendMonitorReport(SDnode *pDnode);
// dmWorker.c
int32_t dmStartStatusThread(SDnode *pDnode);
void dmStopStatusThread(SDnode *pDnode);
int32_t dmStartMonitorThread(SDnode *pDnode);
void dmStopMonitorThread(SDnode *pDnode);
int32_t dmStartWorker(SDnode *pDnode);
void dmStopWorker(SDnode *pDnode);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
// mgmt nodes
void dmInitWrapper(SMgmtWrapper *pWrapper);
void bmInitWrapper(SMgmtWrapper *pWrapper);
void qmInitWrapper(SMgmtWrapper *pWrapper);
void smInitWrapper(SMgmtWrapper *pWrapper);
void vmInitWrapper(SMgmtWrapper *pWrapper);
void mmInitWrapper(SMgmtWrapper *pWrapper);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo);
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo);
void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo);
void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo);
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo);
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo);
SMgmtFunc dmGetMgmtFunc();
SMgmtFunc bmGetMgmtFunc();
SMgmtFunc qmGetMgmtFunc();
SMgmtFunc smGetMgmtFunc();
SMgmtFunc vmGetMgmtFunc();
SMgmtFunc mmGetMgmtFunc();
#ifdef __cplusplus
}
......
......@@ -16,15 +16,6 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
bool required = false;
int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name);
}
return required;
}
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
int32_t shmsize = tsMnodeShmSize;
if (pWrapper->nodeType == VNODE) {
......@@ -123,8 +114,15 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
return -1;
}
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
SMgmtOutputOpt output = {0};
pInput->msgCb = dmGetMsgcb(pWrapper);
if (pWrapper->nodeType == DNODE) {
tmsgSetDefaultMsgCb(&pInput->msgCb);
}
if (pWrapper->procType == DND_PROC_SINGLE || pWrapper->procType == DND_PROC_CHILD) {
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
if ((*pWrapper->func.openFp)(pInput, &output) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -136,27 +134,39 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
pWrapper->deployed = true;
} else {
if (dmInitParentProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->procShm) != 0) return -1;
if (dmRunParentProc(pWrapper) != 0) return -1;
}
if (output.dnodeId != 0) {
pInput->dnodeId = output.dnodeId;
}
if (output.pMgmt != NULL) {
pWrapper->pMgmt = output.pMgmt;
}
if (output.mnodeEps.numOfEps != 0) {
pWrapper->pDnode->mnodeEps = output.mnodeEps;
}
dmReportStartup(pWrapper->pDnode, pWrapper->name, "openned");
return 0;
}
int32_t dmStartNode(SMgmtWrapper *pWrapper) {
if (!pWrapper->required) return 0;
if (pWrapper->procType == DND_PROC_PARENT) {
dInfo("node:%s, not start in parent process", pWrapper->name);
} else if (pWrapper->procType == DND_PROC_CHILD) {
dInfo("node:%s, start in child process", pWrapper->name);
if (pWrapper->nodeType != DNODE) {
if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
} else {
if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
if (pWrapper->func.startFp != NULL && (*pWrapper->func.startFp)(pWrapper->pMgmt) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -167,8 +177,8 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) {
}
void dmStopNode(SMgmtWrapper *pWrapper) {
if (pWrapper->fp.stopFp != NULL) {
(*pWrapper->fp.stopFp)(pWrapper);
if (pWrapper->func.stopFp != NULL) {
(*pWrapper->func.stopFp)(pWrapper->pMgmt);
}
}
......@@ -190,10 +200,8 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
}
}
dmStopNode(pWrapper);
taosWLockLatch(&pWrapper->latch);
(*pWrapper->fp.closeFp)(pWrapper);
(*pWrapper->func.closeFp)(pWrapper->pMgmt);
taosWUnLockLatch(&pWrapper->latch);
if (pWrapper->procObj) {
......@@ -207,48 +215,18 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
static int32_t dmOpenNodes(SDnode *pDnode) {
if (pDnode->ptype == DND_PROC_CHILD) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) {
dError("dnode:%s, failed to open since not required", pWrapper->name);
}
pWrapper->procType = DND_PROC_CHILD;
if (dmInitClient(pDnode) != 0) {
return -1;
}
pDnode->data.msgCb = dmGetMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
return dmOpenNode(pWrapper);
} else {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dmRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (pDnode->ptype == DND_PROC_PARENT && n != DNODE) {
pWrapper->procType = DND_PROC_PARENT;
} else {
pWrapper->procType = DND_PROC_SINGLE;
}
if (n == DNODE) {
if (dmInitClient(pDnode) != 0) {
return -1;
}
pDnode->data.msgCb = dmGetMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
}
if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
pWrapper->procType = DND_PROC_SINGLE;
} else {
pWrapper->procType = pDnode->ptype;
}
return dmOpenNode(pWrapper);
}
}
......@@ -259,7 +237,6 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
static int32_t dmStartNodes(SDnode *pDnode) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (dmStartNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
......@@ -313,16 +290,27 @@ static void dmWatchNodes(SDnode *pDnode) {
}
int32_t dmRun(SDnode *pDnode) {
if (!tsMultiProcess) {
if (tsMultiProcess == 0) {
pDnode->ptype = DND_PROC_SINGLE;
dInfo("dnode run in single process");
dInfo("dnode run in single process mode");
} else if (tsMultiProcess == 2) {
pDnode->ptype = DND_PROC_TEST;
dInfo("dnode run in multi-process test mode");
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
pDnode->ptype = DND_PROC_PARENT;
dInfo("dnode run in parent process");
dInfo("dnode run in parent process mode");
} else {
pDnode->ptype = DND_PROC_CHILD;
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
dInfo("%s run in child process", pWrapper->name);
dInfo("%s run in child process mode", pWrapper->name);
}
if (pDnode->ptype != DND_PROC_CHILD) {
if (dmInitServer(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
}
if (dmOpenNodes(pDnode) != 0) {
......
......@@ -17,66 +17,72 @@
#include "dmImp.h"
static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->data.dnodeId = 0;
pDnode->data.clusterId = 0;
pDnode->data.dnodeVer = 0;
pDnode->data.updateTime = 0;
pDnode->data.rebootTime = taosGetTimestampMs();
pDnode->data.dropped = 0;
pDnode->data.localEp = strdup(pOption->localEp);
pDnode->data.localFqdn = strdup(pOption->localFqdn);
pDnode->data.firstEp = strdup(pOption->firstEp);
pDnode->data.secondEp = strdup(pOption->secondEp);
pDnode->data.dataDir = strdup(pOption->dataDir);
pDnode->data.disks = pOption->disks;
pDnode->data.numOfDisks = pOption->numOfDisks;
pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
pDnode->data.serverPort = pOption->serverPort;
pDnode->ntype = pOption->ntype;
pDnode->input.dnodeId = 0;
pDnode->input.clusterId = 0;
pDnode->input.localEp = strdup(pOption->localEp);
pDnode->input.localFqdn = strdup(pOption->localFqdn);
pDnode->input.firstEp = strdup(pOption->firstEp);
pDnode->input.secondEp = strdup(pOption->secondEp);
pDnode->input.serverPort = pOption->serverPort;
pDnode->input.supportVnodes = pOption->numOfSupportVnodes;
pDnode->input.numOfDisks = pOption->numOfDisks;
pDnode->input.disks = pOption->disks;
pDnode->input.dataDir = strdup(pOption->dataDir);
pDnode->input.pDnode = pDnode;
if (pDnode->data.dataDir == NULL || pDnode->data.localEp == NULL || pDnode->data.localFqdn == NULL ||
pDnode->data.firstEp == NULL || pDnode->data.secondEp == NULL) {
if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL ||
pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pDnode->ntype = pOption->ntype;
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_END) {
pDnode->data.lockfile = dmCheckRunning(pDnode->data.dataDir);
if (pDnode->data.lockfile == NULL) {
pDnode->lockfile = dmCheckRunning(pOption->dataDir);
if (pDnode->lockfile == NULL) {
return -1;
}
}
taosInitRWLatch(&pDnode->data.latch);
taosThreadMutexInit(&pDnode->mutex, NULL);
return 0;
}
static void dmClearVars(SDnode *pDnode) {
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path);
}
if (pDnode->data.lockfile != NULL) {
taosUnLockFile(pDnode->data.lockfile);
taosCloseFile(&pDnode->data.lockfile);
pDnode->data.lockfile = NULL;
}
taosMemoryFreeClear(pDnode->data.localEp);
taosMemoryFreeClear(pDnode->data.localFqdn);
taosMemoryFreeClear(pDnode->data.firstEp);
taosMemoryFreeClear(pDnode->data.secondEp);
taosMemoryFreeClear(pDnode->data.dataDir);
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path);
}
if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile);
taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL;
}
taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
taosMemoryFree(pDnode);
taosMemoryFreeClear(pDnode->input.localEp);
taosMemoryFreeClear(pDnode->input.localFqdn);
taosMemoryFreeClear(pDnode->input.firstEp);
taosMemoryFreeClear(pDnode->input.secondEp);
taosMemoryFreeClear(pDnode->input.dataDir);
dDebug("dnode memory is cleared, data:%p", pDnode);
}
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
bool required = false;
int32_t code = (*pWrapper->func.requiredFp)(&pWrapper->pDnode->input, &required);
if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name);
}
return required;
}
SDnode *dmCreate(const SDnodeOpt *pOption) {
dDebug("start to create dnode");
int32_t code = -1;
char path[PATH_MAX] = {0};
char path[PATH_MAX + 100] = {0};
SDnode *pDnode = NULL;
pDnode = taosMemoryCalloc(1, sizeof(SDnode));
......@@ -85,38 +91,36 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
if (dmInitVars(pDnode, pOption) != 0) {
dError("failed to init variables since %s", terrstr());
goto _OVER;
}
dmSetStatus(pDnode, DND_STAT_INIT);
dmInitWrapper(&pDnode->wrappers[DNODE]);
mmInitWrapper(&pDnode->wrappers[MNODE]);
vmInitWrapper(&pDnode->wrappers[VNODE]);
qmInitWrapper(&pDnode->wrappers[QNODE]);
smInitWrapper(&pDnode->wrappers[SNODE]);
bmInitWrapper(&pDnode->wrappers[BNODE]);
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
pDnode->wrappers[VNODE].func = vmGetMgmtFunc();
pDnode->wrappers[QNODE].func = qmGetMgmtFunc();
pDnode->wrappers[SNODE].func = smGetMgmtFunc();
pDnode->wrappers[BNODE].func = bmGetMgmtFunc();
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
snprintf(path, sizeof(path), "%s%s%s", pDnode->data.dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
pWrapper->procShm.id = -1;
pWrapper->pDnode = pDnode;
pWrapper->name = dmNodeName(ntype);
pWrapper->procShm.id = -1;
pWrapper->nodeType = ntype;
pWrapper->procType = DND_PROC_SINGLE;
taosInitRWLatch(&pWrapper->latch);
snprintf(path, sizeof(path), "%s%s%s", pOption->dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
if (pWrapper->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (ntype != DNODE && dmReadShmFile(pWrapper) != 0) {
if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, &pWrapper->procShm) != 0) {
dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr());
goto _OVER;
}
pWrapper->required = dmRequireNode(pWrapper);
}
if (dmInitMsgHandle(pDnode) != 0) {
......@@ -124,11 +128,15 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
if (dmInitClient(pDnode) != 0) {
goto _OVER;
}
dInfo("dnode is created, data:%p", pDnode);
code = 0;
_OVER:
if (code != 0 && pDnode) {
if (code != 0 && pDnode != NULL) {
dmClearVars(pDnode);
pDnode = NULL;
dError("failed to create dnode since %s", terrstr());
......@@ -139,6 +147,205 @@ _OVER:
void dmClose(SDnode *pDnode) {
if (pDnode == NULL) return;
dmCleanupClient(pDnode);
dmCleanupServer(pDnode);
dmClearVars(pDnode);
dInfo("dnode is closed, data:%p", pDnode);
}
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
pDnode->status = status;
}
}
void dmSetEvent(SDnode *pDnode, EDndEvent event) {
if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
}
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
return pRetWrapper;
}
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
return code;
}
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
}
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupInfo *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
dInfo("step:%s, %s", pStartup->name, pStartup->desc);
}
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) {
dmReportStartup(pWrapper->pDnode, pName, pDesc);
}
static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
pStatus->details[0] = 0;
if (pDnode->status == DND_STAT_INIT) {
pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
} else if (pDnode->status == DND_STAT_STOPPED) {
pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
} else {
#if 0
SDnodeData *pData = &pDnode->data;
if (pData->isMnode && pData->mndState != TAOS_SYNC_STATE_LEADER && pData->mndState == TAOS_SYNC_STATE_CANDIDATE) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(pData->mndState));
} else if (pData->unsyncedVgId != 0 && pData->vndState != TAOS_SYNC_STATE_LEADER &&
pData->vndState != TAOS_SYNC_STATE_CANDIDATE) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pData->unsyncedVgId,
syncStr(pData->vndState));
} else {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
}
#endif
}
}
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("net test req is received");
SRpcMsg rsp = {.handle = pReq->handle, .refId = pReq->refId, .ahandle = pReq->ahandle, .code = 0};
rsp.pCont = rpcMallocCont(pReq->contLen);
if (rsp.pCont == NULL) {
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
} else {
rsp.contLen = pReq->contLen;
}
rpcSendResponse(&rsp);
rpcFreeCont(pReq->pCont);
}
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("server status req is received");
SServerStatusRsp statusRsp = {0};
dmGetServerStatus(pDnode, &statusRsp);
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
rspMsg.pCont = pRsp;
rspMsg.contLen = rspLen;
_OVER:
rpcSendResponse(&rspMsg);
rpcFreeCont(pReq->pCont);
}
int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dmReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
dError("failed to create node since %s", terrstr());
return -1;
}
taosThreadMutexLock(&pDnode->mutex);
pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1;
}
int32_t code = (*pWrapper->func.createFp)(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper);
pWrapper->required = true;
pWrapper->deployed = true;
pWrapper->procType = pDnode->ptype;
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
dError("failed to drop node since %s", terrstr());
return -1;
}
taosThreadMutexLock(&pDnode->mutex);
int32_t code = (*pWrapper->func.dropFp)(pWrapper, pMsg);
if (code != 0) {
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false;
pWrapper->deployed = false;
}
dmReleaseWrapper(pWrapper);
if (code == 0) {
dmCloseNode(pWrapper);
taosRemoveDir(pWrapper->path);
}
taosThreadMutexUnlock(&pDnode->mutex);
return code;
}
\ No newline at end of file
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#include "qworker.h"
#define INTERNAL_USER "_dnd"
......@@ -23,21 +22,21 @@
#define INTERNAL_SECRET "_pwd"
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosRLockLatch(&pDnode->data.latch);
*pEpSet = pDnode->data.mnodeEps;
taosRUnLockLatch(&pDnode->data.latch);
taosRLockLatch(&pDnode->latch);
*pEpSet = pDnode->mnodeEps;
taosRUnLockLatch(&pDnode->latch);
}
static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
taosWLockLatch(&pDnode->data.latch);
pDnode->data.mnodeEps = *pEpSet;
taosWLockLatch(&pDnode->latch);
pDnode->mnodeEps = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pDnode->data.latch);
taosWUnLockLatch(&pDnode->latch);
}
static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
......@@ -64,7 +63,7 @@ static inline int32_t dmBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
if ((pRpc->msgType & 1u)) {
assert(pRpc->refId != 0);
}
// assert(pRpc->handle != NULL && pRpc->refId != 0 && pMsg->rpcMsg.refId != 0);
return 0;
}
......@@ -76,13 +75,9 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
bool needRelease = false;
bool isReq = msgType & 1U;
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
needRelease = true;
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
......@@ -203,27 +198,25 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t dmInitMsgHandle(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex];
pHandle->defaultNtype = NODE_END;
}
}
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex];
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
bool needCheckVgId = pWrapper->needCheckVgIds[msgIndex];
if (msgFp == NULL) continue;
if (needCheckVgId) pHandle->needCheckVgId = needCheckVgId;
if (!needCheckVgId) {
SArray *pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
}
if (!pMgmt->needCheckVgId) {
pHandle->defaultNtype = ntype;
}
pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
}
taosArrayDestroy(pArray);
}
return 0;
......@@ -236,7 +229,7 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) {
if (strcmp(epSet.eps[i].fqdn, pDnode->input.localFqdn) == 0 && epSet.eps[i].port == pDnode->input.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
......@@ -318,17 +311,6 @@ static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp
}
}
#if 0
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
ASSERT(pRsp->code == TSDB_CODE_RPC_REDIRECT);
if (pWrapper->procType != DND_PROC_CHILD) {
rpcSendRedirectRsp(pRsp->handle, pNewEpSet);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
#endif
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != DND_PROC_CHILD) {
rpcRegisterBrokenLinkArg(pMsg);
......@@ -452,11 +434,7 @@ int32_t dmInitClient(SDnode *pDnode) {
return -1;
}
pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
dDebug("dnode rpc client is initialized");
return 0;
}
......@@ -533,8 +511,8 @@ int32_t dmInitServer(SDnode *pDnode) {
SRpcInit rpcInit = {0};
strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn));
rpcInit.localPort = pDnode->data.serverPort;
strncpy(rpcInit.localFqdn, pDnode->input.localFqdn, strlen(pDnode->input.localFqdn));
rpcInit.localPort = pDnode->input.serverPort;
rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dmProcessMsg;
......@@ -571,8 +549,8 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartupByWrapper,
.pWrapper = pWrapper,
.clientRpc = pWrapper->pDnode->trans.clientRpc,
.pWrapper = pWrapper,
};
return msgCb;
}
......@@ -109,6 +109,7 @@ typedef struct {
typedef struct {
int32_t dnodeId;
void *pMgmt;
SEpSet mnodeEps;
} SMgmtOutputOpt;
typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册