提交 9c57ee05 编写于 作者: S Shengliang Guan

TD-10431 create dnode

上级 8acbcb7c
......@@ -38,6 +38,15 @@ extern "C" {
dataPos += sizeof(int32_t); \
}
#define SDB_GET_INT16(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \
sdbFreeRow(pRow); \
return NULL; \
} \
dataPos += sizeof(int16_t); \
}
#define SDB_GET_INT8(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \
......@@ -74,6 +83,15 @@ extern "C" {
dataPos += sizeof(int32_t); \
}
#define SDB_SET_INT16(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt16(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += sizeof(int16_t); \
}
#define SDB_SET_INT8(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
......@@ -100,6 +118,7 @@ extern "C" {
} \
}
typedef struct SMnode SMnode;
typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
......@@ -130,7 +149,7 @@ typedef struct SSdb SSdb;
typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbDeployFp)(SSdb *pSdb);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
......@@ -190,6 +209,12 @@ typedef struct SSdbOpt {
*
*/
const char *path;
/**
* @brief The mnode object.
*
*/
SMnode *pMnode;
} SSdbOpt;
/**
......@@ -291,12 +316,14 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw);
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val);
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val);
int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val);
int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen);
int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen);
int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status);
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val);
int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val);
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val);
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val);
int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen);
......
......@@ -78,6 +78,38 @@ typedef enum {
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
typedef enum {
DND_STATUS_OFFLINE = 0,
DND_STATUS_READY = 1,
DND_STATUS_CREATING = 2,
DND_STATUS_DROPPING = 3
} EDndStatus;
typedef enum {
DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT,
DND_REASON_STATUS_NOT_RECEIVED,
DND_REASON_RESET_BY_MNODE,
DND_REASON_VERSION_NOT_MATCH,
DND_REASON_DNODE_ID_NOT_MATCH,
DND_REASON_CLUSTER_ID_NOT_MATCH,
DND_REASON_NUM_OF_MNODES_NOT_MATCH,
DND_REASON_ENABLE_BALANCE_NOT_MATCH,
DND_REASON_MN_EQUAL_VN_NOT_MATCH,
DND_REASON_OFFLINE_THRESHOLD_NOT_MATCH,
DND_REASON_STATUS_INTERVAL_NOT_MATCH,
DND_REASON_MAX_TAB_PER_VN_NOT_MATCH,
DND_REASON_MAX_VG_PER_DB_NOT_MATCH,
DND_REASON_ARBITRATOR_NOT_MATCH,
DND_REASON_TIME_ZONE_NOT_MATCH,
DND_REASON_LOCALE_NOT_MATCH,
DND_REASON_CHARSET_NOT_MATCH,
DND_REASON_FLOW_CTRL_NOT_MATCH,
DND_REASON_SLAVE_QUERY_NOT_MATCH,
DND_REASON_ADJUST_MASTER_NOT_MATCH,
DND_REASON_OTHERS
} EDndReason;
typedef struct STrans {
int32_t id;
ETrnStage stage;
......@@ -99,29 +131,32 @@ typedef struct SClusterObj {
} SClusterObj;
typedef struct SDnodeObj {
int32_t id;
int32_t vnodes;
int64_t createdTime;
int64_t updateTime;
int64_t lastAccess;
int64_t rebootTime; // time stamp for last reboot
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
uint16_t port;
int16_t numOfCores; // from dnode status msg
int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t status; // set in balance function
int8_t offlineReason;
int32_t id;
int64_t createdTime;
int64_t updateTime;
int64_t rebootTime;
int64_t lastAccessTime;
int16_t numOfMnodes;
int16_t numOfVnodes;
int16_t numOfQnodes;
int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes;
EDndStatus status;
EDndReason offlineReason;
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
} SDnodeObj;
typedef struct SMnodeObj {
int32_t id;
int64_t createdTime;
int64_t updateTime;
int8_t status;
int8_t role;
int32_t roleTerm;
int64_t roleTime;
int64_t createdTime;
int64_t updateTime;
SDnodeObj *pDnode;
} SMnodeObj;
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TRANSACTION_INT_H_
#define _TD_TRANSACTION_INT_H_
#ifndef _TD_MND_TRANS_H_
#define _TD_MND_TRANS_H_
#include "mndInt.h"
......@@ -44,4 +44,4 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
}
#endif
#endif /*_TD_TRANSACTION_INT_H_*/
#endif /*_TD_MND_TRANS_H_*/
......@@ -79,7 +79,7 @@ static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *p
return 0;
}
static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) {
static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) {
int32_t code = 0;
SAcctObj acctObj = {0};
......@@ -98,7 +98,7 @@ static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pSdb, pRaw);
return sdbWrite(pMnode->pSdb, pRaw);
}
int32_t mndInitAcct(SMnode *pMnode) {
......
......@@ -14,8 +14,115 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndTrans.h"
int32_t mndInitDnode(SMnode *pMnode) { return 0; }
void mndCleanupDnode(SMnode *pMnode) {}
\ No newline at end of file
#define SDB_DNODE_VER 1
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_DNODE_VER, sizeof(SDnodeObj));
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pDnode->id);
SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime)
SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime)
SDB_SET_INT16(pRaw, dataPos, pDnode->port)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
}
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_DNODE_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode dnode since %s", terrstr());
return NULL;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj));
SDnodeObj *pDnode = sdbGetRowObj(pRow);
if (pDnode == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pDnode->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->updateTime)
SDB_GET_INT16(pRaw, pRow, dataPos, &pDnode->port)
SDB_GET_BINARY(pRaw, pRow, dataPos, pDnode->fqdn, TSDB_FQDN_LEN)
return pRow;
}
static void mnodeResetDnode(SDnodeObj *pDnode) {
pDnode->rebootTime = 0;
pDnode->lastAccessTime = 0;
pDnode->numOfMnodes = 0;
pDnode->numOfVnodes = 0;
pDnode->numOfQnodes = 0;
pDnode->numOfSupportMnodes = 0;
pDnode->numOfSupportVnodes = 0;
pDnode->numOfSupportQnodes = 0;
pDnode->status = DND_STATUS_OFFLINE;
pDnode->offlineReason = DND_REASON_RESET_BY_MNODE;
snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port);
}
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
mnodeResetDnode(pDnode);
return 0;
}
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { return 0; }
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) {
pSrcDnode->id = pDstDnode->id;
pSrcDnode->createdTime = pDstDnode->createdTime;
pSrcDnode->updateTime = pDstDnode->updateTime;
pSrcDnode->port = pDstDnode->port;
memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN);
mnodeResetDnode(pSrcDnode);
}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
SDnodeObj dnodeObj = {0};
dnodeObj.id = 0;
dnodeObj.createdTime = taosGetTimestampMs();
dnodeObj.updateTime = dnodeObj.createdTime;
dnodeObj.port = pMnode->replicas[0].port;
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mndCreateDefaultDnode,
.encodeFp = (SdbEncodeFp)mndDnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndDnodeActionDecode,
.insertFp = (SdbInsertFp)mndDnodeActionInsert,
.updateFp = (SdbUpdateFp)mndDnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndDnodeActionDelete};
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupDnode(SMnode *pMnode) {}
\ No newline at end of file
......@@ -21,7 +21,7 @@
#define SDB_USER_VER 1
static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj));
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SUserObj));
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
......@@ -97,7 +97,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs
return 0;
}
static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pass) {
static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char *pass) {
SUserObj userObj = {0};
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
......@@ -113,15 +113,15 @@ static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pa
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pSdb, pRaw);
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndCreateDefaultUsers(SSdb *pSdb) {
if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
......
......@@ -38,8 +38,8 @@ extern "C" {
typedef struct SSdbRaw {
int8_t type;
int8_t sver;
int8_t status;
int8_t sver;
int8_t reserved;
int32_t dataLen;
char pData[];
......@@ -53,6 +53,7 @@ typedef struct SSdbRow {
} SSdbRow;
typedef struct SSdb {
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
......
......@@ -27,7 +27,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
}
char path[PATH_MAX + 100];
snprintf(path, PATH_MAX + 100, "%s", pOption->path);
snprintf(path, PATH_MAX + 100, "%s%sdata", pOption->path, TD_DIRSEP);
pSdb->currDir = strdup(path);
snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP);
pSdb->syncDir = strdup(path);
......@@ -44,6 +44,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
taosInitRWLatch(&pSdb->locks[i]);
}
pSdb->pMnode = pOption->pMnode;
mDebug("sdb init successfully");
return pSdb;
}
......
......@@ -46,7 +46,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue;
if ((*fp)(pSdb) != 0) {
if ((*fp)(pSdb->pMnode) != 0) {
mError("failed to deploy sdb:%d since %s", i, terrstr());
return -1;
}
......
......@@ -61,6 +61,21 @@ int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) {
return 0;
}
int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int16_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*(int16_t *)(pRaw->pData + dataPos) = val;
return 0;
}
int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
......@@ -146,6 +161,21 @@ int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) {
return 0;
}
int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int16_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*val = *(int16_t *)(pRaw->pData + dataPos);
return 0;
}
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册