提交 36bf800e 编写于 作者: B Benguang Zhao

fix: improve fault tolerance on disk space full with error handling enhanced

上级 5a5cdb31
...@@ -201,7 +201,7 @@ static int32_t mndInitWal(SMnode *pMnode) { ...@@ -201,7 +201,7 @@ static int32_t mndInitWal(SMnode *pMnode) {
pMnode->pWal = walOpen(path, &cfg); pMnode->pWal = walOpen(path, &cfg);
if (pMnode->pWal == NULL) { if (pMnode->pWal == NULL) {
mError("failed to open wal since %s", terrstr()); mError("failed to open wal since %s. wal:%s", terrstr(), path);
return -1; return -1;
} }
......
...@@ -89,6 +89,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -89,6 +89,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
data = NULL; data = NULL;
if (vnodeEncodeInfo(pInfo, &data) < 0) { if (vnodeEncodeInfo(pInfo, &data) < 0) {
vError("failed to encode json info.");
return -1; return -1;
} }
...@@ -101,7 +102,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -101,7 +102,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
} }
if (taosWriteFile(pFile, data, strlen(data)) < 0) { if (taosWriteFile(pFile, data, strlen(data)) < 0) {
vError("failed to write info file:%s data:%s", fname, terrstr()); vError("failed to write info file:%s error:%s", fname, terrstr());
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -233,15 +234,15 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -233,15 +234,15 @@ int vnodeCommit(SVnode *pVnode) {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
} }
if (vnodeSaveInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0); vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit // preCommit
// smaSyncPreCommit(pVnode->pSma); // smaSyncPreCommit(pVnode->pSma);
if (smaAsyncPreCommit(pVnode->pSma) < 0) { if(smaAsyncPreCommit(pVnode->pSma) < 0){
ASSERT(0); vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -250,44 +251,44 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -250,44 +251,44 @@ int vnodeCommit(SVnode *pVnode) {
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
if (smaAsyncCommit(pVnode->pSma) < 0) { if (smaAsyncCommit(pVnode->pSma) < 0) {
ASSERT(0); vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) { if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) { if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
} else { } else {
if (tsdbCommit(pVnode->pTsdb) < 0) { if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
} }
if (tqCommit(pVnode->pTq) < 0) { if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
// walCommit (TODO) // walCommit (TODO)
// commit info // commit info
if (vnodeCommitInfo(dir, &info) < 0) { if (vnodeCommitInfo(dir, &info) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -296,7 +297,7 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -296,7 +297,7 @@ int vnodeCommit(SVnode *pVnode) {
// postCommit // postCommit
// smaSyncPostCommit(pVnode->pSma); // smaSyncPostCommit(pVnode->pSma);
if (smaAsyncPostCommit(pVnode->pSma) < 0) { if (smaAsyncPostCommit(pVnode->pSma) < 0) {
ASSERT(0); vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -140,7 +140,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -140,7 +140,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
goto _err; goto _err;
} }
......
...@@ -265,10 +265,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -265,10 +265,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
_do_commit: _do_commit:
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
// commit current change // commit current change
vnodeCommit(pVnode); if (vnodeCommit(pVnode) < 0) {
vError("vgId:%d, failed to commit vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// start a new one // start a new one
vnodeBegin(pVnode); if (vnodeBegin(pVnode) < 0) {
vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
} }
return 0; return 0;
......
...@@ -2726,7 +2726,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI ...@@ -2726,7 +2726,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) { if (code != 0) {
// del resp mgr, call FpCommitCb // del resp mgr, call FpCommitCb
ASSERT(0); sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -234,8 +234,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -234,8 +234,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr); pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf); syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
return -1; return -1;
} }
pEntry->index = index; pEntry->index = index;
......
...@@ -106,7 +106,7 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) { ...@@ -106,7 +106,7 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager, pTxn); ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to begin pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId);
return -1; return -1;
} }
} }
...@@ -121,7 +121,7 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { ...@@ -121,7 +121,7 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager, pTxn); ret = tdbPagerCommit(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to commit pager. dbName:%s, txnId:%d", pDb->dbName, pTxn->txnId);
return -1; return -1;
} }
} }
...@@ -136,7 +136,7 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) { ...@@ -136,7 +136,7 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerAbort(pPager, pTxn); ret = tdbPagerAbort(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to abort pager. dbName:%s, txnId:%d", pDb->dbName, pTxn->txnId);
return -1; return -1;
} }
} }
......
...@@ -210,7 +210,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -210,7 +210,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
ret = tdbPagerWritePageToJournal(pPager, pPage); ret = tdbPagerWritePageToJournal(pPager, pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page to journal since %s", tstrerror(terrno));
return -1; return -1;
} }
} }
...@@ -226,6 +226,8 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { ...@@ -226,6 +226,8 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
// Open the journal // Open the journal
pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (pPager->jfd < 0) { if (pPager->jfd < 0) {
tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -243,9 +245,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -243,9 +245,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
// sync the journal file // sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pPager->jfd);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno);
return 0; return -1;
} }
// loop to write the dirty pages to file // loop to write the dirty pages to file
...@@ -255,7 +257,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -255,7 +257,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage = (SPage *)pNode; pPage = (SPage *)pNode;
ret = tdbPagerWritePageToDB(pPager, pPage); ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page to db since %s", tstrerror(terrno));
return -1; return -1;
} }
} }
...@@ -277,11 +279,25 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -277,11 +279,25 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
tRBTreeCreate(&pPager->rbt, pageCmpFn); tRBTreeCreate(&pPager->rbt, pageCmpFn);
// sync the db file // sync the db file
tdbOsFSync(pPager->fd); if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// remove the journal file // remove the journal file
tdbOsClose(pPager->jfd); if (tdbOsClose(pPager->jfd) < 0) {
tdbOsRemove(pPager->jFileName); tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPager->inTran = 0; pPager->inTran = 0;
return 0; return 0;
...@@ -297,14 +313,14 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -297,14 +313,14 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
// 0, sync the journal file // 0, sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pPager->jfd);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to fsync jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno);
return 0; return -1;
} }
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
if (jfd == NULL) { if (jfd == NULL) {
return 0; return -1;
} }
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
...@@ -516,11 +532,15 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { ...@@ -516,11 +532,15 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno)); ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
if (ret < 0) { if (ret < 0) {
tdbError("failed to write pgno due to %s. file:%s, pgno:%u", strerror(errno), pPager->jFileName, pgno);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, pPage->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -540,13 +560,15 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { ...@@ -540,13 +560,15 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0); tdbError("failed to lseek due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName, pPage->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -590,23 +612,38 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { ...@@ -590,23 +612,38 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
i64 offset = pPager->pageSize * (pgno - 1); i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0); tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize); ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, pPager->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
} }
tdbOsFSync(pPager->fd); if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
tdbOsFree(pageBuf); tdbOsFree(pageBuf);
tdbOsClose(jfd); if (tdbOsClose(jfd) < 0) {
tdbOsRemove(pPager->jFileName); tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0; return 0;
} }
...@@ -492,14 +492,20 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -492,14 +492,20 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0); ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0);
cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField; cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
pRoot = cJSON_Parse(bytes); pRoot = cJSON_Parse(bytes);
if (!pRoot) goto _err;
pMeta = cJSON_GetObjectItem(pRoot, "meta"); pMeta = cJSON_GetObjectItem(pRoot, "meta");
if (!pMeta) goto _err;
pField = cJSON_GetObjectItem(pMeta, "firstVer"); pField = cJSON_GetObjectItem(pMeta, "firstVer");
if (!pField) goto _err;
pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField)); pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
if (!pField) goto _err;
pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField)); pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "commitVer"); pField = cJSON_GetObjectItem(pMeta, "commitVer");
if (!pField) goto _err;
pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField)); pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "lastVer"); pField = cJSON_GetObjectItem(pMeta, "lastVer");
if (!pField) goto _err;
pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField)); pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField));
pFiles = cJSON_GetObjectItem(pRoot, "files"); pFiles = cJSON_GetObjectItem(pRoot, "files");
...@@ -510,16 +516,22 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -510,16 +516,22 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
SWalFileInfo* pData = pArray->pData; SWalFileInfo* pData = pArray->pData;
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
if (!pInfoJson) goto _err;
SWalFileInfo* pInfo = &pData[i]; SWalFileInfo* pInfo = &pData[i];
pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
if (!pField) goto _err;
pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); pField = cJSON_GetObjectItem(pInfoJson, "lastVer");
if (!pField) goto _err;
pInfo->lastVer = atoll(cJSON_GetStringValue(pField)); pInfo->lastVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "createTs"); pField = cJSON_GetObjectItem(pInfoJson, "createTs");
if (!pField) goto _err;
pInfo->createTs = atoll(cJSON_GetStringValue(pField)); pInfo->createTs = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "closeTs"); pField = cJSON_GetObjectItem(pInfoJson, "closeTs");
if (!pField) goto _err;
pInfo->closeTs = atoll(cJSON_GetStringValue(pField)); pInfo->closeTs = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pInfoJson, "fileSize"); pField = cJSON_GetObjectItem(pInfoJson, "fileSize");
if (!pField) goto _err;
pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); pInfo->fileSize = atoll(cJSON_GetStringValue(pField));
} }
taosArraySetSize(pArray, sz); taosArraySetSize(pArray, sz);
...@@ -527,6 +539,10 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -527,6 +539,10 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
pWal->writeCur = sz - 1; pWal->writeCur = sz - 1;
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return 0; return 0;
_err:
cJSON_Delete(pRoot);
return -1;
} }
static int walFindCurMetaVer(SWal* pWal) { static int walFindCurMetaVer(SWal* pWal) {
...@@ -626,6 +642,10 @@ int walLoadMeta(SWal* pWal) { ...@@ -626,6 +642,10 @@ int walLoadMeta(SWal* pWal) {
} }
// load into fileInfoSet // load into fileInfoSet
int code = walMetaDeserialize(pWal, buf); int code = walMetaDeserialize(pWal, buf);
if (code < 0) {
wError("failed to deserialize wal meta. file:%s", fnameStr);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
taosCloseFile(&pFile); taosCloseFile(&pFile);
taosMemoryFree(buf); taosMemoryFree(buf);
return code; return code;
......
...@@ -81,6 +81,12 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -81,6 +81,12 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pWal);
return NULL;
}
// set config // set config
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
...@@ -98,15 +104,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -98,15 +104,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
if (taosMkDir(pWal->path) != 0) { if (taosMkDir(pWal->path) != 0) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
taosMemoryFree(pWal); goto _err;
return NULL;
} }
// init ref // init ref
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (pWal->pRefHash == NULL) { if (pWal->pRefHash == NULL) {
taosMemoryFree(pWal); wError("failed to init hash since %s", tstrerror(terrno));
return NULL; goto _err;
} }
// open meta // open meta
...@@ -117,9 +122,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -117,9 +122,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) { if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
taosHashCleanup(pWal->pRefHash); goto _err;
taosMemoryFree(pWal);
return NULL;
} }
// init status // init status
...@@ -131,46 +134,36 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -131,46 +134,36 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeHead.head.protoVer = WAL_PROTO_VER; pWal->writeHead.head.protoVer = WAL_PROTO_VER;
pWal->writeHead.magic = WAL_MAGIC; pWal->writeHead.magic = WAL_MAGIC;
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { (void)walLoadMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
taosMemoryFree(pWal);
return NULL;
}
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) {
taosHashCleanup(pWal->pRefHash);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
taosMemoryFree(pWal);
return NULL;
}
walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) { if (walCheckAndRepairMeta(pWal) < 0) {
wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId); wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash); goto _err;
taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
return NULL;
} }
if (walCheckAndRepairIdx(pWal) < 0) { if (walCheckAndRepairIdx(pWal) < 0) {
wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId); wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash); goto _err;
taosRemoveRef(tsWal.refSetId, pWal->refId); }
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet); // add ref
return NULL; pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) {
wError("failed to add ref for Wal since %s", tstrerror(terrno));
goto _err;
} }
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
pWal->cfg.fsyncPeriod); pWal->cfg.fsyncPeriod);
return pWal; return pWal;
_err:
taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
taosThreadMutexDestroy(&pWal->mutex);
taosMemoryFree(pWal);
pWal = NULL;
return NULL;
} }
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册