diff --git a/include/os/osDef.h b/include/os/osDef.h index 2b3678ac6785f2f41eded008a80dda62a9f93362..bb5395f548fc418256fe71702c66c9f8683a8484 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -178,6 +178,12 @@ extern "C" { #define setThreadName(name) #endif +#if defined(_WIN32) +#define TD_DIRSEP "\\" +#else +#define TD_DIRSEP "/" +#endif + #ifdef __cplusplus } #endif diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index 9bc36cf4ae93a1c87255821bc3ba737b90c95591..9eedb62b605216b798722e95c3cae3823d9a1af2 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -20,6 +20,8 @@ extern "C" { #endif +typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus; + typedef struct { /** * Send messages to other dnodes, such as create vnode message. @@ -44,23 +46,12 @@ typedef struct { */ void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); - /** - * Get the corresponding endpoint information from dnodeId. - * - * @param dnode, the instance of dDnode module. - * @param dnodeId, the id ot dnode. - * @param ep, the endpoint of dnode. - * @param fqdn, the fqdn of dnode. - * @param port, the port of dnode. - */ - void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); - } SMnodeFp; typedef struct { - SMnodeFp fp; - char clusterId[TSDB_CLUSTER_ID_LEN]; - int32_t dnodeId; + SMnodeFp fp; + char clusterId[TSDB_CLUSTER_ID_LEN]; + int32_t dnodeId; } SMnodePara; /** @@ -93,7 +84,7 @@ void mnodeUnDeploy(); * * @return Server status. */ -bool mnodeIsServing(); +EMnStatus mnodeIsServing(); typedef struct { int64_t numOfDnode; diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c index 84a7e2565b3a636c9ae5f2d89c5b292137129940..8b7f2f04456ee6d752953a21ec1f7c5f5e526926 100644 --- a/source/server/dnode/src/dnodeInt.c +++ b/source/server/dnode/src/dnodeInt.c @@ -46,7 +46,6 @@ static int32_t dnodeInitVnodeModule(void **unused) { static int32_t dnodeInitMnodeModule(void **unused) { SMnodePara para; - para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendRedirectMsg = dnodeSendRedirectMsg; diff --git a/source/server/mnode/CMakeLists.txt b/source/server/mnode/CMakeLists.txt index 331a717bf581616c8e2a4a6c9b610d69c7c6f12d..bf35b381afabd96901c064d4be2e67f5dd743a37 100644 --- a/source/server/mnode/CMakeLists.txt +++ b/source/server/mnode/CMakeLists.txt @@ -8,4 +8,5 @@ target_include_directories( target_link_libraries( mnode PUBLIC transport + PUBLIC cjson ) \ No newline at end of file diff --git a/source/server/mnode/inc/mnodeDef.h b/source/server/mnode/inc/mnodeDef.h index 3ac204f4b6b3fa14ef88e2be1a83fad8c923e17c..7ea3d1567d3e25b40ee623eaef731dbf4d7264ce 100644 --- a/source/server/mnode/inc/mnodeDef.h +++ b/source/server/mnode/inc/mnodeDef.h @@ -20,6 +20,7 @@ #include "tlog.h" #include "trpc.h" #include "ttimer.h" +#include "cJSON.h" #include "mnode.h" #ifdef __cplusplus @@ -94,15 +95,15 @@ typedef enum { MN_AUTH_MAX } EMnAuthOp; -typedef enum { - MN_STATUS_UNINIT = 0, - MN_STATUS_INIT = 1, - MN_STATUS_READY = 2, - MN_STATUS_CLOSING = 3 -} EMnStatus; - +typedef enum { MN_SDB_STAT_AVAIL = 0, MN_SDB_STAT_DROPPED = 1 } EMnSdbStat; +typedef struct { + int8_t type; + int8_t status; + int8_t align[6]; +} SdbHead; typedef struct SClusterObj { + SdbHead head; int64_t id; char uid[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; @@ -110,6 +111,7 @@ typedef struct SClusterObj { } SClusterObj; typedef struct SDnodeObj { + SdbHead head; int32_t id; int32_t vnodes; int64_t createdTime; @@ -126,6 +128,7 @@ typedef struct SDnodeObj { } SDnodeObj; typedef struct SMnodeObj { + SdbHead head; int32_t id; int8_t status; int8_t role; @@ -169,6 +172,7 @@ typedef struct { } SAcctInfo; typedef struct SAcctObj { + SdbHead head; char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; @@ -179,6 +183,7 @@ typedef struct SAcctObj { } SAcctObj; typedef struct SUserObj { + SdbHead head; char user[TSDB_USER_LEN]; char pass[TSDB_KEY_LEN]; char acct[TSDB_USER_LEN]; @@ -212,6 +217,7 @@ typedef struct { } SDbCfg; typedef struct SDbObj { + SdbHead head; char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char acct[TSDB_USER_LEN]; int64_t createdTime; @@ -255,6 +261,7 @@ typedef struct SVgObj { } SVgObj; typedef struct SSTableObj { + SdbHead head; char tableId[TSDB_TABLE_NAME_LEN]; uint64_t uid; int64_t createdTime; @@ -265,6 +272,7 @@ typedef struct SSTableObj { } SSTableObj; typedef struct SFuncObj { + SdbHead head; char name[TSDB_FUNC_NAME_LEN]; char path[128]; int32_t contLen; diff --git a/source/server/mnode/inc/mnodeInt.h b/source/server/mnode/inc/mnodeInt.h index 77cc8e4024c8ce3619b239059f82a96647806432..913e39ea5a53a10c743c0d250043177e4231efdb 100644 --- a/source/server/mnode/inc/mnodeInt.h +++ b/source/server/mnode/inc/mnodeInt.h @@ -22,10 +22,10 @@ extern "C" { #endif -tmr_h mnodeGetTimer(); -int32_t mnodeGetDnodeId(); -char *mnodeGetClusterId(); -bool mnodeIsRunning(); +tmr_h mnodeGetTimer(); +int32_t mnodeGetDnodeId(); +char *mnodeGetClusterId(); +EMnStatus mnodeIsServing(); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); diff --git a/source/server/mnode/inc/mnodeSdb.h b/source/server/mnode/inc/mnodeSdb.h index 6370a832aebbd095391087907fb3bcdabe883909..cba9538ac259c840642ade0399f3e1239c79c535 100644 --- a/source/server/mnode/inc/mnodeSdb.h +++ b/source/server/mnode/inc/mnodeSdb.h @@ -22,14 +22,28 @@ extern "C" { #endif -int32_t mnodeInitSdb(); -void mnodeCleanupSdb(); +typedef void (*SdbDeployFp)(); +typedef void *(*SdbDecodeFp)(cJSON *root); +typedef int32_t (*SdbEncodeFp)(void *pHead, char *buf, int32_t maxLen); -int32_t mnodeDeploySdb(); -void mnodeUnDeploySdb(); +int32_t sdbInit(); +void sdbCleanup(); -int32_t mnodeReadSdb(); -int32_t mnodeCommitSdb(); +int32_t sdbRead(); +int32_t sdbCommit(); + +int32_t sdbDeploy(); +void sdbUnDeploy(); + +void *sdbInsertRow(EMnSdb sdb, void *pObj); +void sdbDeleteRow(EMnSdb sdb, void *pHead); +void *sdbUpdateRow(EMnSdb sdb, void *pHead); +void *sdbGetRow(EMnSdb sdb, void *pKey); +void *sdbFetchRow(EMnSdb sdb, void *pIter); +void sdbCancelFetch(EMnSdb sdb, void *pIter); +int32_t sdbGetCount(EMnSdb sdb); + +void sdbSetFp(EMnSdb, EMnKey, SdbDeployFp, SdbEncodeFp, SdbDecodeFp, int32_t dataSize); #ifdef __cplusplus } diff --git a/source/server/mnode/src/mnodeAcct.c b/source/server/mnode/src/mnodeAcct.c index 1d63db8554b356d87293b31ee91859764baf5ebd..6807e56ea30bda39b063ca59438be60da90f45c7 100644 --- a/source/server/mnode/src/mnodeAcct.c +++ b/source/server/mnode/src/mnodeAcct.c @@ -15,7 +15,62 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "mnodeInt.h" +#include "mnodeSdb.h" -int32_t mnodeInitAcct() { return 0; } -void mnodeCleanupAcct() {} +static void mnodeCreateDefaultAcct() { + int32_t code = TSDB_CODE_SUCCESS; + + SAcctObj acctObj = {0}; + tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN); + acctObj.cfg = (SAcctCfg){.maxUsers = 128, + .maxDbs = 128, + .maxTimeSeries = INT32_MAX, + .maxConnections = 1024, + .maxStreams = 1000, + .maxPointsPerSecond = 10000000, + .maxStorage = INT64_MAX, + .maxQueryTime = INT64_MAX, + .maxInbound = 0, + .maxOutbound = 0, + .accessState = TSDB_VN_ALL_ACCCESS}; + acctObj.acctId = 1; + acctObj.createdTime = taosGetTimestampMs(); + + sdbInsertRow(MN_SDB_ACCT, &acctObj); +} + +int32_t mnodeEncodeAcct(SAcctObj *pAcct, char *buf, int32_t maxLen) { + int32_t len = 0; + + len += snprintf(buf + len, maxLen - len, "{\"type\":%d, ", MN_SDB_ACCT); + len += snprintf(buf + len, maxLen - len, "\"acctId\":\"%d\", ", pAcct->acctId); + len += snprintf(buf + len, maxLen - len, "\"maxUsers\":\"%d\", ", pAcct->cfg.maxUsers); + len += snprintf(buf + len, maxLen - len, "\"maxDbs\":\"%d\", ", pAcct->cfg.maxDbs); + len += snprintf(buf + len, maxLen - len, "\"maxTimeSeries\":\"%d\", ", pAcct->cfg.maxTimeSeries); + len += snprintf(buf + len, maxLen - len, "\"maxConnections\":\"%d\", ", pAcct->cfg.maxConnections); + len += snprintf(buf + len, maxLen - len, "\"maxStreams\":\"%d\", ", pAcct->cfg.maxStreams); + len += snprintf(buf + len, maxLen - len, "\"maxPointsPerSecond\":\"%d\", ", pAcct->cfg.maxPointsPerSecond); + len += snprintf(buf + len, maxLen - len, "\"maxUsers\":\"%" PRIu64 "\", ", pAcct->cfg.maxStorage); + len += snprintf(buf + len, maxLen - len, "\"maxQueryTime\":\"%" PRIu64 "\", ", pAcct->cfg.maxQueryTime); + len += snprintf(buf + len, maxLen - len, "\"maxInbound\"\":%" PRIu64 "\", ", pAcct->cfg.maxInbound); + len += snprintf(buf + len, maxLen - len, "\"maxOutbound\":\"%" PRIu64 "\", ", pAcct->cfg.maxOutbound); + len += snprintf(buf + len, maxLen - len, "\"accessState\":\"%d\", ", pAcct->cfg.accessState); + len += snprintf(buf + len, maxLen - len, "\"createdTime\":\"%" PRIu64 "\", ", pAcct->createdTime); + len += snprintf(buf + len, maxLen - len, "\"updateTime\":\"%" PRIu64 "\"}\n", pAcct->updateTime); + + return len; +} + +SAcctObj *mnodeDecodeAcct(cJSON *root) { + SAcctObj *pAcct = calloc(1, sizeof(SAcctObj)); + return pAcct; +} + +int32_t mnodeInitAcct() { + sdbSetFp(MN_SDB_ACCT, MN_KEY_BINARY, mnodeCreateDefaultAcct, (SdbEncodeFp)mnodeEncodeAcct, + (SdbDecodeFp)(mnodeDecodeAcct), sizeof(SAcctObj)); + + return 0; +} + +void mnodeCleanupAcct() {} diff --git a/source/server/mnode/src/mnodeSdb.c b/source/server/mnode/src/mnodeSdb.c index f7a17c2ecaba592ad4f3a2019c6eb761607ddec6..ea97ab765381121465246c3a4d649dcde396cf6f 100644 --- a/source/server/mnode/src/mnodeSdb.c +++ b/source/server/mnode/src/mnodeSdb.c @@ -15,14 +15,222 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "mnodeInt.h" +#include "thash.h" +#include "tglobal.h" +#include "cJSON.h" +#include "mnodeSdb.h" -int32_t mnodeInitSdb() { return 0; } -void mnodeCleanupSdb() {} +static struct { + char currDir[PATH_MAX]; + char backDir[PATH_MAX]; + char tmpDir[PATH_MAX]; + int64_t version; + EMnKey hashKey[MN_SDB_MAX]; + int32_t dataSize[MN_SDB_MAX]; + SHashObj *hashObj[MN_SDB_MAX]; + SdbDeployFp deployFp[MN_SDB_MAX]; + SdbEncodeFp encodeFp[MN_SDB_MAX]; + SdbDecodeFp decodeFp[MN_SDB_MAX]; +} tsSdb = {0}; -int32_t mnodeDeploySdb() { +static int32_t sdbCreateDir() { + if (!taosMkDir(tsSdb.currDir)) { + mError("failed to create dir:%s", tsSdb.currDir); + return TAOS_SYSTEM_ERROR(errno); + } - // if (!taosMkDir()) + if (!taosMkDir(tsSdb.backDir)) { + mError("failed to create dir:%s", tsSdb.backDir); + return -1; + } + + if (!taosMkDir(tsSdb.tmpDir)) { + mError("failed to create dir:%s", tsSdb.tmpDir); + return -1; + } + + return 0; +} + +static int32_t sdbRunDeployFp() { + for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) { + SdbDeployFp fp = tsSdb.deployFp[i]; + if (fp) { + (*fp)(); + } + } + + return 0; +} + +static int32_t sdbReadVersion(cJSON *root) { + cJSON *ver = cJSON_GetObjectItem(root, "version"); + if (!ver || ver->type != cJSON_String) { + mError("failed to parse version since version not found"); + return -1; + } + + tsSdb.version = (int64_t)atoll(ver->valuestring); + mTrace("parse version success, version:%" PRIu64, tsSdb.version); + + return 0; +} + +static void sdbWriteVersion(FileFd fd) { + char content[128]; + int32_t len = + snprintf(content, sizeof(content), "{\"type\":0, \"version\":\"%" PRIu64 "\", \"updateTime\":\"%" PRIu64 "\"}\n", + tsSdb.version, taosGetTimestampMs()); + taosWriteFile(fd, content, len); +} + +static int32_t sdbReadDataFile() { + ssize_t _bytes = 0; + size_t len = 4096; + char *line = calloc(1, len); + int32_t code = -1; + FILE *fp = NULL; + cJSON *root = NULL; + + char file[PATH_MAX + 20]; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + fp = fopen(file, "r"); + if (!fp) { + mError("failed to open file:%s for read since %s", file, strerror(errno)); + goto PARSE_SDB_DATA_ERROR; + } + + while (!feof(fp)) { + memset(line, 0, len); + _bytes = tgetline(&line, &len, fp); + if (_bytes < 0) { + break; + } + + line[len - 1] = 0; + if (len <= 10) continue; + + root = cJSON_Parse(line); + if (root == NULL) { + mError("failed to parse since invalid json format, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + cJSON *type = cJSON_GetObjectItem(root, "type"); + if (!type || type->type != cJSON_Number) { + mError("failed to parse since invalid type not found, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + if (type->valueint >= MN_SDB_MAX || type->valueint < MN_SDB_START) { + mError("failed to parse since invalid type, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + if (type->valueint == MN_SDB_START) { + if (sdbReadVersion(root) != 0) { + mError("failed to parse version, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + cJSON_Delete(root); + root = NULL; + continue; + } + + SdbDecodeFp func = tsSdb.decodeFp[type->valueint]; + SdbHead *pHead = (*func)(root); + if (pHead == NULL) { + mError("failed to parse since decode error, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + sdbInsertRow(pHead->type, pHead); + cJSON_Delete(root); + root = NULL; + } + + code = 0; + +PARSE_SDB_DATA_ERROR: + tfree(line); + fclose(fp); + cJSON_Delete(root); + + return code; +} + +static int32_t sdbWriteDataFile() { + char file[PATH_MAX + 20] = {0}; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + FileFd fd = taosOpenFileCreateWrite(file); + if (fd <= 0) { + mError("failed to open file:%s for write since %s", file, strerror(errno)); + return -1; + } + + int32_t len; + int32_t maxLen = 10240; + char *buf = malloc(maxLen); + + for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObj[i]; + if (!hash) continue; + + SdbEncodeFp fp = tsSdb.encodeFp[i]; + if (!fp) continue; + + SdbHead *pHead = taosHashIterate(hash, NULL); + while (pHead != NULL) { + len = (*fp)(pHead, buf, maxLen); + if (len >= 0) { + taosWriteFile(fd, buf, len); + } + + pHead = taosHashIterate(hash, pHead); + } + } + + sdbWriteVersion(fd); + taosFsyncFile(fd); + taosCloseFile(fd); + + mInfo("write file:%s successfully", file); + return 0; +} + +int32_t sdbCommit() { + int32_t code = sdbWriteDataFile(); + if (code != 0) { + return code; + } + + return 0; +} + +int32_t sdbRead() { + int32_t code = sdbReadDataFile(); + if (code != 0) { + return code; + } + + mInfo("read sdb file successfully"); + return -1; +} + +int32_t sdbDeploy() { + if (sdbCreateDir() != 0) { + return -1; + } + + if (sdbRunDeployFp() != 0) { + return -1; + } + + if (sdbCommit() != 0) { + return -1; + } + + // if (!taosMkDir()) // if (pMinfos == NULL) { // first deploy // tsMint.dnodeId = 1; // bool getuid = taosGetSystemUid(tsMint.clusterId); @@ -42,6 +250,142 @@ int32_t mnodeDeploySdb() { return 0; } -void mnodeUnDeploySdb() {} -int32_t mnodeReadSdb() { return 0; } -int32_t mnodeCommitSdb() { return 0; } \ No newline at end of file +void sdbUnDeploy() {} + +int32_t sdbInit() { + snprintf(tsSdb.currDir, PATH_MAX, "%s%scurrent%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + snprintf(tsSdb.backDir, PATH_MAX, "%s%sbackup%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + snprintf(tsSdb.tmpDir, PATH_MAX, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + + for (int32_t i = 0; i < MN_SDB_MAX; ++i) { + int32_t type; + if (tsSdb.hashKey[i] == MN_KEY_INT32) { + type = TSDB_DATA_TYPE_INT; + } else if (tsSdb.hashKey[i] == MN_KEY_INT64) { + type = TSDB_DATA_TYPE_BIGINT; + } else { + type = TSDB_DATA_TYPE_BINARY; + } + + SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); + if (hash == NULL) { + return -1; + } + + tsSdb.hashObj[i] = hash; + } + + return 0; +} + +void sdbCleanup() { + for (int32_t i = 0; i < MN_SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObj[i]; + if (hash != NULL) { + taosHashCleanup(hash); + } + tsSdb.hashObj[i] = NULL; + } +} + +void sdbSetFp(EMnSdb sdb, EMnKey keyType, SdbDeployFp deployFp, SdbEncodeFp encodeFp, SdbDecodeFp decodeFp, + int32_t dataSize) { + tsSdb.deployFp[sdb] = deployFp; + tsSdb.encodeFp[sdb] = encodeFp; + tsSdb.decodeFp[sdb] = decodeFp; + tsSdb.dataSize[sdb] = dataSize; + tsSdb.hashKey[sdb] = keyType; +} + +static SHashObj *sdbGetHash(int32_t sdb) { + if (sdb >= MN_SDB_MAX || sdb <= MN_SDB_START) { + return NULL; + } + + SHashObj *hash = tsSdb.hashObj[sdb]; + if (hash == NULL) { + return NULL; + } + + return hash; +} + +void *sdbInsertRow(EMnSdb sdb, void *p) { + SdbHead *pHead = p; + pHead->type = sdb; + pHead->status = MN_SDB_STAT_AVAIL; + + char *pKey = (char *)pHead + sizeof(pHead); + int32_t keySize; + EMnKey keyType = tsSdb.hashKey[pHead->type]; + int32_t dataSize = tsSdb.dataSize[pHead->type]; + + SHashObj *hash = sdbGetHash(pHead->type); + if (hash == NULL) { + return NULL; + } + + if (keyType == MN_KEY_INT32) { + keySize = sizeof(int32_t); + } else if (keyType == MN_KEY_BINARY) { + keySize = strlen(pKey) + 1; + } else { + keySize = sizeof(int64_t); + } + + taosHashPut(hash, pKey, keySize, pHead, dataSize); + return taosHashGet(hash, pKey, keySize); +} + +void sdbDeleteRow(EMnSdb sdb, void *p) { + SdbHead *pHead = p; + pHead->status = MN_SDB_STAT_DROPPED; +} + +void *sdbUpdateRow(EMnSdb sdb, void *pHead) { return sdbInsertRow(sdb, pHead); } + +void *sdbGetRow(EMnSdb sdb, void *pKey) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return NULL; + } + + int32_t keySize; + EMnKey keyType = tsSdb.hashKey[sdb]; + + if (keyType == MN_KEY_INT32) { + keySize = sizeof(int32_t); + } else if (keyType == MN_KEY_BINARY) { + keySize = strlen(pKey) + 1; + } else { + keySize = sizeof(int64_t); + } + + return taosHashGet(hash, pKey, keySize); +} + +void *sdbFetchRow(EMnSdb sdb, void *pIter) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return NULL; + } + + return taosHashIterate(hash, pIter); +} + +void sdbCancelFetch(EMnSdb sdb, void *pIter) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return; + } + + taosHashCancelIterate(hash, pIter); +} + +int32_t sdbGetCount(EMnSdb sdb) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return 0; + } + return taosHashGetSize(hash); +} \ No newline at end of file diff --git a/source/server/mnode/src/mondeInt.c b/source/server/mnode/src/mondeInt.c index ac53cac7f29d960ed4f587901fe0fd1b418bb412..13dba2c944edb421eac2c2fc004b335087c560d0 100644 --- a/source/server/mnode/src/mondeInt.c +++ b/source/server/mnode/src/mondeInt.c @@ -51,7 +51,7 @@ int32_t mnodeGetDnodeId() { return tsMint.dnodeId; } char *mnodeGetClusterId() { return tsMint.clusterId; } -bool mnodeIsServing() { return tsMint.state == MN_STATUS_READY; } +EMnStatus mnodeIsServing() { return tsMint.state; } void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { (*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg); @@ -68,7 +68,6 @@ static int32_t mnodeSetPara(SMnodePara para) { tsMint.dnodeId = para.dnodeId; strncpy(tsMint.clusterId, para.clusterId, TSDB_CLUSTER_ID_LEN); - if (tsMint.fp.GetDnodeEp == NULL) return -1; if (tsMint.fp.SendMsgToDnode == NULL) return -1; if (tsMint.fp.SendMsgToMnode == NULL) return -1; if (tsMint.fp.SendRedirectMsg == NULL) return -1; @@ -96,7 +95,7 @@ static int32_t mnodeInitStep1() { struct SSteps *steps = taosStepInit(16, NULL); if (steps == NULL) return -1; - taosStepAdd(steps, "mnode-sdb", mnodeInitSdb, mnodeCleanupSdb); + taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup); taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster); taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode); taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode); @@ -177,11 +176,12 @@ int32_t mnodeDeploy() { } void mnodeUnDeploy() { - mnodeUnDeploySdb(); + sdbUnDeploy(); mnodeCleanup(); } int32_t mnodeInit(SMnodePara para) { + mDebugFlag = 207; if (tsMint.state != MN_STATUS_UNINIT) { return 0; } else { @@ -202,10 +202,10 @@ int32_t mnodeInit(SMnodePara para) { return -1; } - code = mnodeReadSdb(); + code = sdbRead(); if (code != 0) { if (mnodeNeedDeploy()) { - code = mnodeDeploySdb(); + code = sdbDeploy(); if (code != 0) { mnodeCleanupStep1(); tsMint.state = MN_STATUS_UNINIT;