diff --git a/include/os/osDef.h b/include/os/osDef.h index 2b3678ac6785f2f41eded008a80dda62a9f93362..bb5395f548fc418256fe71702c66c9f8683a8484 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -178,6 +178,12 @@ extern "C" { #define setThreadName(name) #endif +#if defined(_WIN32) +#define TD_DIRSEP "\\" +#else +#define TD_DIRSEP "/" +#endif + #ifdef __cplusplus } #endif diff --git a/source/server/mnode/CMakeLists.txt b/source/server/mnode/CMakeLists.txt index 331a717bf581616c8e2a4a6c9b610d69c7c6f12d..bf35b381afabd96901c064d4be2e67f5dd743a37 100644 --- a/source/server/mnode/CMakeLists.txt +++ b/source/server/mnode/CMakeLists.txt @@ -8,4 +8,5 @@ target_include_directories( target_link_libraries( mnode PUBLIC transport + PUBLIC cjson ) \ No newline at end of file diff --git a/source/server/mnode/inc/mnodeDef.h b/source/server/mnode/inc/mnodeDef.h index 3ac204f4b6b3fa14ef88e2be1a83fad8c923e17c..256c3af680732fed7100378fbf13091235fa7eb8 100644 --- a/source/server/mnode/inc/mnodeDef.h +++ b/source/server/mnode/inc/mnodeDef.h @@ -20,6 +20,7 @@ #include "tlog.h" #include "trpc.h" #include "ttimer.h" +#include "cJSON.h" #include "mnode.h" #ifdef __cplusplus @@ -94,15 +95,16 @@ typedef enum { MN_AUTH_MAX } EMnAuthOp; -typedef enum { - MN_STATUS_UNINIT = 0, - MN_STATUS_INIT = 1, - MN_STATUS_READY = 2, - MN_STATUS_CLOSING = 3 -} EMnStatus; +typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus; +typedef enum { MN_SDB_STAT_AVAIL = 0, MN_SDB_STAT_DROPPED = 1 } EMnSdbStat; +typedef struct { + int8_t type; + int8_t status; +} SdbHead; typedef struct SClusterObj { + SdbHead head; int64_t id; char uid[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; @@ -110,6 +112,7 @@ typedef struct SClusterObj { } SClusterObj; typedef struct SDnodeObj { + SdbHead head; int32_t id; int32_t vnodes; int64_t createdTime; @@ -126,6 +129,7 @@ typedef struct SDnodeObj { } SDnodeObj; typedef struct SMnodeObj { + SdbHead head; int32_t id; int8_t status; int8_t role; @@ -169,6 +173,7 @@ typedef struct { } SAcctInfo; typedef struct SAcctObj { + SdbHead head; char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; @@ -179,6 +184,7 @@ typedef struct SAcctObj { } SAcctObj; typedef struct SUserObj { + SdbHead head; char user[TSDB_USER_LEN]; char pass[TSDB_KEY_LEN]; char acct[TSDB_USER_LEN]; @@ -212,6 +218,7 @@ typedef struct { } SDbCfg; typedef struct SDbObj { + SdbHead head; char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char acct[TSDB_USER_LEN]; int64_t createdTime; @@ -255,6 +262,7 @@ typedef struct SVgObj { } SVgObj; typedef struct SSTableObj { + SdbHead head; char tableId[TSDB_TABLE_NAME_LEN]; uint64_t uid; int64_t createdTime; @@ -265,6 +273,7 @@ typedef struct SSTableObj { } SSTableObj; typedef struct SFuncObj { + SdbHead head; char name[TSDB_FUNC_NAME_LEN]; char path[128]; int32_t contLen; diff --git a/source/server/mnode/inc/mnodeSdb.h b/source/server/mnode/inc/mnodeSdb.h index 6370a832aebbd095391087907fb3bcdabe883909..e58ab7243a0117bb740a1008f80f32627b6acaa8 100644 --- a/source/server/mnode/inc/mnodeSdb.h +++ b/source/server/mnode/inc/mnodeSdb.h @@ -22,14 +22,28 @@ extern "C" { #endif -int32_t mnodeInitSdb(); -void mnodeCleanupSdb(); +typedef void (*SdbDeployFp)(); +typedef void (*SdbEncodeFp)(void *pHead, FileFd fd); +typedef void *(*SdbDecodeFp)(cJSON *root); -int32_t mnodeDeploySdb(); -void mnodeUnDeploySdb(); +int32_t sdbInit(); +void sdbCleanup(); -int32_t mnodeReadSdb(); -int32_t mnodeCommitSdb(); +int32_t sdbRead(); +int32_t sdbCommit(); + +int32_t sdbDeploy(); +void sdbUnDeploy(); + +void *sdbInsertRow(EMnSdb sdb, void *pObj); +void sdbDeleteRow(EMnSdb sdb, void *pHead); +void *sdbUpdateRow(EMnSdb sdb, void *pHead); +void *sdbGetRow(EMnSdb sdb, void *pKey); +void *sdbFetchRow(EMnSdb sdb, void *pIter); +void sdbCancelFetch(EMnSdb sdb, void *pIter); +int32_t sdbGetCount(EMnSdb sdb); + +void sdbAddFp(EMnSdb, SdbDeployFp, SdbEncodeFp, SdbDecodeFp, int32_t dataSize); #ifdef __cplusplus } diff --git a/source/server/mnode/src/mnodeSdb.c b/source/server/mnode/src/mnodeSdb.c index f7a17c2ecaba592ad4f3a2019c6eb761607ddec6..7972fb7f3db76fa80bf483897041014381425fae 100644 --- a/source/server/mnode/src/mnodeSdb.c +++ b/source/server/mnode/src/mnodeSdb.c @@ -15,14 +15,214 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "mnodeInt.h" +#include "thash.h" +#include "tglobal.h" +#include "cJSON.h" +#include "mnodeSdb.h" -int32_t mnodeInitSdb() { return 0; } -void mnodeCleanupSdb() {} +static struct { + char currDir[PATH_MAX]; + char backDir[PATH_MAX]; + char tmpDir[PATH_MAX]; + int64_t version; + EMnKey hashKey[MN_SDB_MAX]; + int32_t dataSize[MN_SDB_MAX]; + SHashObj *hashObj[MN_SDB_MAX]; + SdbDeployFp deployFp[MN_SDB_MAX]; + SdbEncodeFp encodeFp[MN_SDB_MAX]; + SdbDecodeFp decodeFp[MN_SDB_MAX]; +} tsSdb = {0}; -int32_t mnodeDeploySdb() { +static int32_t sdbCreateDir() { + if (!taosMkDir(tsSdb.currDir)) { + mError("failed to create dir:%s", tsSdb.currDir); + return TAOS_SYSTEM_ERROR(errno); + } - // if (!taosMkDir()) + if (!taosMkDir(tsSdb.backDir)) { + mError("failed to create dir:%s", tsSdb.backDir); + return -1; + } + + if (!taosMkDir(tsSdb.tmpDir)) { + mError("failed to create dir:%s", tsSdb.tmpDir); + return -1; + } + + return 0; +} + +static int32_t sdbRunDeployFp() { + for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) { + SdbDeployFp fp = tsSdb.deployFp[i]; + if (fp) { + (*fp)(); + } + } + + return 0; +} + +static int32_t sdbReadVersion(cJSON *root) { + cJSON *ver = cJSON_GetObjectItem(root, "version"); + if (!ver || ver->type != cJSON_String) { + mError("failed to parse version since version not found"); + return -1; + } + + tsSdb.version = (int64_t)atoll(ver->valuestring); + mTrace("parse version success, version:%" PRIu64, tsSdb.version); + + return 0; +} + +static void sdbWriteVersion(FileFd fd) { + char content[128]; + int32_t len = + snprintf(content, sizeof(content), "{\"type\":0, \"version\":\"%" PRIu64 "\", \"update\":\"%" PRIu64 "\"}\n", + tsSdb.version, taosGetTimestampMs()); + taosWriteFile(fd, content, len); +} + +static int32_t sdbReadDataFile() { + ssize_t _bytes = 0; + size_t len = 4096; + char *line = calloc(1, len); + int32_t code = -1; + FILE *fp = NULL; + cJSON *root = NULL; + + char file[PATH_MAX + 20]; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + fp = fopen(file, "r"); + if (!fp) { + mError("failed to open file:%s for read since %s", file, strerror(errno)); + goto PARSE_SDB_DATA_ERROR; + } + + while (!feof(fp)) { + memset(line, 0, len); + _bytes = tgetline(&line, &len, fp); + if (_bytes < 0) { + break; + } + + line[len - 1] = 0; + if (len <= 10) continue; + + root = cJSON_Parse(line); + if (root == NULL) { + mError("failed to parse since invalid json format, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + cJSON *type = cJSON_GetObjectItem(root, "type"); + if (!type || type->type != cJSON_Number) { + mError("failed to parse since invalid type not found, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + if (type->valueint >= MN_SDB_MAX || type->valueint < MN_SDB_START) { + mError("failed to parse since invalid type, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + if (type->valueint == MN_SDB_START) { + if (sdbReadVersion(root) != 0) { + mError("failed to parse version, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + cJSON_Delete(root); + root = NULL; + continue; + } + + SdbDecodeFp func = tsSdb.decodeFp[type->valueint]; + SdbHead *pHead = (*func)(root); + if (pHead == NULL) { + mError("failed to parse since decode error, %s", line); + goto PARSE_SDB_DATA_ERROR; + } + + sdbInsertRow(pHead->type, pHead); + cJSON_Delete(root); + root = NULL; + } + + code = 0; + +PARSE_SDB_DATA_ERROR: + tfree(line); + fclose(fp); + cJSON_Delete(root); + + return code; +} + +static int32_t sdbWriteDataFile() { + char file[PATH_MAX + 20] = {0}; + snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); + FileFd fd = taosOpenFileCreateWrite(file); + if (fd <= 0) { + mError("failed to open file:%s for write since %s", file, strerror(errno)); + return -1; + } + + for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObj[i]; + if (!hash) continue; + + SdbEncodeFp fp = tsSdb.encodeFp[i]; + if (!fp) continue; + + SdbHead *pHead = taosHashIterate(hash, pHead); + while (pHead != NULL) { + (*fp)(pHead, fd); + pHead = taosHashIterate(hash, pHead); + } + } + + sdbWriteVersion(fd); + taosFsyncFile(fd); + taosCloseFile(fd); + + mInfo("write file:%s successfully", file); + return 0; +} + +int32_t sdbCommit() { + int32_t code = sdbWriteDataFile(); + if (code != 0) { + return code; + } + + return 0; +} + +int32_t sdbRead() { + int32_t code = sdbReadDataFile(); + if (code != 0) { + return code; + } + + mInfo("read sdb file successfully"); + return -1; +} + +int32_t sdbDeploy() { + if (sdbCreateDir() != 0) { + return -1; + } + + if (sdbRunDeployFp() != 0) { + return -1; + } + + if (sdbCommit() != 0) { + return -1; + } + + // if (!taosMkDir()) // if (pMinfos == NULL) { // first deploy // tsMint.dnodeId = 1; // bool getuid = taosGetSystemUid(tsMint.clusterId); @@ -42,6 +242,140 @@ int32_t mnodeDeploySdb() { return 0; } -void mnodeUnDeploySdb() {} -int32_t mnodeReadSdb() { return 0; } -int32_t mnodeCommitSdb() { return 0; } \ No newline at end of file +void sdbUnDeploy() {} + +int32_t sdbInit() { + snprintf(tsSdb.currDir, PATH_MAX, "%s%scurrent%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + snprintf(tsSdb.backDir, PATH_MAX, "%s%sbackup%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + snprintf(tsSdb.tmpDir, PATH_MAX, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP); + + for (int32_t i = 0; i < MN_SDB_MAX; ++i) { + int32_t type; + if (tsSdb.hashKey[i] == MN_KEY_INT32) { + type = TSDB_DATA_TYPE_INT; + } else if (tsSdb.hashKey[i] == MN_KEY_INT64) { + type = TSDB_DATA_TYPE_BIGINT; + } else { + type = TSDB_DATA_TYPE_BINARY; + } + + SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); + if (hash == NULL) { + return -1; + } + + tsSdb.hashObj[i] = hash; + } + + return 0; +} + +void sdbCleanup() { + for (int32_t i = 0; i < MN_SDB_MAX; ++i) { + SHashObj *hash = tsSdb.hashObj[i]; + if (hash != NULL) { + taosHashCleanup(hash); + } + tsSdb.hashObj[i] = NULL; + } +} + +void sdbAddFp(EMnSdb sdb, SdbDeployFp deployFp, SdbEncodeFp encodeFp, SdbDecodeFp decodeFp, int32_t dataSize) { + tsSdb.deployFp[sdb] = deployFp; + tsSdb.encodeFp[sdb] = encodeFp; + tsSdb.decodeFp[sdb] = decodeFp; + tsSdb.dataSize[sdb] = dataSize; +} + +static SHashObj *sdbGetHash(int32_t sdb) { + if (sdb >= MN_SDB_MAX || sdb <= MN_SDB_START) { + return NULL; + } + + SHashObj *hash = tsSdb.hashObj[sdb]; + if (hash == NULL) { + return NULL; + } + + return hash; +} + +void *sdbInsertRow(EMnSdb sdb, void *p) { + SdbHead *pHead = p; + pHead->type = sdb; + pHead->status = MN_SDB_STAT_AVAIL; + + char *pKey = (char *)pHead + sizeof(pHead); + int32_t keySize; + EMnKey keyType = tsSdb.hashKey[pHead->type]; + int32_t dataSize = tsSdb.dataSize[pHead->type]; + + SHashObj *hash = sdbGetHash(pHead->type); + if (hash == NULL) { + return NULL; + } + + if (keyType == MN_KEY_INT32) { + keySize = sizeof(int32_t); + } else if (keyType == MN_KEY_BINARY) { + keySize = strlen(pKey) + 1; + } else { + keySize = sizeof(int64_t); + } + + taosHashPut(hash, pKey, keySize, pHead, dataSize); + return taosHashGet(hash, pKey, keySize); +} + +void sdbDeleteRow(EMnSdb sdb, void *p) { + SdbHead *pHead = p; + pHead->status = MN_SDB_STAT_DROPPED; +} + +void *sdbUpdateRow(EMnSdb sdb, void *pHead) { return sdbInsertRow(sdb, pHead); } + +void *sdbGetRow(EMnSdb sdb, void *pKey) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return NULL; + } + + int32_t keySize; + EMnKey keyType = tsSdb.hashKey[sdb]; + + if (keyType == MN_KEY_INT32) { + keySize = sizeof(int32_t); + } else if (keyType == MN_KEY_BINARY) { + keySize = strlen(pKey) + 1; + } else { + keySize = sizeof(int64_t); + } + + return taosHashGet(hash, pKey, keySize); +} + +void *sdbFetchRow(EMnSdb sdb, void *pIter) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return NULL; + } + + return taosHashIterate(hash, pIter); +} + +void sdbCancelFetch(EMnSdb sdb, void *pIter) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return; + } + + taosHashCancelIterate(hash, pIter); +} + +int32_t sdbGetCount(EMnSdb sdb) { + SHashObj *hash = sdbGetHash(sdb); + if (hash == NULL) { + return 0; + } + return taosHashGetSize(hash); +} \ No newline at end of file diff --git a/source/server/mnode/src/mondeInt.c b/source/server/mnode/src/mondeInt.c index ac53cac7f29d960ed4f587901fe0fd1b418bb412..2041d5138482d7a6ed9ee58c0f4bd1a6d3d4b6a9 100644 --- a/source/server/mnode/src/mondeInt.c +++ b/source/server/mnode/src/mondeInt.c @@ -96,7 +96,7 @@ static int32_t mnodeInitStep1() { struct SSteps *steps = taosStepInit(16, NULL); if (steps == NULL) return -1; - taosStepAdd(steps, "mnode-sdb", mnodeInitSdb, mnodeCleanupSdb); + taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup); taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster); taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode); taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode); @@ -177,11 +177,12 @@ int32_t mnodeDeploy() { } void mnodeUnDeploy() { - mnodeUnDeploySdb(); + sdbUnDeploy(); mnodeCleanup(); } int32_t mnodeInit(SMnodePara para) { + mDebugFlag = 207; if (tsMint.state != MN_STATUS_UNINIT) { return 0; } else { @@ -202,10 +203,10 @@ int32_t mnodeInit(SMnodePara para) { return -1; } - code = mnodeReadSdb(); + code = sdbRead(); if (code != 0) { if (mnodeNeedDeploy()) { - code = mnodeDeploySdb(); + code = sdbDeploy(); if (code != 0) { mnodeCleanupStep1(); tsMint.state = MN_STATUS_UNINIT;