未验证 提交 eb7de2e5 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #17443 from taosdata/fix/commit_txn

fix: commit txn
...@@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, ...@@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} }
} }
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
LINO = __LINE__; \
goto LABEL; \
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -32,12 +32,6 @@ extern "C" { ...@@ -32,12 +32,6 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
LINO = __LINE__; \
goto LABEL; \
}
typedef struct TSDBROW TSDBROW; typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID; typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY; typedef struct TSDBKEY TSDBKEY;
...@@ -247,18 +241,17 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch ...@@ -247,18 +241,17 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch
// SDelFile // SDelFile
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb); int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSClose(STsdb *pTsdb);
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
void tsdbFSDestroy(STsdbFS *pFS); void tsdbFSDestroy(STsdbFS *pFS);
int32_t tDFileSetCmprFn(const void *p1, const void *p2); int32_t tDFileSetCmprFn(const void *p1, const void *p2);
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSCommit(STsdb *pTsdb);
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRollback(STsdb *pTsdb);
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSRollback(STsdbFS *pFS);
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet); int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile); int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
// tsdbReaderWriter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
......
...@@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); ...@@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode);
int32_t vnodeCommit(SVnode* pVnode); int32_t vnodeCommit(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
......
...@@ -97,10 +97,11 @@ typedef struct SMCtbCursor SMCtbCursor; ...@@ -97,10 +97,11 @@ typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMStbCursor SMStbCursor; typedef struct SMStbCursor SMStbCursor;
typedef struct STbUidStore STbUidStore; typedef struct STbUidStore STbUidStore;
int metaOpen(SVnode* pVnode, SMeta** ppMeta); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaClose(SMeta* pMeta); int metaClose(SMeta* pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
int metaCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta);
int metaFinishCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
...@@ -149,10 +150,12 @@ typedef struct { ...@@ -149,10 +150,12 @@ typedef struct {
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
...@@ -200,7 +203,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -200,7 +203,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// sma // sma
int32_t smaInit(); int32_t smaInit();
void smaCleanUp(); void smaCleanUp();
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode, int8_t rollback);
int32_t smaClose(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma); int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma);
......
...@@ -34,6 +34,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { ...@@ -34,6 +34,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
// commit the meta txn // commit the meta txn
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); }
// abort the meta txn // abort the meta txn
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); } int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); }
...@@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k ...@@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
int metaOpen(SVnode *pVnode, SMeta **ppMeta) { int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
int ret; int ret;
int slen; int slen;
...@@ -60,49 +60,49 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -60,49 +60,49 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
taosMkDir(pMeta->path); taosMkDir(pMeta->path);
// open env // open env
ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv); ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pTbDb // open pTbDb
ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb); ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pSkmDb // open pSkmDb
ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb); ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pUidIdx // open pUidIdx
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pNameIdx // open pNameIdx
ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx); ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pCtbIdx // open pCtbIdx
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx); ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pSuidIdx // open pSuidIdx
ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx); ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
...@@ -119,27 +119,27 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -119,27 +119,27 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err; goto _err;
} }
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pTtlIdx // open pTtlIdx
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pSmaIdx // open pSmaIdx
ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx); ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb); ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
......
...@@ -165,6 +165,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { ...@@ -165,6 +165,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
} else { } else {
code = metaCommit(pWriter->pMeta); code = metaCommit(pWriter->pMeta);
if (code) goto _err; if (code) goto _err;
code = metaFinishCommit(pWriter->pMeta);
if (code) goto _err;
} }
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
*ppWriter = NULL; *ppWriter = NULL;
......
...@@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma); ...@@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma);
pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \
} while (0) } while (0)
#define SMA_OPEN_RSMA_IMPL(v, l) \ #define SMA_OPEN_RSMA_IMPL(v, l) \
do { \ do { \
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
if (!RETENTION_VALID(r)) { \ if (!RETENTION_VALID(r)) { \
if (l == 0) { \ if (l == 0) { \
goto _err; \ goto _err; \
} \ } \
break; \ break; \
} \ } \
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \ if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \
goto _err; \ goto _err; \
} \ } \
} while (0) } while (0)
/** /**
...@@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty ...@@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
return 0; return 0;
} }
int32_t smaOpen(SVnode *pVnode) { int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
STsdbCfg *pCfg = &pVnode->config.tsdbCfg; STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
ASSERT(!pVnode->pSma); ASSERT(!pVnode->pSma);
......
...@@ -70,17 +70,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { ...@@ -70,17 +70,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
} }
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) { if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) { if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) { if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
......
...@@ -1041,38 +1041,20 @@ _exit: ...@@ -1041,38 +1041,20 @@ _exit:
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb;
STsdb *pTsdb = pCommitter->pTsdb; if (eno) {
SMemTable *pMemTable = pTsdb->imem; code = eno;
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(eno == 0 && } else {
"tsdbCommit failure" code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
"Restart taosd");
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
// commit or rollback
code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
pTsdb->imem = NULL; _exit:
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable);
tsdbFSDestroy(&pCommitter->fs); tsdbFSDestroy(&pCommitter->fs);
taosArrayDestroy(pCommitter->aTbDataP); taosArrayDestroy(pCommitter->aTbDataP);
if (code || eno) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else { } else {
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
...@@ -1646,3 +1628,50 @@ _exit: ...@@ -1646,3 +1628,50 @@ _exit:
} }
return code; return code;
} }
int32_t tsdbFinishCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
SMemTable *pMemTable = pTsdb->imem;
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
pTsdb->imem = NULL;
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
if (pMemTable) {
tsdbUnrefMemTable(pMemTable);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d tsdb finish commit", TD_VID(pTsdb->pVnode));
}
return code;
}
int32_t tsdbRollbackCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d tsdb rollback commit", TD_VID(pTsdb->pVnode));
}
return code;
}
\ No newline at end of file
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "tsdb.h" #include "tsdb.h"
// ================================================================================================= // =================================================================================================
static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) {
int32_t n = 0; int32_t n = 0;
int8_t hasDel = pFS->pDelFile ? 1 : 0; int8_t hasDel = pFS->pDelFile ? 1 : 0;
uint32_t nSet = taosArrayGetSize(pFS->aDFileSet); uint32_t nSet = taosArrayGetSize(pFS->aDFileSet);
...@@ -39,215 +39,113 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { ...@@ -39,215 +39,113 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) {
return n; return n;
} }
static int32_t tsdbGnrtCurrent(STsdb *pTsdb, STsdbFS *pFS, char *fname) { static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
int64_t n; int32_t n = 0;
int64_t size;
uint8_t *pData = NULL;
TdFilePtr pFD = NULL;
// to binary // version
size = tsdbEncodeFS(NULL, pFS) + sizeof(TSCKSUM); n += tGetI8(pData + n, NULL);
pData = taosMemoryMalloc(size);
// SDelFile
int8_t hasDel = 0;
n += tGetI8(pData + n, &hasDel);
if (hasDel) {
pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile));
if (pFS->pDelFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
n += tGetDelFile(pData + n, pFS->pDelFile);
pFS->pDelFile->nRef = 1;
} else {
pFS->pDelFile = NULL;
}
// aDFileSet
taosArrayClear(pFS->aDFileSet);
uint32_t nSet = 0;
n += tGetU32v(pData + n, &nSet);
for (uint32_t iSet = 0; iSet < nSet; iSet++) {
SDFileSet fSet = {0};
int32_t nt = tGetDFileSet(pData + n, &fSet);
if (nt < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
n += nt;
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
ASSERT(n + sizeof(TSCKSUM) == nData);
_exit:
return code;
}
static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
// encode to binary
int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
uint8_t *pData = taosMemoryMalloc(size);
if (pData == NULL) { if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
n = tsdbEncodeFS(pData, pFS); tsdbFSToBinary(pData, pFS);
ASSERT(n + sizeof(TSCKSUM) == size);
taosCalcChecksumAppend(0, pData, size); taosCalcChecksumAppend(0, pData, size);
// create and write // save to file
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pFD == NULL) { if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
n = taosWriteFile(pFD, pData, size); int64_t n = taosWriteFile(pFD, pData, size);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
} }
if (taosFsyncFile(pFD) < 0) { if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
} }
taosCloseFile(&pFD); taosCloseFile(&pFD);
_exit:
if (pData) taosMemoryFree(pData); if (pData) taosMemoryFree(pData);
if (code) {
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code; return code;
}
_err: int32_t tsdbFSCreate(STsdbFS *pFS) {
tsdbError("vgId:%d, tsdb gnrt current failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); int32_t code = 0;
if (pData) taosMemoryFree(pData);
pFS->pDelFile = NULL;
pFS->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code; return code;
} }
// static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) {
// int32_t code = 0;
// char fname[TSDB_FILENAME_LEN];
// if (pFrom && pTo) {
// bool isSameDisk = (pFrom->diskId.level == pTo->diskId.level) && (pFrom->diskId.id == pTo->diskId.id);
// // head
// if (isSameDisk && pFrom->pHeadF->commitID == pTo->pHeadF->commitID) {
// ASSERT(pFrom->pHeadF->size == pTo->pHeadF->size);
// ASSERT(pFrom->pHeadF->offset == pTo->pHeadF->offset);
// } else {
// tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname);
// taosRemoveFile(fname);
// }
// // data
// if (isSameDisk && pFrom->pDataF->commitID == pTo->pDataF->commitID) {
// if (pFrom->pDataF->size > pTo->pDataF->size) {
// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE);
// if (code) goto _err;
// }
// } else {
// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
// taosRemoveFile(fname);
// }
// // stt
// if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) {
// if (pFrom->pLastF->size > pTo->pLastF->size) {
// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE);
// if (code) goto _err;
// }
// } else {
// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
// taosRemoveFile(fname);
// }
// // sma
// if (isSameDisk && pFrom->pSmaF->commitID == pTo->pSmaF->commitID) {
// if (pFrom->pSmaF->size > pTo->pSmaF->size) {
// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE);
// if (code) goto _err;
// }
// } else {
// tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname);
// taosRemoveFile(fname);
// }
// } else if (pFrom) {
// // head
// tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname);
// taosRemoveFile(fname);
// // data
// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
// taosRemoveFile(fname);
// // stt
// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
// taosRemoveFile(fname);
// // fsm
// tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname);
// taosRemoveFile(fname);
// }
// return code;
// _err:
// tsdbError("vgId:%d, tsdb apply disk file set change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
// return code;
// }
// static int32_t tsdbApplyDelFileChange(STsdbFS *pFS, SDelFile *pFrom, SDelFile *pTo) {
// int32_t code = 0;
// char fname[TSDB_FILENAME_LEN];
// if (pFrom && pTo) {
// if (!tsdbDelFileIsSame(pFrom, pTo)) {
// tsdbDelFileName(pFS->pTsdb, pFrom, fname);
// if (taosRemoveFile(fname) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// }
// } else if (pFrom) {
// tsdbDelFileName(pFS->pTsdb, pFrom, fname);
// if (taosRemoveFile(fname) < 0) {
// code = TAOS_SYSTEM_ERROR(errno);
// goto _err;
// }
// } else {
// // do nothing
// }
// return code;
// _err:
// tsdbError("vgId:%d, tsdb apply del file change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
// return code;
// }
// static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSState *pTo) {
// int32_t code = 0;
// int32_t iFrom = 0;
// int32_t nFrom = taosArrayGetSize(pFrom->aDFileSet);
// int32_t iTo = 0;
// int32_t nTo = taosArrayGetSize(pTo->aDFileSet);
// SDFileSet *pDFileSetFrom;
// SDFileSet *pDFileSetTo;
// // SDelFile
// code = tsdbApplyDelFileChange(pFS, pFrom->pDelFile, pTo->pDelFile);
// if (code) goto _err;
// // SDFileSet
// while (iFrom < nFrom && iTo < nTo) {
// pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom);
// pDFileSetTo = (SDFileSet *)taosArrayGet(pTo->aDFileSet, iTo);
// if (pDFileSetFrom->fid == pDFileSetTo->fid) {
// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo);
// if (code) goto _err;
// iFrom++;
// iTo++;
// } else if (pDFileSetFrom->fid < pDFileSetTo->fid) {
// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
// if (code) goto _err;
// iFrom++;
// } else {
// iTo++;
// }
// }
// while (iFrom < nFrom) {
// pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom);
// code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
// if (code) goto _err;
// iFrom++;
// }
// #if 0
// // do noting
// while (iTo < nTo) {
// pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo);
// code = tsdbApplyDFileSetChange(pFS, NULL, pDFileSetTo);
// if (code) goto _err;
// iTo++;
// }
// #endif
// return code;
// _err:
// tsdbError("vgId:%d, tsdb fs apply disk change failed sicne %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
// return code;
// }
void tsdbFSDestroy(STsdbFS *pFS) { void tsdbFSDestroy(STsdbFS *pFS) {
if (pFS->pDelFile) { if (pFS->pDelFile) {
taosMemoryFree(pFS->pDelFile); taosMemoryFree(pFS->pDelFile);
...@@ -268,76 +166,81 @@ void tsdbFSDestroy(STsdbFS *pFS) { ...@@ -268,76 +166,81 @@ void tsdbFSDestroy(STsdbFS *pFS) {
static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int64_t size; int32_t lino = 0;
char fname[TSDB_FILENAME_LEN]; int64_t size = 0;
char fname[TSDB_FILENAME_LEN] = {0};
// SDelFile // SDelFile
if (pTsdb->fs.pDelFile) { if (pTsdb->fs.pDelFile) {
tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname); tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname);
if (taosStatFile(fname, &size, NULL)) { if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) { if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
// SArray<SDFileSet> // SArray<SDFileSet>
int32_t fid = 0;
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
fid = pSet->fid;
// head ========= // head =========
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
if (taosStatFile(fname, &size, NULL)) { if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) { if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
// data ========= // data =========
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
if (taosStatFile(fname, &size, NULL)) { if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
if (code) goto _err;
} }
// else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
// code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
// TSDB_CHECK_CODE(code, lino, _exit);
// }
// sma ============= // sma =============
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
if (taosStatFile(fname, &size, NULL)) { if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
if (code) goto _err;
} }
// else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
// code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
// TSDB_CHECK_CODE(code, lino, _exit);
// }
// stt =========== // stt ===========
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
if (taosStatFile(fname, &size, NULL)) { if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) { if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
} }
...@@ -346,10 +249,11 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { ...@@ -346,10 +249,11 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
// remove those invalid files (todo) // remove those invalid files (todo)
} }
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
tsdbError("vgId:%d, tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); fid);
}
return code; return code;
} }
...@@ -363,141 +267,531 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { ...@@ -363,141 +267,531 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) { static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP);
}
} else {
if (current) {
snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
}
if (current_t) {
snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP);
}
}
}
static int32_t tsdbLoadFSFromFile(const char *fname, STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
int8_t hasDel; int32_t lino = 0;
uint32_t nSet; uint8_t *pData = NULL;
int32_t n = 0;
// version // load binary
n += tGetI8(pData + n, NULL); TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// SDelFile int64_t size;
n += tGetI8(pData + n, &hasDel); if (taosFStatFile(pFD, &size, NULL) < 0) {
if (hasDel) { code = TAOS_SYSTEM_ERROR(errno);
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); taosCloseFile(&pFD);
if (pTsdb->fs.pDelFile == NULL) { TSDB_CHECK_CODE(code, lino, _exit);
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosReadFile(pFD, pData, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
taosCloseFile(&pFD);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&pFD);
// decode binary
code = tsdbBinaryToFS(pData, size, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (pData) taosMemoryFree(pData);
if (code) {
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
}
return code;
}
static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
char fname[TSDB_FILENAME_LEN] = {0};
int32_t nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
if (nRef == 0) {
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSet->pHeadF);
}
nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
if (nRef == 0) {
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
taosRemoveFile(fname);
taosMemoryFree(pSet->pDataF);
}
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
if (nRef == 0) {
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
taosRemoveFile(fname);
taosMemoryFree(pSet->pSmaF);
}
for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) {
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
taosRemoveFile(fname);
taosMemoryFree(pSet->aSttF[iStt]);
}
}
_exit:
return code;
}
static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFrom) {
int32_t code = 0;
int32_t lino = 0;
*pSetTo = (SDFileSet){.diskId = pSetFrom->diskId, .fid = pSetFrom->fid, .nSttF = 0};
// head
pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (pSetTo->pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetTo->pHeadF = *pSetFrom->pHeadF;
pSetTo->pHeadF->nRef = 1;
// data
pSetTo->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (pSetTo->pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetTo->pDataF = *pSetFrom->pDataF;
pSetTo->pDataF->nRef = 1;
// sma
pSetTo->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (pSetTo->pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetTo->pSmaF = *pSetFrom->pSmaF;
pSetTo->pSmaF->nRef = 1;
// stt
for (int32_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) {
pSetTo->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetTo->aSttF[iStt] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pSetTo->nSttF++;
*pSetTo->aSttF[iStt] = *pSetFrom->aSttF[iStt];
pSetTo->aSttF[iStt]->nRef = 1;
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSetNew) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
bool sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));
char fname[TSDB_FILENAME_LEN] = {0};
// head
SHeadFile *pHeadF = pSetOld->pHeadF;
if ((!sameDisk) || (pHeadF->commitID != pSetNew->pHeadF->commitID)) {
pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (pSetOld->pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->pHeadF = *pSetNew->pHeadF;
pSetOld->pHeadF->nRef = 1;
nRef = atomic_sub_fetch_32(&pHeadF->nRef, 1);
if (nRef == 0) {
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pHeadF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pHeadF);
}
} else {
nRef = pHeadF->nRef;
*pHeadF = *pSetNew->pHeadF;
pHeadF->nRef = nRef;
}
// data
SDataFile *pDataF = pSetOld->pDataF;
if ((!sameDisk) || (pDataF->commitID != pSetNew->pDataF->commitID)) {
pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (pSetOld->pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->pDataF = *pSetNew->pDataF;
pSetOld->pDataF->nRef = 1;
nRef = atomic_sub_fetch_32(&pDataF->nRef, 1);
if (nRef == 0) {
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pDataF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pDataF);
}
} else {
nRef = pDataF->nRef;
*pDataF = *pSetNew->pDataF;
pDataF->nRef = nRef;
}
// sma
SSmaFile *pSmaF = pSetOld->pSmaF;
if ((!sameDisk) || (pSmaF->commitID != pSetNew->pSmaF->commitID)) {
pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (pSetOld->pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->pSmaF = *pSetNew->pSmaF;
pSetOld->pSmaF->nRef = 1;
nRef = atomic_sub_fetch_32(&pSmaF->nRef, 1);
if (nRef == 0) {
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSmaF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSmaF);
}
} else {
nRef = pSmaF->nRef;
*pSmaF = *pSetNew->pSmaF;
pSmaF->nRef = nRef;
}
// stt
if (sameDisk) {
if (pSetNew->nSttF > pSetOld->nSttF) {
ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1);
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
pSetOld->nSttF++;
} else if (pSetNew->nSttF < pSetOld->nSttF) {
ASSERT(pSetNew->nSttF == 1);
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
SSttFile *pSttFile = pSetOld->aSttF[iStt];
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
taosRemoveFile(fname);
taosMemoryFree(pSttFile);
}
pSetOld->aSttF[iStt] = NULL;
}
pSetOld->nSttF = 1;
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
pSetOld->aSttF[0]->nRef = 1;
} else {
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
SSttFile *pSttFile = pSetOld->aSttF[iStt];
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
taosRemoveFile(fname);
taosMemoryFree(pSttFile);
}
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[iStt] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
pSetOld->aSttF[iStt]->nRef = 1;
} else {
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
}
}
} }
} else {
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
SSttFile *pSttFile = pSetOld->aSttF[iStt];
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSttFile);
}
}
pSetOld->nSttF = 0;
for (int32_t iStt = 0; iStt < pSetNew->nSttF; iStt++) {
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[iStt] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
pSetOld->aSttF[iStt]->nRef = 1;
pSetOld->nSttF++;
}
}
if (!sameDisk) {
pSetOld->diskId = pSetNew->diskId;
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
int32_t nRef = 0;
char fname[TSDB_FILENAME_LEN] = {0};
pTsdb->fs.pDelFile->nRef = 1; // SDelFile
n += tGetDelFile(pData + n, pTsdb->fs.pDelFile); if (pFS->pDelFile) {
SDelFile *pDelFile = pTsdb->fs.pDelFile;
if (pDelFile == NULL || (pDelFile->commitID != pFS->pDelFile->commitID)) {
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
if (pTsdb->fs.pDelFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pTsdb->fs.pDelFile = *pFS->pDelFile;
pTsdb->fs.pDelFile->nRef = 1;
if (pDelFile) {
nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
if (nRef == 0) {
tsdbDelFileName(pTsdb, pDelFile, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pDelFile);
}
}
}
} else { } else {
pTsdb->fs.pDelFile = NULL; ASSERT(pTsdb->fs.pDelFile == NULL);
} }
// SArray<SDFileSet> // aDFileSet
taosArrayClear(pTsdb->fs.aDFileSet); int32_t iOld = 0;
n += tGetU32v(pData + n, &nSet); int32_t iNew = 0;
for (uint32_t iSet = 0; iSet < nSet; iSet++) { while (true) {
SDFileSet fSet = {0}; int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
int32_t nNew = taosArrayGetSize(pFS->aDFileSet);
SDFileSet fSet = {0};
int8_t sameDisk = 0;
if (iOld >= nOld && iNew >= nNew) break;
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL;
if (pSetOld && pSetNew) {
if (pSetOld->fid == pSetNew->fid) {
code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit);
iOld++;
iNew++;
} else if (pSetOld->fid < pSetNew->fid) {
code = tsdbRemoveFileSet(pTsdb, pSetOld);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
} else {
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit)
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t nt = tGetDFileSet(pData + n, &fSet); iOld++;
if (nt < 0) { iNew++;
code = TSDB_CODE_OUT_OF_MEMORY; }
goto _err; } else if (pSetOld) {
} code = tsdbRemoveFileSet(pTsdb, pSetOld);
TSDB_CHECK_CODE(code, lino, _exit);
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
} else {
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
TSDB_CHECK_CODE(code, lino, _exit)
n += nt; if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) { iOld++;
code = TSDB_CODE_OUT_OF_MEMORY; iNew++;
goto _err;
} }
} }
ASSERT(n + sizeof(TSCKSUM) == nData); _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
return code; return code;
} }
// EXPOSED APIS ==================================================================================== // EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t tsdbFSCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pTsdb->pVnode; int32_t lino = 0;
STsdbFS fs = {0};
// open handle char current[TSDB_FILENAME_LEN] = {0};
pTsdb->fs.pDelFile = NULL; char current_t[TSDB_FILENAME_LEN] = {0};
pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); tsdbGetCurrentFName(pTsdb, current, current_t);
if (pTsdb->fs.aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// load fs or keep empty if (!taosCheckExistFile(current_t)) goto _exit;
char fname[TSDB_FILENAME_LEN];
if (pVnode->pTfs) { // rename the file
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, if (taosRenameFile(current_t, current) < 0) {
pTsdb->path, TD_DIRSEP); code = TAOS_SYSTEM_ERROR(errno);
} else { TSDB_CHECK_CODE(code, lino, _exit);
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
} }
if (!taosCheckExistFile(fname)) { // Load the new FS
// empty one code = tsdbFSCreate(&fs);
code = tsdbGnrtCurrent(pTsdb, &pTsdb->fs, fname); TSDB_CHECK_CODE(code, lino, _exit);
if (code) goto _err;
} else {
// read
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
int64_t size; code = tsdbLoadFSFromFile(current, &fs);
if (taosFStatFile(pFD, &size, NULL) < 0) { TSDB_CHECK_CODE(code, lino, _exit);
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFD);
goto _err;
}
uint8_t *pData = taosMemoryMalloc(size); // apply file change
if (pData == NULL) { code = tsdbFSApplyChange(pTsdb, &fs);
code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit);
taosCloseFile(&pFD);
goto _err;
}
int64_t n = taosReadFile(pFD, pData, size); _exit:
if (n < 0) { tsdbFSDestroy(&fs);
code = TAOS_SYSTEM_ERROR(errno); if (code) {
taosMemoryFree(pData); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
taosCloseFile(&pFD); }
goto _err; return code;
} }
if (!taosCheckChecksumWhole(pData, size)) { int32_t tsdbFSRollback(STsdb *pTsdb) {
code = TSDB_CODE_FILE_CORRUPTED; int32_t code = 0;
taosMemoryFree(pData); int32_t lino = 0;
taosCloseFile(&pFD);
goto _err;
}
taosCloseFile(&pFD); char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, NULL, current_t);
(void)taosRemoveFile(current_t);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
}
return code;
}
// recover fs int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
code = tsdbRecoverFS(pTsdb, pData, size); int32_t code = 0;
if (code) { int32_t lino = 0;
taosMemoryFree(pData); SVnode *pVnode = pTsdb->pVnode;
goto _err;
// open handle
code = tsdbFSCreate(&pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// open impl
char current[TSDB_FILENAME_LEN] = {0};
char current_t[TSDB_FILENAME_LEN] = {0};
tsdbGetCurrentFName(pTsdb, current, current_t);
if (taosCheckExistFile(current)) {
code = tsdbLoadFSFromFile(current, &pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {
if (rollback) {
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbFSCommit(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
} else {
// empty one
code = tsdbSaveFSToFile(&pTsdb->fs, current);
TSDB_CHECK_CODE(code, lino, _exit);
taosMemoryFree(pData); ASSERT(!rollback);
} }
// scan and fix FS // scan and fix FS
code = tsdbScanAndTryFixFS(pTsdb); code = tsdbScanAndTryFixFS(pTsdb);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d, tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); }
return code; return code;
} }
...@@ -538,19 +832,24 @@ int32_t tsdbFSClose(STsdb *pTsdb) { ...@@ -538,19 +832,24 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
pFS->pDelFile = NULL; pFS->pDelFile = NULL;
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); if (pFS->aDFileSet) {
if (pFS->aDFileSet == NULL) { taosArrayClear(pFS->aDFileSet);
code = TSDB_CODE_OUT_OF_MEMORY; } else {
goto _exit; pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
if (pFS->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
if (pTsdb->fs.pDelFile) { if (pTsdb->fs.pDelFile) {
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
if (pFS->pDelFile == NULL) { if (pFS->pDelFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
*pFS->pDelFile = *pTsdb->fs.pDelFile; *pFS->pDelFile = *pTsdb->fs.pDelFile;
...@@ -564,7 +863,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -564,7 +863,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (fSet.pHeadF == NULL) { if (fSet.pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
*fSet.pHeadF = *pSet->pHeadF; *fSet.pHeadF = *pSet->pHeadF;
...@@ -572,7 +871,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -572,7 +871,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (fSet.pDataF == NULL) { if (fSet.pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
*fSet.pDataF = *pSet->pDataF; *fSet.pDataF = *pSet->pDataF;
...@@ -580,7 +879,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -580,7 +879,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) { if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
*fSet.pSmaF = *pSet->pSmaF; *fSet.pSmaF = *pSet->pSmaF;
...@@ -589,26 +888,21 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -589,26 +888,21 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (fSet.aSttF[fSet.nSttF] == NULL) { if (fSet.aSttF[fSet.nSttF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
*fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF]; *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
} }
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
_exit: _exit:
return code; if (code) {
} tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
int32_t tsdbFSRollback(STsdbFS *pFS) {
int32_t code = 0;
ASSERT(0);
return code; return code;
} }
...@@ -714,336 +1008,21 @@ _exit: ...@@ -714,336 +1008,21 @@ _exit:
return code; return code;
} }
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN]; char tfname[TSDB_FILENAME_LEN];
char fname[TSDB_FILENAME_LEN];
snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, tsdbGetCurrentFName(pTsdb, NULL, tfname);
pTsdb->path, TD_DIRSEP);
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
pTsdb->path, TD_DIRSEP);
// gnrt CURRENT.t // gnrt CURRENT.t
code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname); code = tsdbSaveFSToFile(pFSNew, tfname);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
// rename _exit:
code = taosRenameFile(tfname, fname);
if (code) { if (code) {
code = TAOS_SYSTEM_ERROR(code); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
goto _err;
}
return code;
_err:
tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t code = 0;
int32_t nRef;
char fname[TSDB_FILENAME_LEN];
// del
if (pFSNew->pDelFile) {
SDelFile *pDelFile = pTsdb->fs.pDelFile;
if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) {
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
if (pTsdb->fs.pDelFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pTsdb->fs.pDelFile = *pFSNew->pDelFile;
pTsdb->fs.pDelFile->nRef = 1;
if (pDelFile) {
nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
if (nRef == 0) {
tsdbDelFileName(pTsdb, pDelFile, fname);
taosRemoveFile(fname);
taosMemoryFree(pDelFile);
}
}
}
} else {
ASSERT(pTsdb->fs.pDelFile == NULL);
}
// data
int32_t iOld = 0;
int32_t iNew = 0;
while (true) {
int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
int32_t nNew = taosArrayGetSize(pFSNew->aDFileSet);
SDFileSet fSet;
int8_t sameDisk;
if (iOld >= nOld && iNew >= nNew) break;
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL;
if (pSetOld && pSetNew) {
if (pSetOld->fid == pSetNew->fid) {
goto _merge_old_and_new;
} else if (pSetOld->fid < pSetNew->fid) {
goto _remove_old;
} else {
goto _add_new;
}
} else if (pSetOld) {
goto _remove_old;
} else {
goto _add_new;
}
_merge_old_and_new:
sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));
// head
fSet.pHeadF = pSetOld->pHeadF;
if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) {
pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (pSetOld->pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->pHeadF = *pSetNew->pHeadF;
pSetOld->pHeadF->nRef = 1;
nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1);
if (nRef == 0) {
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(fSet.pHeadF);
}
} else {
ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size);
ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset);
}
// data
fSet.pDataF = pSetOld->pDataF;
if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) {
pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (pSetOld->pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->pDataF = *pSetNew->pDataF;
pSetOld->pDataF->nRef = 1;
nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1);
if (nRef == 0) {
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname);
taosRemoveFile(fname);
taosMemoryFree(fSet.pDataF);
}
} else {
ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size);
pSetOld->pDataF->size = pSetNew->pDataF->size;
}
// sma
fSet.pSmaF = pSetOld->pSmaF;
if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) {
pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (pSetOld->pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->pSmaF = *pSetNew->pSmaF;
pSetOld->pSmaF->nRef = 1;
nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1);
if (nRef == 0) {
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(fSet.pSmaF);
}
} else {
ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size);
pSetOld->pSmaF->size = pSetNew->pSmaF->size;
}
// stt
if (sameDisk) {
if (pSetNew->nSttF > pSetOld->nSttF) {
ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1);
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
pSetOld->nSttF++;
} else if (pSetNew->nSttF < pSetOld->nSttF) {
ASSERT(pSetNew->nSttF == 1);
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
SSttFile *pSttFile = pSetOld->aSttF[iStt];
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
taosRemoveFile(fname);
taosMemoryFree(pSttFile);
}
pSetOld->aSttF[iStt] = NULL;
}
pSetOld->nSttF = 1;
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
pSetOld->aSttF[0]->nRef = 1;
} else {
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
SSttFile *pSttFile = pSetOld->aSttF[iStt];
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
taosRemoveFile(fname);
taosMemoryFree(pSttFile);
}
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[iStt] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
pSetOld->aSttF[iStt]->nRef = 1;
} else {
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
}
}
}
} else {
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
SSttFile *pSttFile = pSetOld->aSttF[iStt];
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
taosRemoveFile(fname);
taosMemoryFree(pSttFile);
}
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pSetOld->aSttF[iStt] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
pSetOld->aSttF[iStt]->nRef = 1;
}
}
if (!sameDisk) {
pSetOld->diskId = pSetNew->diskId;
}
iOld++;
iNew++;
continue;
_remove_old:
nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1);
if (nRef == 0) {
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSetOld->pHeadF);
}
nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1);
if (nRef == 0) {
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname);
taosRemoveFile(fname);
taosMemoryFree(pSetOld->pDataF);
}
nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1);
if (nRef == 0) {
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname);
taosRemoveFile(fname);
taosMemoryFree(pSetOld->pSmaF);
}
for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname);
taosRemoveFile(fname);
taosMemoryFree(pSetOld->aSttF[iStt]);
}
}
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
continue;
_add_new:
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1};
// head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (fSet.pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*fSet.pHeadF = *pSetNew->pHeadF;
fSet.pHeadF->nRef = 1;
// data
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (fSet.pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*fSet.pDataF = *pSetNew->pDataF;
fSet.pDataF->nRef = 1;
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*fSet.pSmaF = *pSetNew->pSmaF;
fSet.pSmaF->nRef = 1;
// stt
ASSERT(pSetNew->nSttF == 1);
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (fSet.aSttF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*fSet.aSttF[0] = *pSetNew->aSttF[0];
fSet.aSttF[0]->nRef = 1;
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
iOld++;
iNew++;
continue;
} }
return code;
_err:
tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
......
...@@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { ...@@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
* @param dir * @param dir
* @return int * @return int
*/ */
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg) { int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
int slen = 0; int slen = 0;
...@@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee ...@@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
} }
// open tsdb // open tsdb
if (tsdbFSOpen(pTsdb) < 0) { if (tsdbFSOpen(pTsdb, rollback) < 0) {
goto _err; goto _err;
} }
......
...@@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { ...@@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
} }
// do change fs // do change fs
code = tsdbFSCommit1(pTsdb, &fs); code = tsdbFSPrepareCommit(pTsdb, &fs);
if (code) goto _err; if (code) goto _err;
taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit2(pTsdb, &fs); code = tsdbFSCommit(pTsdb);
if (code) { if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err; goto _err;
......
...@@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code = tsdbSnapWriteDelEnd(pWriter); code = tsdbSnapWriteDelEnd(pWriter);
if (code) goto _err; if (code) goto _err;
code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
if (code) goto _err; if (code) goto _err;
// lock // lock
taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); code = tsdbFSCommit(pWriter->pTsdb);
if (code) { if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err; goto _err;
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
static int vnodeCommitImpl(void *arg); static int vnodeCommitImpl(void *arg);
static void vnodeWaitCommit(SVnode *pVnode); static void vnodeWaitCommit(SVnode *pVnode);
...@@ -215,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) { ...@@ -215,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) {
} }
int vnodeCommit(SVnode *pVnode) { int vnodeCommit(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
...@@ -234,8 +234,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -234,8 +234,8 @@ int vnodeCommit(SVnode *pVnode) {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
} }
if (vnodeSaveInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0) {
vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); code = terrno;
return -1; TSDB_CHECK_CODE(code, lino, _exit);
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
...@@ -249,8 +249,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -249,8 +249,8 @@ int vnodeCommit(SVnode *pVnode) {
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno)); code = TSDB_CODE_FAILED;
return -1; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
...@@ -259,22 +259,26 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -259,22 +259,26 @@ int vnodeCommit(SVnode *pVnode) {
return -1; return -1;
} }
} else { } else {
if (tsdbCommit(pVnode->pTsdb) < 0) { code = tsdbCommit(pVnode->pTsdb);
vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); TSDB_CHECK_CODE(code, lino, _exit);
return -1;
}
} }
if (tqCommit(pVnode->pTq) < 0) { if (tqCommit(pVnode->pTq) < 0) {
vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno)); code = TSDB_CODE_FAILED;
return -1; TSDB_CHECK_CODE(code, lino, _exit);
} }
// walCommit (TODO)
// commit info // commit info
if (vnodeCommitInfo(dir, &info) < 0) { if (vnodeCommitInfo(dir, &info) < 0) {
vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno)); code = terrno;
return -1; TSDB_CHECK_CODE(code, lino, _exit);
}
tsdbFinishCommit(pVnode->pTsdb);
if (metaFinishCommit(pVnode->pMeta) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
} }
pVnode->state.committed = info.state.committed; pVnode->state.committed = info.state.committed;
...@@ -287,11 +291,31 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -287,11 +291,31 @@ int vnodeCommit(SVnode *pVnode) {
// apply the commit (TODO) // apply the commit (TODO)
walEndSnapshot(pVnode->pWal); walEndSnapshot(pVnode->pWal);
vInfo("vgId:%d, commit end", TD_VID(pVnode)); _exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
vInfo("vgId:%d, commit end", TD_VID(pVnode));
}
return 0; return 0;
} }
bool vnodeShouldRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VND_INFO_FNAME_TMP);
return taosCheckExistFile(tFName);
}
void vnodeRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VND_INFO_FNAME_TMP);
(void)taosRemoveFile(tFName);
}
static int vnodeCommitImpl(void *arg) { static int vnodeCommitImpl(void *arg) {
SVnode *pVnode = (SVnode *)arg; SVnode *pVnode = (SVnode *)arg;
...@@ -304,16 +328,6 @@ static int vnodeCommitImpl(void *arg) { ...@@ -304,16 +328,6 @@ static int vnodeCommitImpl(void *arg) {
return 0; return 0;
} }
static int vnodeStartCommit(SVnode *pVnode) {
// TODO
return 0;
}
static int vnodeEndCommit(SVnode *pVnode) {
// TODO
return 0;
}
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); } static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeEncodeState(const void *pObj, SJson *pJson) {
......
...@@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool // open buffer pool
if (vnodeOpenBufPool(pVnode) < 0) { if (vnodeOpenBufPool(pVnode) < 0) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
...@@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
} }
// open meta // open meta
if (metaOpen(pVnode, &pVnode->pMeta) < 0) { if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open tsdb // open tsdb
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) { if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open sma // open sma
if (smaOpen(pVnode)) { if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
...@@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
#if !VNODE_AS_LIB
// open query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
#endif
// vnode begin // vnode begin
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
...@@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
#if !VNODE_AS_LIB
// open sync // open sync
if (vnodeSyncOpen(pVnode, dir)) { if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
#endif
if (rollback) {
vnodeRollback(pVnode);
}
return pVnode; return pVnode;
......
...@@ -27,7 +27,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -27,7 +27,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
char* streamPath = taosMemoryCalloc(1, len); char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream"); sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = strdup(streamPath); pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
taosMemoryFree(streamPath); taosMemoryFree(streamPath);
goto _err; goto _err;
} }
...@@ -36,11 +36,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -36,11 +36,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosMulModeMkDir(streamPath, 0755); taosMulModeMkDir(streamPath, 0755);
taosMemoryFree(streamPath); taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err; goto _err;
} }
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
goto _err; goto _err;
} }
......
...@@ -99,26 +99,26 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -99,26 +99,26 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset(statePath, 0, 300); memset(statePath, 0, 300);
tstrncpy(statePath, path, 300); tstrncpy(statePath, path, 300);
} }
if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) {
goto _err; goto _err;
} }
// open state storage backend // open state storage backend
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) { if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) {
goto _err; goto _err;
} }
// todo refactor // todo refactor
if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) { if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) {
goto _err; goto _err;
} }
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db, if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db,
&pState->pSessionStateDb) < 0) { &pState->pSessionStateDb, 0) < 0) {
goto _err; goto _err;
} }
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) { if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) {
goto _err; goto _err;
} }
......
...@@ -31,15 +31,17 @@ typedef struct STBC TBC; ...@@ -31,15 +31,17 @@ typedef struct STBC TBC;
typedef struct STxn TXN; typedef struct STxn TXN;
// TDB // TDB
int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb); int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback);
int32_t tdbClose(TDB *pDb); int32_t tdbClose(TDB *pDb);
int32_t tdbBegin(TDB *pDb, TXN *pTxn); int32_t tdbBegin(TDB *pDb, TXN *pTxn);
int32_t tdbCommit(TDB *pDb, TXN *pTxn); int32_t tdbCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn);
int32_t tdbAbort(TDB *pDb, TXN *pTxn); int32_t tdbAbort(TDB *pDb, TXN *pTxn);
int32_t tdbAlter(TDB *pDb, int pages); int32_t tdbAlter(TDB *pDb, int pages);
// TTB // TTB
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb); int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb,
int8_t rollback);
int32_t tdbTbClose(TTB *pTb); int32_t tdbTbClose(TTB *pTb);
int32_t tdbTbDrop(TTB *pTb); int32_t tdbTbDrop(TTB *pTb);
int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tdbInt.h" #include "tdbInt.h"
int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, int8_t rollback) {
TDB *pDb; TDB *pDb;
int dsize; int dsize;
int zsize; int zsize;
...@@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) { ...@@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
#ifdef USE_MAINDB #ifdef USE_MAINDB
// open main db // open main db
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb); ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb, rollback);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -129,6 +129,21 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { ...@@ -129,6 +129,21 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
return 0; return 0;
} }
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) {
SPager *pPager;
int ret;
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerPostCommit(pPager, pTxn);
if (ret < 0) {
tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId);
return -1;
}
}
return 0;
}
int32_t tdbAbort(TDB *pDb, TXN *pTxn) { int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
SPager *pPager; SPager *pPager;
int ret; int ret;
......
...@@ -305,6 +305,18 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -305,6 +305,18 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return 0; return 0;
} }
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPager->inTran = 0;
return 0;
}
// recovery dirty pages // recovery dirty pages
int tdbPagerAbort(SPager *pPager, TXN *pTxn) { int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
SPage *pPage; SPage *pPage;
...@@ -657,3 +669,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { ...@@ -657,3 +669,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
return 0; return 0;
} }
int tdbPagerRollback(SPager *pPager) {
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
...@@ -24,7 +24,8 @@ struct STBC { ...@@ -24,7 +24,8 @@ struct STBC {
SBTC btc; SBTC btc;
}; };
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) { int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb,
int8_t rollback) {
TTB *pTb; TTB *pTb;
SPager *pPager; SPager *pPager;
int ret; int ret;
...@@ -110,10 +111,14 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF ...@@ -110,10 +111,14 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
return -1; return -1;
} }
ret = tdbPagerRestore(pPager, pTb->pBt); if (rollback) {
if (ret < 0) { tdbPagerRollback(pPager);
tdbOsFree(pTb); } else {
return -1; ret = tdbPagerRestore(pPager, pTb->pBt);
if (ret < 0) {
tdbOsFree(pTb);
return -1;
}
} }
*ppTb = pTb; *ppTb = pTb;
......
...@@ -190,12 +190,14 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt); ...@@ -190,12 +190,14 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn);
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn);
int tdbPagerAbort(SPager *pPager, TXN *pTxn); int tdbPagerAbort(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn); TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerRestore(SPager *pPager, SBTree *pBt); int tdbPagerRestore(SPager *pPager, SBTree *pBt);
int tdbPagerRollback(SPager *pPager);
// tdbPCache.c ==================================== // tdbPCache.c ====================================
#define TDB_PCACHE_PAGE \ #define TDB_PCACHE_PAGE \
......
...@@ -140,7 +140,7 @@ static void generateBigVal(char *val, int valLen) { ...@@ -140,7 +140,7 @@ static void generateBigVal(char *val, int valLen) {
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) { static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
TDB *pEnv = NULL; TDB *pEnv = NULL;
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv); int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0);
if (ret) { if (ret) {
pEnv = NULL; pEnv = NULL;
} }
...@@ -162,8 +162,8 @@ static void insertOfp(void) { ...@@ -162,8 +162,8 @@ static void insertOfp(void) {
// open db // open db
TTB *pDb = NULL; TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr; tdb_cmpr_fn_t compFunc = tKeyCmpr;
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb); ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// open the pool // open the pool
...@@ -211,8 +211,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) { ...@@ -211,8 +211,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
// open db // open db
TTB *pDb = NULL; TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr; tdb_cmpr_fn_t compFunc = tKeyCmpr;
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb); int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// generate value payload // generate value payload
...@@ -253,7 +253,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) { ...@@ -253,7 +253,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
// open db // open db
TTB *pDb = NULL; TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr; tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// open the pool // open the pool
...@@ -354,12 +354,12 @@ TEST(tdb_test, simple_insert1) { ...@@ -354,12 +354,12 @@ TEST(tdb_test, simple_insert1) {
taosRemoveDir("tdb"); taosRemoveDir("tdb");
// Open Env // Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv); ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// Create a database // Create a database
compFunc = tKeyCmpr; compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
{ {
......
...@@ -130,12 +130,12 @@ TEST(tdb_test, DISABLED_simple_insert1) { ...@@ -130,12 +130,12 @@ TEST(tdb_test, DISABLED_simple_insert1) {
taosRemoveDir("tdb"); taosRemoveDir("tdb");
// Open Env // Open Env
ret = tdbOpen("tdb", 4096, 64, &pEnv); ret = tdbOpen("tdb", 4096, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// Create a database // Create a database
compFunc = tKeyCmpr; compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
{ {
...@@ -250,12 +250,12 @@ TEST(tdb_test, DISABLED_simple_insert2) { ...@@ -250,12 +250,12 @@ TEST(tdb_test, DISABLED_simple_insert2) {
taosRemoveDir("tdb"); taosRemoveDir("tdb");
// Open Env // Open Env
ret = tdbOpen("tdb", 1024, 10, &pEnv); ret = tdbOpen("tdb", 1024, 10, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// Create a database // Create a database
compFunc = tDefaultKeyCmpr; compFunc = tDefaultKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
{ {
...@@ -346,11 +346,11 @@ TEST(tdb_test, DISABLED_simple_delete1) { ...@@ -346,11 +346,11 @@ TEST(tdb_test, DISABLED_simple_delete1) {
pPool = openPool(); pPool = openPool();
// open env // open env
ret = tdbOpen("tdb", 1024, 256, &pEnv); ret = tdbOpen("tdb", 1024, 256, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// open database // open database
ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb); ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
...@@ -435,11 +435,11 @@ TEST(tdb_test, DISABLED_simple_upsert1) { ...@@ -435,11 +435,11 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
taosRemoveDir("tdb"); taosRemoveDir("tdb");
// open env // open env
ret = tdbOpen("tdb", 4096, 64, &pEnv); ret = tdbOpen("tdb", 4096, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// open database // open database
ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb); ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
pPool = openPool(); pPool = openPool();
...@@ -497,12 +497,12 @@ TEST(tdb_test, multi_thread_query) { ...@@ -497,12 +497,12 @@ TEST(tdb_test, multi_thread_query) {
taosRemoveDir("tdb"); taosRemoveDir("tdb");
// Open Env // Open Env
ret = tdbOpen("tdb", 4096, 10, &pEnv); ret = tdbOpen("tdb", 4096, 10, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// Create a database // Create a database
compFunc = tKeyCmpr; compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
char key[64]; char key[64];
...@@ -614,10 +614,10 @@ TEST(tdb_test, DISABLED_multi_thread1) { ...@@ -614,10 +614,10 @@ TEST(tdb_test, DISABLED_multi_thread1) {
taosRemoveDir("tdb"); taosRemoveDir("tdb");
// Open Env // Open Env
ret = tdbOpen("tdb", 512, 1, &pDb); ret = tdbOpen("tdb", 512, 1, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb); ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb, 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) { auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
...@@ -726,4 +726,4 @@ TEST(tdb_test, DISABLED_multi_thread1) { ...@@ -726,4 +726,4 @@ TEST(tdb_test, DISABLED_multi_thread1) {
ret = tdbClose(pDb); ret = tdbClose(pDb);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
#endif #endif
} }
\ No newline at end of file
...@@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile)); TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile));
if (pFile == NULL) { if (pFile == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
if (fd >= 0) close(fd); if (fd >= 0) close(fd);
if (fp != NULL) fclose(fp); if (fp != NULL) fclose(fp);
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册