提交 5ac3398f 编写于 作者: S Shengliang Guan

Mnode intergate with wal module

上级 0e8c2260
......@@ -188,6 +188,14 @@ int32_t sdbDeploy(SSdb *pSdb);
*/
int32_t sdbReadFile(SSdb *pSdb);
/**
* @brief Write sdb file.
*
* @param pSdb The sdb object.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t sdbWriteFile(SSdb *pSdb);
/**
* @brief Parse and write raw data to sdb, then free the pRaw object
*
......
......@@ -160,6 +160,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0339)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x033A)
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x033B)
#define TSDB_CODE_SDB_INVALID_WAl_VER TAOS_DEF_ERROR_CODE(0, 0x033C)
// mnode-dnode
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340)
......
......@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE
#include "mndSync.h"
#include "mndTrans.h"
int32_t mndInitSync(SMnode *pMnode) {
static int32_t mndInitWal(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
......@@ -26,12 +26,95 @@ int32_t mndInitSync(SMnode *pMnode) {
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = 0,
.retentionSize = 0,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) {
mError("failed to open wal in %s since %s", path, terrstr());
if (pMgmt->pWal == NULL) return -1;
return 0;
}
static void mndCloseWal(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
pMgmt->pWal = NULL;
}
}
static int32_t mndRestoreWal(SMnode *pMnode) {
SWal *pWal = pMnode->syncMgmt.pWal;
SSdb *pSdb = pMnode->pSdb;
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
int32_t code = -1;
SWalReadHandle *pHandle = walOpenReadHandle(pWal);
if (pHandle == NULL) return -1;
int64_t start = walGetFirstVer(pWal);
int64_t end = walGetLastVer(pWal);
start = MAX(lastSdbVer, start);
for (int64_t ver = start; ver >= 0 && ver <= end; ++ver) {
if (walReadWithHandle(pHandle, ver) < 0) {
mError("failed to read with wal handle since %s, ver:%" PRId64, terrstr(), ver);
goto WAL_RESTORE_OVER;
}
SWalHead *pHead = pHandle->pHead;
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
if (sdbVer + 1 != ver) {
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
mError("failed to write wal to sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver);
goto WAL_RESTORE_OVER;
}
if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) {
mError("failed to write wal to sdb since %s, ver:%" PRId64, terrstr(), ver);
goto WAL_RESTORE_OVER;
}
sdbUpdateVer(pSdb, 1);
}
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
if (sdbVer != lastSdbVer) {
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER;
}
if (sdbVer != lastSdbVer) {
mInfo("sdb restore wal from %" PRId64 " to %" PRId64, lastSdbVer, sdbVer);
if (sdbWriteFile(pSdb) != 0) {
goto WAL_RESTORE_OVER;
}
}
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
}
code = 0;
WAL_RESTORE_OVER:
walCloseReadHandle(pHandle);
return 0;
}
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
if (mndInitWal(pMnode) < 0) {
mError("failed to open wal since %s", terrstr());
return -1;
}
if (mndRestoreWal(pMnode) < 0) {
mError("failed to restore wal since %s", terrstr());
return -1;
}
......@@ -42,11 +125,8 @@ int32_t mndInitSync(SMnode *pMnode) {
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
pMgmt->pWal = NULL;
tsem_destroy(&pMgmt->syncSem);
}
tsem_destroy(&pMgmt->syncSem);
mndCloseWal(pMnode);
}
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
......@@ -71,6 +151,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
}
mTrace("raw:%p has been write to wal, ver:%" PRId64, pRaw, ver);
walCommit(pWal, ver);
walFsync(pWal, true);
#if 1
......
......@@ -72,8 +72,6 @@ typedef struct SSdb {
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
int32_t sdbWriteFile(SSdb *pSdb);
const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
......
......@@ -20,6 +20,8 @@
#include "tchecksum.h"
#include "wal.h"
#include "taoserror.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -19,8 +19,10 @@
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
if (pRead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRead->pWal = pWal;
pRead->readIdxTfd = -1;
pRead->readLogTfd = -1;
......
......@@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_WAl_VER, "Invalid wal version")
// mnode-dnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册