提交 7ab3a1e4 编写于 作者: L Liu Jicong

feat(wal): support restore from snapshot

上级 b23fa19b
...@@ -308,9 +308,11 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc ...@@ -308,9 +308,11 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosFreeQitem(pBlock);
} else { } else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
taosWriteQitem(pTask->outputQueue->queue, pBlock); taosWriteQitem(pTask->outputQueue->queue, pBlock);
......
...@@ -141,6 +141,8 @@ typedef struct SWal { ...@@ -141,6 +141,8 @@ typedef struct SWal {
// ctl // ctl
int64_t refId; int64_t refId;
TdThreadMutex mutex; TdThreadMutex mutex;
// ref
SHashObj *pRefHash; // ref -> SWalRef
// path // path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
// reusable write head // reusable write head
...@@ -184,7 +186,7 @@ int32_t walRollback(SWal *, int64_t ver); ...@@ -184,7 +186,7 @@ int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely // notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver); int32_t walBeginSnapshot(SWal *, int64_t ver);
int32_t walEndSnapshot(SWal *); int32_t walEndSnapshot(SWal *);
void walRestoreFromSnapshot(SWal *, int64_t ver); int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*); // int32_t walDataCorrupted(SWal*);
// read // read
...@@ -199,6 +201,16 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead); ...@@ -199,6 +201,16 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead); int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead); int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
typedef struct {
int64_t refId;
int64_t ver;
} SWalRef;
SWalRef *walOpenRef(SWal *);
void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *);
// deprecated // deprecated
#if 0 #if 0
int32_t walRead(SWal *, SWalHead **, int64_t ver); int32_t walRead(SWal *, SWalHead **, int64_t ver);
......
...@@ -1772,6 +1772,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1772,6 +1772,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
// assign data // assign data
// TODO
ret = taosMemoryCalloc(1, cap + 46); ret = taosMemoryCalloc(1, cap + 46);
ret = POINTER_SHIFT(ret, 46); ret = POINTER_SHIFT(ret, 46);
ret->header.vgId = vgId; ret->header.vgId = vgId;
......
...@@ -132,6 +132,7 @@ static inline void walResetVer(SWalVer* pVer) { ...@@ -132,6 +132,7 @@ static inline void walResetVer(SWalVer* pVer) {
int walLoadMeta(SWal* pWal); int walLoadMeta(SWal* pWal);
int walSaveMeta(SWal* pWal); int walSaveMeta(SWal* pWal);
int walRemoveMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal); int walRollFileInfo(SWal* pWal);
int walCheckAndRepairMeta(SWal* pWal); int walCheckAndRepairMeta(SWal* pWal);
......
...@@ -419,3 +419,12 @@ int walLoadMeta(SWal* pWal) { ...@@ -419,3 +419,12 @@ int walLoadMeta(SWal* pWal) {
taosMemoryFree(buf); taosMemoryFree(buf);
return code; return code;
} }
int walRemoveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) return 0;
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr);
taosRemoveFile(fnameStr);
return 0;
}
...@@ -75,7 +75,7 @@ void walCleanUp() { ...@@ -75,7 +75,7 @@ void walCleanUp() {
} }
SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = taosMemoryMalloc(sizeof(SWal)); SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
if (pWal == NULL) { if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
...@@ -92,6 +92,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -92,6 +92,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
// init ref
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (pWal->pRefHash == NULL) {
taosMemoryFree(pWal);
return NULL;
}
// open meta // open meta
walResetVer(&pWal->vers); walResetVer(&pWal->vers);
pWal->pWriteLogTFile = NULL; pWal->pWriteLogTFile = NULL;
...@@ -100,6 +107,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -100,6 +107,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) { if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
taosHashCleanup(pWal->pRefHash);
taosMemoryFree(pWal); taosMemoryFree(pWal);
return NULL; return NULL;
} }
...@@ -115,12 +123,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -115,12 +123,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
taosMemoryFree(pWal); taosMemoryFree(pWal);
return NULL; return NULL;
} }
pWal->refId = taosAddRef(tsWal.refSetId, pWal); pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) { if (pWal->refId < 0) {
taosHashCleanup(pWal->pRefHash);
taosThreadMutexDestroy(&pWal->mutex); taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
taosMemoryFree(pWal); taosMemoryFree(pWal);
...@@ -130,6 +140,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -130,6 +140,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walLoadMeta(pWal); walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) { if (walCheckAndRepairMeta(pWal) < 0) {
taosHashCleanup(pWal->pRefHash);
taosRemoveRef(tsWal.refSetId, pWal->refId); taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex); taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
...@@ -175,6 +186,7 @@ void walClose(SWal *pWal) { ...@@ -175,6 +186,7 @@ void walClose(SWal *pWal) {
walSaveMeta(pWal); walSaveMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL; pWal->fileInfoSet = NULL;
taosHashCleanup(pWal->pRefHash);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId); taosRemoveRef(tsWal.refSetId, pWal->refId);
......
...@@ -18,12 +18,47 @@ ...@@ -18,12 +18,47 @@
#include "tchecksum.h" #include "tchecksum.h"
#include "walInt.h" #include "walInt.h"
void walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
/*pWal->vers.firstVer = -1;*/ taosThreadMutexLock(&pWal->mutex);
void *pIter = NULL;
while (1) {
taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = (SWalRef *)pIter;
if (pRef->ver != -1) {
taosHashCancelIterate(pWal->pRefHash, pIter);
return -1;
}
}
taosCloseFile(&pWal->pWriteLogTFile);
taosCloseFile(&pWal->pWriteIdxTFile);
if (pWal->vers.firstVer != -1) {
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int32_t i = 0; i < fileSetSize; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
}
}
walRemoveMeta(pWal);
pWal->writeCur = -1;
pWal->totSize = 0;
pWal->lastRollSeq = -1;
taosArrayClear(pWal->fileInfoSet);
pWal->vers.firstVer = -1;
pWal->vers.lastVer = ver; pWal->vers.lastVer = ver;
pWal->vers.commitVer = ver - 1; pWal->vers.commitVer = ver - 1;
pWal->vers.snapshotVer = ver - 1; pWal->vers.snapshotVer = ver - 1;
pWal->vers.verInSnapshotting = -1; pWal->vers.verInSnapshotting = -1;
taosThreadMutexUnlock(&pWal->mutex);
return 0;
} }
int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walCommit(SWal *pWal, int64_t ver) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册