提交 f6c676a1 编写于 作者: S Shengliang Guan

enh: sdb snapshot

上级 dfdaf1cd
...@@ -75,13 +75,12 @@ typedef struct { ...@@ -75,13 +75,12 @@ typedef struct {
} STelemMgmt; } STelemMgmt;
typedef struct { typedef struct {
SWal *pWal; SWal *pWal;
sem_t syncSem; sem_t syncSem;
int64_t sync; int64_t sync;
bool standby; bool standby;
bool restored; int32_t errCode;
int32_t errCode; int32_t transId;
int32_t transId;
} SSyncMgmt; } SSyncMgmt;
typedef struct { typedef struct {
...@@ -90,29 +89,31 @@ typedef struct { ...@@ -90,29 +89,31 @@ typedef struct {
} SGrantInfo; } SGrantInfo;
typedef struct SMnode { typedef struct SMnode {
int32_t selfDnodeId; int32_t selfDnodeId;
int64_t clusterId; int64_t clusterId;
TdThread thread; TdThread thread;
bool deploy; TdThreadRwlock lock;
bool stopped; int32_t refCount;
int8_t replica; bool stopped;
int8_t selfIndex; bool restored;
SReplica replicas[TSDB_MAX_REPLICA]; bool deploy;
char *path; int8_t replica;
int64_t checkTime; int8_t selfIndex;
SSdb *pSdb; SReplica replicas[TSDB_MAX_REPLICA];
SMgmtWrapper *pWrapper; char *path;
SArray *pSteps; int64_t checkTime;
SQHandle *pQuery; SSdb *pSdb;
SShowMgmt showMgmt; SArray *pSteps;
SProfileMgmt profileMgmt; SQHandle *pQuery;
STelemMgmt telemMgmt; SHashObj *infosMeta;
SSyncMgmt syncMgmt; SHashObj *perfsMeta;
SHashObj *infosMeta; SShowMgmt showMgmt;
SHashObj *perfsMeta; SProfileMgmt profileMgmt;
SGrantInfo grant; STelemMgmt telemMgmt;
MndMsgFp msgFp[TDMT_MAX]; SSyncMgmt syncMgmt;
SMsgCb msgCb; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb;
} SMnode; } SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
......
...@@ -336,7 +336,7 @@ int32_t mndStart(SMnode *pMnode) { ...@@ -336,7 +336,7 @@ int32_t mndStart(SMnode *pMnode) {
mndSyncStart(pMnode); mndSyncStart(pMnode);
if (pMnode->deploy) { if (pMnode->deploy) {
if (sdbDeploy(pMnode->pSdb) != 0) return -1; if (sdbDeploy(pMnode->pSdb) != 0) return -1;
pMnode->syncMgmt.restored = true; pMnode->restored = true;
} }
return mndInitTimer(pMnode); return mndInitTimer(pMnode);
} }
......
...@@ -63,7 +63,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { ...@@ -63,7 +63,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
if (!pMnode->deploy) { if (!pMnode->deploy) {
mInfo("mnode sync restore finished"); mInfo("mnode sync restore finished");
mndTransPullup(pMnode); mndTransPullup(pMnode);
pMnode->syncMgmt.restored = true; pMnode->restored = true;
} }
} }
...@@ -85,7 +85,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void ...@@ -85,7 +85,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) { int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
pMnode->syncMgmt.restored = false; pMnode->restored = false;
mInfo("start to apply snapshot to sdb, len:%d", len); mInfo("start to apply snapshot to sdb, len:%d", len);
int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len); int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len);
...@@ -93,7 +93,7 @@ int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char ...@@ -93,7 +93,7 @@ int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char
mError("failed to apply snapshot to sdb, len:%d", len); mError("failed to apply snapshot to sdb, len:%d", len);
} else { } else {
mInfo("successfully to apply snapshot to sdb, len:%d", len); mInfo("successfully to apply snapshot to sdb, len:%d", len);
pMnode->syncMgmt.restored = true; pMnode->restored = true;
} }
// taosMemoryFree(pBuf); // taosMemoryFree(pBuf);
...@@ -250,7 +250,7 @@ bool mndIsMaster(SMnode *pMnode) { ...@@ -250,7 +250,7 @@ bool mndIsMaster(SMnode *pMnode) {
return false; return false;
} }
if (!pMgmt->restored) { if (!pMnode->restored) {
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
return false; return false;
} }
......
...@@ -166,7 +166,6 @@ typedef struct SSdbRow { ...@@ -166,7 +166,6 @@ typedef struct SSdbRow {
typedef struct SSdb { typedef struct SSdb {
SMnode *pMnode; SMnode *pMnode;
char *currDir; char *currDir;
char *syncDir;
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t lastCommitVer;
int64_t curVer; int64_t curVer;
...@@ -182,6 +181,7 @@ typedef struct SSdb { ...@@ -182,6 +181,7 @@ typedef struct SSdb {
SdbDeployFp deployFps[SDB_MAX]; SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
TdThreadMutex filelock;
} SSdb; } SSdb;
typedef struct SSdbIter { typedef struct SSdbIter {
......
...@@ -56,6 +56,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -56,6 +56,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
pSdb->curTerm = -1; pSdb->curTerm = -1;
pSdb->lastCommitVer = -1; pSdb->lastCommitVer = -1;
pSdb->pMnode = pOption->pMnode; pSdb->pMnode = pOption->pMnode;
taosThreadMutexInit(&pSdb->filelock, NULL);
mDebug("sdb init successfully"); mDebug("sdb init successfully");
return pSdb; return pSdb;
} }
...@@ -69,10 +70,6 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -69,10 +70,6 @@ void sdbCleanup(SSdb *pSdb) {
taosMemoryFreeClear(pSdb->currDir); taosMemoryFreeClear(pSdb->currDir);
} }
if (pSdb->syncDir != NULL) {
taosMemoryFreeClear(pSdb->syncDir);
}
if (pSdb->tmpDir != NULL) { if (pSdb->tmpDir != NULL) {
taosMemoryFreeClear(pSdb->tmpDir); taosMemoryFreeClear(pSdb->tmpDir);
} }
...@@ -104,6 +101,7 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -104,6 +101,7 @@ void sdbCleanup(SSdb *pSdb) {
mDebug("sdb table:%s is cleaned up", sdbTableName(i)); mDebug("sdb table:%s is cleaned up", sdbTableName(i));
} }
taosThreadMutexDestroy(&pSdb->filelock);
taosMemoryFree(pSdb); taosMemoryFree(pSdb);
mDebug("sdb is cleaned up"); mDebug("sdb is cleaned up");
} }
......
...@@ -22,13 +22,14 @@ ...@@ -22,13 +22,14 @@
#define SDB_RESERVE_SIZE 512 #define SDB_RESERVE_SIZE 512
#define SDB_FILE_VER 1 #define SDB_FILE_VER 1
static int32_t sdbRunDeployFp(SSdb *pSdb) { static int32_t sdbDeployData(SSdb *pSdb) {
mDebug("start to deploy sdb"); mDebug("start to deploy sdb");
for (int32_t i = SDB_MAX - 1; i >= 0; --i) { for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
SdbDeployFp fp = pSdb->deployFps[i]; SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue; if (fp == NULL) continue;
mDebug("start to deploy sdb:%s", sdbTableName(i));
if ((*fp)(pSdb->pMnode) != 0) { if ((*fp)(pSdb->pMnode) != 0) {
mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr()); mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
return -1; return -1;
...@@ -39,6 +40,39 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { ...@@ -39,6 +40,39 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
return 0; return 0;
} }
static void sdbResetData(SSdb *pSdb) {
mDebug("start to reset sdb");
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue;
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL) continue;
sdbFreeRow(pSdb, pRow, true);
ppRow = taosHashIterate(hash, ppRow);
}
}
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue;
taosHashClear(pSdb->hashObjs[i]);
pSdb->tableVer[i] = 0;
pSdb->maxId[i] = 0;
mDebug("sdb:%s is reset", sdbTableName(i));
}
pSdb->curVer = -1;
pSdb->curTerm = -1;
pSdb->lastCommitVer = -1;
mDebug("sdb reset successfully");
}
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
int64_t sver = 0; int64_t sver = 0;
int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t)); int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
...@@ -169,11 +203,15 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -169,11 +203,15 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return 0; return 0;
} }
int32_t sdbReadFile(SSdb *pSdb) { static int32_t sdbReadFileImp(SSdb *pSdb) {
int64_t offset = 0; int64_t offset = 0;
int32_t code = 0; int32_t code = 0;
int32_t readLen = 0; int32_t readLen = 0;
int64_t ret = 0; int64_t ret = 0;
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file);
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
if (pRaw == NULL) { if (pRaw == NULL) {
...@@ -182,10 +220,6 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -182,10 +220,6 @@ int32_t sdbReadFile(SSdb *pSdb) {
return -1; return -1;
} }
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
...@@ -196,8 +230,6 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -196,8 +230,6 @@ int32_t sdbReadFile(SSdb *pSdb) {
if (sdbReadFileHead(pSdb, pFile) != 0) { if (sdbReadFileHead(pSdb, pFile) != 0) {
mError("failed to read file:%s head since %s", file, terrstr()); mError("failed to read file:%s head since %s", file, terrstr());
pSdb->curVer = -1;
pSdb->curTerm = -1;
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
taosCloseFile(&pFile); taosCloseFile(&pFile);
return -1; return -1;
...@@ -264,6 +296,20 @@ _OVER: ...@@ -264,6 +296,20 @@ _OVER:
return code; return code;
} }
int32_t sdbReadFile(SSdb *pSdb) {
taosThreadMutexLock(&pSdb->filelock);
sdbResetData(pSdb);
int32_t code = sdbReadFileImp(pSdb);
if (code != 0) {
mError("failed to read sdb since %s", terrstr());
sdbResetData(pSdb);
}
taosThreadMutexUnlock(&pSdb->filelock);
return code;
}
static int32_t sdbWriteFileImp(SSdb *pSdb) { static int32_t sdbWriteFileImp(SSdb *pSdb) {
int32_t code = 0; int32_t code = 0;
...@@ -378,15 +424,21 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -378,15 +424,21 @@ int32_t sdbWriteFile(SSdb *pSdb) {
return 0; return 0;
} }
return sdbWriteFileImp(pSdb); taosThreadMutexLock(&pSdb->filelock);
int32_t code = sdbWriteFileImp(pSdb);
if (code != 0) {
mError("failed to write sdb since %s", terrstr());
}
taosThreadMutexUnlock(&pSdb->filelock);
return code;
} }
int32_t sdbDeploy(SSdb *pSdb) { int32_t sdbDeploy(SSdb *pSdb) {
if (sdbRunDeployFp(pSdb) != 0) { if (sdbDeployData(pSdb) != 0) {
return -1; return -1;
} }
if (sdbWriteFileImp(pSdb) != 0) { if (sdbWriteFile(pSdb) != 0) {
return -1; return -1;
} }
...@@ -397,13 +449,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { ...@@ -397,13 +449,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) {
char datafile[PATH_MAX] = {0}; char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
taosThreadMutexLock(&pSdb->filelock);
if (taosCopyFile(datafile, tmpfile) != 0) { if (taosCopyFile(datafile, tmpfile) != 0) {
taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr()); mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
return NULL; return NULL;
} }
taosThreadMutexUnlock(&pSdb->filelock);
SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter)); SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
if (pIter == NULL) { if (pIter == NULL) {
...@@ -422,11 +477,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) { ...@@ -422,11 +477,16 @@ static SSdbIter *sdbOpenIter(SSdb *pSdb) {
return pIter; return pIter;
} }
static void sdbCloseIter(SSdbIter *pIter) { static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) {
if (pIter == NULL) return; if (pIter == NULL) return;
if (pIter->file != NULL) { if (pIter->file != NULL) {
taosCloseFile(&pIter->file); taosCloseFile(&pIter->file);
} }
char tmpfile[PATH_MAX] = {0};
snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
taosRemoveFile(tmpfile);
taosMemoryFree(pIter); taosMemoryFree(pIter);
mInfo("sdbiter:%p, is closed", pIter); mInfo("sdbiter:%p, is closed", pIter);
} }
...@@ -453,15 +513,14 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -453,15 +513,14 @@ static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) {
} }
int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) { int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) {
const int32_t maxlen = 100;
SSdbIter *pIter = sdbGetIter(pSdb, ppIter); SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
if (pIter == NULL) return -1; if (pIter == NULL) return -1;
char *pBuf = taosMemoryCalloc(1, maxlen); int32_t maxlen = 100;
char *pBuf = taosMemoryCalloc(1, maxlen);
if (pBuf == NULL) { if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
return -1; return -1;
} }
...@@ -472,7 +531,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le ...@@ -472,7 +531,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = NULL; *ppBuf = NULL;
*len = 0; *len = 0;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return -1; return -1;
} else if (readlen == 0) { } else if (readlen == 0) {
...@@ -480,7 +539,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le ...@@ -480,7 +539,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = NULL; *ppBuf = NULL;
*len = 0; *len = 0;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return 0; return 0;
} else if ((readlen < maxlen && errno != 0) || readlen == maxlen) { } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) {
...@@ -494,7 +553,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le ...@@ -494,7 +553,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = pBuf; *ppBuf = pBuf;
*len = readlen; *len = readlen;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
return 0; return 0;
} else { } else {
// impossible // impossible
...@@ -502,7 +561,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le ...@@ -502,7 +561,7 @@ int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *le
*ppBuf = NULL; *ppBuf = NULL;
*len = 0; *len = 0;
*ppIter = NULL; *ppIter = NULL;
sdbCloseIter(pIter); sdbCloseIter(pSdb, pIter);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return -1; return -1;
} }
...@@ -512,7 +571,7 @@ int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) { ...@@ -512,7 +571,7 @@ int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) {
char datafile[PATH_MAX] = {0}; char datafile[PATH_MAX] = {0};
char tmpfile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0};
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP); snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册