提交 efb3d812 编写于 作者: S Shengliang Guan

add commit version to sdb

上级 99e53317
...@@ -102,25 +102,24 @@ typedef enum { ...@@ -102,25 +102,24 @@ typedef enum {
} ESdbStatus; } ESdbStatus;
typedef enum { typedef enum {
SDB_START = 0, SDB_TRANS = 0,
SDB_TRANS = 1, SDB_CLUSTER = 1,
SDB_CLUSTER = 2, SDB_MNODE = 2,
SDB_MNODE = 3, SDB_QNODE = 3,
SDB_QNODE = 4, SDB_SNODE = 4,
SDB_SNODE = 5, SDB_BNODE = 5,
SDB_BNODE = 6, SDB_DNODE = 6,
SDB_DNODE = 7, SDB_USER = 7,
SDB_USER = 8, SDB_AUTH = 8,
SDB_AUTH = 9, SDB_ACCT = 9,
SDB_ACCT = 10, SDB_CONSUMER = 10,
SDB_CONSUMER = 11, SDB_CGROUP = 11,
SDB_CGROUP = 12, SDB_TOPIC = 12,
SDB_TOPIC = 13, SDB_VGROUP = 13,
SDB_VGROUP = 14, SDB_STB = 14,
SDB_STB = 15, SDB_DB = 15,
SDB_DB = 16, SDB_FUNC = 16,
SDB_FUNC = 17, SDB_MAX = 17
SDB_MAX = 18
} ESdbType; } ESdbType;
typedef struct SSdb SSdb; typedef struct SSdb SSdb;
......
...@@ -17,11 +17,12 @@ ...@@ -17,11 +17,12 @@
#define _TD_SDB_INT_H_ #define _TD_SDB_INT_H_
#include "os.h" #include "os.h"
#include "sdb.h" #include "sdb.h"
#include "tmsg.h"
#include "thash.h" #include "thash.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -60,7 +61,7 @@ typedef struct SSdb { ...@@ -60,7 +61,7 @@ typedef struct SSdb {
int64_t lastCommitVer; int64_t lastCommitVer;
int64_t curVer; int64_t curVer;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
int32_t maxId[SDB_MAX]; int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX]; EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX]; SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX]; SRWLatch locks[SDB_MAX];
......
...@@ -49,6 +49,9 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -49,6 +49,9 @@ SSdb *sdbInit(SSdbOpt *pOption) {
for (ESdbType i = 0; i < SDB_MAX; ++i) { for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]); taosInitRWLatch(&pSdb->locks[i]);
pSdb->maxId[i] = 0;
pSdb->tableVer[i] = -1;
pSdb->keyTypes[i] = SDB_KEY_INT32;
} }
pSdb->curVer = -1; pSdb->curVer = -1;
...@@ -61,10 +64,10 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -61,10 +64,10 @@ SSdb *sdbInit(SSdbOpt *pOption) {
void sdbCleanup(SSdb *pSdb) { void sdbCleanup(SSdb *pSdb) {
mDebug("start to cleanup sdb"); mDebug("start to cleanup sdb");
// if (pSdb->curVer != pSdb->lastCommitVer) { if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); mDebug("write sdb file for curVer:% " PRId64 " and lastCommitVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
sdbWriteFile(pSdb); sdbWriteFile(pSdb);
// } }
if (pSdb->currDir != NULL) { if (pSdb->currDir != NULL) {
tfree(pSdb->currDir); tfree(pSdb->currDir);
......
...@@ -17,10 +17,13 @@ ...@@ -17,10 +17,13 @@
#include "sdbInt.h" #include "sdbInt.h"
#include "tchecksum.h" #include "tchecksum.h"
#define SDB_TABLE_SIZE 24
#define SDB_RESERVE_SIZE 512
static int32_t sdbRunDeployFp(SSdb *pSdb) { static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to deploy sdb"); mDebug("start to deploy sdb");
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
SdbDeployFp fp = pSdb->deployFps[i]; SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue; if (fp == NULL) continue;
...@@ -34,6 +37,100 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { ...@@ -34,6 +37,100 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
return 0; return 0;
} }
static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) {
int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = -1;
ret = taosReadFile(fd, &maxId, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (i < SDB_MAX) {
pSdb->maxId[i] = maxId;
}
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t ver = -1;
ret = taosReadFile(fd, &ver, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (i < SDB_MAX) {
pSdb->tableVer[i] = ver;
}
}
char reserve[SDB_RESERVE_SIZE] = {0};
ret = taosWriteFile(fd, reserve, sizeof(reserve));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(reserve)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
return 0;
}
static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) {
if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = -1;
if (i < SDB_MAX) {
maxId = pSdb->maxId[i];
}
if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t ver = -1;
if (i < SDB_MAX) {
ver = pSdb->tableVer[i];
}
if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
char reserve[512] = {0};
if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
int32_t sdbReadFile(SSdb *pSdb) { int32_t sdbReadFile(SSdb *pSdb) {
int64_t offset = 0; int64_t offset = 0;
int32_t code = 0; int32_t code = 0;
...@@ -43,7 +140,7 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -43,7 +140,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
if (pRaw == NULL) { if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read file since %s", terrstr()); mError("failed read file since %s", terrstr());
return -1; return -1;
} }
...@@ -58,6 +155,14 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -58,6 +155,14 @@ int32_t sdbReadFile(SSdb *pSdb) {
return 0; return 0;
} }
if (sdbReadFileHead(pSdb, fd) != 0) {
mError("failed to read file:%s head since %s", file, terrstr());
pSdb->curVer = -1;
free(pRaw);
taosCloseFile(fd);
return -1;
}
while (1) { while (1) {
readLen = sizeof(SSdbRaw); readLen = sizeof(SSdbRaw);
ret = taosReadFile(fd, pRaw, readLen); ret = taosReadFile(fd, pRaw, readLen);
...@@ -104,6 +209,8 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -104,6 +209,8 @@ int32_t sdbReadFile(SSdb *pSdb) {
} }
code = 0; code = 0;
pSdb->lastCommitVer = pSdb->curVer;
mError("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
PARSE_SDB_DATA_ERROR: PARSE_SDB_DATA_ERROR:
taosCloseFile(fd); taosCloseFile(fd);
...@@ -130,11 +237,17 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -130,11 +237,17 @@ int32_t sdbWriteFile(SSdb *pSdb) {
return -1; return -1;
} }
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { if (sdbWriteFileHead(pSdb, fd) != 0) {
mError("failed to write file:%s head since %s", tmpfile, terrstr());
taosCloseFile(fd);
return -1;
}
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i]; SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue; if (encodeFp == NULL) continue;
mTrace("sdb write %s, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i]; SHashObj *hash = pSdb->hashObjs[i];
SRWLatch *pLock = &pSdb->locks[i]; SRWLatch *pLock = &pSdb->locks[i];
...@@ -155,7 +268,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -155,7 +268,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
pRaw->status = pRow->status; pRaw->status = pRow->status;
int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen; int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(errno);
taosHashCancelIterate(hash, ppRow); taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
break; break;
...@@ -163,7 +276,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -163,7 +276,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(errno);
taosHashCancelIterate(hash, ppRow); taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
break; break;
...@@ -201,7 +314,8 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -201,7 +314,8 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (code != 0) { if (code != 0) {
mError("failed to write file:%s since %s", curfile, tstrerror(code)); mError("failed to write file:%s since %s", curfile, tstrerror(code));
} else { } else {
mDebug("write file:%s successfully", curfile); pSdb->lastCommitVer = pSdb->curVer;
mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
} }
terrno = code; terrno = code;
......
...@@ -38,6 +38,10 @@ const char *sdbTableName(ESdbType type) { ...@@ -38,6 +38,10 @@ const char *sdbTableName(ESdbType type) {
return "auth"; return "auth";
case SDB_ACCT: case SDB_ACCT:
return "acct"; return "acct";
case SDB_CONSUMER:
return "consumer";
case SDB_CGROUP:
return "cgroup";
case SDB_TOPIC: case SDB_TOPIC:
return "topic"; return "topic";
case SDB_VGROUP: case SDB_VGROUP:
...@@ -70,7 +74,7 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) { ...@@ -70,7 +74,7 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
} }
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) { static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
if (type >= SDB_MAX || type <= SDB_START) { if (type >= SDB_MAX || type < 0) {
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE; terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
return NULL; return NULL;
} }
...@@ -100,8 +104,6 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) { ...@@ -100,8 +104,6 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
} }
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosWLockLatch(pLock);
...@@ -126,10 +128,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -126,10 +128,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { int32_t code = 0;
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
if (insertFp != NULL) { if (insertFp != NULL) {
code = (*insertFp)(pSdb, pRow->pObj); code = (*insertFp)(pSdb, pRow->pObj);
...@@ -143,12 +142,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -143,12 +142,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
} }
} }
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) {
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
}
pSdb->tableVer[pRow->type]++;
return 0; return 0;
} }
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pNewRow->type]; SRWLatch *pLock = &pSdb->locks[pNewRow->type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
...@@ -157,23 +162,24 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -157,23 +162,24 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
} }
SSdbRow *pOldRow = *ppOldRow;
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status; pOldRow->status = pRaw->status;
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
if (updateFp != NULL) { if (updateFp != NULL) {
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
} }
sdbFreeRow(pSdb, pNewRow); sdbFreeRow(pSdb, pNewRow);
pSdb->tableVer[pOldRow->type]++;
return code; return code;
} }
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
int32_t code = 0;
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock); taosWLockLatch(pLock);
...@@ -190,9 +196,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -190,9 +196,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
// sdbRelease(pSdb, pOldRow->pObj); pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow);
return code; // sdbRelease(pSdb, pOldRow->pObj);
return 0;
} }
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
...@@ -277,7 +284,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -277,7 +284,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
if (pObj == NULL) return; if (pObj == NULL) return;
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return; if (pRow->type >= SDB_MAX ) return;
SRWLatch *pLock = &pSdb->locks[pRow->type]; SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock); taosRLockLatch(pLock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册