提交 a305bb42 编写于 作者: S slguan

Add code for module dnodeMgmt

上级 39bdc75b
......@@ -6,6 +6,8 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/common/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
......
......@@ -20,17 +20,17 @@
extern "C" {
#endif
int dnodeInitMgmt();
void dnodeCleanupMgmt();
void dnodeMgmt(SRpcMsg *);
int32_t dnodeInitMgmt();
void dnodeCleanupMgmt();
void dnodeMgmt(void *rpcMsg);
void* dnodeGetVnode(int vgId);
int dnodeGetVnodeStatus(void *);
void* dnodeGetVnodeRworker(void *);
void* dnodeGetVnodeWworker(void *);
void* dnodeGetVnodeWal(void *);
void* dnodeGetVnodeTsdb(void *);
void dnodeReleaseVnode(void *);
void* dnodeGetVnode(int32_t vgId);
int32_t dnodeGetVnodeStatus(void *pVnode);
void* dnodeGetVnodeRworker(void *pVnode);
void* dnodeGetVnodeWworker(void *pVnode);
void* dnodeGetVnodeWal(void *pVnode);
void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode);
#ifdef __cplusplus
}
......
......@@ -15,49 +15,64 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "ihash.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "dnodeWrite.h"
#include "dnodeRead.h"
#include "tstatus.h"
#include "tsdb.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
typedef struct {
int32_t vgId; // global vnode group ID
int status; // status: master, slave, notready, deleting
int refCount; // reference count
int32_t status; // status: master, slave, notready, deleting
int32_t refCount; // reference count
int64_t version;
void *wworker;
void *rworker;
void *wal;
void *tsdb;
void *replica;
void *events;
void *cq; // continuous query
void *wworker;
void *rworker;
void *wal;
void *tsdb;
void *replica;
void *events;
void *cq; // continuous query
} SVnodeObj;
static int dnodeOpenVnodes();
static void dnodeCleanupVnodes();
static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg);
static int dnodeDropVnode(SVnodeObj *pVnode);
static void dnodeRemoveVnode(SVnodeObj *pVnode);
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static int32_t dnodeOpenVnodes();
static void dnodeCleanupVnodes();
static int32_t dnodeOpenVnode(int32_t vgId);
static void dnodeCleanupVnode(SVnodeObj *pVnode);
static int32_t dnodeCreateVnode(SCreateVnodeMsg *cfg);
static void dnodeDropVnode(SVnodeObj *pVnode);
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static void * tsDnodeVnodesHash = NULL;
int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_VNODE] = dnodeProcessDropVnodeMsg;
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
int dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessDropVnodeMsg;
return dnodeOpenVnodes();
}
void dnodeCleanupMgmt() {
dnodeCleanupVnodes();
taosCleanUpIntHash(tsDnodeVnodesHash);
}
void dnodeMgmt(SRpcMsg *pMsg) {
void dnodeMgmt(void *rpcMsg) {
SRpcMsg *pMsg = rpcMsg;
terrno = 0;
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
......@@ -68,28 +83,29 @@ void dnodeMgmt(SRpcMsg *pMsg) {
SRpcMsg rsp;
rsp.handle = pMsg->handle;
rsp.code = terrno;
rsp.pCont = NULL;
rsp.code = terrno;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont); // free the received message
}
void *dnodeGetVnode(int vgId) {
SVnodeObj *pVnode;
// retrieve the pVnode from vgId
void *dnodeGetVnode(int32_t vgId) {
SVnodeObj *pVnode = taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) {
terrno = TSDB_CODE_INVALID_VGROUP_ID;
return NULL;
}
// if (pVnode->status == ....) {
// terrno = ;
// return NULL;
// }
if (pVnode->status != TSDB_VN_STATUS_MASTER && pVnode->status == TSDB_VN_STATUS_SLAVE) {
terrno = TSDB_CODE_INVALID_VNODE_STATUS;
return NULL;
}
atomic_add_fetch_32(&pVnode->refCount, 1);
return pVnode;
}
int dnodeGetVnodeStatus(void *pVnode) {
int32_t dnodeGetVnodeStatus(void *pVnode) {
return ((SVnodeObj *)pVnode)->status;
}
......@@ -109,61 +125,53 @@ void *dnodeGetVnodeTsdb(void *pVnode) {
return ((SVnodeObj *)pVnode)->tsdb;
}
void dnodeReleaseVnode(void *param) {
SVnodeObj *pVnode = (SVnodeObj *)param;
int refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (refCount == 0) dnodeRemoveVnode(pVnode);
}
static int dnodeOpenVnode() {
SVnodeObj *pVnode;
// create tsdb
// create wal
// allocate write worker
pVnode->wworker = dnodeAllocateWriteWorker();
// create read queue
pVnode->rworker = dnodeAllocateReadWorker();
// create the replica
// set the status
pVnode->refCount = 1;
return 0;
}
static int dnodeOpenVnodes() {
return 0;
void dnodeReleaseVnode(void *pVnode) {
atomic_sub_fetch_32(&((SVnodeObj *) pVnode)->refCount, 1);
}
static void dnodeCleanupVnode() {
static int32_t dnodeOpenVnodes() {
dPrint("open all vnodes");
return TSDB_CODE_SUCCESS;
}
static void dnodeCleanupVnodes() {
dPrint("clean all vnodes");
}
static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg) {
static int32_t dnodeOpenVnode(int32_t vgId) {
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
SVnodeObj *pVnode = malloc(sizeof(SVnodeObj));
// save the vnode info in non-volatile storage
// add into hash, so it can be retrieved
dnodeOpenVnode(pVnode);
void *pTsdb = tsdbOpenRepo(rootDir);
if (pTsdb != NULL) {
return terrno;
}
return 0;
SVnodeObj vnodeObj;
vnodeObj.vgId = vgId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
return TSDB_CODE_SUCCESS;
}
static void dnodeRemoveVnode(SVnodeObj *pVnode) {
static void dnodeCleanupVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_NOT_READY;
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count > 0) {
// wait refcount
}
// remove replica
// remove read queue
......@@ -171,26 +179,79 @@ static void dnodeRemoveVnode(SVnodeObj *pVnode) {
// remove write queue
dnodeFreeWriteWorker(pVnode->wworker);
// remove wal
// remove tsdb
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb);
}
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
}
static int dnodeDropVnode(SVnodeObj *pVnode) {
int count = atomic_sub_fetch_32(&pVnode->refCount, 1);
static int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnodeCfg) {
pVnodeCfg->vnode = htonl(pVnodeCfg->vnode);
pVnodeCfg->cfg.vgId = htonl(pVnodeCfg->cfg.vgId);
pVnodeCfg->cfg.maxSessions = htonl(pVnodeCfg->cfg.maxSessions);
pVnodeCfg->cfg.daysPerFile = htonl(pVnodeCfg->cfg.daysPerFile);
STsdbCfg tsdbCfg;
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->vnode;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId);
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb != NULL) {
return terrno;
}
if (count<=0) dnodeRemoveVnode(pVnode);
SVnodeObj vnodeObj;
vnodeObj.vgId = pVnodeCfg->cfg.vgId;
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL;
vnodeObj.events = NULL;
vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj);
return TSDB_CODE_SUCCESS;
}
static void dnodeDropVnode(SVnodeObj *pVnode) {
pVnode->status = TSDB_VN_STATUS_NOT_READY;
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
if (count > 0) {
// wait refcount
}
if (pVnode->tsdb) {
tsdbDropRepo(pVnode->tsdb);
pVnode->tsdb = NULL;
}
return 0;
dnodeCleanupVnode(pVnode);
}
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) {
// SVnodeObj *pVnode;
// int vgId;
// int32_t vgId;
// SVPeersMsg *pCfg;
// check everything, if not ok, set terrno;
......@@ -206,7 +267,7 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) {
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) {
SVnodeObj *pVnode;
int vgId;
int32_t vgId;
// check everything, if not ok, set terrno;
......@@ -220,7 +281,7 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) {
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) {
SVnodeObj *pVnode;
int vgId;
int32_t vgId;
// check everything, if not ok, set terrno;
......
......@@ -28,7 +28,7 @@ static void *tsDnodeMnodeRpc = NULL;
int32_t dnodeInitMnode() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeMgmt;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_VNODE] = dnodeMgmt;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
......
......@@ -166,6 +166,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG_VERSION, 0, 122, "invalid version of message")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 123, "dnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 124, "invalid vgroup id")
#ifdef TAOS_ERROR_C
};
......
......@@ -41,10 +41,17 @@ extern "C" {
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
// dnodeMgmt
#define TSDB_MSG_TYPE_CREATE_VNODE 13
#define TSDB_MSG_TYPE_CREATE_VNODE_RSP 14
#define TSDB_MSG_TYPE_FREE_VNODE 15
#define TSDB_MSG_TYPE_FREE_VNODE_RSP 16
#define TSDB_MSG_TYPE_DROP_VNODE 15
#define TSDB_MSG_TYPE_DROP_VNODE_RSP 16
#define TSDB_MSG_TYPE_ALTER_VNODE 17
#define TSDB_MSG_TYPE_ALTER_VNODE_RSP 18
#define TSDB_MSG_TYPE_CONFIG_VNODE 19
#define TSDB_MSG_TYPE_CONFIG_VNODE_RSP 20
#define TSDB_MSG_TYPE_DNODE_CFG 17
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19
......@@ -93,8 +100,7 @@ extern "C" {
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66
#define TSDB_MSG_TYPE_ALTER_TABLE 67
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68
#define TSDB_MSG_TYPE_VNODE_CFG 69
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70
#define TSDB_MSG_TYPE_TABLE_CFG 71
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72
#define TSDB_MSG_TYPE_TABLE_META 73
......
......@@ -259,7 +259,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
} else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) {
} else if (msgType == TSDB_MSG_TYPE_CONFIG_VNODE) {
mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP) {
mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code);
......@@ -267,7 +267,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_CREATE_VNODE_RSP) {
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) {
} else if (msgType == TSDB_MSG_TYPE_DROP_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DROP_STABLE) {
mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
......@@ -294,7 +294,7 @@ void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg));
if (pFreeVnode != NULL) {
pFreeVnode->vnode = htonl(vnode);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DROP_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle);
}
}
......
......@@ -48,6 +48,7 @@ typedef enum _TSDB_VN_STATUS {
TSDB_VN_STATUS_MASTER,
TSDB_VN_STATUS_CLOSING,
TSDB_VN_STATUS_DELETING,
TSDB_VN_STATUS_NOT_READY
} EVnodeStatus;
enum _TSDB_VN_SYNC_STATUS {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册