提交 3de4b0ad 编写于 作者: S Shengliang Guan

refact(cluster): node mgmt

上级 a8b890bc
......@@ -26,12 +26,13 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper);
void dndCloseNode(SMgmtWrapper *pWrapper);
// dndTransport.c
int32_t dndInitTrans(SDnode *pDnode);
int32_t dmInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq);
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp);
// mgmt
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
......@@ -54,12 +55,11 @@ int32_t dmWriteFile(SDnodeData *pMgmt);
void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps);
// dmHandle.c
void dmInitMsgHandle(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeData *pMgmt);
int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
void dmSendStatusReq(SDnode *pDnode);
int32_t dmProcessConfigReq(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg);
// dmMonitor.c
......@@ -67,11 +67,15 @@ void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void dmSendMonitorReport(SDnode *pDnode);
// dmWorker.c
int32_t dmStartThread(SDnodeData *pMgmt);
int32_t dmStartWorker(SDnodeData *pMgmt);
void dmStopWorker(SDnodeData *pMgmt);
int32_t dmStartStatusThread(SDnode *pDnode);
void dmStopStatusThread(SDnode *pDnode);
int32_t dmStartMonitorThread(SDnode *pDnode);
void dmStopMonitorThread(SDnode *pDnode);
int32_t dmStartWorker(SDnode *pDnode);
void dmStopWorker(SDnode *pDnode);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -299,7 +299,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
return -1;
}
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
SMsgCb msgCb = dmGetMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
pWrapper->procType = DND_PROC_CHILD;
......
......@@ -16,11 +16,33 @@
#define _DEFAULT_SOURCE
#include "dndImp.h"
void dmSendStatusReq(SDnodeData *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
static int32_t dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnode *pDnode = pMgmt->pDnode;
if (pRsp->code != TSDB_CODE_SUCCESS) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->data.dropped && pDnode->data.dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->data.dnodeId);
pDnode->data.dropped = 1;
dmWriteFile(pMgmt);
}
} else {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps);
}
tFreeSStatusRsp(&statusRsp);
}
return TSDB_CODE_SUCCESS;
}
void dmSendStatusReq(SDnode *pDnode) {
SStatusReq req = {0};
taosRLockLatch(&pMgmt->latch);
taosRLockLatch(&pDnode->data.latch);
req.sver = tsVersion;
req.dnodeVer = pMgmt->dnodeVer;
req.dnodeId = pDnode->data.dnodeId;
......@@ -38,7 +60,7 @@ void dmSendStatusReq(SDnodeData *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch);
taosRUnLockLatch(&pDnode->data.latch);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODE);
if (pWrapper != NULL) {
......@@ -54,10 +76,11 @@ void dmSendStatusReq(SDnodeData *pMgmt) {
taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527};
pMgmt->statusSent = 1;
SRpcMsg rspMsg = {0};
dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle);
dndSendMsgToMnode(pDnode, &rpcMsg);
dmSendToMnodeRecv(pDnode, rpcMsg, &rpcRsp);
dmProcessStatusRsp(pDnode, &rpcRsp);
}
static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) {
......@@ -65,37 +88,12 @@ static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) {
if (pDnode->data.dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch);
taosWLockLatch(&pDnode->data.latch);
pDnode->data.dnodeId = pCfg->dnodeId;
pDnode->data.clusterId = pCfg->clusterId;
dmWriteFile(pMgmt);
taosWUnLockLatch(&pMgmt->latch);
}
}
int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pRsp = &pMsg->rpcMsg;
if (pRsp->code != TSDB_CODE_SUCCESS) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->data.dropped && pDnode->data.dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->data.dnodeId);
pDnode->data.dropped = 1;
dmWriteFile(pMgmt);
}
} else {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps);
}
tFreeSStatusRsp(&statusRsp);
taosWUnLockLatch(&pDnode->data.latch);
}
pMgmt->statusSent = 0;
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) {
......@@ -194,7 +192,7 @@ int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg) {
}
}
void dmInitMsgHandle(SMgmtWrapper *pWrapper) {
static void dmSetMsgHandle(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
......@@ -205,10 +203,84 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, DEFAULT_HANDLE);
// Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
}
static int32_t dmStart(SMgmtWrapper *pWrapper) { return dmStartStatusThread(pWrapper->pDnode); }
static void dmStop(SMgmtWrapper *pWrapper) { dmStopThread(pWrapper->pDnode); }
static int32_t dmInit(SMgmtWrapper *pWrapper) {
dInfo("dnode-data start to init");
SDnode *pDnode = pWrapper->pDnode;
pDnode->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pDnode->data.dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pDnode) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pDnode->data.dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
if (dmStartWorker(pDnode) != 0) {
return -1;
}
if (dmInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
dInfo("dnode-data is initialized");
return 0;
}
static void dmCleanup(SMgmtWrapper *pWrapper) {
dInfo("dnode-data start to clean up");
SDnode *pDnode = pWrapper->pDnode;
dmStopWorker(pDnode);
taosWLockLatch(&pDnode->data.latch);
if (pMgmt->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->dnodeEps);
pMgmt->dnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
taosWUnLockLatch(&pDnode->data.latch);
dndCleanupTrans(pDnode);
dInfo("dnode-data is cleaned up");
}
static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) {
*required = true;
return 0;
}
void dmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.startFp = dmStart;
mgmtFp.stopFp = dmStop;
mgmtFp.requiredFp = dmRequire;
dmSetMsgHandle(pWrapper);
pWrapper->name = "dnode";
pWrapper->fp = mgmtFp;
}
......@@ -17,103 +17,11 @@
#include "dndImp.h"
static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode-mgmt start to run");
return dmStartThread(pWrapper->pMgmt);
}
static int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData));
dInfo("dnode-mgmt start to init");
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->data.dnodeId = 0;
pDnode->data.dropped = 0;
pDnode->data.clusterId = 0;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pDnode;
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pDnode->data.dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
if (dmStartWorker(pMgmt) != 0) {
return -1;
}
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
pWrapper->pMgmt = pMgmt;
pMgmt->msgCb = dndCreateMsgcb(pWrapper);
dInfo("dnode-mgmt is initialized");
return 0;
}
static void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeData *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pMgmt->pDnode;
dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch);
if (pMgmt->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->dnodeEps);
pMgmt->dnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
taosWUnLockLatch(&pMgmt->latch);
taosMemoryFree(pMgmt);
pWrapper->pMgmt = NULL;
dndCleanupTrans(pDnode);
dInfo("dnode-mgmt is cleaned up");
}
static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) {
*required = true;
return 0;
}
void dmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.startFp = dmStart;
mgmtFp.requiredFp = dmRequire;
dmInitMsgHandle(pWrapper);
pWrapper->name = "dnode";
pWrapper->fp = mgmtFp;
}
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
pDnode->data.serverPort = pOption->serverPort;
pDnode->data.dataDir = strdup(pOption->dataDir);
......@@ -139,6 +47,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
}
}
taosInitRWLatch(&pDnode->data.latch);
return 0;
}
......@@ -212,7 +121,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
SMsgCb msgCb = dmGetMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
dInfo("dnode is created, data:%p", pDnode);
......@@ -245,5 +154,3 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
pDnode->event = event;
}
}
......@@ -247,13 +247,13 @@ void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq) {
int32_t dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dndSendRpcReq(pDnode, &epSet, pReq);
return dndSendRpcReq(pDnode, &epSet, pReq);
}
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
......@@ -453,7 +453,7 @@ static inline int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcRsp = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
dmSendToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
......@@ -506,7 +506,7 @@ static void dndCleanupServer(SDnode *pDnode) {
}
}
int32_t dndInitTrans(SDnode *pDnode) {
int32_t dmInitTrans(SDnode *pDnode) {
if (dndInitServer(pDnode) != 0) return -1;
if (dndInitClient(pDnode) != 0) return -1;
......
......@@ -16,41 +16,78 @@
#define _DEFAULT_SOURCE
#include "dndImp.h"
static void *dmThreadRoutine(void *param) {
SDnodeData *pMgmt = param;
SDnode *pDnode = pMgmt->pDnode;
int64_t lastStatusTime = taosGetTimestampMs();
int64_t lastMonitorTime = lastStatusTime;
static void *dmStatusThreadFp(void *param) {
SDnode *pDnode = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-hb");
setThreadName("dnode-status");
while (true) {
while (1) {
taosThreadTestCancel();
taosMsleep(200);
if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->data.dropped) {
if (pDnode->status != DND_STAT_RUNNING || pDnode->data.dropped) {
continue;
}
int64_t curTime = taosGetTimestampMs();
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsStatusInterval) {
dmSendStatusReq(pMgmt);
lastStatusTime = curTime;
lastTime = curTime;
}
}
return NULL;
}
static void *dmMonitorThreadFp(void *param) {
SDnode *pDnode = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-monitor");
while (1) {
taosThreadTestCancel();
taosMsleep(200);
if (pDnode->status != DND_STAT_RUNNING || pDnode->data.dropped) {
continue;
}
float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
if (monitorInterval >= tsMonitorInterval) {
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsMonitorInterval) {
dmSendMonitorReport(pDnode);
lastMonitorTime = curTime;
lastTime = curTime;
}
}
return TSDB_CODE_SUCCESS;
return NULL;
}
int32_t dmStartStatusThread(SDnode *pDnode) {
pDnode->statusThreadId = taosCreateThread(dmStatusThreadFp, pDnode);
if (pDnode->statusThreadId == NULL) {
dError("failed to init dnode status thread");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
void dmStopStatusThread(SDnode *pDnode) {
if (pDnode->statusThreadId != NULL) {
taosDestoryThread(pDnode->statusThreadId);
pDnode->statusThreadId = NULL;
}
}
int32_t dmStartThread(SDnodeData *pMgmt) {
pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread");
int32_t dmStartMonitorThread(SDnode *pDnode) {
pDnode->monitorThreadId = taosCreateThread(dmMonitorThreadFp, pDnode);
if (pDnode->monitorThreadId == NULL) {
dError("failed to init dnode monitor thread");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -58,21 +95,23 @@ int32_t dmStartThread(SDnodeData *pMgmt) {
return 0;
}
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnodeData *pMgmt = pInfo->ahandle;
void dmStopMonitorThread(SDnode *pDnode) {
if (pMgmt->monitorThreadId != NULL) {
taosDestoryThread(pMgmt->monitorThreadId);
pMgmt->monitorThreadId = NULL;
}
}
SDnode *pDnode = pMgmt->pDnode;
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnode *pDnode = pInfo->ahandle;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
dTrace("msg:%p, will be processed in dnode queue", pMsg);
dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg);
switch (pRpc->msgType) {
case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pMgmt, pMsg);
break;
case TDMT_MND_STATUS_RSP:
code = dmProcessStatusRsp(pMgmt, pMsg);
break;
case TDMT_MND_AUTH_RSP:
code = dmProcessAuthRsp(pMgmt, pMsg);
break;
......@@ -96,15 +135,9 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
int32_t dmStartWorker(SDnodeData *pMgmt) {
SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-monitor", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &scfg) != 0) {
dError("failed to start dnode monitor worker since %s", terrstr());
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
dError("failed to start dnode-mgmt worker since %s", terrstr());
return -1;
}
......@@ -114,12 +147,6 @@ int32_t dmStartWorker(SDnodeData *pMgmt) {
void dmStopWorker(SDnodeData *pMgmt) {
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tSingleWorkerCleanup(&pMgmt->monitorWorker);
if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL;
}
dDebug("dnode workers are closed");
}
......@@ -132,7 +159,7 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0;
}
int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnodeData *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
......
......@@ -110,29 +110,26 @@ typedef struct {
int64_t updateTime;
int64_t rebootTime;
bool dropped;
int8_t statusSent;
SEpSet mnodeEpSet;
SHashObj *dnodeHash;
SArray *dnodeEps;
TdThread *threadId;
TdThread *statusThreadId;
TdThread *monitorThreadId;
SRWLatch latch;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
SMsgCb msgCb;
SDnode *pDnode;
const char *path;
TdFilePtr lockfile;
struct {
char *localEp;
char *localFqdn;
char *firstEp;
char *secondEp;
char *dataDir;
SDiskCfg *disks;
int32_t numOfDisks;
int32_t supportVnodes;
uint16_t serverPort;
};
char *localEp;
char *localFqdn;
char *firstEp;
char *secondEp;
char *dataDir;
SDiskCfg *disks;
int32_t numOfDisks;
int32_t supportVnodes;
uint16_t serverPort;
} SDnodeData;
typedef struct SDnode {
......
......@@ -37,7 +37,7 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dndGetMonitorSysInfo(SMonSysInfo *pInfo);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
// dndFile.c
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
......
......@@ -174,7 +174,7 @@ void dndGetMonitorSysInfo(SMonSysInfo *pInfo) {
taosGetProcIODelta(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
}
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = pWrapper->pDnode->data.msgCb;
msgCb.pWrapper = pWrapper;
return msgCb;
......
......@@ -19,7 +19,7 @@
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
pOption->msgCb = msgCb;
}
......
......@@ -39,7 +39,7 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
}
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
......
......@@ -19,7 +19,7 @@
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.qsizeFp = qmGetQueueSize;
......
......@@ -19,7 +19,7 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
pOption->msgCb = msgCb;
}
......
......@@ -143,7 +143,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
......
......@@ -128,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册