提交 47e1d695 编写于 作者: S Shengliang Guan

TD-10431 dnode manage

上级 7c5ab044
......@@ -36,6 +36,53 @@ static char *offlineReason[] = {
"unknown",
};
static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode);
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg);
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg);
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg);
int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultDnode,
.encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
.insertFp = (SdbInsertFp)mndDnodeActionInsert,
.updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndDnodeActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupDnode(SMnode *pMnode) {}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
SDnodeObj dnodeObj = {0};
dnodeObj.id = 1;
dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime;
dnodeObj.port = pMnode->replicas[0].port;
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, SDB_DNODE_VER, sizeof(SDnodeObj));
if (pRaw == NULL) return NULL;
......@@ -107,24 +154,18 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj
pOldDnode->createdTime = pNewDnode->createdTime;
pOldDnode->updateTime = pNewDnode->updateTime;
pOldDnode->port = pNewDnode->port;
memcpy(pOldDnode->fqdn, pNewDnode->fqdn, TSDB_FQDN_LEN);
memcpy(pOldDnode->fqdn, pNewDnode->fqdn, TSDB_FQDN_LEN);
return 0;
}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
SDnodeObj dnodeObj = {0};
dnodeObj.id = 1;
dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime;
dnodeObj.port = pMnode->replicas[0].port;
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
}
mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pDnode);
}
static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
......@@ -322,33 +363,3 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { return 0; }
int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultDnode,
.encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
.insertFp = (SdbInsertFp)mndDnodeActionInsert,
.updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndDnodeActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupDnode(SMnode *pMnode) {}
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
}
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pDnode);
}
......@@ -18,6 +18,47 @@
#define SDB_MNODE_VER 1
static int32_t mndCreateDefaultMnode(SMnode *pMnode);
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj);
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj);
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj);
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode);
static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg);
int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultMnode,
.encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
.insertFp = (SdbInsertFp)mndMnodeActionInsert,
.updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupMnode(SMnode *pMnode) {}
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = 1;
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("mnode:%d, will be created while deploy sdb", mnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, SDB_MNODE_VER, sizeof(SMnodeObj));
if (pRaw == NULL) return NULL;
......@@ -89,42 +130,6 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj
return 0;
}
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = 1;
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("mnode:%d, will be created while deploy sdb", mnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return 0; }
int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultMnode,
.encodeFp = (SdbEncodeFp)mndMnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndMnodeActionDecode,
.insertFp = (SdbInsertFp)mndMnodeActionInsert,
.updateFp = (SdbUpdateFp)mndMnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndMnodeActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupMnode(SMnode *pMnode) {}
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
SSdb *pSdb = pMnode->pSdb;
......@@ -157,4 +162,8 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
pEpSet->numOfEps++;
}
}
\ No newline at end of file
}
static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return 0; }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册