提交 dabb896a 编写于 作者: S Shengliang Guan

shm

上级 b574dfd7
......@@ -404,7 +404,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
return -1;
}
tsNumOfThreadsPerCore = cfgGetItem(pCfg, "maxTmrCtrl")->fval;
tsNumOfThreadsPerCore = cfgGetItem(pCfg, "numOfThreadsPerCore")->fval;
tsMaxTmrCtrl = cfgGetItem(pCfg, "maxTmrCtrl")->i32;
tsRpcTimer = cfgGetItem(pCfg, "rpcTimer")->i32;
tsRpcMaxTime = cfgGetItem(pCfg, "rpcMaxTime")->i32;
......
......@@ -25,6 +25,10 @@ extern "C" {
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle bmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -37,14 +37,14 @@ typedef struct SBnodeMgmt {
bool singleProc;
} SBnodeMgmt;
SMgmtFp bmGetMgmtFp();
void bmGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t dndInitBnode(SDnode *pDnode);
void dndCleanupBnode(SDnode *pDnode);
void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -31,9 +31,9 @@ int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void bmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void bmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -17,6 +17,10 @@
#include "bmHandle.h"
#include "bmWorker.h"
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
void bmInitMsgHandles(SMgmtWrapper *pWrapper) {
}
......
......@@ -19,12 +19,14 @@
bool bmRequireNode(SMgmtWrapper *pWrapper) { return false; }
SMgmtFp bmGetMgmtFp() {
void bmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = bmRequireNode;
mgmtFp.getMsgHandleFp = bmGetMsgHandle;
return mgmtFp;
// bmInitMsgHandles(pWrapper);
pWrapper->name = "snode";
pWrapper->fp = mgmtFp;
}
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
// #include "dndBnode.h"
// #include "dmMgmt.h"
// #include "dmInt.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
......@@ -261,7 +261,7 @@ static int32_t dndDropBnode(SDnode *pDnode) {
return 0;
}
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateBnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -277,7 +277,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
}
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropBnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......
......@@ -72,8 +72,8 @@ typedef struct SQnodeMgmt SQnodeMgmt;
typedef struct SSnodeMgmt SSnodeMgmt;
typedef struct SBnodeMgmt SBnodeMgmt;
typedef void (*RpcMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps);
typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef void (*RpcMsgFp)(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps);
typedef void (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper);
......@@ -132,7 +132,6 @@ typedef struct SDnode {
SStartupReq startup;
TdFilePtr pLockFile;
STransMgmt trans;
SMgmtFp fps[NODE_MAX];
SMgmtWrapper wrappers[NODE_MAX];
} SDnode;
......@@ -155,7 +154,7 @@ SDnode *dndCreate(SDndCfg *pCfg);
void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
// dndTransport.h
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
......
......@@ -26,7 +26,7 @@ SDnode *dndCreate(SDndCfg *pCfg);
void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
}
......
......@@ -90,18 +90,12 @@ SDnode *dndCreate(SDndCfg *pCfg) {
goto _OVER;
}
pDnode->wrappers[DNODE].fp = dmGetMgmtFp();
pDnode->wrappers[MNODE].fp = mmGetMgmtFp();
pDnode->wrappers[VNODES].fp = vmGetMgmtFp();
pDnode->wrappers[QNODE].fp = qmGetMgmtFp();
pDnode->wrappers[SNODE].fp = smGetMgmtFp();
pDnode->wrappers[BNODE].fp = bmGetMgmtFp();
pDnode->wrappers[DNODE].name = "dnode";
pDnode->wrappers[MNODE].name = "mnode";
pDnode->wrappers[VNODES].name = "vnodes";
pDnode->wrappers[QNODE].name = "qnode";
pDnode->wrappers[SNODE].name = "snode";
pDnode->wrappers[BNODE].name = "bnode";
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg));
if (dndSetMsgHandle(pDnode) != 0) {
......@@ -263,5 +257,8 @@ void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
pDnode->event = event;
}
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {}
\ No newline at end of file
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
}
\ No newline at end of file
......@@ -39,7 +39,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
if (pHandle->rpcMsgFp != NULL) {
dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType),
pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle);
(*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pRsp, pEpSet);
(*pHandle->rpcMsgFp)(pHandle->pWrapper, pRsp, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle);
rpcFreeCont(pRsp->pCont);
......@@ -121,7 +121,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
if (pHandle->rpcMsgFp != NULL) {
dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name,
pReq->ahandle);
(*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pReq, pEpSet);
(*pHandle->rpcMsgFp)(pHandle->pWrapper, pReq, pEpSet);
} else {
dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
......@@ -251,7 +251,7 @@ int32_t dndSetMsgHandle(SDnode *pDnode) {
for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType];
GetMsgHandleFp getMsgHandleFp = pDnode->fps[nodeType].getMsgHandleFp;
GetMsgHandleFp getMsgHandleFp = pWrapper->fp.getMsgHandleFp;
if (getMsgHandleFp == NULL) continue;
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
......
......@@ -25,8 +25,12 @@ extern "C" {
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void dmSendStatusReq(SDnode *pDnode);
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq);
void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
#ifdef __cplusplus
}
......
......@@ -26,6 +26,8 @@ typedef struct SDnodeMgmt {
int32_t dnodeId;
int32_t dropped;
int64_t clusterId;
char localEp[TSDB_EP_LEN];
char firstEp[TSDB_EP_LEN];
int64_t dver;
int64_t updateTime;
int8_t statusSent;
......@@ -42,12 +44,13 @@ typedef struct SDnodeMgmt {
} SDnodeMgmt;
// dmInt.h
SMgmtFp dmGetMgmtFp();
void dmGetMgmtFp(SMgmtWrapper *pWrapper);
int32_t dmGetDnodeId(SDnode *pDnode);
int64_t dmGetClusterId(SDnode *pDnode);
// dmMgmt.h
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
// dmHandle.h
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_MGMT_H_
#define _TD_DND_DNODE_MGMT_H_
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t dmInit(SMgmtWrapper *pWrapper);
void dmCleanup(SMgmtWrapper *pWrapper);
bool dmRequire(SMgmtWrapper *pWrapper);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) ;
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_MGMT_H_*/
\ No newline at end of file
......@@ -24,6 +24,7 @@ extern "C" {
int32_t dmStartWorker();
void dmStopWorker();
void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -15,8 +15,104 @@
#define _DEFAULT_SOURCE
#include "dmHandle.h"
#include "dndWorker.h"
#include "dmMgmt.h"
#include "dmFile.h"
#include "dmWorker.h"
#include "vmInt.h"
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
SStatusReq req = {0};
taosRLockLatch(&pMgmt->latch);
req.sver = tsVersion;
req.dver = pMgmt->dver;
req.dnodeId = pMgmt->dnodeId;
req.clusterId = pMgmt->clusterId;
req.rebootTime = pDnode->rebootTime;
req.updateTime = pMgmt->updateTime;
req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes;
memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
memcpy(req.clusterCfg.timezone, tsTimezone, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch);
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmGetVnodeLoads(pDnode, req.pVloads);
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req);
taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status req to mnode", pDnode);
dndSendReqToMnode(pMgmt->pDnode, &rpcMsg);
}
static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
dmWriteFile(pMgmt);
taosWUnLockLatch(&pMgmt->latch);
}
}
void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, MNODE)->pMgmt;
if (pRsp->code != TSDB_CODE_SUCCESS) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1;
dmWriteFile(pMgmt);
}
} else {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps);
}
taosArrayDestroy(statusRsp.pDnodeEps);
}
pMgmt->statusSent = 0;
}
void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); }
void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("grant rsp is received, but not supported yet"); }
int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) {
dError("config req is received, but not supported yet");
SDCfgDnodeReq *pCfg = pReq->pCont;
return TSDB_CODE_OPS_NOT_SUPPORT;
}
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp);
}
static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
......
......@@ -15,8 +15,9 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "dmFile.h"
#include "dmHandle.h"
#include "dmMgmt.h"
#include "dmWorker.h"
int32_t dmGetDnodeId(SDnode *pDnode) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
......@@ -38,11 +39,147 @@ int64_t dmGetClusterId(SDnode *pDnode) {
return clusterId;
}
SMgmtFp dmGetMgmtFp() {
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
*pEpSet = pMgmt->mnodeEpSet;
taosRUnLockLatch(&pMgmt->latch);
}
void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->mnodeEpSet = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pMgmt->latch);
}
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
if (pPort != NULL) {
*pPort = pDnodeEp->ep.port;
}
if (pFqdn != NULL) {
tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
}
if (pEp != NULL) {
snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
}
}
taosRUnLockLatch(&pMgmt->latch);
}
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
rpcSendRedirectRsp(pReq->handle, &epSet);
}
int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
pMgmt->dnodeId = 0;
pMgmt->dropped = 0;
pMgmt->clusterId = 0;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pDnode;
memcpy(pMgmt->localEp, pDnode->cfg.localEp, TSDB_EP_LEN);
memcpy(pMgmt->firstEp, pDnode->cfg.firstEp, TSDB_EP_LEN);
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("node:%s, failed to init dnode hash", pWrapper->name);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pMgmt) != 0) {
dError("node:%s, failed to read file since %s", pWrapper->name, terrstr());
return -1;
}
if (pMgmt->dropped) {
dError("node:%s, will not start since its already dropped", pWrapper->name);
return -1;
}
if (dmStartWorker(pMgmt) != 0) {
dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr());
return -1;
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
dmSendStatusReq(pMgmt);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("dnode-mgmt is initialized");
return 0;
}
void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch);
if (pMgmt->pDnodeEps != NULL) {
taosArrayDestroy(pMgmt->pDnodeEps);
pMgmt->pDnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
taosWUnLockLatch(&pMgmt->latch);
dInfo("dnode-mgmt is cleaned up");
}
bool dmRequire(SMgmtWrapper *pWrapper) { return true; }
void dmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.requiredFp = dmRequire;
mgmtFp.getMsgHandleFp = dmGetMsgHandle;
return mgmtFp;
dmInitMsgHandles(pWrapper);
pWrapper->name = "dnode";
pWrapper->fp = mgmtFp;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmWorker.h"
// #include "dmMgmt.h"
#include "dmFile.h"
#include "dmHandle.h"
#include "dndMonitor.h"
// #include "dndBnode.h"
// #include "mm.h"
// #include "dndQnode.h"
// #include "dndSnode.h"
#include "dndTransport.h"
// #include "dndVnodes.h"
#include "dndWorker.h"
// #include "monitor.h"
#if 0
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq);
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
if (pPort != NULL) {
*pPort = pDnodeEp->ep.port;
}
if (pFqdn != NULL) {
tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
}
if (pEp != NULL) {
snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
}
}
taosRUnLockLatch(&pMgmt->latch);
}
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
*pEpSet = pMgmt->mnodeEpSet;
taosRUnLockLatch(&pMgmt->latch);
}
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
rpcSendRedirectRsp(pReq->handle, &epSet);
}
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->mnodeEpSet = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pMgmt->latch);
}
void dmSendStatusReq(SDnode *pDnode) {
SStatusReq req = {0};
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
req.sver = tsVersion;
req.dver = pMgmt->dver;
req.dnodeId = pMgmt->dnodeId;
req.clusterId = pMgmt->clusterId;
req.rebootTime = pMgmt->rebootTime;
req.updateTime = pMgmt->updateTime;
req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes;
memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
memcpy(req.clusterCfg.timezone, tsTimezone, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch);
#if 0
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
dndGetVnodeLoads(pDnode, req.pVloads);
#endif
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req);
taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status req to mnode", pDnode);
dndSendReqToMnode(pDnode, &rpcMsg);
}
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
dmWriteFile(pDnode);
taosWUnLockLatch(&pMgmt->latch);
}
}
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pRsp->code != TSDB_CODE_SUCCESS) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1;
dmWriteFile(pDnode);
}
} else {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
dmUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
}
taosArrayDestroy(statusRsp.pDnodeEps);
}
pMgmt->statusSent = 0;
}
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); }
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) {
dError("grant rsp is received, but not supported yet");
}
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
dError("config req is received, but not supported yet");
SDCfgDnodeReq *pCfg = pReq->pCont;
return TSDB_CODE_OPS_NOT_SUPPORT;
}
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
rpcSendResponse(&rpcRsp);
}
void dmStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker);
dndCleanupWorker(&pMgmt->statusWorker);
if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL;
}
}
void dmCleanupMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch);
if (pMgmt->pDnodeEps != NULL) {
taosArrayDestroy(pMgmt->pDnodeEps);
pMgmt->pDnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
if (pMgmt->file != NULL) {
free(pMgmt->file);
pMgmt->file = NULL;
}
taosWUnLockLatch(&pMgmt->latch);
dInfo("dnode-mgmt is cleaned up");
}
void dmProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pDnode, pEpSet);
}
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
if (pMsg->msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)) != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
#endif
void dmStopMgmt(SDnode *pDnode) {}
void dmCleanupMgmt(SDnode *pDnode){}
void dmSendStatusReq(SDnode *pDnode){}
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {}
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){}
void dmProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
pMgmt->dnodeId = 0;
pMgmt->dropped = 0;
pMgmt->clusterId = 0;
pMgmt->path = pWrapper->path;
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("node:%s, failed to init dnode hash", pWrapper->name);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pMgmt) != 0) {
dError("node:%s, failed to read file since %s", pWrapper->name, terrstr());
return -1;
}
if (pMgmt->dropped) {
dError("node:%s, will not start since its already dropped", pWrapper->name);
return -1;
}
if (dmStartWorker(pMgmt) != 0) {
dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr());
return -1;
}
dInfo("dnode-mgmt is initialized");
return 0;
// dndSetStatus(pDnode, DND_STAT_RUNNING);
// dmSendStatusReq(pDnode);
// dndReportStartup(pDnode, "TDengine", "initialized successfully");
#if 0
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
SDiskCfg *pDisks = pDnode->cfg.pDisks;
int32_t numOfDisks = pDnode->cfg.numOfDisks;
if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg;
numOfDisks = 1;
}
pDnode->pTfs = tfsOpen(pDisks, numOfDisks);
if (pDnode->pTfs == NULL) {
dError("failed to init tfs since %s", terrstr());
return -1;
}
#endif
}
void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch);
if (pMgmt->pDnodeEps != NULL) {
taosArrayDestroy(pMgmt->pDnodeEps);
pMgmt->pDnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
taosWUnLockLatch(&pMgmt->latch);
dInfo("dnode-mgmt is cleaned up");
}
bool dmRequire(SMgmtWrapper *pWrapper) { return true; }
......@@ -16,7 +16,12 @@
#define _DEFAULT_SOURCE
#include "dmWorker.h"
#include "dmHandle.h"
#include "dndWorker.h"
#include "bmInt.h"
#include "mmInt.h"
#include "qmInt.h"
#include "smInt.h"
#include "vmInt.h"
static void *dnodeThreadRoutine(void *param) {
SDnodeMgmt *pMgmt = param;
......@@ -37,7 +42,7 @@ static void *dnodeThreadRoutine(void *param) {
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
dmSendStatusReq(pDnode);
dmSendStatusReq(pMgmt);
lastStatusTime = curTime;
}
......@@ -52,61 +57,60 @@ static void *dnodeThreadRoutine(void *param) {
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
#if 0
switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE:
code = mmProcessCreateMnodeReq(pDnode, pMsg);
code = mmProcessCreateReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = mmProcessAlterMnodeReq(pDnode, pMsg);
code = mmProcessAlterReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = mmProcessDropMnodeReq(pDnode, pMsg);
code = mmProcessDropReq(pDnode, pMsg);
break;
case TDMT_DND_CREATE_QNODE:
code = dndProcessCreateQnodeReq(pDnode, pMsg);
code = qmProcessCreateReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_QNODE:
code = dndProcessDropQnodeReq(pDnode, pMsg);
code = qmProcessDropReq(pDnode, pMsg);
break;
case TDMT_DND_CREATE_SNODE:
code = dndProcessCreateSnodeReq(pDnode, pMsg);
code = smProcessCreateReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_SNODE:
code = dndProcessDropSnodeReq(pDnode, pMsg);
code = smProcessDropReq(pDnode, pMsg);
break;
case TDMT_DND_CREATE_BNODE:
code = dndProcessCreateBnodeReq(pDnode, pMsg);
code = bmProcessCreateReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_BNODE:
code = dndProcessDropBnodeReq(pDnode, pMsg);
code = bmProcessDropReq(pDnode, pMsg);
break;
case TDMT_DND_CONFIG_DNODE:
code = dndProcessConfigDnodeReq(pDnode, pMsg);
code = dmProcessConfigReq(pDnode, pMsg);
break;
case TDMT_MND_STATUS_RSP:
dndProcessStatusRsp(pDnode, pMsg);
dmProcessStatusRsp(pDnode, pMsg);
break;
case TDMT_MND_AUTH_RSP:
dndProcessAuthRsp(pDnode, pMsg);
dmProcessAuthRsp(pDnode, pMsg);
break;
case TDMT_MND_GRANT_RSP:
dndProcessGrantRsp(pDnode, pMsg);
dmProcessGrantRsp(pDnode, pMsg);
break;
case TDMT_DND_CREATE_VNODE:
code = dndProcessCreateVnodeReq(pDnode, pMsg);
code = vmProcessCreateVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = dndProcessAlterVnodeReq(pDnode, pMsg);
code = vmProcessAlterVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
code = vmProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
code = vmProcessSyncVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = dndProcessCompactVnodeReq(pDnode, pMsg);
code = vmProcessCompactVnodeReq(pDnode, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......@@ -114,7 +118,7 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
break;
}
#endif
if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
......@@ -156,3 +160,21 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
pMgmt->threadId = NULL;
}
}
void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)) != 0) {
if (pMsg->rpcMsg.msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
}
\ No newline at end of file
......@@ -25,6 +25,10 @@ extern "C" {
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
......
......@@ -45,13 +45,15 @@ typedef struct SMnodeMgmt {
// interface
SMgmtFp mmGetMgmtFp();
void mmGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t mmInit(SDnode *pDnode);
void mmCleanup(SDnode *pDnode);
int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
// mmHandle.h
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
......
......@@ -31,9 +31,9 @@ int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -18,9 +18,9 @@
#include "mmWorker.h"
#if 0
#include "dmMgmt.h"
#include "dmInt.h"
int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -52,7 +52,7 @@ int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return mmOpen(pDnode, &option);
}
int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) {
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -86,7 +86,7 @@ int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return code;
}
int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -142,6 +142,10 @@ int32_t mmGetUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char
#endif
int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
static void mmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
......
......@@ -19,14 +19,16 @@
bool mmRequireNode(SMgmtWrapper *pWrapper) { return false; }
SMgmtFp mmGetMgmtFp() {
void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = mmRequireNode;
mgmtFp.getMsgHandleFp = mmGetMsgHandle;
return mgmtFp;
mmInitMsgHandles(pWrapper);
pWrapper->name = "mnode";
pWrapper->fp = mgmtFp;
}
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dmMgmt.h"
#include "dmInt.h"
#include "dndTransport.h"
#if 0
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "dmMgmt.h"
#include "dmInt.h"
#include "dndTransport.h"
#include "dndWorker.h"
......@@ -268,6 +268,6 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
#endif
void mmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
void mmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
void mmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
\ No newline at end of file
void mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
void mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
\ No newline at end of file
......@@ -25,6 +25,9 @@ extern "C" {
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle qmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -37,15 +37,17 @@ typedef struct SQnodeMgmt {
bool singleProc;
} SQnodeMgmt;
SMgmtFp qmGetMgmtFp();
void qmGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t dndInitQnode(SDnode *pDnode);
void dndCleanupQnode(SDnode *pDnode);
void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
// qmHandle.h
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -17,6 +17,9 @@
#include "qmHandle.h"
#include "qmWorker.h"
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg){return 0;}
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
}
......
......@@ -19,11 +19,14 @@
bool qmRequireNode(SMgmtWrapper *pWrapper) { return false; }
SMgmtFp qmGetMgmtFp() {
void qmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = qmRequireNode;
mgmtFp.getMsgHandleFp = qmGetMsgHandle;
return mgmtFp;
// qmInitMsgHandles(pWrapper);
pWrapper->name = "qnode";
pWrapper->fp = mgmtFp;
}
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
// #include "dndQnode.h"
// #include "dmMgmt.h"
// #include "dmInt.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
......@@ -267,7 +267,7 @@ static int32_t dndDropQnode(SDnode *pDnode) {
return 0;
}
int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateQnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -283,7 +283,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
}
int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropQnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......
......@@ -37,14 +37,14 @@ typedef struct SSnodeMgmt {
} SSnodeMgmt;
SMgmtFp smGetMgmtFp();
void smGetMgmtFp(SMgmtWrapper *pMgmt);
int32_t dndInitSnode(SDnode *pDnode);
void dndCleanupSnode(SDnode *pDnode);
void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
......
......@@ -17,6 +17,9 @@
#include "smHandle.h"
#include "smWorker.h"
int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;}
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
}
......
......@@ -20,11 +20,14 @@
bool smRequireNode(SMgmtWrapper *pWrapper) { return false; }
SMgmtFp smGetMgmtFp() {
void smGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = smRequireNode;
mgmtFp.getMsgHandleFp = smGetMsgHandle;
return mgmtFp;
// smInitMsgHandles(pWrapper);
pWrapper->name = "snode";
pWrapper->fp = mgmtFp;
}
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
// #include "dndSnode.h"
// #include "dmMgmt.h"
// #include "dmInt.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
......@@ -261,7 +261,7 @@ static int32_t dndDropSnode(SDnode *pDnode) {
return 0;
}
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateSnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -277,7 +277,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
}
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropSnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......
......@@ -25,6 +25,12 @@ extern "C" {
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
#ifdef __cplusplus
}
......
......@@ -48,22 +48,22 @@ typedef struct SVnodesMgmt {
bool singleProc;
} SVnodesMgmt;
SMgmtFp vmGetMgmtFp() ;
void vmGetMgmtFp(SMgmtWrapper *pMgmt) ;
int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads);
void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
......
......@@ -31,10 +31,10 @@ int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void vmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessQueryMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessFetchMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -17,6 +17,14 @@
#include "vmHandle.h"
#include "vmWorker.h"
int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;}
static void vmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
......
......@@ -19,6 +19,23 @@
#include "vmMgmt.h"
static int32_t vmInit(SMgmtWrapper *pWrapper) {
// SDiskCfg dCfg = {0};
// tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
// dCfg.level = 0;
// dCfg.primary = 1;
// SDiskCfg *pDisks = pDnode->cfg.pDisks;
// int32_t numOfDisks = pDnode->cfg.numOfDisks;
// if (numOfDisks <= 0 || pDisks == NULL) {
// pDisks = &dCfg;
// numOfDisks = 1;
// }
// pDnode->pTfs = tfsOpen(pDisks, numOfDisks);
// if (pDnode->pTfs == NULL) {
// dError("failed to init tfs since %s", terrstr());
// return -1;
// }
SVnodeOpt vnodeOpt = {0};
vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ;
......@@ -45,11 +62,14 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
static bool vmRequire(SMgmtWrapper *pWrapper) { return false; }
SMgmtFp vmGetMgmtFp() {
void vmGetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = vmInit;
mgmtFp.closeFp = vmCleanup;
mgmtFp.requiredFp = vmRequire;
mgmtFp.getMsgHandleFp = vmGetMsgHandle;
return mgmtFp;
vmInitMsgHandles(pWrapper);
pWrapper->name = "vnodes";
pWrapper->fp = mgmtFp;
}
......@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "vmMgmt.h"
#include "dmMgmt.h"
#include "dmInt.h"
#include "dndTransport.h"
// #include "sync.h"
......@@ -539,7 +539,7 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra
pCfg->vgVersion = pCreate->vgVersion;
}
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCreateVnodeReq createReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -597,7 +597,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return 0;
}
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SAlterVnodeReq alterReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -638,7 +638,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return code;
}
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -668,7 +668,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return 0;
}
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SSyncVnodeReq syncReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
......@@ -691,7 +691,7 @@ int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return 0;
}
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCompactVnodeReq compatcReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
......@@ -979,7 +979,7 @@ void dndCleanupVnodes(SDnode *pDnode) {
dInfo("dnode-vnodes is cleaned up");
}
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodesStat *pStat = &pMgmt->stat;
int32_t totalVnodes = 0;
......@@ -1078,4 +1078,6 @@ void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) {
pInfo->errors = tsNumOfErrorLogs;
pInfo->vnodes_num = pStat->totalVnodes;
pInfo->masters = pStat->masterNum;
}
\ No newline at end of file
}
void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {}
\ No newline at end of file
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmWorker.h"
void vmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessQueryMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessFetchMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
\ No newline at end of file
void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessQueryMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册