From efb3d812b183e7831717045ad158a036cd4b2f68 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 4 Jan 2022 11:23:54 +0800 Subject: [PATCH] add commit version to sdb --- include/dnode/mnode/sdb/sdb.h | 37 ++++---- source/dnode/mnode/sdb/inc/sdbInt.h | 5 +- source/dnode/mnode/sdb/src/sdb.c | 11 ++- source/dnode/mnode/sdb/src/sdbFile.c | 128 +++++++++++++++++++++++++-- source/dnode/mnode/sdb/src/sdbHash.c | 37 ++++---- 5 files changed, 171 insertions(+), 47 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 48c8df5ba0..497da71c13 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -102,25 +102,24 @@ typedef enum { } ESdbStatus; typedef enum { - SDB_START = 0, - SDB_TRANS = 1, - SDB_CLUSTER = 2, - SDB_MNODE = 3, - SDB_QNODE = 4, - SDB_SNODE = 5, - SDB_BNODE = 6, - SDB_DNODE = 7, - SDB_USER = 8, - SDB_AUTH = 9, - SDB_ACCT = 10, - SDB_CONSUMER = 11, - SDB_CGROUP = 12, - SDB_TOPIC = 13, - SDB_VGROUP = 14, - SDB_STB = 15, - SDB_DB = 16, - SDB_FUNC = 17, - SDB_MAX = 18 + SDB_TRANS = 0, + SDB_CLUSTER = 1, + SDB_MNODE = 2, + SDB_QNODE = 3, + SDB_SNODE = 4, + SDB_BNODE = 5, + SDB_DNODE = 6, + SDB_USER = 7, + SDB_AUTH = 8, + SDB_ACCT = 9, + SDB_CONSUMER = 10, + SDB_CGROUP = 11, + SDB_TOPIC = 12, + SDB_VGROUP = 13, + SDB_STB = 14, + SDB_DB = 15, + SDB_FUNC = 16, + SDB_MAX = 17 } ESdbType; typedef struct SSdb SSdb; diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 25db988a0c..c99dff57e1 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -17,11 +17,12 @@ #define _TD_SDB_INT_H_ #include "os.h" + #include "sdb.h" -#include "tmsg.h" #include "thash.h" #include "tlockfree.h" #include "tlog.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { @@ -60,7 +61,7 @@ typedef struct SSdb { int64_t lastCommitVer; int64_t curVer; int64_t tableVer[SDB_MAX]; - int32_t maxId[SDB_MAX]; + int64_t maxId[SDB_MAX]; EKeyType keyTypes[SDB_MAX]; SHashObj *hashObjs[SDB_MAX]; SRWLatch locks[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index fb6ac7bb37..0671434218 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -49,6 +49,9 @@ SSdb *sdbInit(SSdbOpt *pOption) { for (ESdbType i = 0; i < SDB_MAX; ++i) { taosInitRWLatch(&pSdb->locks[i]); + pSdb->maxId[i] = 0; + pSdb->tableVer[i] = -1; + pSdb->keyTypes[i] = SDB_KEY_INT32; } pSdb->curVer = -1; @@ -61,10 +64,10 @@ SSdb *sdbInit(SSdbOpt *pOption) { void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); - // if (pSdb->curVer != pSdb->lastCommitVer) { - mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); - sdbWriteFile(pSdb); - // } + if (pSdb->curVer != pSdb->lastCommitVer) { + mDebug("write sdb file for curVer:% " PRId64 " and lastCommitVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); + sdbWriteFile(pSdb); + } if (pSdb->currDir != NULL) { tfree(pSdb->currDir); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 5a03b3409e..c5306478f2 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -17,10 +17,13 @@ #include "sdbInt.h" #include "tchecksum.h" +#define SDB_TABLE_SIZE 24 +#define SDB_RESERVE_SIZE 512 + static int32_t sdbRunDeployFp(SSdb *pSdb) { mDebug("start to deploy sdb"); - for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { + for (int32_t i = SDB_MAX - 1; i >= 0; --i) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; @@ -34,6 +37,100 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { return 0; } +static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) { + int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t maxId = -1; + ret = taosReadFile(fd, &maxId, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + if (i < SDB_MAX) { + pSdb->maxId[i] = maxId; + } + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t ver = -1; + ret = taosReadFile(fd, &ver, sizeof(int64_t)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(int64_t)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + if (i < SDB_MAX) { + pSdb->tableVer[i] = ver; + } + } + + char reserve[SDB_RESERVE_SIZE] = {0}; + ret = taosWriteFile(fd, reserve, sizeof(reserve)); + if (ret < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (ret != sizeof(reserve)) { + terrno = TSDB_CODE_FILE_CORRUPTED; + return -1; + } + + return 0; +} + +static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) { + if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t maxId = -1; + if (i < SDB_MAX) { + maxId = pSdb->maxId[i]; + } + if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { + int64_t ver = -1; + if (i < SDB_MAX) { + ver = pSdb->tableVer[i]; + } + if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + char reserve[512] = {0}; + if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + int32_t sdbReadFile(SSdb *pSdb) { int64_t offset = 0; int32_t code = 0; @@ -43,7 +140,7 @@ int32_t sdbReadFile(SSdb *pSdb) { SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); if (pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed read file since %s", terrstr()); + mError("failed read file since %s", terrstr()); return -1; } @@ -58,6 +155,14 @@ int32_t sdbReadFile(SSdb *pSdb) { return 0; } + if (sdbReadFileHead(pSdb, fd) != 0) { + mError("failed to read file:%s head since %s", file, terrstr()); + pSdb->curVer = -1; + free(pRaw); + taosCloseFile(fd); + return -1; + } + while (1) { readLen = sizeof(SSdbRaw); ret = taosReadFile(fd, pRaw, readLen); @@ -104,6 +209,8 @@ int32_t sdbReadFile(SSdb *pSdb) { } code = 0; + pSdb->lastCommitVer = pSdb->curVer; + mError("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); PARSE_SDB_DATA_ERROR: taosCloseFile(fd); @@ -130,11 +237,17 @@ int32_t sdbWriteFile(SSdb *pSdb) { return -1; } - for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { + if (sdbWriteFileHead(pSdb, fd) != 0) { + mError("failed to write file:%s head since %s", tmpfile, terrstr()); + taosCloseFile(fd); + return -1; + } + + for (int32_t i = SDB_MAX - 1; i >= 0; --i) { SdbEncodeFp encodeFp = pSdb->encodeFps[i]; if (encodeFp == NULL) continue; - mTrace("sdb write %s, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); + mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); SHashObj *hash = pSdb->hashObjs[i]; SRWLatch *pLock = &pSdb->locks[i]; @@ -155,7 +268,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { pRaw->status = pRow->status; int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen; if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); taosHashCancelIterate(hash, ppRow); sdbFreeRaw(pRaw); break; @@ -163,7 +276,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { - code = TAOS_SYSTEM_ERROR(terrno); + code = TAOS_SYSTEM_ERROR(errno); taosHashCancelIterate(hash, ppRow); sdbFreeRaw(pRaw); break; @@ -201,7 +314,8 @@ int32_t sdbWriteFile(SSdb *pSdb) { if (code != 0) { mError("failed to write file:%s since %s", curfile, tstrerror(code)); } else { - mDebug("write file:%s successfully", curfile); + pSdb->lastCommitVer = pSdb->curVer; + mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer); } terrno = code; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 597484dad1..4b11ec3e76 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -38,6 +38,10 @@ const char *sdbTableName(ESdbType type) { return "auth"; case SDB_ACCT: return "acct"; + case SDB_CONSUMER: + return "consumer"; + case SDB_CGROUP: + return "cgroup"; case SDB_TOPIC: return "topic"; case SDB_VGROUP: @@ -70,7 +74,7 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { } static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { - if (type >= SDB_MAX || type <= SDB_START) { + if (type >= SDB_MAX || type < 0) { terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; return NULL; } @@ -100,8 +104,6 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) { } static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - int32_t code = 0; - SRWLatch *pLock = &pSdb->locks[pRow->type]; taosWLockLatch(pLock); @@ -126,10 +128,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosWUnLockLatch(pLock); - if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { - pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); - } - + int32_t code = 0; SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; if (insertFp != NULL) { code = (*insertFp)(pSdb, pRow->pObj); @@ -143,12 +142,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } } + if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { + pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); + } + if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) { + pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); + } + pSdb->tableVer[pRow->type]++; + return 0; } static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { - int32_t code = 0; - SRWLatch *pLock = &pSdb->locks[pNewRow->type]; taosRLockLatch(pLock); @@ -157,23 +162,24 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosRUnLockLatch(pLock); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); } - SSdbRow *pOldRow = *ppOldRow; + SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; taosRUnLockLatch(pLock); + int32_t code = 0; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; if (updateFp != NULL) { code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); } sdbFreeRow(pSdb, pNewRow); + + pSdb->tableVer[pOldRow->type]++; return code; } static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - int32_t code = 0; - SRWLatch *pLock = &pSdb->locks[pRow->type]; taosWLockLatch(pLock); @@ -190,9 +196,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosHashRemove(hash, pOldRow->pObj, keySize); taosWUnLockLatch(pLock); - // sdbRelease(pSdb, pOldRow->pObj); + pSdb->tableVer[pOldRow->type]++; sdbFreeRow(pSdb, pRow); - return code; + // sdbRelease(pSdb, pOldRow->pObj); + return 0; } int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { @@ -277,7 +284,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { if (pObj == NULL) return; SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); - if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return; + if (pRow->type >= SDB_MAX ) return; SRWLatch *pLock = &pSdb->locks[pRow->type]; taosRLockLatch(pLock); -- GitLab