提交 c8a4d491 编写于 作者: S Shengliang Guan

shm

上级 e7ed5e3d
......@@ -22,6 +22,21 @@
extern "C" {
#endif
typedef struct SBnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SBnodeMgmt;
SMgmtFp bmGetMgmtFp();
int32_t dndInitBnode(SDnode *pDnode);
......
......@@ -179,9 +179,9 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->sver = tsVersion;
}
......@@ -268,7 +268,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to create bnode since %s", terrstr());
return -1;
......@@ -284,7 +284,7 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
......
......@@ -16,10 +16,6 @@
#ifndef _TD_DND_INT_H_
#define _TD_DND_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "cJSON.h"
......@@ -47,6 +43,11 @@ extern "C" {
#include "snode.h"
#include "tfs.h"
#include "vnode.h"
#include "monitor.h"
#ifdef __cplusplus
extern "C" {
#endif
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
......@@ -61,11 +62,15 @@ typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat;
typedef struct SDnodeMgmt SDnodeMgmt;
typedef struct SMgmtFp SMgmtFp;
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SMsgHandle SMsgHandle;
typedef struct SDnodeMgmt SDnodeMgmt;
typedef struct SVnodesMgmt SVnodesMgmt;
typedef struct SMnodeMgmt SMnodeMgmt;
typedef struct SQnodeMgmt SQnodeMgmt;
typedef struct SSnodeMgmt SSnodeMgmt;
typedef struct SBnodeMgmt SBnodeMgmt;
typedef void (*RpcMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps);
typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
......@@ -80,6 +85,24 @@ typedef struct SMsgHandle {
SMgmtWrapper *pWrapper;
} SMsgHandle;
typedef struct SMgmtFp {
OpenNodeFp openFp;
CloseNodeFp closeFp;
RequireNodeFp requiredFp;
GetMsgHandleFp getMsgHandleFp;
} SMgmtFp;
typedef struct SMgmtWrapper {
const char *name;
char *path;
bool required;
EProcType procType;
SProcObj *pProc;
void *pMgmt;
SDnode *pDnode;
SMgmtFp fp;
} SMgmtWrapper;
typedef struct {
EWorkerType type;
const char *name;
......@@ -94,144 +117,55 @@ typedef struct {
};
} SDnodeWorker;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SMnode *pMnode;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SMnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SQnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SSnodeMgmt;
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SBnodeMgmt;
typedef struct {
SVnodesStat stat;
SHashObj *hash;
SRWLatch latch;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SVnodesMgmt;
typedef struct {
void *serverRpc;
void *clientRpc;
SMsgHandle msgHandles[TDMT_MAX];
} STransMgmt;
typedef struct SMgmtFp {
OpenNodeFp openFp;
CloseNodeFp closeFp;
RequireNodeFp requiredFp;
GetMsgHandleFp getMsgHandleFp;
} SMgmtFp;
typedef struct SMgmtWrapper {
const char *name;
char *path;
bool required;
EProcType procType;
SProcObj *pProc;
void *pMgmt;
SDnode *pDnode;
SMgmtFp fp;
} SMgmtWrapper;
typedef struct SDnode {
int64_t rebootTime;
EDndStatus status;
EDndEvent event;
EProcType procType;
SDndCfg cfg;
SStartupReq startup;
TdFilePtr pLockFile;
STransMgmt tmgmt;
STfs *pTfs;
STransMgmt trans;
SMgmtFp fps[NODE_MAX];
SMgmtWrapper wrappers[NODE_MAX];
} SDnode;
// dndInt.h
int32_t dndInit();
void dndCleanup();
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
const char *dndStatStr(EDndStatus stat);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) ;
// dndMonitor.h
void dndSendMonitorReport(SDnode *pDnode);
// dndNode.h
SDnode *dndCreate(SDndCfg *pCfg);
void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
SMgmtFp dmGetMgmtFp();
// dndTransport.h
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
// dndWorker.h
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp);
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
#ifdef __cplusplus
}
......
......@@ -22,6 +22,10 @@
extern "C" {
#endif
SDnode *dndCreate(SDndCfg *pCfg);
void dndClose(SDnode *pDnode);
int32_t dndRun(SDnode *pDnode);
void dndeHandleEvent(SDnode *pDnode, EDndEvent event);
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
......
......@@ -27,11 +27,6 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
void dndProcessMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "dmHandle.h"
#include "dndTransport.h"
#include "vmInt.h"
static int8_t once = DND_ENV_INIT;
......@@ -38,22 +35,6 @@ int32_t dndInit() {
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
dndCleanup();
return -1;
}
SVnodeOpt vnodeOpt = {0};
vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ;
vnodeOpt.sendReqToDnodeFp = dndSendReqToDnode;
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr());
dndCleanup();
return -1;
}
SMonCfg monCfg = {0};
monCfg.maxLogs = tsMonitorMaxLogs;
monCfg.port = tsMonitorPort;
......@@ -76,10 +57,7 @@ void dndCleanup() {
}
monCleanup();
vnodeCleanup();
walCleanUp();
rpcCleanup();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
......@@ -141,8 +119,3 @@ TdFilePtr dndCheckRunning(char *dataDir) {
return pFile;
}
void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode);
pDnode->event = event;
}
......@@ -15,7 +15,9 @@
#define _DEFAULT_SOURCE
#include "dndMonitor.h"
#include "dmMgmt.h"
#include "dmInt.h"
#include "mmInt.h"
#include "vmInt.h"
static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
......@@ -23,23 +25,18 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
if (pDnode->pTfs != NULL) {
return tfsGetMonitorInfo(NULL, pInfo);
}
return 0;
return vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo);
}
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->dnode_id = dndGetDnodeId(pDnode);
pInfo->dnode_id = dmGetDnodeId(pDnode);
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
pInfo->cluster_id = dndGetClusterId(pDnode);
pInfo->cluster_id = dmGetClusterId(pDnode);
pInfo->protocol = 1;
}
static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
#if 0
pInfo->uptime = (taosGetTimestampMs() - pDnode->dmgmt.rebootTime) / (86400000.0f);
pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f);
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system);
pInfo->cpu_cores = tsNumOfCores;
taosGetProcMemory(&pInfo->mem_engine);
......@@ -51,17 +48,8 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
SVnodesStat *pStat = &pDnode->vmgmt.stat;
pInfo->req_select = pStat->numOfSelectReqs;
pInfo->req_insert = pStat->numOfInsertReqs;
pInfo->req_insert_success = pStat->numOfInsertSuccessReqs;
pInfo->req_insert_batch = pStat->numOfBatchInsertReqs;
pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs;
pInfo->errors = tsNumOfErrorLogs;
pInfo->vnodes_num = pStat->totalVnodes;
pInfo->masters = pStat->masterNum;
pInfo->has_mnode = pDnode->mmgmt.deployed;
#endif
vmGetVndMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo);
pInfo->has_mnode = (dndGetWrapper(pDnode, MNODE)->required);
}
void dndSendMonitorReport(SDnode *pDnode) {
......@@ -74,7 +62,7 @@ void dndSendMonitorReport(SDnode *pDnode) {
SMonBasicInfo basicInfo = {0};
dndGetMonitorBasicInfo(pDnode, &basicInfo);
monSetBasicInfo(pMonitor, &basicInfo);
#if 0
SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0};
......@@ -97,7 +85,7 @@ void dndSendMonitorReport(SDnode *pDnode) {
taosArrayDestroy(clusterInfo.mnodes);
taosArrayDestroy(vgroupInfo.vgroups);
taosArrayDestroy(diskInfo.datadirs);
#endif
monSendReport(pMonitor);
monCleanupMonitorInfo(pMonitor);
}
\ No newline at end of file
......@@ -74,6 +74,7 @@ SDnode *dndCreate(SDndCfg *pCfg) {
}
dndSetStatus(pDnode, DND_STAT_INIT);
pDnode->rebootTime = taosGetTimestampMs();
pDnode->pLockFile = dndCheckRunning(pCfg->dataDir);
if (pDnode->pLockFile == NULL) {
goto _OVER;
......@@ -257,4 +258,10 @@ int32_t dndRun(SDnode *pDnode) {
return 0;
}
void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode);
pDnode->event = event;
}
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {}
\ No newline at end of file
......@@ -24,7 +24,7 @@
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pRsp->msgType;
......@@ -47,7 +47,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
}
int32_t dndInitClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
......@@ -77,7 +77,7 @@ int32_t dndInitClient(SDnode *pDnode) {
}
void dndCleanupClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc);
pMgmt->clientRpc = NULL;
......@@ -87,12 +87,12 @@ void dndCleanupClient(SDnode *pDnode) {
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pReq->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req will be processed, app:%p", pReq->handle, pReq->ahandle);
dndProcessStartupReq(pDnode, pReq);
dmProcessStartupReq(pDnode, pReq);
return;
}
......@@ -131,10 +131,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
}
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
......@@ -208,7 +208,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
}
int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
......@@ -238,7 +238,7 @@ int32_t dndInitServer(SDnode *pDnode) {
}
void dndCleanupServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
if (pMgmt->serverRpc) {
rpcClose(pMgmt->serverRpc);
pMgmt->serverRpc = NULL;
......@@ -247,7 +247,7 @@ void dndCleanupServer(SDnode *pDnode) {
}
int32_t dndSetMsgHandle(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType];
......@@ -274,7 +274,7 @@ int32_t dndSetMsgHandle(SDnode *pDnode) {
}
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) {
STransMgmt *pMgmt = &pDnode->tmgmt;
STransMgmt *pMgmt = &pDnode->trans;
if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE;
return -1;
......@@ -286,6 +286,6 @@ int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) {
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(pDnode, &epSet);
return dndSendReqToDnode(pDnode, &epSet, pReq);
}
......@@ -24,7 +24,7 @@ extern "C" {
int32_t dmReadFile(SDnodeMgmt *pMgmt);
int32_t dmWriteFile(SDnodeMgmt *pMgmt);
void dndUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
#ifdef __cplusplus
}
......
......@@ -22,12 +22,11 @@
extern "C" {
#endif
void dndInitMsgHandles(SMgmtWrapper *pWrapper);
void dmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
void dndSendStatusReq(SDnode *pDnode);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dmSendStatusReq(SDnode *pDnode);
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -27,7 +27,6 @@ typedef struct SDnodeMgmt {
int32_t dropped;
int64_t clusterId;
int64_t dver;
int64_t rebootTime;
int64_t updateTime;
int8_t statusSent;
SEpSet mnodeEpSet;
......@@ -42,9 +41,16 @@ typedef struct SDnodeMgmt {
SDnode *pDnode;
} SDnodeMgmt;
// dmFile.h
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
// dmHandle.h
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dmInt.h
SMgmtFp dmGetMgmtFp();
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
int32_t dmGetDnodeId(SDnode *pDnode);
int64_t dmGetClusterId(SDnode *pDnode);
#ifdef __cplusplus
}
......
......@@ -22,18 +22,18 @@
extern "C" {
#endif
int32_t dndInitMgmt(SDnode *pDnode);
void dndStopMgmt(SDnode *pDnode);
void dndCleanupMgmt(SDnode *pDnode);
int32_t dmInitMgmt(SDnode *pDnode);
void dmStopMgmt(SDnode *pDnode);
void dmCleanupMgmt(SDnode *pDnode);
int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
int32_t dmGetDnodeId(SDnode *pDnode);
int64_t dmGetClusterId(SDnode *pDnode);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
void dmProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) ;
void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) ;
#ifdef __cplusplus
}
#endif
......
......@@ -210,7 +210,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
return 0;
}
void dndUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) {
int32_t numOfEps = taosArrayGetSize(pDnodeEps);
if (numOfEps <= 0) return;
......
......@@ -27,29 +27,29 @@ static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp n
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void dndInitMsgHandles(SMgmtWrapper *pWrapper) {
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg);
// Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dndProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg);
}
SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
......
......@@ -38,7 +38,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
int32_t dndGetDnodeId(SDnode *pDnode) {
int32_t dmGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
int32_t dnodeId = pMgmt->dnodeId;
......@@ -46,7 +46,7 @@ int32_t dndGetDnodeId(SDnode *pDnode) {
return dnodeId;
}
int64_t dndGetClusterId(SDnode *pDnode) {
int64_t dmGetClusterId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
int64_t clusterId = pMgmt->clusterId;
......@@ -54,7 +54,7 @@ int64_t dndGetClusterId(SDnode *pDnode) {
return clusterId;
}
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
......@@ -74,18 +74,18 @@ void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint
taosRUnLockLatch(&pMgmt->latch);
}
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
*pEpSet = pMgmt->mnodeEpSet;
taosRUnLockLatch(&pMgmt->latch);
}
void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dmGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
......@@ -115,7 +115,7 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
}
void dndSendStatusReq(SDnode *pDnode) {
void dmSendStatusReq(SDnode *pDnode) {
SStatusReq req = {0};
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
......@@ -183,7 +183,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
dmUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
}
taosArrayDestroy(statusRsp.pDnodeEps);
}
......@@ -203,7 +203,7 @@ static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
......@@ -215,7 +215,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
rpcSendResponse(&rpcRsp);
}
void dndStopMgmt(SDnode *pDnode) {
void dmStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker);
dndCleanupWorker(&pMgmt->statusWorker);
......@@ -226,7 +226,7 @@ void dndStopMgmt(SDnode *pDnode) {
}
}
void dndCleanupMgmt(SDnode *pDnode) {
void dmCleanupMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosWLockLatch(&pMgmt->latch);
......@@ -249,7 +249,7 @@ void dndCleanupMgmt(SDnode *pDnode) {
dInfo("dnode-mgmt is cleaned up");
}
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dmProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) {
......@@ -275,25 +275,24 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndStopMgmt(SDnode *pDnode) {}
void dmStopMgmt(SDnode *pDnode) {}
void dndCleanupMgmt(SDnode *pDnode){}
void dmCleanupMgmt(SDnode *pDnode){}
void dndSendStatusReq(SDnode *pDnode){}
void dmSendStatusReq(SDnode *pDnode){}
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {}
void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){}
void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){}
void dmProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
static int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
pMgmt->dnodeId = 0;
pMgmt->rebootTime = taosGetTimestampMs();
pMgmt->dropped = 0;
pMgmt->clusterId = 0;
pMgmt->path = pWrapper->path;
......@@ -325,7 +324,7 @@ static int32_t dmInit(SMgmtWrapper *pWrapper) {
return 0;
// dndSetStatus(pDnode, DND_STAT_RUNNING);
// dndSendStatusReq(pDnode);
// dmSendStatusReq(pDnode);
// dndReportStartup(pDnode, "TDengine", "initialized successfully");
#if 0
......
......@@ -37,7 +37,7 @@ static void *dnodeThreadRoutine(void *param) {
float statusInterval = (curTime - lastStatusTime) / 1000.0f;
if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
dndSendStatusReq(pDnode);
dmSendStatusReq(pDnode);
lastStatusTime = curTime;
}
......
......@@ -23,6 +23,27 @@ extern "C" {
#endif
typedef struct SMnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SMnode *pMnode;
SRWLatch latch;
SDnodeWorker readWorker;
SDnodeWorker writeWorker;
SDnodeWorker syncWorker;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SMnodeMgmt;
// interface
SMgmtFp mmGetMgmtFp();
......
......@@ -27,7 +27,7 @@ int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.replica <= 1 || createReq.dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.replica <= 1 || createReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
......@@ -59,7 +59,7 @@ int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (alterReq.dnodeId != dndGetDnodeId(pDnode)) {
if (alterReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
......@@ -93,7 +93,7 @@ int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
......
......@@ -204,11 +204,11 @@ int32_t mmDrop(SDnode *pDnode) {
}
static bool mmDeployRequired(SDnode *pDnode) {
if (dndGetDnodeId(pDnode) > 0) {
if (dmGetDnodeId(pDnode) > 0) {
return false;
}
if (dndGetClusterId(pDnode) > 0) {
if (dmGetClusterId(pDnode) > 0) {
return false;
}
......@@ -223,11 +223,11 @@ static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue;
pOption->putReqToMReadQFp = mmPutMsgToReadQueue;
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
}
static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) {
......@@ -255,8 +255,8 @@ static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) {
int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
mmInitOption(pDnode, pOption);
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
......
......@@ -70,7 +70,7 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt) {
static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc);
dmSendRedirectRsp(pDnode, pRpc);
} else {
rpcSendResponse(pRpc);
}
......
......@@ -16,10 +16,26 @@
#ifndef _TD_DND_QNODE_INT_H_
#define _TD_DND_QNODE_INT_H_
#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "dndInt.h"
typedef struct SQnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SQnode *pQnode;
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SQnodeMgmt;
SMgmtFp qmGetMgmtFp();
......
......@@ -185,9 +185,9 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->sver = tsVersion;
}
......@@ -274,7 +274,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
......@@ -290,7 +290,7 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
......
......@@ -16,7 +16,7 @@
#ifndef _TD_DND_SNODE_HANDLE_H_
#define _TD_DND_SNODE_HANDLE_H_
#include "mmInt.h"
#include "smInt.h"
#ifdef __cplusplus
extern "C" {
......
......@@ -16,10 +16,26 @@
#ifndef _TD_DND_SNODE_INT_H_
#define _TD_DND_SNODE_INT_H_
#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "dndInt.h"
typedef struct SSnodeMgmt {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
//
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SSnodeMgmt;
SMgmtFp smGetMgmtFp();
......
......@@ -21,6 +21,6 @@ void smInitMsgHandles(SMgmtWrapper *pWrapper) {
}
SMsgHandle smGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
......@@ -179,9 +179,9 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->sendRedirectRspFp = dmSendRedirectRsp;
pOption->dnodeId = dmGetDnodeId(pDnode);
pOption->clusterId = dmGetClusterId(pDnode);
pOption->sver = tsVersion;
}
......@@ -268,7 +268,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
return -1;
......@@ -284,7 +284,7 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1;
}
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
......
......@@ -22,6 +22,32 @@
extern "C" {
#endif
typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
int64_t numOfSelectReqs;
int64_t numOfInsertReqs;
int64_t numOfInsertSuccessReqs;
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
typedef struct SVnodesMgmt {
SVnodesStat stat;
SHashObj *hash;
SRWLatch latch;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
STfs *pTfs;
SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess;
bool singleProc;
} SVnodesMgmt;
SMgmtFp vmGetMgmtFp() ;
int32_t dndInitVnodes(SDnode *pDnode);
......@@ -39,7 +65,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq);
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_VNODE_MGMT_H_
#define _TD_DND_VNODE_MGMT_H_
#include "vmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_VNODE_MGMT_H_*/
......@@ -16,15 +16,40 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "vmHandle.h"
#include "vmMgmt.h"
bool vmRequireNode(SMgmtWrapper *pWrapper) { return false; }
static int32_t vmInit(SMgmtWrapper *pWrapper) {
SVnodeOpt vnodeOpt = {0};
vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ;
vnodeOpt.sendReqToDnodeFp = dndSendReqToDnode;
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr());
dndCleanup();
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
dndCleanup();
return -1;
}
return 0;
}
static void vmCleanup(SMgmtWrapper *pWrapper) {
walCleanUp();
vnodeCleanup();
}
static bool vmRequire(SMgmtWrapper *pWrapper) { return false; }
SMgmtFp vmGetMgmtFp() {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = NULL;
mgmtFp.closeFp = NULL;
mgmtFp.requiredFp = vmRequireNode;
mgmtFp.openFp = vmInit;
mgmtFp.closeFp = vmCleanup;
mgmtFp.requiredFp = vmRequire;
mgmtFp.getMsgHandleFp = vmGetMsgHandle;
return mgmtFp;
}
......@@ -554,7 +554,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SWrapperCfg wrapperCfg = {0};
dndGenerateWrapperCfg(pDnode, &createReq, &wrapperCfg);
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dmGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
return -1;
......@@ -1056,3 +1056,26 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
// }
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq){return 0;}
int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo){
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return -1;
return tfsGetMonitorInfo(pMgmt->pTfs, pInfo);;
}
void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
SVnodesStat *pStat = &pMgmt->stat;
pInfo->req_select = pStat->numOfSelectReqs;
pInfo->req_insert = pStat->numOfInsertReqs;
pInfo->req_insert_success = pStat->numOfInsertSuccessReqs;
pInfo->req_insert_batch = pStat->numOfBatchInsertReqs;
pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs;
pInfo->errors = tsNumOfErrorLogs;
pInfo->vnodes_num = pStat->totalVnodes;
pInfo->masters = pStat->masterNum;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册