提交 ab4b640f 编写于 作者: S Shengliang Guan

dnode-vnodes

上级 12c6ea28
......@@ -36,7 +36,7 @@ enum {
TSDB_MESSAGE_NULL = 0,
#endif
// message from client to dnode
// message from client to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
......@@ -46,25 +46,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
......@@ -88,6 +75,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
......@@ -97,54 +85,55 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" )
// message from client to qnode
// message from client to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from mnode to dnode
// message from vnode to vnode
// message from vnode to mnode
// message from vnode to qnode
// message from vnode to dnode
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" )
// message from qnode to vnode
// message from qnode to mnode
// message from qnode to qnode
// message from qnode to dnode
// message from dnode to vnode
// message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" )
// message from dnode to qnode
// message from dnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 147
......@@ -428,10 +417,6 @@ typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
} SDropSTableMsg;
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
......@@ -661,9 +646,8 @@ typedef struct {
typedef struct {
int32_t vgId;
int8_t status;
int8_t role;
int8_t reserved[2];
int8_t reserved[3];
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
......@@ -737,9 +721,18 @@ typedef struct {
int8_t replica;
int8_t quorum;
int8_t selfIndex;
SVnodeDesc nodes[TSDB_MAX_REPLICA];
SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct {
int32_t vgId;
int8_t accessState;
} SAuthVnodeMsg;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag;
......@@ -1002,6 +995,7 @@ typedef struct {
/* data */
} SUpdateTagValRsp;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -27,7 +27,7 @@ extern "C" {
typedef struct SVnode SVnode;
typedef struct {
char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
......@@ -47,17 +47,6 @@ typedef struct {
SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SVnodeCfg;
typedef struct {
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int64_t tablesNum;
} SVnodeStatisic;
typedef struct {
int8_t syncRole;
} SVnodeStatus;
typedef struct SVnodeMsg {
int32_t msgType;
int32_t code;
......@@ -69,9 +58,6 @@ typedef struct SVnodeMsg {
int32_t vnodeInit();
void vnodeCleanup();
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat);
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus);
SVnode *vnodeOpen(int32_t vgId, const char *path);
void vnodeClose(SVnode *pVnode);
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
......@@ -80,6 +66,7 @@ int32_t vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg);
#ifdef __cplusplus
......
......@@ -42,7 +42,10 @@ void dnodeCleanup();
EDnStat dnodeGetRunStat();
void dnodeSetRunStat();
void dnodeGetStartup(SStartupMsg *);
void dnodeReportStartup(char *name, char *desc);
void dnodeReportStartupFinished(char *name, char *desc);
void dnodeGetStartup(SStartupMsg *);
#ifdef __cplusplus
}
......
......@@ -23,9 +23,14 @@ extern "C" {
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeGetVnodes(SVnodeLoads *pVloads);
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
}
#endif
......
......@@ -35,14 +35,14 @@ EDnStat dnodeGetRunStat() { return tsInt.runStat; }
void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; }
static void dnodeReportStartup(char *name, char *desc) {
void dnodeReportStartup(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0;
}
static void dnodeReportStartupFinished(char *name, char *desc) {
void dnodeReportStartupFinished(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
......
......@@ -34,24 +34,22 @@ static struct {
static void dnodeInitMsgFp() {
// msg from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg;
// msg from client to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
......@@ -77,6 +75,7 @@ static void dnodeInitMsgFp() {
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
......@@ -85,22 +84,29 @@ static void dnodeInitMsgFp() {
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
// message from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
// message from mnode to vnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
......
......@@ -14,13 +14,622 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeVnodes.h"
#include "thash.h"
#include "tqueue.h"
#include "tstep.h"
#include "tthread.h"
#include "tworker.h"
#include "vnode.h"
int32_t dnodeInitVnodes() { return vnodeInit(); }
typedef struct {
int32_t vgId;
int32_t refCount;
int8_t dropped;
int8_t accessState;
SVnode *pImpl;
taos_queue pWriteQ;
taos_queue pSyncQ;
taos_queue pApplyQ;
taos_queue pQueryQ;
taos_queue pFetchQ;
} SVnodeObj;
void dnodeCleanupVnodes() { vnodeCleanup(); }
typedef struct {
pthread_t *threadId;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
SVnodeObj *pVnodes;
} SVThread;
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); }
static struct {
SHashObj *hash;
SWorkerPool mgmtPool;
taos_queue pMgmtQ;
SSteps *pSteps;
int32_t openVnodes;
int32_t totalVnodes;
char file[PATH_MAX + 20];
} tsVnodes;
void dnodeGetVnodes(SVnodeLoads *pVloads) {}
\ No newline at end of file
static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) {
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
pVnode->vgId = vgId;
pVnode->refCount = 0;
pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl;
pVnode->pWriteQ = NULL;
pVnode->pSyncQ = NULL;
pVnode->pApplyQ = NULL;
pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL;
return taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
}
static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) {
taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t));
//todo wait all queue empty
pVnode->pWriteQ = NULL;
pVnode->pSyncQ = NULL;
pVnode->pApplyQ = NULL;
pVnode->pQueryQ = NULL;
pVnode->pFetchQ = NULL;
}
static int32_t dnodeGetVnodesFromHash(SVnodeObj *pVnodes[], int32_t *numOfVnodes) {
void *pIter = taosHashIterate(tsVnodes.hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (*ppVnode) {
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
dError("vgId:%d, too many open vnodes, exist:%d max:%d", (*ppVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
pVnodes[*numOfVnodes - 1] = (*ppVnode);
}
}
pIter = taosHashIterate(tsVnodes.hash, pIter);
}
return TSDB_CODE_SUCCESS;
}
static int32_t dnodeGetVnodesFromFile(SVnodeObj *pVnodes, int32_t *numOfVnodes) {
pVnodes[0].vgId = 2;
pVnodes[0].dropped = 0;
pVnodes[0].vgId = 3;
pVnodes[0].dropped = 0;
return 0;
}
static int32_t dnodeWriteVnodesToFile() { return 0; }
static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
int32_t code = 0;
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, vgId);
SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
if (pImpl == NULL) {
code = terrno;
return code;
}
code = dnodeCreateVnodeWrapper(vgId, pImpl);
if (code != 0) {
vnodeDrop(pImpl);
return code;
}
code = dnodeWriteVnodesToFile();
if (code != 0) {
vnodeDrop(pImpl);
return code;
}
return code;
}
static int32_t dnodeDropVnode(SVnodeObj *pVnode) {
pVnode->dropped = 1;
int32_t code = dnodeWriteVnodesToFile();
if (code != 0) {
pVnode->dropped = 0;
return code;
}
dnodeDropVnodeWrapper(pVnode);
vnodeDrop(pVnode->pImpl);
dnodeWriteVnodesToFile();
return 0;
}
static SVnodeObj *dnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = NULL;
taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
}
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount);
return pVnode;
}
static void dnodeReleaseVnode(SVnodeObj *pVnode) {
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
static void *dnodeOpenVnodeFunc(void *param) {
SVThread *pThread = param;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = &pThread->pVnodes[v];
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId,
tsVnodes.openVnodes, tsVnodes.totalVnodes);
dnodeReportStartup("open-vnodes", stepDesc);
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, pVnode->vgId);
SVnode *pImpl = vnodeOpen(pVnode->vgId, path);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dnodeCreateVnodeWrapper(pVnode->vgId, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++;
}
atomic_add_fetch_32(&tsVnodes.openVnodes, 1);
}
dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodes.hash == NULL) {
dError("failed to init vnode hash");
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
SVnodeObj pVnodes[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t code = dnodeGetVnodesFromFile(pVnodes, &numOfVnodes);
if (code != TSDB_CODE_SUCCESS) {
dInfo("failed to get vnode list from disk since %s", tstrerror(code));
return code;
}
tsVnodes.totalVnodes = numOfVnodes;
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVThread *threads = calloc(threadNum, sizeof(SVThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SVThread *pThread = &threads[t];
pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v];
}
dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SVThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pThread->threadId = taosCreateThread(dnodeOpenVnodeFunc, pThread);
if (pThread->threadId == NULL) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
}
for (int32_t t = 0; t < threadNum; ++t) {
SVThread *pThread = &threads[t];
taosDestoryThread(pThread->threadId);
pThread->threadId = NULL;
free(pThread->pVnodes);
}
free(threads);
if (tsVnodes.openVnodes != tsVnodes.totalVnodes) {
dError("there are total vnodes:%d, opened:%d", tsVnodes.totalVnodes, tsVnodes.openVnodes);
return -1;
} else {
dInfo("total vnodes:%d open successfully", tsVnodes.totalVnodes);
}
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
SVnodeObj *pVnodes[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t code = dnodeGetVnodesFromHash(pVnodes, &numOfVnodes);
if (code != TSDB_CODE_SUCCESS) {
dInfo("failed to get dnode list since code %d", code);
return;
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeClose(pVnodes[i]->pImpl);
}
if (tsVnodes.hash != NULL) {
taosHashCleanup(tsVnodes.hash);
tsVnodes.hash = NULL;
}
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static int32_t dnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
*vgId = htonl(pCreate->vgId);
tstrncpy(pCfg->db, pCreate->db, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN);
pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->totalBlocks = htonl(pCreate->totalBlocks);
pCfg->daysPerFile = htonl(pCreate->daysPerFile);
pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0);
pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCfg->precision = pCreate->precision;
pCfg->compression = pCreate->compression;
pCfg->cacheLastRow = pCreate->cacheLastRow;
pCfg->update = pCreate->update;
pCfg->quorum = pCreate->quorum;
pCfg->replica = pCreate->replica;
pCfg->walLevel = pCreate->walLevel;
pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod);
for (int32_t i = 0; i < pCfg->replica; ++i) {
pCfg->replicas[i].port = htons(pCreate->replicas[i].port);
tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
}
return 0;
}
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = rpcMsg->pCont;
pAuth->vgId = htonl(pAuth->vgId);
return pAuth;
}
static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
dDebug("vgId:%d, create vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", vgId);
dnodeReleaseVnode(pVnode);
return 0;
}
int32_t code = dnodeCreateVnode(vgId, &vnodeCfg);
if (code != 0) {
dError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code));
}
return code;
}
static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
int32_t code = 0;
dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
dDebug("vgId:%d, alter vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
return code;
}
code = vnodeAlter(pVnode->pImpl, &vnodeCfg);
if (code != 0) {
dError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pDrop->vgId;
dDebug("vgId:%d, drop vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code));
return code;
}
code = vnodeDrop(pVnode->pImpl);
if (code != 0) {
dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessAuthVnodeReq(SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pAuth->vgId;
dDebug("vgId:%d, auth vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code));
return code;
}
pVnode->accessState = pAuth->accessState;
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pAuth->vgId;
dDebug("vgId:%d, auth vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code));
return code;
}
code = vnodeSync(pVnode->pImpl);
if (code != 0) {
dError("vgId:%d, failed to auth vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pCompact->vgId;
dDebug("vgId:%d, compact vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code));
return code;
}
code = vnodeCompact(pVnode->pImpl);
if (code != 0) {
dError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static void dnodeProcessVnodeMgmtReq(SRpcMsg *pMsg, void *unused) {
int32_t code = 0;
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_VNODE_IN:
code = vnodeProcessCreateVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_ALTER_VNODE_IN:
code = vnodeProcessAlterVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_DROP_VNODE_IN:
code = vnodeProcessDropVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_AUTH_VNODE_IN:
code = vnodeProcessAuthVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_SYNC_VNODE_IN:
code = vnodeProcessSyncVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_COMPACT_VNODE_IN:
code = vnodeProcessCompactVnodeReq(pMsg);
break;
default:
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
break;
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t dnodeWriteToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0;
if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg == NULL) {
code = TSDB_CODE_DND_OUT_OF_MEMORY;
} else {
*pMsg = *pRpcMsg;
code = taosWriteQitem(pQueue, pMsg);
}
}
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
rpcFreeCont(pRpcMsg->pCont);
}
}
static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(pHead->vgId);
if (pVnode == NULL) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
}
return pVnode;
}
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteToVnodeQueue(tsVnodes.pMgmtQ, pMsg); }
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pWriteQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pSyncQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pQueryQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteToVnodeQueue(pVnode->pFetchQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
static int32_t dnodeInitVnodeMgmtWorker() {
SWorkerPool *pPool = &tsVnodes.mgmtPool;
pPool->name = "vnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtReq);
if (tsVnodes.pMgmtQ == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeMgmtWorker() {
tWorkerFreeQueue(&tsVnodes.mgmtPool, tsVnodes.pMgmtQ);
tWorkerCleanup(&tsVnodes.mgmtPool);
tsVnodes.pMgmtQ = NULL;
}
int32_t dnodeInitVnodes() {
dInfo("dnode-vnodes start to init");
SSteps *pSteps = taosStepInit(3, dnodeReportStartup);
taosStepAdd(pSteps, "dnode-vnode-env", vnodeInit, vnodeCleanup);
taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker);
taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes);
tsVnodes.pSteps = pSteps;
return taosStepExec(pSteps);
}
void dnodeCleanupVnodes() {
if (tsVnodes.pSteps != NULL) {
dInfo("dnode-vnodes start to clean up");
taosStepCleanup(tsVnodes.pSteps);
tsVnodes.pSteps = NULL;
dInfo("dnode-vnodes is cleaned up");
}
}
void dnodeGetVnodes(SVnodeLoads *pLoads) {
pLoads->vnodeNum = taosHashGetSize(tsVnodes.hash);
int32_t v = 0;
void *pIter = taosHashIterate(tsVnodes.hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
if (pVnode) {
SVnodeLoad *pLoad = &pLoads->vnodeLoads[v++];
vnodeGetLoad(pVnode->pImpl, pLoad);
pLoad->vgId = htonl(pLoad->vgId);
pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
pLoad->tablesNum = htobe64(pLoad->tablesNum);
}
pIter = taosHashIterate(tsVnodes.hash, pIter);
}
}
\ No newline at end of file
......@@ -19,9 +19,6 @@
int32_t vnodeInit() { return 0; }
void vnodeCleanup() {}
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; }
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; }
SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }
void vnodeClose(SVnode *pVnode) {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
......@@ -31,3 +28,4 @@ int32_t vnodeCompact(SVnode *pVnode) { return 0; }
int32_t vnodeSync(SVnode *pVnode) { return 0; }
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {}
void vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册