From 23da7d4230211e26e052a755fb23becab097e3e8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 8 Dec 2021 18:50:52 +0800 Subject: [PATCH] TD-10431 create dnode --- include/common/taosmsg.h | 16 +- include/common/tglobal.h | 1 - include/util/tutil.h | 23 +- source/common/src/tglobal.c | 18 - source/dnode/mnode/impl/inc/mndDef.h | 32 +- source/dnode/mnode/impl/inc/mndDnode.h | 1 + source/dnode/mnode/impl/inc/mndShow.h | 1 + source/dnode/mnode/impl/inc/mndVgroup.h | 1 + source/dnode/mnode/impl/src/mndDnode.c | 428 +++++++++++++++++++++++- source/dnode/mnode/impl/src/mndShow.c | 7 +- source/dnode/mnode/impl/src/mndUser.c | 7 +- source/dnode/mnode/impl/src/mndVgroup.c | 114 ++++++- source/util/src/tutil.c | 21 +- 13 files changed, 589 insertions(+), 81 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 6b8d55c458..08d8eacce9 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -829,7 +829,16 @@ typedef struct SShowRsp { typedef struct { char ep[TSDB_EP_LEN]; // end point, hostname:port -} SCreateDnodeMsg, SDropDnodeMsg; +} SCreateDnodeMsg; + +typedef struct { + int32_t dnodeId; +} SDropDnodeMsg; + +typedef struct { + int32_t dnodeId; + char config[128]; +} SCfgDnodeMsg; typedef struct { int32_t dnodeId; @@ -849,11 +858,6 @@ typedef struct { int32_t vgId; } SConfigVnodeMsg; -typedef struct { - char ep[TSDB_EP_LEN]; // end point, hostname:port - char config[64]; -} SCfgDnodeMsg; - typedef struct { char sql[TSDB_SHOW_SQL_LEN]; int32_t queryId; diff --git a/include/common/tglobal.h b/include/common/tglobal.h index d0f95b786a..e62083e999 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -196,7 +196,6 @@ extern SDiskCfg tsDiskCfg[]; void taosInitGlobalCfg(); int32_t taosCheckGlobalCfg(); int32_t taosCfgDynamicOptions(char *msg); -int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId); void taosAddDataDir(int index, char *v1, int level, int primary); void taosReadDataDirCfg(char *v1, char *v2, char *v3); diff --git a/include/util/tutil.h b/include/util/tutil.h index 573dee9339..41eb2ca26b 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -21,22 +21,22 @@ extern "C" { #endif #include "os.h" -#include "tmd5.h" #include "tcrc32c.h" #include "tdef.h" +#include "tmd5.h" int32_t strdequote(char *src); -int32_t strndequote(char *dst, const char* z, int32_t len); +int32_t strndequote(char *dst, const char *z, int32_t len); int32_t strRmquote(char *z, int32_t len); size_t strtrim(char *src); -char * strnchr(char *haystack, char needle, int32_t len, bool skipquote); -char ** strsplit(char *src, const char *delim, int32_t *num); -char * strtolower(char *dst, const char *src); -char * strntolower(char *dst, const char *src, int32_t n); -char * strntolower_s(char *dst, const char *src, int32_t n); +char *strnchr(char *haystack, char needle, int32_t len, bool skipquote); +char **strsplit(char *src, const char *delim, int32_t *num); +char *strtolower(char *dst, const char *src); +char *strntolower(char *dst, const char *src, int32_t n); +char *strntolower_s(char *dst, const char *src, int32_t n); int64_t strnatoi(char *num, int32_t len); -char * strbetween(char *string, char *begin, char *end); -char * paGetToken(char *src, char **token, int32_t *tokenLen); +char *strbetween(char *string, char *begin, char *end); +char *paGetToken(char *src, char **token, int32_t *tokenLen); int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]); int32_t taosHexStrToByteArray(char hexstr[], char bytes[]); @@ -45,11 +45,12 @@ char *taosIpStr(uint32_t ipInt); uint32_t ip2uint(const char *const ip_addr); void taosIp2String(uint32_t ip, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str); +int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { MD5_CTX context; MD5Init(&context); - MD5Update(&context, inBuf, (unsigned int)inLen); + MD5Update(&context, inBuf, (uint32_t)inLen); MD5Final(&context); memcpy(target, context.digest, TSDB_KEY_LEN); } @@ -58,4 +59,4 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar } #endif -#endif /*_TD_UTIL_UTIL_H*/ +#endif /*_TD_UTIL_UTIL_H*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1737bd9def..117f908faf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1738,24 +1738,6 @@ int32_t taosCheckGlobalCfg() { return 0; } -int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { - *port = 0; - strcpy(fqdn, ep); - - char *temp = strchr(fqdn, ':'); - if (temp) { - *temp = 0; - *port = atoi(temp+1); - } - - if (*port == 0) { - *port = tsServerPort; - return -1; - } - - return 0; -} - /* * alter dnode 1 balance "vnode:1-dnode:2" */ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 00d9f31b15..8840df848f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -286,23 +286,21 @@ typedef struct SFuncObj { char pData[]; } SFuncObj; -typedef struct SShowObj SShowObj; -typedef struct SShowObj { - int8_t type; - int8_t maxReplica; - int16_t numOfColumns; - int32_t id; - int32_t rowSize; - int32_t numOfRows; - int32_t numOfReads; - uint16_t payloadLen; - void *pIter; - void *pVgIter; - SMnode *pMnode; - char db[TSDB_FULL_DB_NAME_LEN]; - int16_t offset[TSDB_MAX_COLUMNS]; - int32_t bytes[TSDB_MAX_COLUMNS]; - char payload[]; +typedef struct { + int32_t id; + int8_t type; + int8_t replica; + int16_t numOfColumns; + int32_t rowSize; + int32_t numOfRows; + int32_t numOfReads; + int32_t payloadLen; + void *pIter; + SMnode *pMnode; + char db[TSDB_FULL_DB_NAME_LEN]; + int16_t offset[TSDB_MAX_COLUMNS]; + int32_t bytes[TSDB_MAX_COLUMNS]; + char payload[]; } SShowObj; typedef struct SMnodeMsg { diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index 9bb1ab7acd..a25784e5bb 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -26,6 +26,7 @@ int32_t mndInitDnode(SMnode *pMnode); void mndCleanupDnode(SMnode *pMnode); SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); +SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndShow.h b/source/dnode/mnode/impl/inc/mndShow.h index e7a3fcd45f..37fb9159dc 100644 --- a/source/dnode/mnode/impl/inc/mndShow.h +++ b/source/dnode/mnode/impl/inc/mndShow.h @@ -28,6 +28,7 @@ void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp); void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp); void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp); void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow); +char *mndShowStr(int32_t showType); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index c75bdb5949..5509c81a2e 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -24,6 +24,7 @@ extern "C" { int32_t mndInitVgroup(SMnode *pMnode); void mndCleanupVgroup(SMnode *pMnode); +int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d62e1464a6..358b07c967 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -16,12 +16,17 @@ #define _DEFAULT_SOURCE #include "mndDnode.h" #include "mndMnode.h" +#include "mndShow.h" #include "mndTrans.h" #include "ttime.h" +#include "tutil.h" -#define SDB_DNODE_VER 1 +#define TSDB_DNODE_VER 1 +#define TSDB_CONFIG_OPTION_LEN 16 +#define TSDB_CONIIG_VALUE_LEN 48 +#define TSDB_CONFIG_NUMBER 8 -static char *offlineReason[] = { +static const char *offlineReason[] = { "", "status msg timeout", "status not received", @@ -36,16 +41,27 @@ static char *offlineReason[] = { "unknown", }; +static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"}; + static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode); -static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg); -static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg); -static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg); + +static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg); +static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg); +static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg); + +static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); +static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); int32_t mndInitDnode(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_DNODE, @@ -60,8 +76,16 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP, mndProcessConfigDnodeRsp); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg); + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndGetConfigMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndCancelGetNextConfig); + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndGetDnodeMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode); + return sdbSetTable(pMnode->pSdb, table); } @@ -84,7 +108,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { } static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, SDB_DNODE_VER, sizeof(SDnodeObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER, sizeof(SDnodeObj)); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -102,7 +126,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != SDB_DNODE_VER) { + if (sver != TSDB_DNODE_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; mError("failed to decode dnode since %s", terrstr()); return NULL; @@ -168,6 +192,12 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) { sdbRelease(pSdb, pDnode); } +SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) { + SEpSet epSet = {.inUse = 0, .numOfEps = 1, .port[0] = pDnode->port}; + memcpy(epSet.fqdn[0], pDnode->fqdn, TSDB_FQDN_LEN); + return epSet; +} + static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) { SSdb *pSdb = pMnode->pSdb; @@ -358,8 +388,384 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { return 0; } +static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) { + SDnodeObj dnodeObj = {0}; + dnodeObj.id = 1; // todo + dnodeObj.createdTime = taosGetTimestampMs(); + dnodeObj.updateTime = dnodeObj.createdTime; + taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port); + + if (dnodeObj.fqdn[0] == 0 || dnodeObj.port <= 0) { + terrno = TSDB_CODE_SDB_APP_ERROR; + mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + return terrno; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + if (pTrans == NULL) { + mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + return -1; + } + mDebug("trans:%d, used to create dnode:%s", pTrans->id, pCreate->ep); + + SSdbRaw *pRedoRaw = mndDnodeActionEncode(&dnodeObj); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); -static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { return 0; } + SSdbRaw *pUndoRaw = mndDnodeActionEncode(&dnodeObj); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { + mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); + + SSdbRaw *pCommitRaw = mndDnodeActionEncode(&dnodeObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; + + mDebug("dnode:%s, start to create", pCreate->ep); + + if (pCreate->ep[0] == 0) { + terrno = TSDB_CODE_SDB_APP_ERROR; + mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + return -1; + } + + SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, pCreate->ep); + if (pDnode != NULL) { + mError("dnode:%d, already exist", pDnode->id); + sdbRelease(pMnode->pSdb, pDnode); + terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST; + return -1; + } -static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { return 0; } + int32_t code = mndCreateDnode(pMnode, pMsg, pCreate); + + if (code != 0) { + mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); + return -1; + } + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + if (pTrans == NULL) { + mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); + return -1; + } + mDebug("trans:%d, used to drop user:%d", pTrans->id, pDnode->id); + + SSdbRaw *pRedoRaw = mndDnodeActionEncode(pDnode); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); + + SSdbRaw *pUndoRaw = mndDnodeActionEncode(pDnode); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { + mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); + + SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + + if (mndTransPrepare(pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SDropDnodeMsg *pDrop = pMsg->rpcMsg.pCont; + pDrop->dnodeId = htonl(pDrop->dnodeId); + + mDebug("dnode:%d, start to drop", pDrop->dnodeId); + + if (pDrop->dnodeId <= 0) { + terrno = TSDB_CODE_SDB_APP_ERROR; + mError("dnode:%d, failed to create since %s", pDrop->dnodeId, terrstr()); + return -1; + } + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDrop->dnodeId); + if (pDnode == NULL) { + mError("dnode:%d, not exist", pDrop->dnodeId); + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + return -1; + } + + int32_t code = mndDropDnode(pMnode, pMsg, pDnode); + + if (code != 0) { + mError("dnode:%d, failed to create since %s", pDrop->dnodeId, terrstr()); + return -1; + } + + sdbRelease(pMnode->pSdb, pDnode); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SCfgDnodeMsg *pCfg = pMsg->rpcMsg.pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId); + if (pDnode == NULL) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + mError("dnode:%d, failed to cfg since %s ", pCfg->dnodeId, terrstr()); + return -1; + } + + SEpSet epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SCfgDnodeMsg *pCfgDnode = rpcMallocCont(sizeof(SCfgDnodeMsg)); + pCfgDnode->dnodeId = htonl(pCfg->dnodeId); + memcpy(pCfgDnode->config, pCfg->config, 128); + + SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_CONFIG_DNODE_IN, .pCont = pCfgDnode, .contLen = sizeof(SCfgDnodeMsg)}; + + mInfo("dnode:%d, is configured", pCfg->dnodeId); + mndSendMsgToDnode(pMnode, &epSet, &rpcMsg); + + return 0; +} + +static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pMsg) { mInfo("cfg dnode rsp is received"); } + +static int32_t mndGetConfigMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + tstrncpy(pSchema[cols].name, "name", sizeof(pSchema[cols].name)); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + tstrncpy(pSchema[cols].name, "value", sizeof(pSchema[cols].name)); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = TSDB_CONFIG_NUMBER; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->pIter = NULL; + strcpy(pMeta->tableFname, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveConfigs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + int32_t numOfRows = 0; + char *cfgOpts[TSDB_CONFIG_NUMBER] = {0}; + char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0}; + char *pWrite; + int32_t cols = 0; + + cfgOpts[numOfRows] = "statusInterval"; + snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%d", pMnode->cfg.statusInterval); + numOfRows++; + + cfgOpts[numOfRows] = "timezone"; + snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%s", pMnode->cfg.timezone); + numOfRows++; + + cfgOpts[numOfRows] = "locale"; + snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%s", pMnode->cfg.locale); + numOfRows++; + + cfgOpts[numOfRows] = "charset"; + snprintf(cfgVals[numOfRows], TSDB_CONIIG_VALUE_LEN, "%s", pMnode->cfg.charset); + numOfRows++; + + for (int32_t i = 0; i < numOfRows; i++) { + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, cfgOpts[i], TSDB_CONFIG_OPTION_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, cfgVals[i], TSDB_CONIIG_VALUE_LEN); + cols++; + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {} + +static int32_t mndGetDnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "id"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "end point"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "vnodes"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "cores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "offline reason"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tableFname, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SDnodeObj *pDnode = NULL; + char *pWrite; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode); + if (pShow->pIter == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pDnode->id; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->ep, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pDnode->numOfVnodes; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pDnode->numOfCores; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + const char *status = dnodeStatus[pDnode->status]; + STR_TO_VARSTR(pWrite, status); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pDnode->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]); + cols++; + + numOfRows++; + sdbRelease(pSdb, pDnode); + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 747ea39237..ac33d89517 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -20,7 +20,6 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); -static char *mndShowStr(int32_t showType); static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg); static bool mndCheckRetrieveFinished(SShowObj *pShow); @@ -88,10 +87,6 @@ static void mndFreeShowObj(SShowObj *pShow) { ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type]; if (freeFp != NULL) { - if (pShow->pVgIter != NULL) { - // only used in 'show vnodes "ep"' - (*freeFp)(pMnode, pShow->pVgIter); - } if (pShow->pIter != NULL) { (*freeFp)(pMnode, pShow->pIter); } @@ -259,7 +254,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { return TSDB_CODE_SUCCESS; } -static char *mndShowStr(int32_t showType) { +char *mndShowStr(int32_t showType) { switch (showType) { case TSDB_MGMT_TABLE_ACCT: return "show accounts"; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 4dc114e17a..0be3ffc80b 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -277,7 +277,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU return 0; } -static int32_t mndDropUser(SMnode *pMnode, SUserObj *pUser, SMnodeMsg *pMsg) { +static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("user:%s, failed to drop since %s", pUser->user, terrstr()); @@ -437,7 +437,7 @@ static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) { return -1; } - int32_t code = mndDropUser(pMnode, pUser, pMsg); + int32_t code = mndDropUser(pMnode, pMsg, pUser); sdbRelease(pMnode->pSdb, pOperUser); if (code != 0) { @@ -489,7 +489,8 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p pShow->numOfRows = sdbGetSize(pSdb, SDB_USER); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - strcpy(pMeta->tableFname, "show users"); + strcpy(pMeta->tableFname, mndShowStr(pShow->type)); + return 0; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index b98468b63e..eb820fa3c2 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -14,8 +14,114 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndVgroup.h" +#include "mndDnode.h" +#include "mndShow.h" +#include "mndTrans.h" +#include "ttime.h" -int32_t mndInitVgroup(SMnode *pMnode) { return 0; } -void mndCleanupVgroup(SMnode *pMnode) {} \ No newline at end of file +static char *syncRole[] = {"unsynced", "slave", "master"}; + +static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); + +int32_t mndInitVgroup(SMnode *pMnode) { + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndGetVnodeMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode); + + return 0; +} + +void mndCleanupVgroup(SMnode *pMnode) {} + +int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { + if (dnodeId == 0) { + return 0; + } + + return 0; +} + +static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "vgId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + int32_t dnodeId = 0; + if (pShow->payloadLen > 0) { + dnodeId = atoi(pShow->payload); + } + + pShow->replica = dnodeId; + pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tableFname, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SVgObj *pVgroup = NULL; + char *pWrite; + int32_t cols = 0; + int32_t dnodeId = pShow->replica; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup); + if (pShow->pIter == NULL) break; + + for (int32_t i = 0; i < pVgroup->numOfVnodes && numOfRows < rows; ++i) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; + if (pVgid->dnodeId != dnodeId) continue; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(uint32_t *)pWrite = pVgroup->vgId; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, syncRole[pVgid->role]); + cols++; + numOfRows++; + } + + sdbRelease(pSdb, pVgroup); + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} \ No newline at end of file diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index b9d2da6939..3c3c8a1dfc 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -187,7 +187,7 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) { } char* strtolower(char *dst, const char *src) { - int esc = 0; + int32_t esc = 0; char quote = 0, *p = dst, c; assert(dst != NULL); @@ -214,7 +214,7 @@ char* strtolower(char *dst, const char *src) { } char* strntolower(char *dst, const char *src, int32_t n) { - int esc = 0; + int32_t esc = 0; char quote = 0, *p = dst, c; assert(dst != NULL); @@ -347,7 +347,7 @@ char *strbetween(char *string, char *begin, char *end) { char *_begin = strstr(string, begin); if (_begin != NULL) { char *_end = strstr(_begin + strlen(begin), end); - int size = (int)(_end - _begin); + int32_t size = (int32_t)(_end - _begin); if (_end != NULL && size > 0) { result = (char *)calloc(1, size); memcpy(result, _begin + strlen(begin), size - +strlen(begin)); @@ -402,7 +402,7 @@ int32_t taosHexStrToByteArray(char hexstr[], char bytes[]) { char *taosIpStr(uint32_t ipInt) { static char ipStrArray[3][30]; - static int ipStrIndex = 0; + static int32_t ipStrIndex = 0; char *ipStr = ipStrArray[(ipStrIndex++) % 3]; //sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); @@ -416,4 +416,17 @@ void taosIp2String(uint32_t ip, char *str) { void taosIpPort2String(uint32_t ip, uint16_t port, char *str) { sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port); +} + +int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { + *port = 0; + strcpy(fqdn, ep); + + char *temp = strchr(fqdn, ':'); + if (temp) { + *temp = 0; + *port = atoi(temp + 1); + } + + return 0; } \ No newline at end of file -- GitLab