提交 765144cc 编写于 作者: S Shengliang Guan

sdb version

上级 f76dbd58
......@@ -178,6 +178,12 @@ extern "C" {
#define setThreadName(name)
#endif
#if defined(_WIN32)
#define TD_DIRSEP "\\"
#else
#define TD_DIRSEP "/"
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -8,4 +8,5 @@ target_include_directories(
target_link_libraries(
mnode
PUBLIC transport
PUBLIC cjson
)
\ No newline at end of file
......@@ -20,6 +20,7 @@
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
#include "cJSON.h"
#include "mnode.h"
#ifdef __cplusplus
......@@ -94,15 +95,16 @@ typedef enum {
MN_AUTH_MAX
} EMnAuthOp;
typedef enum {
MN_STATUS_UNINIT = 0,
MN_STATUS_INIT = 1,
MN_STATUS_READY = 2,
MN_STATUS_CLOSING = 3
} EMnStatus;
typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus;
typedef enum { MN_SDB_STAT_AVAIL = 0, MN_SDB_STAT_DROPPED = 1 } EMnSdbStat;
typedef struct {
int8_t type;
int8_t status;
} SdbHead;
typedef struct SClusterObj {
SdbHead head;
int64_t id;
char uid[TSDB_CLUSTER_ID_LEN];
int64_t createdTime;
......@@ -110,6 +112,7 @@ typedef struct SClusterObj {
} SClusterObj;
typedef struct SDnodeObj {
SdbHead head;
int32_t id;
int32_t vnodes;
int64_t createdTime;
......@@ -126,6 +129,7 @@ typedef struct SDnodeObj {
} SDnodeObj;
typedef struct SMnodeObj {
SdbHead head;
int32_t id;
int8_t status;
int8_t role;
......@@ -169,6 +173,7 @@ typedef struct {
} SAcctInfo;
typedef struct SAcctObj {
SdbHead head;
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
......@@ -179,6 +184,7 @@ typedef struct SAcctObj {
} SAcctObj;
typedef struct SUserObj {
SdbHead head;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char acct[TSDB_USER_LEN];
......@@ -212,6 +218,7 @@ typedef struct {
} SDbCfg;
typedef struct SDbObj {
SdbHead head;
char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
......@@ -255,6 +262,7 @@ typedef struct SVgObj {
} SVgObj;
typedef struct SSTableObj {
SdbHead head;
char tableId[TSDB_TABLE_NAME_LEN];
uint64_t uid;
int64_t createdTime;
......@@ -265,6 +273,7 @@ typedef struct SSTableObj {
} SSTableObj;
typedef struct SFuncObj {
SdbHead head;
char name[TSDB_FUNC_NAME_LEN];
char path[128];
int32_t contLen;
......
......@@ -22,14 +22,28 @@
extern "C" {
#endif
int32_t mnodeInitSdb();
void mnodeCleanupSdb();
typedef void (*SdbDeployFp)();
typedef void (*SdbEncodeFp)(void *pHead, FileFd fd);
typedef void *(*SdbDecodeFp)(cJSON *root);
int32_t mnodeDeploySdb();
void mnodeUnDeploySdb();
int32_t sdbInit();
void sdbCleanup();
int32_t mnodeReadSdb();
int32_t mnodeCommitSdb();
int32_t sdbRead();
int32_t sdbCommit();
int32_t sdbDeploy();
void sdbUnDeploy();
void *sdbInsertRow(EMnSdb sdb, void *pObj);
void sdbDeleteRow(EMnSdb sdb, void *pHead);
void *sdbUpdateRow(EMnSdb sdb, void *pHead);
void *sdbGetRow(EMnSdb sdb, void *pKey);
void *sdbFetchRow(EMnSdb sdb, void *pIter);
void sdbCancelFetch(EMnSdb sdb, void *pIter);
int32_t sdbGetCount(EMnSdb sdb);
void sdbAddFp(EMnSdb, SdbDeployFp, SdbEncodeFp, SdbDecodeFp, int32_t dataSize);
#ifdef __cplusplus
}
......
......@@ -15,14 +15,214 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnodeInt.h"
#include "thash.h"
#include "tglobal.h"
#include "cJSON.h"
#include "mnodeSdb.h"
int32_t mnodeInitSdb() { return 0; }
void mnodeCleanupSdb() {}
static struct {
char currDir[PATH_MAX];
char backDir[PATH_MAX];
char tmpDir[PATH_MAX];
int64_t version;
EMnKey hashKey[MN_SDB_MAX];
int32_t dataSize[MN_SDB_MAX];
SHashObj *hashObj[MN_SDB_MAX];
SdbDeployFp deployFp[MN_SDB_MAX];
SdbEncodeFp encodeFp[MN_SDB_MAX];
SdbDecodeFp decodeFp[MN_SDB_MAX];
} tsSdb = {0};
int32_t mnodeDeploySdb() {
static int32_t sdbCreateDir() {
if (!taosMkDir(tsSdb.currDir)) {
mError("failed to create dir:%s", tsSdb.currDir);
return TAOS_SYSTEM_ERROR(errno);
}
// if (!taosMkDir())
if (!taosMkDir(tsSdb.backDir)) {
mError("failed to create dir:%s", tsSdb.backDir);
return -1;
}
if (!taosMkDir(tsSdb.tmpDir)) {
mError("failed to create dir:%s", tsSdb.tmpDir);
return -1;
}
return 0;
}
static int32_t sdbRunDeployFp() {
for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) {
SdbDeployFp fp = tsSdb.deployFp[i];
if (fp) {
(*fp)();
}
}
return 0;
}
static int32_t sdbReadVersion(cJSON *root) {
cJSON *ver = cJSON_GetObjectItem(root, "version");
if (!ver || ver->type != cJSON_String) {
mError("failed to parse version since version not found");
return -1;
}
tsSdb.version = (int64_t)atoll(ver->valuestring);
mTrace("parse version success, version:%" PRIu64, tsSdb.version);
return 0;
}
static void sdbWriteVersion(FileFd fd) {
char content[128];
int32_t len =
snprintf(content, sizeof(content), "{\"type\":0, \"version\":\"%" PRIu64 "\", \"update\":\"%" PRIu64 "\"}\n",
tsSdb.version, taosGetTimestampMs());
taosWriteFile(fd, content, len);
}
static int32_t sdbReadDataFile() {
ssize_t _bytes = 0;
size_t len = 4096;
char *line = calloc(1, len);
int32_t code = -1;
FILE *fp = NULL;
cJSON *root = NULL;
char file[PATH_MAX + 20];
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
fp = fopen(file, "r");
if (!fp) {
mError("failed to open file:%s for read since %s", file, strerror(errno));
goto PARSE_SDB_DATA_ERROR;
}
while (!feof(fp)) {
memset(line, 0, len);
_bytes = tgetline(&line, &len, fp);
if (_bytes < 0) {
break;
}
line[len - 1] = 0;
if (len <= 10) continue;
root = cJSON_Parse(line);
if (root == NULL) {
mError("failed to parse since invalid json format, %s", line);
goto PARSE_SDB_DATA_ERROR;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!type || type->type != cJSON_Number) {
mError("failed to parse since invalid type not found, %s", line);
goto PARSE_SDB_DATA_ERROR;
}
if (type->valueint >= MN_SDB_MAX || type->valueint < MN_SDB_START) {
mError("failed to parse since invalid type, %s", line);
goto PARSE_SDB_DATA_ERROR;
}
if (type->valueint == MN_SDB_START) {
if (sdbReadVersion(root) != 0) {
mError("failed to parse version, %s", line);
goto PARSE_SDB_DATA_ERROR;
}
cJSON_Delete(root);
root = NULL;
continue;
}
SdbDecodeFp func = tsSdb.decodeFp[type->valueint];
SdbHead *pHead = (*func)(root);
if (pHead == NULL) {
mError("failed to parse since decode error, %s", line);
goto PARSE_SDB_DATA_ERROR;
}
sdbInsertRow(pHead->type, pHead);
cJSON_Delete(root);
root = NULL;
}
code = 0;
PARSE_SDB_DATA_ERROR:
tfree(line);
fclose(fp);
cJSON_Delete(root);
return code;
}
static int32_t sdbWriteDataFile() {
char file[PATH_MAX + 20] = {0};
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
FileFd fd = taosOpenFileCreateWrite(file);
if (fd <= 0) {
mError("failed to open file:%s for write since %s", file, strerror(errno));
return -1;
}
for (int32_t i = MN_SDB_START; i < MN_SDB_MAX; ++i) {
SHashObj *hash = tsSdb.hashObj[i];
if (!hash) continue;
SdbEncodeFp fp = tsSdb.encodeFp[i];
if (!fp) continue;
SdbHead *pHead = taosHashIterate(hash, pHead);
while (pHead != NULL) {
(*fp)(pHead, fd);
pHead = taosHashIterate(hash, pHead);
}
}
sdbWriteVersion(fd);
taosFsyncFile(fd);
taosCloseFile(fd);
mInfo("write file:%s successfully", file);
return 0;
}
int32_t sdbCommit() {
int32_t code = sdbWriteDataFile();
if (code != 0) {
return code;
}
return 0;
}
int32_t sdbRead() {
int32_t code = sdbReadDataFile();
if (code != 0) {
return code;
}
mInfo("read sdb file successfully");
return -1;
}
int32_t sdbDeploy() {
if (sdbCreateDir() != 0) {
return -1;
}
if (sdbRunDeployFp() != 0) {
return -1;
}
if (sdbCommit() != 0) {
return -1;
}
// if (!taosMkDir())
// if (pMinfos == NULL) { // first deploy
// tsMint.dnodeId = 1;
// bool getuid = taosGetSystemUid(tsMint.clusterId);
......@@ -42,6 +242,140 @@ int32_t mnodeDeploySdb() {
return 0;
}
void mnodeUnDeploySdb() {}
int32_t mnodeReadSdb() { return 0; }
int32_t mnodeCommitSdb() { return 0; }
\ No newline at end of file
void sdbUnDeploy() {}
int32_t sdbInit() {
snprintf(tsSdb.currDir, PATH_MAX, "%s%scurrent%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
snprintf(tsSdb.backDir, PATH_MAX, "%s%sbackup%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
snprintf(tsSdb.tmpDir, PATH_MAX, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
for (int32_t i = 0; i < MN_SDB_MAX; ++i) {
int32_t type;
if (tsSdb.hashKey[i] == MN_KEY_INT32) {
type = TSDB_DATA_TYPE_INT;
} else if (tsSdb.hashKey[i] == MN_KEY_INT64) {
type = TSDB_DATA_TYPE_BIGINT;
} else {
type = TSDB_DATA_TYPE_BINARY;
}
SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
if (hash == NULL) {
return -1;
}
tsSdb.hashObj[i] = hash;
}
return 0;
}
void sdbCleanup() {
for (int32_t i = 0; i < MN_SDB_MAX; ++i) {
SHashObj *hash = tsSdb.hashObj[i];
if (hash != NULL) {
taosHashCleanup(hash);
}
tsSdb.hashObj[i] = NULL;
}
}
void sdbAddFp(EMnSdb sdb, SdbDeployFp deployFp, SdbEncodeFp encodeFp, SdbDecodeFp decodeFp, int32_t dataSize) {
tsSdb.deployFp[sdb] = deployFp;
tsSdb.encodeFp[sdb] = encodeFp;
tsSdb.decodeFp[sdb] = decodeFp;
tsSdb.dataSize[sdb] = dataSize;
}
static SHashObj *sdbGetHash(int32_t sdb) {
if (sdb >= MN_SDB_MAX || sdb <= MN_SDB_START) {
return NULL;
}
SHashObj *hash = tsSdb.hashObj[sdb];
if (hash == NULL) {
return NULL;
}
return hash;
}
void *sdbInsertRow(EMnSdb sdb, void *p) {
SdbHead *pHead = p;
pHead->type = sdb;
pHead->status = MN_SDB_STAT_AVAIL;
char *pKey = (char *)pHead + sizeof(pHead);
int32_t keySize;
EMnKey keyType = tsSdb.hashKey[pHead->type];
int32_t dataSize = tsSdb.dataSize[pHead->type];
SHashObj *hash = sdbGetHash(pHead->type);
if (hash == NULL) {
return NULL;
}
if (keyType == MN_KEY_INT32) {
keySize = sizeof(int32_t);
} else if (keyType == MN_KEY_BINARY) {
keySize = strlen(pKey) + 1;
} else {
keySize = sizeof(int64_t);
}
taosHashPut(hash, pKey, keySize, pHead, dataSize);
return taosHashGet(hash, pKey, keySize);
}
void sdbDeleteRow(EMnSdb sdb, void *p) {
SdbHead *pHead = p;
pHead->status = MN_SDB_STAT_DROPPED;
}
void *sdbUpdateRow(EMnSdb sdb, void *pHead) { return sdbInsertRow(sdb, pHead); }
void *sdbGetRow(EMnSdb sdb, void *pKey) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return NULL;
}
int32_t keySize;
EMnKey keyType = tsSdb.hashKey[sdb];
if (keyType == MN_KEY_INT32) {
keySize = sizeof(int32_t);
} else if (keyType == MN_KEY_BINARY) {
keySize = strlen(pKey) + 1;
} else {
keySize = sizeof(int64_t);
}
return taosHashGet(hash, pKey, keySize);
}
void *sdbFetchRow(EMnSdb sdb, void *pIter) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return NULL;
}
return taosHashIterate(hash, pIter);
}
void sdbCancelFetch(EMnSdb sdb, void *pIter) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return;
}
taosHashCancelIterate(hash, pIter);
}
int32_t sdbGetCount(EMnSdb sdb) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return 0;
}
return taosHashGetSize(hash);
}
\ No newline at end of file
......@@ -96,7 +96,7 @@ static int32_t mnodeInitStep1() {
struct SSteps *steps = taosStepInit(16, NULL);
if (steps == NULL) return -1;
taosStepAdd(steps, "mnode-sdb", mnodeInitSdb, mnodeCleanupSdb);
taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup);
taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster);
taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode);
taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode);
......@@ -177,11 +177,12 @@ int32_t mnodeDeploy() {
}
void mnodeUnDeploy() {
mnodeUnDeploySdb();
sdbUnDeploy();
mnodeCleanup();
}
int32_t mnodeInit(SMnodePara para) {
mDebugFlag = 207;
if (tsMint.state != MN_STATUS_UNINIT) {
return 0;
} else {
......@@ -202,10 +203,10 @@ int32_t mnodeInit(SMnodePara para) {
return -1;
}
code = mnodeReadSdb();
code = sdbRead();
if (code != 0) {
if (mnodeNeedDeploy()) {
code = mnodeDeploySdb();
code = sdbDeploy();
if (code != 0) {
mnodeCleanupStep1();
tsMint.state = MN_STATUS_UNINIT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册