diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 9dc15519706034845ea6ae932ecb7c6f3b1515e1..e798a0c42af1765848f3c98f4f5fdbaa41bc73aa 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -832,10 +832,8 @@ typedef struct { } SCreateDnodeMsg, SDropDnodeMsg; typedef struct { - int32_t dnodeId; - int32_t mnodeNum; - SDnodeEp mnodeEps[]; -} SCreateMnodeMsg; + int32_t dnodeId; +} SCreateMnodeMsg, SDropMnodeMsg; typedef struct { int32_t dnodeId; diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index e41d157057e2e2dc0ac57beb7896b84350ba3741..e0fcdec560df967c54106a99bd695d49f0f041b7 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -20,41 +20,23 @@ extern "C" { #endif -typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus; +typedef struct { + int64_t numOfDnode; + int64_t numOfMnode; + int64_t numOfVgroup; + int64_t numOfDatabase; + int64_t numOfSuperTable; + int64_t numOfChildTable; + int64_t numOfColumn; + int64_t totalPoints; + int64_t totalStorage; + int64_t compStorage; +} SMnodeStat; typedef struct { - /** - * Send messages to other dnodes, such as create vnode message. - * - * @param epSet, the endpoint list of the dnodes. - * @param rpcMsg, message to be sent. - */ void (*SendMsgToDnode)(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); - - /** - * Send messages to mnode, such as config message. - * - * @param rpcMsg, message to be sent. - */ void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg); - - /** - * Send redirect message to dnode or shell. - * - * @param rpcMsg, message to be sent. - * @param forShell, used to identify whether to send to shell or dnode. - */ void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); - - /** - * Get the corresponding endpoint information from dnodeId. - * - * @param dnode, the instance of dDnode module. - * @param dnodeId, the id ot dnode. - * @param ep, the endpoint of dnode. - * @param fqdn, the fqdn of dnode. - * @param port, the port of dnode. - */ void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); } SMnodeFp; @@ -64,77 +46,16 @@ typedef struct { int32_t dnodeId; } SMnodePara; -/** - * Initialize and start mnode module. - * - * @param para, initialization parameters. - * @return Error code. - */ int32_t mnodeInit(SMnodePara para); - -/** - * Stop and cleanup mnode module. - */ -void mnodeCleanup(); - -/** - * Deploy mnode instances in dnode. - * - * @return Error Code. - */ +void mnodeCleanup(); int32_t mnodeDeploy(); +void mnodeUnDeploy(); +int32_t mnodeStart(); +void mnodeStop(); -/** - * Delete the mnode instance deployed in dnode. - */ -void mnodeUnDeploy(); - -/** - * Whether the mnode is in service. - * - * @return Server status. - */ -EMnStatus mnodeGetStatus(); - -typedef struct { - int64_t numOfDnode; - int64_t numOfMnode; - int64_t numOfVgroup; - int64_t numOfDatabase; - int64_t numOfSuperTable; - int64_t numOfChildTable; - int64_t numOfColumn; - int64_t totalPoints; - int64_t totalStorage; - int64_t compStorage; -} SMnodeStat; - -/** - * Get the statistical information of Mnode. - * - * @param stat, statistical information. - * @return Error Code. - */ int32_t mnodeGetStatistics(SMnodeStat *stat); - -/** - * Get the auth information of Mnode. - * - * @param user, username. - * @param spi, security parameter index. - * @param encrypt, encrypt algorithm. - * @param secret, key for authentication. - * @param ckey, ciphering key. - * @return Error Code. - */ int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); -/** - * Interface for processing messages. - * - * @param rpcMsg, message to be processed. - * @return Error code. - */ void mnodeProcessMsg(SRpcMsg *rpcMsg); #ifdef __cplusplus diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index a9f40aad77581165ad29102e645620de4e8f935d..c4f8df79c11ecae8c7d3611ef6dfb3196bc77ee8 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -66,41 +66,12 @@ typedef struct SVnodeMsg { char pCont[]; } SVnodeMsg; -/** - * Start initialize vnode module. - * - * @return Error code. - */ int32_t vnodeInit(); +void vnodeCleanup(); -/** - * Cleanup vnode module. - */ -void vnodeCleanup(); - -/** - * Get the statistical information of vnode. - * - * @param pVnode, - * @param pStat, statistical information. - * @return Error Code. - */ int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat); - -/** - * Get the status of all vnodes. - * - * @param pVnode, - * @param status, status information. - * @return Error Code. - */ int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus); -/** - * Operation functions of vnode - * - * @return Error Code. - */ SVnode *vnodeOpen(int32_t vgId, const char *path); void vnodeClose(SVnode *pVnode); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); @@ -109,14 +80,7 @@ int32_t vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); -/** - * Interface for processing messages. - * - * @param pVnode, - * @param pMsg, message to be processed. - * - */ -int32_t vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 76c5f575a5c64bcf98065ba9bb14d2412816dcb6..0579ae46bcb48e3577c592ab881c39c6e7350bae 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -216,11 +216,13 @@ int32_t* taosGetErrno(); // dnode #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed") #define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory") -#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0402) //"No permission for disk files in dnode") -#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0403) //"Invalid message length") -#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"Action in progress") -#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0405) //"Too many vnode directories") -#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0406) //"Dnode is exiting" +#define TSDB_CODE_DND_DNODE_ID_NOT_MATCHED TAOS_DEF_ERROR_CODE(0, 0x0402) //"Dnode Id not matched") +#define TSDB_CODE_DND_MNODE_ALREADY_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed") +#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"No permission for disk files in dnode") +#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0405) //"Invalid message length") +#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress") +#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories") +#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting" // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 37dc9f05a15172abebdd5b1a4a6f3a740a22a46d..5bf5b1d56a0af9e3c5462f1bd76e959ef662dfa2 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -35,7 +35,6 @@ static struct { int8_t threadStop; pthread_t *threadId; pthread_mutex_t mutex; - MsgFp msgFp[TSDB_MSG_TYPE_MAX]; } tsDnode = {0}; int32_t dnodeGetDnodeId() { @@ -127,7 +126,7 @@ static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) { pthread_mutex_unlock(&tsDnode.mutex); } -static void dnodePrintEps() { +static void dnodePrintDnodes() { dDebug("print dnode endpoint list, num:%d", tsDnode.dnodeEps->dnodeNum); for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) { SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i]; @@ -135,7 +134,7 @@ static void dnodePrintEps() { } } -static void dnodeResetEps(SDnodeEps *pEps) { +static void dnodeResetDnodes(SDnodeEps *pEps) { assert(pEps != NULL); int32_t size = sizeof(SDnodeEps) + pEps->dnodeNum * sizeof(SDnodeEp); @@ -171,7 +170,7 @@ static void dnodeResetEps(SDnodeEps *pEps) { taosHashPut(tsDnode.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); } - dnodePrintEps(); + dnodePrintDnodes(); } static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) { @@ -189,7 +188,7 @@ static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) { return changed; } -static int32_t dnodeReadEps() { +static int32_t dnodeReadDnodes() { int32_t len = 0; int32_t maxLen = 30000; char *content = calloc(1, maxLen + 1); @@ -199,59 +198,59 @@ static int32_t dnodeReadEps() { fp = fopen(tsDnode.file, "r"); if (!fp) { dDebug("file %s not exist", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { dError("failed to read %s since content is null", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { dError("failed to read %s since dnodeId not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } tsDnode.dnodeId = atoi(dnodeId->valuestring); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } tsDnode.clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_String) { dError("failed to read %s since dropped not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } tsDnode.dropped = atoi(dropped->valuestring); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { dError("failed to read %s since dnodeInfos not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); if (dnodeInfosSize <= 0) { dError("failed to read %s since dnodeInfos size:%d invalid", tsDnode.file, dnodeInfosSize); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } tsDnode.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); if (tsDnode.dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } tsDnode.dnodeEps->dnodeNum = dnodeInfosSize; @@ -264,36 +263,36 @@ static int32_t dnodeReadEps() { cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_String) { dError("failed to read %s, dnodeId not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } pEp->dnodeId = atoi(dnodeId->valuestring); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); if (!isMnode || isMnode->type != cJSON_String) { dError("failed to read %s, isMnode not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } pEp->isMnode = atoi(isMnode->valuestring); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { dError("failed to read %s, dnodeFqdn not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } tstrncpy(pEp->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); if (!dnodePort || dnodePort->type != cJSON_String) { dError("failed to read %s, dnodePort not found", tsDnode.file); - goto PRASE_EPS_OVER; + goto PRASE_DNODE_OVER; } pEp->dnodePort = atoi(dnodePort->valuestring); } dInfo("succcessed to read file %s", tsDnode.file); - dnodePrintEps(); + dnodePrintDnodes(); -PRASE_EPS_OVER: +PRASE_DNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); @@ -303,13 +302,13 @@ PRASE_EPS_OVER: return -1; } - dnodeResetEps(tsDnode.dnodeEps); + dnodeResetDnodes(tsDnode.dnodeEps); terrno = 0; return 0; } -static int32_t dnodeWriteEps() { +static int32_t dnodeWriteDnodes() { FILE *fp = fopen(tsDnode.file, "w"); if (!fp) { dError("failed to write %s since %s", tsDnode.file, strerror(errno)); @@ -391,7 +390,7 @@ static void dnodeUpdateCfg(SDnodeCfg *pCfg) { tsDnode.dropped = pCfg->dropped; dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, pCfg->dnodeId, pCfg->clusterId); - dnodeWriteEps(); + dnodeWriteDnodes(); pthread_mutex_unlock(&tsDnode.mutex); } @@ -401,13 +400,13 @@ static void dnodeUpdateDnodeEps(SDnodeEps *pEps) { pthread_mutex_lock(&tsDnode.mutex); if (pEps->dnodeNum != tsDnode.dnodeEps->dnodeNum) { - dnodeResetEps(pEps); - dnodeWriteEps(); + dnodeResetDnodes(pEps); + dnodeWriteDnodes(); } else { int32_t size = pEps->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); if (memcmp(tsDnode.dnodeEps, pEps, size) != 0) { - dnodeResetEps(pEps); - dnodeWriteEps(); + dnodeResetDnodes(pEps); + dnodeWriteDnodes(); } } @@ -493,9 +492,9 @@ int32_t dnodeInitDnode() { return TSDB_CODE_DND_OUT_OF_MEMORY; } - int32_t code = dnodeReadEps(); + int32_t code = dnodeReadDnodes(); if (code != 0) { - dError("failed to read dnode endpoint file since %s", tstrerror(code)); + dError("failed to read file:%s since %s", tsDnode.file, tstrerror(code)); return code; } diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index 54fa847790118eb6a03404e4d3940fe74793a865..4414d57f3e6ec9ca7cb7ca5c4b0e2c9c3495c2c0 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -17,59 +17,264 @@ #include "dnodeMnode.h" #include "dnodeDnode.h" #include "dnodeTransport.h" +#include "cJSON.h" #include "mnode.h" -int32_t dnodeInitMnode() { - SMnodePara para; - para.fp.GetDnodeEp = dnodeGetDnodeEp; - para.fp.SendMsgToDnode = dnodeSendMsgToDnode; - para.fp.SendMsgToMnode = dnodeSendMsgToMnode; - para.fp.SendRedirectMsg = dnodeSendRedirectMsg; - para.dnodeId = dnodeGetDnodeId(); - para.clusterId = dnodeGetClusterId(); +static struct { + int8_t deployed; + int8_t dropped; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} tsMnode = {0}; - return mnodeInit(para); -} +static int32_t dnodeReadMnode() { + int32_t len = 0; + int32_t maxLen = 300; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; -void dnodeCleanupMnode() { mnodeCleanup(); } + fp = fopen(tsMnode.file, "r"); + if (!fp) { + dDebug("file %s not exist", tsMnode.file); + goto PRASE_MNODE_OVER; + } -static int32_t dnodeStartMnode(SRpcMsg *pMsg) { - SCreateMnodeMsg *pCfg = pMsg->pCont; - pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->mnodeNum = htonl(pCfg->mnodeNum); - for (int32_t i = 0; i < pCfg->mnodeNum; ++i) { - pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId); - pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort); + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", tsMnode.file); + goto PRASE_MNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", tsMnode.file); + goto PRASE_MNODE_OVER; } + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_String) { + dError("failed to read %s since deployed not found", tsMnode.file); + goto PRASE_MNODE_OVER; + } + tsMnode.deployed = atoi(deployed->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", tsMnode.file); + goto PRASE_MNODE_OVER; + } + tsMnode.dropped = atoi(dropped->valuestring); + + dInfo("succcessed to read file %s", tsMnode.file); + +PRASE_MNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + return 0; +} + +static int32_t dnodeWriteMnode() { + FILE *fp = fopen(tsMnode.file, "w"); + if (!fp) { + dError("failed to write %s since %s", tsMnode.file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 300; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.dropped); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsMnode.dropped); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + dInfo("successed to write %s", tsMnode.file); + return 0; +} + +static int32_t dnodeStartMnode(SCreateMnodeMsg *pCfg) { + int32_t code = 0; + if (pCfg->dnodeId != dnodeGetDnodeId()) { - dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); - return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + code = TSDB_CODE_DND_DNODE_ID_NOT_MATCHED; + dError("failed to start mnode since %s", tstrerror(code)); + return code; } - if (mnodeGetStatus() == MN_STATUS_READY) return 0; + if (tsMnode.dropped) { + code = TSDB_CODE_DND_MNODE_ALREADY_DROPPED; + dError("failed to start mnode since %s", tstrerror(code)); + return code; + } + + if (tsMnode.deployed) { + dError("failed to start mnode since its already deployed"); + return 0; + } - return mnodeDeploy(); + tsMnode.deployed = 1; + tsMnode.dropped = 0; + + code = dnodeWriteMnode(); + if (code != 0) { + tsMnode.deployed = 0; + dError("failed to start mnode since %s", tstrerror(code)); + return code; + } + + code = mnodeDeploy(); + if (code != 0) { + tsMnode.deployed = 0; + dError("failed to start mnode since %s", tstrerror(code)); + return code; + } + + code = mnodeStart(); + if (code != 0) { + tsMnode.deployed = 0; + dError("failed to start mnode since %s", tstrerror(code)); + return code; + } + + tsMnode.deployed = 1; + return 0; } -void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { - int32_t code = dnodeStartMnode(pMsg); +static int32_t dnodeDropMnode(SDropMnodeMsg *pCfg) { + int32_t code = 0; + + if (pCfg->dnodeId != dnodeGetDnodeId()) { + code = TSDB_CODE_DND_DNODE_ID_NOT_MATCHED; + dError("failed to drop mnode since %s", tstrerror(code)); + return code; + } + + if (tsMnode.dropped) { + code = TSDB_CODE_DND_MNODE_ALREADY_DROPPED; + dError("failed to drop mnode since %s", tstrerror(code)); + return code; + } + + if (!tsMnode.deployed) { + dError("failed to drop mnode since not deployed"); + return 0; + } + + mnodeStop(); + + tsMnode.deployed = 0; + tsMnode.dropped = 1; + + code = dnodeWriteMnode(); + if (code != 0) { + tsMnode.deployed = 1; + tsMnode.dropped = 0; + dError("failed to drop mnode since %s", tstrerror(code)); + return code; + } + + mnodeUnDeploy(); + + tsMnode.deployed = 0; + return 0; +} - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; +static void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { + SCreateMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + int32_t code = dnodeStartMnode(pCfg); + SRpcMsg rspMsg = {.handle = pMsg->handle, .code = code}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); } -void dnodeProcessDropMnodeReq(SRpcMsg *pMsg) { - int32_t code = dnodeStartMnode(pMsg); - - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; +static void dnodeProcessDropMnodeReq(SRpcMsg *pMsg) { + SDropMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + int32_t code = dnodeDropMnode(pCfg); + SRpcMsg rspMsg = {.handle = pMsg->handle, .code = code}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); } +static bool dnodeNeedDeployMnode() { + if (dnodeGetDnodeId() > 0) return false; + if (dnodeGetClusterId() > 0) return false; + if (strcmp(tsFirst, tsLocalEp) != 0) return false; + return true; +} + +int32_t dnodeInitMnode() { + tsMnode.dropped = 0; + tsMnode.deployed = 0; + snprintf(tsMnode.file, sizeof(tsMnode.file), "%s/mnode.json", tsDnodeDir); + + SMnodePara para; + para.fp.GetDnodeEp = dnodeGetDnodeEp; + para.fp.SendMsgToDnode = dnodeSendMsgToDnode; + para.fp.SendMsgToMnode = dnodeSendMsgToMnode; + para.fp.SendRedirectMsg = dnodeSendRedirectMsg; + para.dnodeId = dnodeGetDnodeId(); + para.clusterId = dnodeGetClusterId(); + + int32_t code = mnodeInit(para); + if (code != 0) { + dError("failed to init mnode module since %s", tstrerror(code)); + return code; + } + + code = dnodeReadMnode(); + if (code != 0) { + dError("failed to read file:%s since %s", tsMnode.file, tstrerror(code)); + return code; + } + + if (tsMnode.dropped) { + dError("mnode already dropped, undeploy it"); + mnodeUnDeploy(); + return 0; + } + + if (!tsMnode.deployed) { + bool needDeploy = dnodeNeedDeployMnode(); + if (needDeploy) { + code = mnodeDeploy(); + } else { + return 0; + } + + if (code != 0) { + dError("failed to deploy mnode since %s", tstrerror(code)); + return code; + } + + tsMnode.deployed = 1; + } + + return mnodeStart(); +} + +void dnodeCleanupMnode() { + if (tsMnode.deployed) { + mnodeStop(); + } + + mnodeCleanup(); +} + void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { switch (pMsg->msgType) { case TSDB_MSG_TYPE_CREATE_MNODE_IN: diff --git a/source/dnode/mnode/inc/mnodeInt.h b/source/dnode/mnode/inc/mnodeInt.h index 5b552e5c51cda2674aa16d8dc8322125a10249e1..5fcd7173eea6effbb5a2821f9d676159d1132287 100644 --- a/source/dnode/mnode/inc/mnodeInt.h +++ b/source/dnode/mnode/inc/mnodeInt.h @@ -22,6 +22,8 @@ extern "C" { #endif +typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus; + tmr_h mnodeGetTimer(); int32_t mnodeGetDnodeId(); int64_t mnodeGetClusterId(); diff --git a/source/dnode/mnode/src/mondeInt.c b/source/dnode/mnode/src/mondeInt.c index 2b17da247512c97700008987fabe9021a05494f3..669273d8dc8cc8adc6982813e448642ed6957f03 100644 --- a/source/dnode/mnode/src/mondeInt.c +++ b/source/dnode/mnode/src/mondeInt.c @@ -250,3 +250,6 @@ void mnodeCleanup() { mInfo("mnode is cleaned up"); } } + +int32_t mnodeStart() { return 0; } +void mnodeStop() {} \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 7a395706e947ff3f01226e210f8039159cfefc0c..6f83542ceff8ab2831899965f34be78006d542bf 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -30,4 +30,4 @@ int32_t vnodeDrop(SVnode *pVnode) { return 0; } int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } -int32_t vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) { return 0; } +void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8e5d7a47fd9797d5aba796ab809859238e79dbf0..22fbeb1883b7bb744a0fb21ca9dbb525054e69c1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -228,6 +228,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists" // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, "Dnode out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_ID_NOT_MATCHED, "Dnode Id not matched") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ALREADY_DROPPED, "Mnode already deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, "No permission for disk files in dnode") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")