提交 1c26f1b5 编写于 作者: L Liu Jicong

fix(wal): reference

上级 26d4af69
......@@ -550,7 +550,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) {
ASSERT(0);
mError("failed to prepare trans rebalance since %s", terrstr());
goto REB_FAIL;
}
......
......@@ -42,6 +42,7 @@ void walCloseRef(SWal *pWal, int64_t refId) {
int32_t walRefVer(SWalRef *pRef, int64_t ver) {
SWal *pWal = pRef->pWal;
wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId);
if (pRef->refVer != ver) {
taosThreadMutexLock(&pWal->mutex);
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
......
......@@ -257,6 +257,8 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver;
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling
if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex);
......@@ -273,6 +275,10 @@ int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0;
taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting;
wDebug("vgId:%d, wal end snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
if (ver == -1) {
code = -1;
goto END;
......@@ -287,7 +293,8 @@ int32_t walEndSnapshot(SWal *pWal) {
if (pIter == NULL) break;
SWalRef *pRef = *(SWalRef **)pIter;
if (pRef->refVer == -1) continue;
ver = TMIN(ver, pRef->refVer);
ver = TMIN(ver, pRef->refVer - 1);
wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
}
int deleteCnt = 0;
......@@ -298,8 +305,9 @@ int32_t walEndSnapshot(SWal *pWal) {
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
if (pInfo) {
if (ver >= pInfo->lastVer) {
pInfo++;
pInfo--;
}
wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
// iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) ||
......@@ -315,10 +323,12 @@ int32_t walEndSnapshot(SWal *pWal) {
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0);
}
......@@ -409,7 +419,7 @@ END:
}
static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
SWalIdxEntry entry = {.ver = ver, .offset = offset};
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL);
ASSERT(pFileInfo->firstVer >= 0);
......@@ -424,7 +434,8 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return -1;
}
ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) &&
"Offset of idx entries misaligned");
return 0;
}
......@@ -432,7 +443,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
const void *body, int32_t bodyLen) {
int64_t code = 0;
int64_t offset = walGetCurFileOffset(pWal);
int64_t offset = walGetCurFileOffset(pWal);
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册